Server-Sent Events (SSE) is a technology that enables a server to push real-time updates to a client over a single, long-lived HTTP connection. It’s built on top of HTTP and is particularly useful for scenarios where the server needs to send updates to the client without the client explicitly requesting them, such as live notifications, stock tickers, or real-time monitoring dashboards.


Principles of SSE

  1. Unidirectional Communication:
    • SSE establishes a one-way communication channel from the server to the client. The client opens a connection, and the server sends events as they occur.
    • Unlike WebSockets, which are bidirectional, SSE is simpler and lightweight, suitable for scenarios where the client primarily consumes updates.
  2. HTTP-Based:
    • SSE uses standard HTTP/1.1 or HTTP/2 protocols. The client initiates an HTTP request, and the server keeps the connection open, sending data in a specific format as events occur.
    • It leverages the text/event-stream MIME type to indicate the response is an event stream.
  3. Event Stream Format:
    • Data is sent as plain text in a specific format, with each event separated by double newlines (\n\n).
    • The basic structure of an SSE message includes fields like event, data, id, and retry:
      event: message\n
      id: 123\n
      data: {"message": "Hello, world!"}\n\n
      
      • event: Specifies the event type (optional; defaults to message if omitted).
      • data: The payload, which can be any text (often JSON).
      • id: A unique identifier for the event, used for reconnection or tracking.
      • retry: Specifies the reconnection time (in milliseconds) if the connection drops.
  4. Connection Management:
    • The client uses the EventSource API in browsers to connect to an SSE endpoint.
    • The connection remains open until the client closes it, the server terminates it, or a network issue occurs.
    • SSE supports automatic reconnection by the client, using the Last-Event-ID header to resume from the last received event.
  5. Advantages:
    • Simple to implement compared to WebSockets.
    • Built on HTTP, so it works well with existing infrastructure (proxies, load balancers).
    • Lightweight, with minimal overhead for unidirectional updates.
    • Native browser support via EventSource.
  6. Limitations:
    • Unidirectional (server to client only).
    • Limited to text-based data (though JSON is commonly used).
    • Connection limits in browsers (typically 6 concurrent SSE connections per domain).
    • Less flexible than WebSockets for complex, bidirectional communication.

Key Knowledge Points

  1. Client-Side (Browser):
    • The EventSource API is used to connect to an SSE endpoint:
      const source = new EventSource('/events');
      source.onmessage = (event) => {
        console.log('Received:', event.data);
      };
      source.onerror = () => {
        console.log('Error occurred');
      };
      
    • You can listen for specific event types:
      source.addEventListener('customEvent', (event) => {
        console.log('Custom event:', event.data);
      });
      
    • The client automatically reconnects if the connection is lost, with a default retry interval (adjustable via the retry field).
  2. Server-Side Requirements:
    • The server must respond with Content-Type: text/event-stream.
    • It must keep the connection open and send data in the SSE format.
    • The server should handle connection timeouts, client disconnections, and optionally support Last-Event-ID for resuming streams.
  3. Reconnection and Reliability:
    • The id field helps track events, allowing clients to resume from the last event using the Last-Event-ID header.
    • The retry field controls how long the client waits before reconnecting.
    • Servers can implement logic to buffer recent events for clients that reconnect.
  4. Scalability:
    • SSE connections are long-lived, so the server must handle many concurrent connections efficiently.
    • In Go, you can use goroutines and channels to manage event broadcasting to multiple clients.
    • For high scalability, consider using a message queue (e.g., Redis, Kafka) to decouple event generation from delivery.
  5. Security:
    • Use HTTPS to secure SSE connections.
    • Implement CORS headers if the client and server are on different domains.
    • Validate Last-Event-ID to prevent abuse.
    • Avoid sending sensitive data unless properly encrypted or authorized.

Implementing SSE in Go

