Server-Sent Events (SSE) is a technology that enables a server to push real-time updates to a client over a single, long-lived HTTP connection. It’s built on top of HTTP and is particularly useful for scenarios where the server needs to send updates to the client without the client explicitly requesting them, such as live notifications, stock tickers, or real-time monitoring dashboards.
Principles of SSE
- Unidirectional Communication:
- SSE establishes a one-way communication channel from the server to the client. The client opens a connection, and the server sends events as they occur.
- Unlike WebSockets, which are bidirectional, SSE is simpler and lightweight, suitable for scenarios where the client primarily consumes updates.
- HTTP-Based:
- SSE uses standard HTTP/1.1 or HTTP/2 protocols. The client initiates an HTTP request, and the server keeps the connection open, sending data in a specific format as events occur.
- It leverages the
text/event-streamMIME type to indicate the response is an event stream.
- Event Stream Format:
- Data is sent as plain text in a specific format, with each event separated by double newlines (
\n\n). - The basic structure of an SSE message includes fields like
event,data,id, andretry:event: message\n id: 123\n data: {"message": "Hello, world!"}\n\nevent: Specifies the event type (optional; defaults tomessageif omitted).data: The payload, which can be any text (often JSON).id: A unique identifier for the event, used for reconnection or tracking.retry: Specifies the reconnection time (in milliseconds) if the connection drops.
- Data is sent as plain text in a specific format, with each event separated by double newlines (
- Connection Management:
- The client uses the
EventSourceAPI in browsers to connect to an SSE endpoint. - The connection remains open until the client closes it, the server terminates it, or a network issue occurs.
- SSE supports automatic reconnection by the client, using the
Last-Event-IDheader to resume from the last received event.
- The client uses the
- Advantages:
- Simple to implement compared to WebSockets.
- Built on HTTP, so it works well with existing infrastructure (proxies, load balancers).
- Lightweight, with minimal overhead for unidirectional updates.
- Native browser support via
EventSource.
- Limitations:
- Unidirectional (server to client only).
- Limited to text-based data (though JSON is commonly used).
- Connection limits in browsers (typically 6 concurrent SSE connections per domain).
- Less flexible than WebSockets for complex, bidirectional communication.
Key Knowledge Points
- Client-Side (Browser):
- The
EventSourceAPI is used to connect to an SSE endpoint:const source = new EventSource('/events'); source.onmessage = (event) => { console.log('Received:', event.data); }; source.onerror = () => { console.log('Error occurred'); }; - You can listen for specific event types:
source.addEventListener('customEvent', (event) => { console.log('Custom event:', event.data); }); - The client automatically reconnects if the connection is lost, with a default retry interval (adjustable via the
retryfield).
- The
- Server-Side Requirements:
- The server must respond with
Content-Type: text/event-stream. - It must keep the connection open and send data in the SSE format.
- The server should handle connection timeouts, client disconnections, and optionally support
Last-Event-IDfor resuming streams.
- The server must respond with
- Reconnection and Reliability:
- The
idfield helps track events, allowing clients to resume from the last event using theLast-Event-IDheader. - The
retryfield controls how long the client waits before reconnecting. - Servers can implement logic to buffer recent events for clients that reconnect.
- The
- Scalability:
- SSE connections are long-lived, so the server must handle many concurrent connections efficiently.
- In Go, you can use goroutines and channels to manage event broadcasting to multiple clients.
- For high scalability, consider using a message queue (e.g., Redis, Kafka) to decouple event generation from delivery.
- Security:
- Use HTTPS to secure SSE connections.
- Implement CORS headers if the client and server are on different domains.
- Validate
Last-Event-IDto prevent abuse. - Avoid sending sensitive data unless properly encrypted or authorized.
Implementing SSE in Go
Go’s standard library (net/http) is well-suited for implementing SSE, as it provides low-level control over HTTP responses. Below is a detailed example of an SSE server in Go, along with explanations.
Example: SSE Server in Go
This example demonstrates a simple SSE server that broadcasts messages to all connected clients. It uses goroutines and channels to handle event broadcasting.
package main
import (
"fmt"
"log"
"net/http"
"sync"
"time"
)
// Client represents a connected SSE client
type Client struct {
send chan string // Channel to send messages to the client
}
// EventBroker manages all connected clients and broadcasts events
type EventBroker struct {
clients map[*Client]bool // Connected clients
register chan *Client // Channel to register new clients
unregister chan *Client // Channel to unregister clients
broadcast chan string // Channel to broadcast messages
mu sync.Mutex // Mutex for thread-safe client management
}
func NewEventBroker() *EventBroker {
return &EventBroker{
clients: make(map[*Client]bool),
register: make(chan *Client),
unregister: make(chan *Client),
broadcast: make(chan string),
}
}
// Start runs the broker's main loop
func (b *EventBroker) Start() {
for {
select {
case client := <-b.register:
b.mu.Lock()
b.clients[client] = true
b.mu.Unlock()
log.Printf("New client connected. Total clients: %d", len(b.clients))
case client := <-b.unregister:
b.mu.Lock()
if _, ok := b.clients[client]; ok {
close(client.send)
delete(b.clients, client)
}
b.mu.Unlock()
log.Printf("Client disconnected. Total clients: %d", len(b.clients))
case message := <-b.broadcast:
b.mu.Lock()
for client := range b.clients {
select {
case client.send <- message:
default:
// If the client's channel is blocked, close it
close(client.send)
delete(b.clients, client)
}
}
b.mu.Unlock()
}
}
}
// SSEHandler handles SSE connections
func (b *EventBroker) SSEHandler(w http.ResponseWriter, r *http.Request) {
// Set headers for SSE
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("Access-Control-Allow-Origin", "*") // Adjust for CORS if needed
// Create a new client
client := &Client{send: make(chan string)}
b.register <- client
// Clean up when the client disconnects
defer func() {
b.unregister <- client
}()
// Ensure the response is flushed immediately
if flusher, ok := w.(http.Flusher); ok {
flusher.Flush()
}
// Send messages to the client
for {
select {
case message, ok := <-client.send:
if !ok {
return
}
// Format the message as an SSE event
fmt.Fprintf(w, "data: %s\n\n", message)
if flusher, ok := w.(http.Flusher); ok {
flusher.Flush()
}
case <-r.Context().Done():
// Client closed the connection
return
}
}
}
func main() {
// Create and start the event broker
broker := NewEventBroker()
go broker.Start()
// Simulate broadcasting messages every 5 seconds
go func() {
for i := 1; ; i++ {
message := fmt.Sprintf("Event %d: %s", i, time.Now().Format(time.RFC3339))
broker.broadcast <- message
time.Sleep(5 * time.Second)
}
}()
// Set up the SSE endpoint
http.HandleFunc("/events", broker.SSEHandler)
// Start the server
log.Println("SSE server running on :8080")
if err := http.ListenAndServe(":8080", nil); err != nil {
log.Fatal(err)
}
}
How It Works
- EventBroker:
- Manages all connected clients and handles registration, unregistration, and broadcasting.
- Uses channels (
register,unregister,broadcast) for safe, concurrent communication. - A mutex (
mu) ensures thread-safe access to theclientsmap.
- Client:
- Each client has a dedicated
sendchannel for receiving messages. - When a client disconnects, its channel is closed, and it’s removed from the broker.
- Each client has a dedicated
- SSEHandler:
- Sets the required HTTP headers for SSE (
Content-Type: text/event-stream, etc.). - Registers a new client with the broker.
- Listens for messages on the client’s
sendchannel and writes them in SSE format (data: <message>\n\n). - Uses
http.Flusherto ensure messages are sent immediately. - Detects client disconnection via
r.Context().Done().
- Sets the required HTTP headers for SSE (
- Broadcasting:
- A goroutine simulates sending messages every 5 seconds by pushing them to the
broadcastchannel. - The broker forwards these messages to all connected clients.
- A goroutine simulates sending messages every 5 seconds by pushing them to the
- Client-Side Example:
To test this server, you can use the following JavaScript in a browser:
const source = new EventSource('http://localhost:8080/events'); source.onmessage = (event) => { console.log('Received:', event.data); }; source.onerror = () => { console.log('Error occurred'); };
Running the Example
- Save the Go code as
main.go. - Run it with
go run main.go. - Open a browser and use the JavaScript above (e.g., in the console or a web page) to connect to
http://localhost:8080/events. - You’ll see messages logged every 5 seconds, like
Received: Event 1: 2025-04-17T12:00:00Z.
Advanced Considerations for Go
- Handling Last-Event-ID:
- Check the
Last-Event-IDheader in the request to resume from a specific event:lastEventID := r.Header.Get("Last-Event-ID") if lastEventID != "" { // Logic to send missed events based on lastEventID } - Store recent events in memory or a database to support this.
- Check the
- Scaling with Redis:
- Use a Redis Pub/Sub channel to broadcast events to multiple server instances:
import "github.com/redis/go-redis/v9" func subscribeToRedis(b *EventBroker) { client := redis.NewClient(&redis.Options{Addr: "localhost:6379"}) pubsub := client.Subscribe(context.Background(), "events") for msg := range pubsub.Channel() { b.broadcast <- msg.Payload } } - Servers subscribe to the Redis channel, and a separate process publishes events.
- Use a Redis Pub/Sub channel to broadcast events to multiple server instances:
- Connection Timeouts:
- Set a timeout to close idle connections:
http.Server{ Addr: ":8080", Handler: nil, ReadTimeout: 30 * time.Second, WriteTimeout: 30 * time.Second, }.ListenAndServe()
- Set a timeout to close idle connections:
- Error Handling:
- Log errors when writing to clients fails (e.g., broken connections).
- Gracefully handle client disconnections to avoid goroutine leaks.
- Testing:
- Use tools like
curlto test the SSE endpoint:curl http://localhost:8080/events - Write unit tests for the broker and handler using Go’s
testingpackage.
- Use tools like
Best Practices
- Keep It Simple:
- Use SSE for scenarios where unidirectional updates suffice. For bidirectional communication, consider WebSockets.
- Optimize Resource Usage:
- Limit the number of concurrent connections per server instance.
- Use a load balancer to distribute clients across multiple instances.
- Monitor Connections:
- Log connection and disconnection events to track usage.
- Monitor memory and CPU usage, as long-lived connections can accumulate.
- Secure the Endpoint:
- Require authentication (e.g., via tokens in query parameters or headers).
- Use HTTPS to prevent eavesdropping.
- Graceful Shutdown:
- Implement graceful shutdown to close client connections cleanly:
server := &http.Server{Addr: ":8080"} go server.ListenAndServe() // On shutdown server.Shutdown(context.Background())
- Implement graceful shutdown to close client connections cleanly:
Conclusion
SSE is a powerful, lightweight technology for real-time server-to-client updates, and Go’s concurrency model makes it an excellent choice for building scalable SSE servers. The example above provides a foundation for broadcasting events, which you can extend with features like event persistence, reconnection support, or integration with message queues.
Comments