Description
I'm aware this a not a simple issue.
gin sse support is fundamentally flawed/broken. If it ever worked, it doesn't work anymore. At least behind nginx (nginx version: nginx/1.20.2)
I essentially copied https://github.com/gin-gonic/examples/blob/master/server-sent-event/main.go
The issue is, when you have 1 client connected almost everything seems to work well. The initial messages get swallowed and don't make it to the client.
But when you have 2 clients connected either nginx or gin starts round-robin messages. You send 1 message
curl -k -X POST -H 'Content-Type: application/json' -d '{"f":"1", "t":"3", "c":"tester"}' https://restful.local/api/v1/msg
client 1 receives this message via /api/v1/sse you send this message again or another payload message Now client 1 doesn't receive this message, but client 2 receives it. It doesn't matter if a client is curl or a browser EventSource client.
With 3 clients client 1 receives a message, then client 2 receives a message, then client 3 receives a message... and so on.
Also if you search for "sse" in open issues you'll see that the possiblilty to supply your own id is missing.
It might be a nginx issue, it might not, I have yet to find out. All I know is that websockets with gin & nginx don't have that problem but SSE do.
IMHO sse support in gin should be redesigned from the ground up.
How to reproduce
server {
listen 443 ssl http2;
listen [::]:443 ssl http2;
server_name restful.local;
ssl_certificate /etc/nginx/ssl/restful.cert.pem;
ssl_certificate_key /etc/nginx/ssl/restful.key.pem;
index index.html;
error_log /var/log/nginx/restful.error;
location /api/ {
proxy_pass http://unix:/tmp/restful.sock;
proxy_read_timeout 30;
proxy_connect_timeout 30;
proxy_redirect off;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
}
location = /api/v1/sse {
proxy_read_timeout 86400;
proxy_connect_timeout 86400;
proxy_redirect off;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
proxy_set_header Connection '';
proxy_http_version 1.1;
chunked_transfer_encoding off;
proxy_buffering off;
proxy_cache off;
proxy_pass http://unix:/tmp/restful.sock;
}
}
package event
import (
"log"
"github.com/gin-gonic/gin"
)
type MessageNotification struct {
Type string `json:"type"`
ID string `json:"id"`
SourceID string `json:"s"`
TargetID string `json:"t"`
}
// Event keeps a list of clients those are currently attached
// and broadcasting events to those clients.
type Event struct {
// Events are pushed to this channel by the main events-gathering routine
Message chan *MessageNotification
// New client connections
NewClients chan chan *MessageNotification
// Closed client connections
ClosedClients chan chan *MessageNotification
// Total client connections
TotalClients map[chan *MessageNotification]bool
}
// ClientChan - New event messages are broadcast to all registered client connection channels
type ClientChan chan *MessageNotification
// NewServer initializes event and start processing requests
func NewServer() *Event {
event := &Event{
Message: make(chan *MessageNotification),
NewClients: make(chan chan *MessageNotification),
ClosedClients: make(chan chan *MessageNotification),
TotalClients: make(map[chan *MessageNotification]bool),
}
go event.listen()
return event
}
//listen Listens all incoming requests from clients.
//Handles addition and removal of clients and broadcast messages to clients.
func (stream *Event) listen() {
for {
select {
// Add new available client
case client := <-stream.NewClients:
stream.TotalClients[client] = true
log.Printf("Client added. %d registered clients", len(stream.TotalClients))
// Remove closed client
case client := <-stream.ClosedClients:
delete(stream.TotalClients, client)
log.Printf("Removed client. %d registered clients", len(stream.TotalClients))
// Broadcast message to client
case eventMsg := <-stream.Message:
for clientMessageChan := range stream.TotalClients {
clientMessageChan <- eventMsg
}
}
}
}
func (stream *Event) ServeHTTP() gin.HandlerFunc {
return func(c *gin.Context) {
// Initialize client channel
clientChan := make(ClientChan)
// Send new connection to event server
stream.NewClients <- clientChan
defer func() {
// Send closed connection to event server
stream.ClosedClients <- clientChan
}()
go func() {
// Send connection that is closed by client to event server
<-c.Done()
stream.ClosedClients <- clientChan
}()
c.Next()
}
}
func HeadersMiddleware() gin.HandlerFunc {
return func(c *gin.Context) {
c.Writer.Header().Set("Access-Control-Allow-Origin", "*")
c.Writer.Header().Set("Access-Control-Allow-Headers", "Content-Type")
c.Writer.Header().Set("Content-Type", "text/event-stream")
c.Writer.Header().Set("Cache-Control", "no-cache")
c.Writer.Header().Set("Connection", "keep-alive")
// c.Writer.Header().Set("Transfer-Encoding", "chunked")
c.Writer.Header().Set("X-Accel-Buffering", "no")
c.Next()
}
}
package restful
import (
"time"
"<base>/event"
"go.mongodb.org/mongo-driver/bson/primitive"
)
type Handler struct {
notifier *event.Event
}
func NewHandler() *Handler {
h := &Handler{
notifier: event.NewServer(),
}
return h
}
// MessageRoutes defines routes for Message handlers.
func (h *Handler) MessageRoutes(api *gin.RouterGroup) {
api.GET("/sse", event.HeadersMiddleware(), h.notifier.ServeHTTP(), h.MessageEvents)
api.POST("/msg", h.CreateMessageNoAuth)
}
func (h *Handler) MessageEvents(cx *gin.Context) {
cx.Stream(func(w io.Writer) bool {
if msg, ok := <-h.notifier.Message; ok {
cx.SSEvent("message", msg)
return true
}
return false
})
}
func (h *Handler) CreateMessageNoAuth(cx *gin.Context) {
type Message struct {
ID primitive.ObjectID `bson:"_id,omitempty" json:"id"`
From string `bson:"f" json:"f"`
To string `bson:"t" json:"t"`
Content string `bson:"c" json:"c"`
}
m := new(Message)
if e := cx.BindJSON(m); e != nil {
cx.AbortWithStatusJSON(http.StatusBadRequest, gin.H{"error": e.Error()})
return
}
m.ID = primitive.NewObjectID()
h.notifier.Message <- &event.MessageNotification{
Type: time.Now().String(),
ID: m.ID.Hex(),
SourceID: m.From,
TargetID: m.To,
}
cx.SecureJSON(201, m)
}
const (
prefixTCP = "tcp:"
prefixUNIX = "unix:"
)
func main() {
r := gin.Default()
if e := r.SetTrustedProxies([]string{
"127.0.0.1",
"::1",
}); e != nil {
log.Fatal(e.Error())
}
r.SecureJsonPrefix(")]}',\n")
r.MaxMultipartMemory = 16 << 20
v1 := r.Group("/api/v1")
h := restful.NewHandler(}
h.MessageRoutes(v1)
restfulAddr := "unix:/tmp/restful.sock"
ginServe(r, restfulAddr)
}
func ginServe(r *gin.Engine, path string) error {
// serve
if strings.HasPrefix(path, prefixTCP) {
addr := strings.TrimPrefix(path, prefixTCP)
fmt.Println("listening on", addr)
return r.Run(addr)
} else if strings.HasPrefix(path, prefixUNIX) {
addr := strings.TrimPrefix(path, prefixUNIX)
if _, e := os.Stat(addr); errors.Is(e, os.ErrNotExist) {
fmt.Println("listening on", addr)
return r.RunUnix(addr)
} else {
if e := os.Remove(addr); e != nil {
return e
} else {
fmt.Println("listening on", addr)
return r.RunUnix(addr)
}
}
} else {
return nil
}
}
Expectations
1 and the same messages gets broadcast to all clients
Actual result
curl -v -k https://restful.local/api/v1/sse
see above, only 1 client is receiving the message, the next message is received by another client.
Environment
- go version: go 1.18
- gin version (or commit ref): github.com/gin-gonic/gin v1.7.7
- operating system: linux
Comment From: j-bernard
It seems there was a fix in the example, in Sept 2022 that should address your issue.
Check c.Set("clientChan", clientChan)
and c.Get("clientChan")
in the latest example version in particular.
id is still missing though, but instead of c.SSEvent
you can directly call
c.Render(-1, sse.Event{
Event: "myEvent",
Data: "myData",
Id: "myId",
})