Go’s standard library (net/http) is well-suited for implementing SSE, as it provides low-level control over HTTP responses. Below is a detailed example of an SSE server in Go, along with explanations.

Example: SSE Server in Go

This example demonstrates a simple SSE server that broadcasts messages to all connected clients. It uses goroutines and channels to handle event broadcasting.

package main

import (
	"fmt"
	"log"
	"net/http"
	"sync"
	"time"
)

// Client represents a connected SSE client
type Client struct {
	send chan string // Channel to send messages to the client
}

// EventBroker manages all connected clients and broadcasts events
type EventBroker struct {
	clients    map[*Client]bool // Connected clients
	register   chan *Client     // Channel to register new clients
	unregister chan *Client     // Channel to unregister clients
	broadcast  chan string      // Channel to broadcast messages
	mu         sync.Mutex       // Mutex for thread-safe client management
}

func NewEventBroker() *EventBroker {
	return &EventBroker{
		clients:    make(map[*Client]bool),
		register:   make(chan *Client),
		unregister: make(chan *Client),
		broadcast:  make(chan string),
	}
}

// Start runs the broker's main loop
func (b *EventBroker) Start() {
	for {
		select {
		case client := <-b.register:
			b.mu.Lock()
			b.clients[client] = true
			b.mu.Unlock()
			log.Printf("New client connected. Total clients: %d", len(b.clients))
		case client := <-b.unregister:
			b.mu.Lock()
			if _, ok := b.clients[client]; ok {
				close(client.send)
				delete(b.clients, client)
			}
			b.mu.Unlock()
			log.Printf("Client disconnected. Total clients: %d", len(b.clients))
		case message := <-b.broadcast:
			b.mu.Lock()
			for client := range b.clients {
				select {
				case client.send <- message:
				default:
					// If the client's channel is blocked, close it
					close(client.send)
					delete(b.clients, client)
				}
			}
			b.mu.Unlock()
		}
	}
}

// SSEHandler handles SSE connections
func (b *EventBroker) SSEHandler(w http.ResponseWriter, r *http.Request) {
	// Set headers for SSE
	w.Header().Set("Content-Type", "text/event-stream")
	w.Header().Set("Cache-Control", "no-cache")
	w.Header().Set("Connection", "keep-alive")
	w.Header().Set("Access-Control-Allow-Origin", "*") // Adjust for CORS if needed

	// Create a new client
	client := &Client{send: make(chan string)}
	b.register <- client

	// Clean up when the client disconnects
	defer func() {
		b.unregister <- client
	}()

	// Ensure the response is flushed immediately
	if flusher, ok := w.(http.Flusher); ok {
		flusher.Flush()
	}

	// Send messages to the client
	for {
		select {
		case message, ok := <-client.send:
			if !ok {
				return
			}
			// Format the message as an SSE event
			fmt.Fprintf(w, "data: %s\n\n", message)
			if flusher, ok := w.(http.Flusher); ok {
				flusher.Flush()
			}
		case <-r.Context().Done():
			// Client closed the connection
			return
		}
	}
}

func main() {
	// Create and start the event broker
	broker := NewEventBroker()
	go broker.Start()

	// Simulate broadcasting messages every 5 seconds
	go func() {
		for i := 1; ; i++ {
			message := fmt.Sprintf("Event %d: %s", i, time.Now().Format(time.RFC3339))
			broker.broadcast <- message
			time.Sleep(5 * time.Second)
		}
	}()

	// Set up the SSE endpoint
	http.HandleFunc("/events", broker.SSEHandler)

	// Start the server
	log.Println("SSE server running on :8080")
	if err := http.ListenAndServe(":8080", nil); err != nil {
		log.Fatal(err)
	}
}

