Set up Websocket API from API Gateway
We have a quick guide on how to use the websocket api here:
However, raw websocket api is highly unreliable, especially without socket.io
we need robust mechanism to handle socket reconnection and retry on error. Luckily there is already a popular library:
npm package: reconnecting-websocket
reconnecting-websocket
Let's
yarn add reconnecting-websocket
Set up Ping Route
We need to send data to websocket api regularly on a 9-minutes basis as any idle connection can only be kept for at most 10 minutes.
For that, we define an endpoint to send dummy data:
the dummy data will be
const pingMessage = JSON.stringify({ action: "ping", content: "ping", });
and it will be processed by our lambda function (configured when we create that route):
whereas the lambda function is as simple as:
export const handler = async (event) => { const response = { statusCode: 200, body: "pong", }; return response; };
Set up Route and functions to save and remove connectionId of websocket
$connect
In the following we delegate the persistence of connectionId
to our own backend:
// lambda function for connection export const handler = async (event) => { const connectionId = event.requestContext.connectionId; const userId = event.queryStringParameters.userId; const platform = event.queryStringParameters.platform; await insert(userId, connectionId, platform); return { statusCode: 200, body: "Connected." }; }; const insert = async (userId, connectionId, platform) => { const url = "https://my-own-domain:8080/gateway-websocket/connect"; await fetch(url, { method: "POST", headers: { "Content-Type": "application/json", }, body: JSON.stringify({ userId: userId, connectionId: connectionId, platform: platform, }), }); };
Note that we can provide queryStringParameters
when we make a websocket connection to the endpoint:
const url = `${webSocketURL}?userId=${userId}&platform=${platform}`; // new WebSocket(url) new ReconnectingWebSocket(url);
Here ReconnectingWebSocket
is an enhanced version of Websocket
that will be introduced soon in the upcoming section.
$disconnect
Again we delegate the disconnection (removal of connectionId
from our database) to our own domain:
// lambda function for disconnection export const handler = async (event) => { const connectionId = event.requestContext.connectionId; await deleteConnection(connectionId); return { statusCode: 200, body: "disConnected." }; }; const deleteConnection = async (connectionId) => { try { const response = await fetch( `https://my-own-domain:8080/gateway-websocket/disconnect/${connectionId}`, { method: "DELETE", headers: { "Content-Type": "application/json", }, } ); if (!response.ok) { throw new Error(`HTTP error! status: ${response.status}`); } const data = await response.json(); console.log(data); } catch (error) { console.error("Error:", error); } };
Receive Data from Frontend (React)
Define a custom hook to initiate websocket connection
1import ReconnectingWebSocket from "reconnecting-websocket" 2 3const useAWSGatewaySocket = (newCallbacks: { callback: (data: AllWSResponses) => void }[]) => { 4 const webSocketURL = process.env.VITE_SOCKET_URL 5 const callbackDataRef = useRef(newCallbacks) 6 const userId = useAppSelector((s) => s.auth.userId) 7 const socketRef = useRef<ReconnectingWebSocket | null>(null) 8 const dispatch = useAppDispatch() 9 const setChatSocketConnectionStatus = (on: boolean, socketId: string) => { 10 dispatch(chatSlice.actions.updateChatSocket({ on, socketId })) 11 } 12 const heartbeatInterval = useRef<ReturnType<typeof setInterval> | null>(null) 13 const HEARTBEAT_INTERVAL = 540000 // 9 minutes in milliseconds 14 15 const getSocketConnection = async (userId: string) => { 16 try { 17 if (!userId) { 18 return 19 } 20 if (socketRef.current) { 21 return 22 } 23 const rws = new ReconnectingWebSocket(`${webSocketURL}?userId=${userId}&platform=pc`) 24 socketRef.current = rws 25 26 const pingMessage = JSON.stringify({ 27 action: "ping", 28 content: "ping", 29 }) 30 31 socketRef.current.onerror = (e) => { 32 dispatch(appSlice.actions.setIsConnected(false)) 33 } 34 35 socketRef.current.onopen = (e) => { 36 console.log("websocket connected") 37 dispatch(appSlice.actions.setIsConnected(true)) 38 if (heartbeatInterval.current) { 39 clearInterval(heartbeatInterval.current) 40 } 41 heartbeatInterval.current = setInterval(() => { 42 if (rws.readyState === WebSocket.OPEN) { 43 rws.send(pingMessage) 44 console.log("Heartbeat sent") 45 } 46 }, HEARTBEAT_INTERVAL) 47 } 48 49 socketRef.current.addEventListener("message", (e: MessageEvent<string>) => { 50 // the content in callbackDataRef is ever-changing, 51 // based on the state change passing into the useAWSGatewaySocket hook. 52 // we use useEffect to update the content in callbackDataRef 53 try { 54 const parsedData = JSON.parse(e?.data || "{}") as Payload 55 callbackDataRef.current.forEach(({ callback }) => { 56 callback(parsedData) 57 }) 58 } catch (err) { 59 console.error("websocket incoming event parsing error", err, e) 60 } 61 }) 62 } catch (err) { 63 socketRef.current = null 64 __DEV__ && msgUtil.tmpError(JSON.stringify(err)) 65 } 66 } 67 68 // only disconnect when the component is unmounted 69 useEffect(() => { 70 console.log("userId changed", userId) 71 if (userId) { 72 getSocketConnection(userId) 73 } 74 if (!userId) { 75 socketRef.current?.close() 76 socketRef.current = null 77 } 78 }, [userId]) 79 80 useEffect(() => { 81 callbackDataRef.current = newCallbacks 82 }, [newCallbacks]) 83 84 return { socketRef }
Listeners for incoming socket messages
Note that useAWSGatewaySocket
accepts parameter: (newCallbacks: { callback: (data: Payload) => void }[])
, we can register the listeners as follow:
const someState = ... useAWSGatewaySocket([ { callback: (payload) => { if (payload.type === SomeKey1) { // do something to payload.data } }, }, { callback: (payload) => { if (payload.type === SomeKey2) { // do something to payload.data } }, }, { callback: (payload) => { if (payload.type === SomeKey2) { // this will update the listener when someState changed if (someState === "Something") { // do something to payload.data } } }, } ])
-
We need to agree the type definition of
payload.data
with backend developers. -
Note that by this approach we didn't pass a closure to
socketRef.current.addEventListener
, any state change will directly change the listeners to handle socket messages. -
Therefore our application can behaviour dynamically with different app states.
Send Data from Backend (Kotlin)
There are plenty of examples in nodejs, let's study an example in Kotlin.
Dependencies
// build.gradle dependencies { implementation(platform("aws.sdk.kotlin:bom:1.0.48")) implementation("aws.sdk.kotlin:apigatewaymanagementapi") implementation("aws.sdk.kotlin:aws-core") implementation("aws.smithy.kotlin:http-client-engine-okhttp4:1.3.30") implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.7.3") implementation("org.jetbrains.kotlinx:kotlinx-coroutines-jdk8:1.7.3") ... }
Bean for ApiGatewayManagementClient
ApiGatewayManagementClient
import aws.sdk.kotlin.services.apigatewaymanagementapi.ApiGatewayManagementClient import aws.smithy.kotlin.runtime.net.url.Url import org.springframework.beans.factory.annotation.Value import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration import aws.smithy.kotlin.runtime.http.engine.okhttp4.OkHttp4Engine @Configuration class ApiGatewayManagementApiClientConfig( @Value("\${socket.endpoint}") private val socketEndpoint: String, @Value("\${socket.aws-region}") private val awsRegion: String, ) { @Bean fun createClient(): ApiGatewayManagementClient { return ApiGatewayManagementClient { this.endpointUrl = Url.parse(socketEndpoint) this.region = awsRegion this.httpClient = OkHttp4Engine() } } }
WebsocketNotificationService
data class SocketMessage( val connectionId: String, val payload: Payload ) { data class Payload( val type: String, val data: SomeType ) } @Service class WebsocketNotificationService( private val apiGatewayClient: ApiGatewayManagementClient, ) { private val logger = KotlinLogging.logger {} private val gson = Gson() suspend fun sendMessageToConnection(socketMessage: SocketMessage) { val (connectionId, payload) = socketMessage val request = PostToConnectionRequest { this.connectionId = connectionId this.data = gson.toJson(payload).toByteArray() } apiGatewayClient.postToConnection(request) } }
Back to highlighted line 54 in Define a custom hook to initiate websocket connection section we have
type Payload = { type: string; data: any }; const parsedData = JSON.parse(e?.data || "{}") as Payload;
This e?.data
is exactly (if exists) the json string of Payload
object from kotlin side.