Description

I'm aware this a not a simple issue.

gin sse support is fundamentally flawed/broken. If it ever worked, it doesn't work anymore. At least behind nginx (nginx version: nginx/1.20.2)

I essentially copied https://github.com/gin-gonic/examples/blob/master/server-sent-event/main.go

The issue is, when you have 1 client connected almost everything seems to work well. The initial messages get swallowed and don't make it to the client.

But when you have 2 clients connected either nginx or gin starts round-robin messages. You send 1 message

curl -k -X POST -H 'Content-Type: application/json' -d '{"f":"1", "t":"3", "c":"tester"}' https://restful.local/api/v1/msg

client 1 receives this message via /api/v1/sse you send this message again or another payload message Now client 1 doesn't receive this message, but client 2 receives it. It doesn't matter if a client is curl or a browser EventSource client.

With 3 clients client 1 receives a message, then client 2 receives a message, then client 3 receives a message... and so on.

Also if you search for "sse" in open issues you'll see that the possiblilty to supply your own id is missing.

It might be a nginx issue, it might not, I have yet to find out. All I know is that websockets with gin & nginx don't have that problem but SSE do.

IMHO sse support in gin should be redesigned from the ground up.

How to reproduce

server {
    listen 443 ssl http2;
    listen [::]:443 ssl http2;
    server_name restful.local;

    ssl_certificate /etc/nginx/ssl/restful.cert.pem;
    ssl_certificate_key /etc/nginx/ssl/restful.key.pem;
    index index.html;

    error_log /var/log/nginx/restful.error;

    location /api/ {
        proxy_pass http://unix:/tmp/restful.sock;
        proxy_read_timeout 30;
        proxy_connect_timeout 30;
        proxy_redirect off;
        proxy_set_header Host               $host;
        proxy_set_header X-Real-IP          $remote_addr;
        proxy_set_header X-Forwarded-For    $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto  $scheme;
    }

    location = /api/v1/sse {
        proxy_read_timeout 86400;
        proxy_connect_timeout 86400;
        proxy_redirect off;
        proxy_set_header Host               $host;
        proxy_set_header X-Real-IP          $remote_addr;
        proxy_set_header X-Forwarded-For    $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto  $scheme;
        proxy_set_header Connection '';
        proxy_http_version 1.1;
        chunked_transfer_encoding off;
        proxy_buffering off;
        proxy_cache off;
        proxy_pass http://unix:/tmp/restful.sock;
    }
}
package event

import (
    "log"

    "github.com/gin-gonic/gin"
)

type MessageNotification struct {
    Type     string `json:"type"`
    ID       string `json:"id"`
    SourceID string `json:"s"`
    TargetID string `json:"t"`
}

// Event keeps a list of clients those are currently attached
// and broadcasting events to those clients.
type Event struct {
    // Events are pushed to this channel by the main events-gathering routine
    Message chan *MessageNotification
    // New client connections
    NewClients chan chan *MessageNotification
    // Closed client connections
    ClosedClients chan chan *MessageNotification
    // Total client connections
    TotalClients map[chan *MessageNotification]bool
}

// ClientChan - New event messages are broadcast to all registered client connection channels
type ClientChan chan *MessageNotification

// NewServer initializes event and start processing requests
func NewServer() *Event {
    event := &Event{
        Message:       make(chan *MessageNotification),
        NewClients:    make(chan chan *MessageNotification),
        ClosedClients: make(chan chan *MessageNotification),
        TotalClients:  make(map[chan *MessageNotification]bool),
    }
    go event.listen()
    return event
}

//listen Listens all incoming requests from clients.
//Handles addition and removal of clients and broadcast messages to clients.
func (stream *Event) listen() {
    for {
        select {
        // Add new available client
        case client := <-stream.NewClients:
            stream.TotalClients[client] = true
            log.Printf("Client added. %d registered clients", len(stream.TotalClients))

        // Remove closed client
        case client := <-stream.ClosedClients:
            delete(stream.TotalClients, client)
            log.Printf("Removed client. %d registered clients", len(stream.TotalClients))

        // Broadcast message to client
        case eventMsg := <-stream.Message:
            for clientMessageChan := range stream.TotalClients {
                clientMessageChan <- eventMsg
            }
        }
    }
}

func (stream *Event) ServeHTTP() gin.HandlerFunc {
    return func(c *gin.Context) {
        // Initialize client channel
        clientChan := make(ClientChan)

        // Send new connection to event server
        stream.NewClients <- clientChan

        defer func() {
            // Send closed connection to event server
            stream.ClosedClients <- clientChan
        }()

        go func() {
            // Send connection that is closed by client to event server
            <-c.Done()
            stream.ClosedClients <- clientChan
        }()

        c.Next()
    }
}

