SSE 实现实时数据推送
在现代Web应用中,实时通信成为了用户体验的重要组成部分。无论是社交媒体的即时通知,还是股票价格的实时更新,SSE(Server-Sent Events)作为一种轻量级的实时通信协议,能够帮助开发者实现高效的单向数据流推送。本篇博客将介绍如何使用Go语言实现一个基于SSE的实时消息推送系统。
什么是SSE?
Server-Sent Events (SSE) 是一种由服务器向浏览器单向推送实时更新的技术。与WebSocket不同,SSE建立在HTTP协议之上,适用于浏览器与服务器之间的单向数据传输。在许多场景中,SSE比WebSocket更加轻量,并且更易于实现,尤其是当你只需要从服务器向客户端推送数据时。
SSE的工作原理是:服务器通过HTTP连接将数据推送给客户端,客户端使用EventSource接口来接收数据流。每当服务器有新的数据时,客户端就会收到相应的更新。
为什么选择SSE?
- 简便易用:SSE的实现非常简单,客户端只需要一个EventSource对象,服务器只需按规范返回数据流。
 - HTTP协议支持:SSE基于HTTP协议,因此不需要建立复杂的连接,它与现有的Web架构兼容。
 - 自动重连:如果连接丢失,SSE会自动尝试重新连接,保证消息的持续性。
 
需求背景
在本篇博客中,我们将使用Go语言构建一个基于SSE的消息推送系统,前端将使用HTML和JavaScript与Go后端进行实时通信。
项目概述
在本项目中,我们将实现以下功能:
- 客户端:前端页面可以输入Token进行身份验证,连接到SSE服务器并接收实时消息。
 - 服务器端:后端使用Go语言处理SSE连接,通过EventSource将数据推送到客户端。我们将使用JWT进行身份验证。
 - 消息推送:后端将消息推送到与客户端建立的SSE连接。
 
技术栈
- Go 语言:后端使用Go实现,处理HTTP请求和SSE通信。
 - HTML/JavaScript:前端实现,负责显示消息并与后端进行交互。
 
后端实现
在后端实现中,我们将分为几个主要部分:
- Client和ClientManager:管理客户端连接。
 - SSE连接处理:处理客户端的SSE连接请求。
 - 消息发送:通过SendMessage方法将消息推送到客户端。
 
1. Client与ClientManager的实现
首先,我们需要定义Client和ClientManager结构体,分别用于表示每一个客户端和管理所有连接的客户端。
type Client struct {
	ID   uint32
	Send chan []byte
}
type ClientManager struct {
	clients *safety.Map[uint32, *Client]
}
- Client:每个客户端都有一个唯一的ID和一个用于发送消息的通道Send。
 - ClientManager:管理所有连接的客户端,提供方法来添加、删除和获取客户端。
 
// NewClientManager 创建新的ClientManager
func NewClientManager() *ClientManager {
	return &ClientManager{
		clients: safety.NewMap[uint32, *Client](),
	}
}
2. 处理SSE连接:NewSSEHandler
NewSSEHandler函数处理客户端的SSE连接。当客户端请求连接时,我们会验证JWT Token并将客户端添加到ClientManager中。
func NewSSEHandler(clientManager *ClientManager) http.HandlerFunc {
	return func(w http.ResponseWriter, r *http.Request) {
		token := r.URL.Query().Get("token")
		if token == "" {
			http.Error(w, "token is required", http.StatusBadRequest)
			return
		}
		// 设置HTTP头部,指定这是一个SSE连接
		w.Header().Set("Content-Type", "text/event-stream")
		w.Header().Set("Cache-Control", "no-cache")
		w.Header().Set("Connection", "keep-alive")
		claims, ok := middleware.ParseJwtClaimsFromToken(strings.TrimPrefix(token, "Bearer "))
		if !ok {
			http.Error(w, "token is invalid", http.StatusUnauthorized)
			return
		}
		client := NewClient(claims.GetUser())
		clientManager.AddClient(client)
		defer func() {
			clientManager.RemoveClient(client.ID)
		}()
		go client.WriteSSE(w)
		<-r.Context().Done()
		log.Infof("client %d disconnected", client.ID)
	}
}
在这段代码中,我们:
- 验证Token并解析JWT。
 - 如果Token有效,创建一个Client对象并将其加入ClientManager。
 - 为每个客户端开启一个goroutine来执行WriteSSE,持续推送数据流。
 
