Stream-Reponse Result:
Usages
Frontend
In my app when I leave a room, I will send an SSE event to the backend, trigger excel file geneation of the room and get the status from event stream:
const leave = () => { dispatch(appSlice.actions.closeAppDialog()); setTimeout(() => { router.replace("/(billie)/"); SSE.createSSE({ eventSource: apiRoutes.GET_SSE_EXCEL_STATUS(roomOid), token: token, subscriptions: [ { key: "EXCEL_STATUS", action: (data: string) => { dispatch( chatSlice.actions.setRoomExcelStatusOnLeave({ roomOid, status: data, }) ); }, }, ], endEvent: { key: "EXCEL_STATUS_END", }}); }, 300); };
When the backend sends an endEventKey
, we will close our sse
instance.
Backend
Suppose that I have an SSE GET
route that has the following controller:
export const getExcel = async (req: Request, res: Response) => { const excelStatusChannel = new SSEChannel({ eventEmitter: chatService.Cache.eventEmitter, channelKey: `EXCEL_STATUS_${req.user?.userOid || ""}`, SSEMsgKey: "EXCEL_STATUS", SSEEndKey: "EXCEL_STATUS_END", res: res }); const ssePublisher = excelStatusChannel.getSSEPublisher(); const { roomOid } = req.params; if (ssePublisher) { await chatService.generateSaveAndSendExcelReport(roomOid, ssePublisher); } }
- Note that we also pass a
WriteStream
res
intoSSE
channel so that - later our
ssePublisher
can write a stream-response to end the channel (seessePublisher.closeChannel()
, which executeskillChannel()
fromSSEChannel
).
Next the final function call is:
const generateSaveAndSendExcelReport = async ( roomOid: string, ssePublisher: SSEChannelPublisher ) => { await requestAndSaveLLMSummaryFromRoomOid(roomOid, ssePublisher); const { excelUrl, room } = await dispatchExcelGenerationTaskToFlask( roomOid, ssePublisher ); ssePublisher.emit("Finished"); ssePublisher.closeChannel(); await sendEmail({ room, excelUrl }); };
Each of requestAndSaveLLMSummaryFromRoomOid
and dispatchExcelGenerationTaskToFlask
has a setInterval
to publish messages to frontend by using ssePublisher.emit("something")
.
Code Implementation
- Here we assume access token is passed by header.
- In case the reader uses cookie to pass token, we just need to modify the function call of the constructor of
Eventsource
to usewithCredential: true
as an option.
On SSE Request Header
Default EventSource
in react
and react-native
does not provide any option to pass headers via the new EventSource()
constructor. We need additional package to replace the native one.
- For
react
, we use eventsource - For
react-native
we use react-native-event-source
Custom SSE Class:
// util/SSE.ts const SSE_MAX_RETRY_COUNT = 5 import Eventsource from "react-native-event-source"; export const sseStore: { current: SSE | null } = { current: null }; type SSEProps = { eventSource: string, subscriptions: { key: string, action: (data: string) => void }[], endEvent: { key: string } | null, token: string, } class SSE { private reconnectionTries: number = 0 private eventSource: SSEProps["eventSource"] = "" private subscriptions: SSEProps["subscriptions"] = [] private endEvent: SSEProps["endEvent"] = null private sse: Eventsource | null = null; private token: string = ""; constructor(params: SSEProps) { const { endEvent, eventSource, subscriptions, token } = params; this.eventSource = eventSource; this.subscriptions = subscriptions; this.endEvent = endEvent this.token = token; } public close = () => { this.sse?.close(); } public subscribe = () => { try { this.sse = new Eventsource(this.eventSource, { headers: { "Authorization": "Bearer " + this.token, } }); } catch (err) { console.log(err); } if (!this.sse) { return; } // listen to all subscriptions this.subscriptions.forEach((event) => { const { action, key } = event; this.sse!.addEventListener(key, (event) => { const data = event.data as string; action(data); }); }) // listen to the only Kill Subscription Event Key if (this.endEvent) { this.sse.addEventListener(this.endEvent.key, () => { this.sse!.close(); }); } // handle connection error if any this.sse.addEventListener("error", () => { if (this.reconnectionTries < SSE_MAX_RETRY_COUNT) { this.reconnectionTries++; console.log("err event, retry"); } else { if (this.sse) { console.log( `${this.reconnectionTries + 1}th attempt, close connection` ); this.sse.close(); } } }) } } const createSSE = (props: SSEProps) => { if (sseStore.current) { sseStore.current.close(); } sseStore.current = new SSE(props); sseStore.current.subscribe(); }; const closeSSE = () => { sseStore.current?.close(); sseStore.current = null; } export default { createSSE, closeSSE };
Code Implementation: SSEChannel class and SSEChannelPublisher class
Let's fix a cached EventEmitter
instance. Let's identify each event emission key as a channel.
// util/SSEChannel.ts import { Response } from "express"; import { EventEmitter, Writable } from "stream"; import logger from "./logger"; import chatService from "../service/chatService"; export class SSEChannelPublisher { private channelKey = ""; private killChannel = () => { }; constructor(props: { channelKey: string, killChannel: () => void }) { this.channelKey = props.channelKey; this.killChannel = props.killChannel; } public closeChannel = () => { this.killChannel(); chatService.Cache.eventEmitter.removeAllListeners(this.channelKey); } public emit = (data: string) => { chatService.Cache.eventEmitter.emit(this.channelKey, data) } } type ChannelProps = { res: Response, eventEmitter: EventEmitter, channelKey: string SSEMsgKey: string SSEEndKey: string } /** * SSE specific Pub/Sub, the behaviour of our subscription is fixed, we just create publisher. */ class SSEChannel { private channelOption: ChannelProps | null = null; private channelEmitter: EventEmitter | null = null; private eventId: number = 0; constructor(props: ChannelProps) { this.channelOption = props; this.channelEmitter = this.channelOption.eventEmitter; this.listen(); } private killChannel = () => { logger.info("Killing the channel ..."); this.writeMessage({ message: "", SSEMsgKey: this.channelOption?.SSEEndKey || "" }); this.channelEmitter?.removeAllListeners(this.channelOption?.channelKey || ""); } public getSSEMsgKey = () => { return this.channelOption?.SSEMsgKey || ""; } public getSSEPublisher = () => { return new SSEChannelPublisher({ channelKey: this.channelOption?.channelKey || "", killChannel: this.killChannel }); } private writeMessage = (props: { SSEMsgKey: string, message: string }) => { const res = this.channelOption?.res; if (!res) { return; } this.eventId++; const { SSEMsgKey: frontendKey, message } = props; res.write(`event: ${frontendKey}\n`); res.write(`data: ${message}\n`); res.write(`id: ${this.eventId}\n\n`); } public listen = () => { const res = this.channelOption?.res; if (!res) { return; } res.writeHead(200, { "Content-Type": "text/event-stream", "Connection": "keep-alive", "Cache-Control": "no-cache", }); const { channelKey, SSEMsgKey } = this.channelOption!; this.channelEmitter!.removeAllListeners(channelKey); this.channelEmitter!.on( channelKey, (message: string) => { logger.info(`Pushing Status ${message} to frontend`); this.writeMessage({ SSEMsgKey, message }); } ); } } export default SSEChannel;