func HeadersMiddleware() gin.HandlerFunc {
    return func(c *gin.Context) {
        c.Writer.Header().Set("Access-Control-Allow-Origin", "*")
        c.Writer.Header().Set("Access-Control-Allow-Headers", "Content-Type")
        c.Writer.Header().Set("Content-Type", "text/event-stream")
        c.Writer.Header().Set("Cache-Control", "no-cache")
        c.Writer.Header().Set("Connection", "keep-alive")
        // c.Writer.Header().Set("Transfer-Encoding", "chunked")
        c.Writer.Header().Set("X-Accel-Buffering", "no")
        c.Next()
    }
}
package restful

import (
    "time"

    "<base>/event"
        "go.mongodb.org/mongo-driver/bson/primitive"
)

type Handler struct {
    notifier    *event.Event
}

func NewHandler() *Handler {
    h := &Handler{
        notifier:    event.NewServer(),
    }
    return h
}

// MessageRoutes defines routes for Message handlers.
func (h *Handler) MessageRoutes(api *gin.RouterGroup) {
    api.GET("/sse", event.HeadersMiddleware(), h.notifier.ServeHTTP(), h.MessageEvents)
    api.POST("/msg", h.CreateMessageNoAuth)
}

func (h *Handler) MessageEvents(cx *gin.Context) {
    cx.Stream(func(w io.Writer) bool {
        if msg, ok := <-h.notifier.Message; ok {
            cx.SSEvent("message", msg)
            return true
        }
        return false
    })
}

func (h *Handler) CreateMessageNoAuth(cx *gin.Context) {
    type Message struct {
            ID      primitive.ObjectID `bson:"_id,omitempty" json:"id"`
            From    string             `bson:"f" json:"f"`
            To      string             `bson:"t" json:"t"`
            Content string             `bson:"c" json:"c"`
    }
    m := new(Message)
    if e := cx.BindJSON(m); e != nil {
        cx.AbortWithStatusJSON(http.StatusBadRequest, gin.H{"error": e.Error()})
        return
    }

    m.ID = primitive.NewObjectID()

    h.notifier.Message <- &event.MessageNotification{
        Type:     time.Now().String(),
        ID:       m.ID.Hex(),
        SourceID: m.From,
        TargetID: m.To,
    }

    cx.SecureJSON(201, m)
}

const (
    prefixTCP  = "tcp:"
    prefixUNIX = "unix:"
)

func main() {
        r := gin.Default()
        if e := r.SetTrustedProxies([]string{
            "127.0.0.1",
            "::1",
        }); e != nil {
            log.Fatal(e.Error())
        }
        r.SecureJsonPrefix(")]}',\n")
        r.MaxMultipartMemory = 16 << 20
                v1 := r.Group("/api/v1")
        h := restful.NewHandler(}
                h.MessageRoutes(v1)
               restfulAddr := "unix:/tmp/restful.sock"
        ginServe(r, restfulAddr)
}

func ginServe(r *gin.Engine, path string) error {
    // serve
    if strings.HasPrefix(path, prefixTCP) {
        addr := strings.TrimPrefix(path, prefixTCP)
        fmt.Println("listening on", addr)
        return r.Run(addr)
    } else if strings.HasPrefix(path, prefixUNIX) {
        addr := strings.TrimPrefix(path, prefixUNIX)
        if _, e := os.Stat(addr); errors.Is(e, os.ErrNotExist) {
            fmt.Println("listening on", addr)
            return r.RunUnix(addr)
        } else {
            if e := os.Remove(addr); e != nil {
                return e
            } else {
                fmt.Println("listening on", addr)
                return r.RunUnix(addr)
            }
        }
    } else {
        return nil
    }
}

Expectations

1 and the same messages gets broadcast to all clients

Actual result

curl -v -k https://restful.local/api/v1/sse

see above, only 1 client is receiving the message, the next message is received by another client.

Environment

  • go version: go 1.18
  • gin version (or commit ref): github.com/gin-gonic/gin v1.7.7
  • operating system: linux

Comment From: j-bernard

It seems there was a fix in the example, in Sept 2022 that should address your issue. Check c.Set("clientChan", clientChan) and c.Get("clientChan") in the latest example version in particular.

id is still missing though, but instead of c.SSEvent you can directly call

c.Render(-1, sse.Event{
    Event: "myEvent",
    Data:  "myData",
    Id:    "myId",
})