How It Works

  1. EventBroker:
    • Manages all connected clients and handles registration, unregistration, and broadcasting.
    • Uses channels (register, unregister, broadcast) for safe, concurrent communication.
    • A mutex (mu) ensures thread-safe access to the clients map.
  2. Client:
    • Each client has a dedicated send channel for receiving messages.
    • When a client disconnects, its channel is closed, and it’s removed from the broker.
  3. SSEHandler:
    • Sets the required HTTP headers for SSE (Content-Type: text/event-stream, etc.).
    • Registers a new client with the broker.
    • Listens for messages on the client’s send channel and writes them in SSE format (data: <message>\n\n).
    • Uses http.Flusher to ensure messages are sent immediately.
    • Detects client disconnection via r.Context().Done().
  4. Broadcasting:
    • A goroutine simulates sending messages every 5 seconds by pushing them to the broadcast channel.
    • The broker forwards these messages to all connected clients.
  5. Client-Side Example: To test this server, you can use the following JavaScript in a browser:
    const source = new EventSource('http://localhost:8080/events');
    source.onmessage = (event) => {
      console.log('Received:', event.data);
    };
    source.onerror = () => {
      console.log('Error occurred');
    };
    

Running the Example

  1. Save the Go code as main.go.
  2. Run it with go run main.go.
  3. Open a browser and use the JavaScript above (e.g., in the console or a web page) to connect to http://localhost:8080/events.
  4. You’ll see messages logged every 5 seconds, like Received: Event 1: 2025-04-17T12:00:00Z.

Advanced Considerations for Go

  1. Handling Last-Event-ID:
    • Check the Last-Event-ID header in the request to resume from a specific event:
      lastEventID := r.Header.Get("Last-Event-ID")
      if lastEventID != "" {
        // Logic to send missed events based on lastEventID
      }
      
    • Store recent events in memory or a database to support this.
  2. Scaling with Redis:
    • Use a Redis Pub/Sub channel to broadcast events to multiple server instances:
      import "github.com/redis/go-redis/v9"
      
      func subscribeToRedis(b *EventBroker) {
        client := redis.NewClient(&redis.Options{Addr: "localhost:6379"})
        pubsub := client.Subscribe(context.Background(), "events")
        for msg := range pubsub.Channel() {
          b.broadcast <- msg.Payload
        }
      }
      
    • Servers subscribe to the Redis channel, and a separate process publishes events.
  3. Connection Timeouts:
    • Set a timeout to close idle connections:
      http.Server{
        Addr:         ":8080",
        Handler:      nil,
        ReadTimeout:  30 * time.Second,
        WriteTimeout: 30 * time.Second,
      }.ListenAndServe()
      
  4. Error Handling:
    • Log errors when writing to clients fails (e.g., broken connections).
    • Gracefully handle client disconnections to avoid goroutine leaks.
  5. Testing:
    • Use tools like curl to test the SSE endpoint:
      curl http://localhost:8080/events
      
    • Write unit tests for the broker and handler using Go’s testing package.

Best Practices

  1. Keep It Simple:
    • Use SSE for scenarios where unidirectional updates suffice. For bidirectional communication, consider WebSockets.
  2. Optimize Resource Usage:
    • Limit the number of concurrent connections per server instance.
    • Use a load balancer to distribute clients across multiple instances.
  3. Monitor Connections:
    • Log connection and disconnection events to track usage.
    • Monitor memory and CPU usage, as long-lived connections can accumulate.
  4. Secure the Endpoint:
    • Require authentication (e.g., via tokens in query parameters or headers).
    • Use HTTPS to prevent eavesdropping.
  5. Graceful Shutdown:
    • Implement graceful shutdown to close client connections cleanly:
      server := &http.Server{Addr: ":8080"}
      go server.ListenAndServe()
      // On shutdown
      server.Shutdown(context.Background())
      

Conclusion

SSE is a powerful, lightweight technology for real-time server-to-client updates, and Go’s concurrency model makes it an excellent choice for building scalable SSE servers. The example above provides a foundation for broadcasting events, which you can extend with features like event persistence, reconnection support, or integration with message queues.

Comments