3. 消息推送:WriteSSE和SendMessage
WriteSSE方法负责向客户端推送消息流,使用HTTP流式响应将数据发送给客户端。
func (c *Client) WriteSSE(w http.ResponseWriter) {
	defer after.RecoverX()
	flusher, ok := w.(http.Flusher)
	if !ok {
		log.Errorw("err", "Streaming unsupported!")
		http.Error(w, "Streaming unsupported!", http.StatusInternalServerError)
		return
	}
	log.Debugw("msg", "listen sse client")
	for data := range c.Send {
		log.Debugw("WriteSSE", string(data))
		_, _ = fmt.Fprintf(w, "data: %s\n\n", string(data))
		flusher.Flush()
	}
	log.Debugw("WriteSSE", "data: [DONE]")
}
func (c *Client) SendMessage(message string) (err error) {
	defer func() {
		if r := recover(); r != nil {
			err = fmt.Errorf("send message error: %v", r)
		}
	}()
	c.Send <- []byte(message)
	return
}
- WriteSSE:接收客户端请求后,持续推送消息流,直到连接断开。
 - SendMessage:向客户端发送消息,通过Send通道将消息发送给客户端。
 
前端实现
前端部分非常简洁,使用EventSource对象建立与后端的SSE连接。
1. 连接到SSE服务器
function connectToSSE() {
    const token = document.getElementById('token').value;
    if (!token) {
        alert("Please enter a Token.");
        return;
    }
    // 创建一个EventSource对象,连接到服务器的SSE端点
    const url = new URL(`http://localhost:9999/events`);
    url.searchParams.set("token", token);
    eventSource = new EventSource(url.toString());
    eventSource.onmessage = function(event) {
        const messagesDiv = document.getElementById('messages');
        const newMessage = document.createElement('div');
        newMessage.textContent = `New message: ${event.data}`;
        messagesDiv.appendChild(newMessage);
        messagesDiv.scrollTop = messagesDiv.scrollHeight; // 自动滚动到最新消息
    };
    eventSource.onerror = function() {
        alert("Error connecting to server.");
        eventSource.close();
    };
    eventSource.onopen = function() {
        console.log("SSE connection established.");
    };
}
在此代码中,前端通过输入Token与后端建立连接,并使用EventSource接收实时消息。当消息到达时,自动将其显示在页面上。
2. 发送消息
function sendMessage() {
    const message = document.getElementById('message').value;
    if (!message) {
        alert("Please enter a message.");
        return;
    }
    fetch(`/msg?msg=${message}`, {
        method: 'GET',
        headers: {
            'Authorization': 'Bearer ' + document.getElementById('token').value,
        }
    });
}
用户输入消息后,通过GET请求将消息发送到后端,后端根据Token验证身份并将消息推送到对应的客户端。
总结与优化建议
1. 优点
- 简单易用:SSE相较于WebSocket实现更加简单,不需要处理双向通信。
 - 高效:对于只需要单向推送数据的场景,SSE非常高效,并且可以轻松处理数千个并发连接。
 - 自动重连:SSE内建自动重连机制,保证了连接的稳定性。
 
2. 优化建议
- 连接管理:使用Redis或其他分布式缓存来管理多个实例之间的客户端连接,支持横向扩展。
 - Token验证:目前通过URL传递Token,可以进一步加强安全性,通过HTTP头部传递Token。
 
通过SSE,开发者可以轻松实现实时推送功能,而Go语言则提供了高效的并发处理能力,非常适合用于构建这种类型的服务。