1. Queues Fundamentals
1// Service A (Producer) 2async function serviceA() { 3 const channel = await connection.createChannel(); 4 await channel.assertExchange('orders_exchange', 'topic'); 5 6 // Only publishes, doesn't need to know about queues 7 channel.publish('orders_exchange', 'order.created', 8 Buffer.from('new order')); 9} 10 11// Service B (Consumer) 12async function serviceB() { 13 const channel = await connection.createChannel(); 14 await channel.assertExchange('orders_exchange', 'topic'); 15 await channel.assertQueue('order_processing_queue'); 16 await channel.bindQueue('order_processing_queue', 'orders_exchange', 17 'order.created'); 18 19 channel.consume('order_processing_queue', msg => { 20 // Process order 21 }); 22}
2. Queues Structure
3. File Structure

Let's explore these files one by one.
3.1. QueueName.ts
1enum QueueName { 2 LLM_SUMMARY_STEP = "LLM_SUMMARY_STEP", 3 LLM_TRANSLATION_STEP = "LLM_TRANSLATION_STEP", 4 LLM_REPLY_STEP = "LLM_REPLY_STEP", 5 INSERT_ALGOLIOA = "INSERT_ALGOLIOA", 6 UPDATE_ALGOLIA = "UPDATE_ALGOLIA", 7 FLASK_EXCEL_GENERATION = "FLASK_EXCEL_GENERATION", 8 SNOOZE_AND_PIN = "SNOOZE_AND_PIN", 9 SNOOZE_AND_PIN_DEAD_LETTER = "SNOOZE_AND_PIN_DEAD_LETTER", 10 EXPORT_REPORT = "EXPORT_REPORT", 11 LLM_USAGE_LOG = "LLM_USAGE_LOG", 12 LLM_IMPACT_UPDATE = "LLM_IMPACT_UPDATE" 13} 14 15export default QueueName;
3.2. RoutingKey.ts
1enum QueueName { 2 LLM_SUMMARY_STEP = "LLM_SUMMARY_STEP", 3 LLM_TRANSLATION_STEP = "LLM_TRANSLATION_STEP", 4 LLM_REPLY_STEP = "LLMREPLY_STEP", 5 INSERT_ALGOLIOA = "INSERT_ALGOLIOA", 6 UPDATE_ALGOLIA = "UPDATE_ALGOLIA", 7 FLASK_EXCEL_GENERATION = "FLASK_EXCEL_GENERATION", 8 SNOOZE_AND_PIN = "SNOOZE_AND_PIN", 9 SNOOZE_AND_PIN_DEAD_LETTER = "SNOOZE_AND_PIN_DEAD_LETTER", 10 EXPORT_REPORT = "EXPORT_REPORT", 11 LLM_USAGE_LOG = "LLM_USAGE_LOG", 12 LLM_IMPACT_UPDATE = "LLM_IMPACT_UPDATE" 13} 14 15export default QueueName;
3.3. channels.ts
1import amqplib, { Channel, Replies } from "amqplib"; 2import logger from "../util/logger"; 3import QueueName from "./QueueName"; 4import queueBinding from "./queueBinding"; 5 6const AMQP_URL = process.env.AMQP_URL || "" 7 8export const NORMAL_EXCHANGE = "Billie" 9export const GENERAL_DEAD_EXCHANGE = "GENERAL_DEAD_EXCHANGE" 10 11const Q_CAPACITY = Number(process.env.Q_CAPACITY || "10"); 12 13type ChannelRef = { current: Channel | null }; 14 15const llmTaskChannelRef: ChannelRef = { current: null }; 16const normalTaskChanenlRef: ChannelRef = { current: null }; 17 18 19const getLLMTaskChannel = () => llmTaskChannelRef.current; 20const getNormalTaskChannel = () => normalTaskChanenlRef.current; 21 22const queues: { [k in QueueName]?: Replies.AssertQueue } = {}; 23 24const getQueue = (queueName: QueueName) => queues?.[queueName]; 25 26const initChannels = async (consumptions: (() => void)[]) => { 27 // create channels 28 const connection = await amqplib.connect(AMQP_URL); 29 const llmTaskChannel = await connection.createChannel(); 30 const normalTaskChannel = await connection.createChannel(); 31 32 llmTaskChannel.prefetch(Q_CAPACITY); 33 llmTaskChannelRef.current = llmTaskChannel; 34 normalTaskChanenlRef.current = normalTaskChannel; 35 36 try { 37 await llmTaskChannel.assertExchange(NORMAL_EXCHANGE, "direct", { durable: true }); 38 await llmTaskChannel.assertExchange(GENERAL_DEAD_EXCHANGE, "direct", { durable: true }); 39 } catch (err) { 40 console.log(err); 41 } 42 43 await queueBinding({ queues, llmTaskChannel, normalTaskChannel }); 44 45 for (const consumption of consumptions) { 46 consumption(); 47 } 48 49 logger.info("Channels Inited") 50} 51 52export default { 53 getQueue, 54 getLLMTaskChannel, 55 getNormalTaskChannel, 56 initChannels 57}
- Here
queueBindingandconsumptionsare the most important components of our queue system.
3.4. queueBinding.ts
1import { Channel, Replies } from "amqplib"; 2import QueueName from "./QueueName"; 3import RoutingKey from "./RoutingKey"; 4import { GENERAL_DEAD_EXCHANGE, NORMAL_EXCHANGE } from "./channels"; 5const SNOOZE_PIN_TTL = Number(process.env.SNOOZE_PIN_TTL || "604800000"); 6 7type KeyBindingConfig<Q, R> = { 8 queueName: Q 9 routingKey: R, 10 channel: Channel, 11 exchange: string, 12 deadLetter?: { 13 deadLetterExchange?: string, 14 deadLetterRoutingKey?: R, 15 messageTtl?: number 16 } 17} 18 19const bind = async ( 20 queues: { [key in QueueName]?: Replies.AssertQueue }, 21 config: KeyBindingConfig<QueueName, RoutingKey> 22) => { 23 const { channel, queueName, exchange, routingKey, 24 deadLetter = {} 25 } = config; 26 const q = await channel.assertQueue(queueName, { 27 durable: true, 28 ...deadLetter 29 }); 30 queues[queueName] = q; 31 await channel.bindQueue(queueName, exchange, routingKey); 32} 33 34const queueBinding = async (args: { 35 queues: { [key in QueueName]?: Replies.AssertQueue }, 36 llmTaskChannel: Channel, 37 normalTaskChannel: Channel 38}) => { 39 const { queues, llmTaskChannel, normalTaskChannel } = args; 40 41 const queueConfigs: KeyBindingConfig<QueueName, RoutingKey>[] = [ 42 { 43 queueName: QueueName.LLM_SUMMARY_STEP, 44 routingKey: RoutingKey.LLM_SUMMARY_STEP, 45 channel: llmTaskChannel, 46 exchange: NORMAL_EXCHANGE 47 }, 48 { 49 queueName: QueueName.LLM_TRANSLATION_STEP, 50 routingKey: RoutingKey.LLM_TRANSLATION_STEP, 51 channel: llmTaskChannel, 52 exchange: NORMAL_EXCHANGE 53 }, 54 { 55 queueName: QueueName.LLM_REPLY_STEP, 56 routingKey: RoutingKey.LLM_REPLY_STEP, 57 channel: llmTaskChannel, 58 exchange: NORMAL_EXCHANGE 59 }, 60 { 61 queueName: QueueName.SNOOZE_AND_PIN, 62 routingKey: RoutingKey.SNOOZE_AND_PIN, 63 channel: normalTaskChannel, 64 exchange: NORMAL_EXCHANGE, 65 deadLetter: { 66 deadLetterExchange: GENERAL_DEAD_EXCHANGE, 67 deadLetterRoutingKey: RoutingKey.SNOOZE_AND_PIN_DEAD_LETTER, 68 messageTtl: SNOOZE_PIN_TTL 69 } 70 }, 71 { 72 queueName: QueueName.SNOOZE_AND_PIN_DEAD_LETTER, 73 routingKey: RoutingKey.SNOOZE_AND_PIN_DEAD_LETTER, 74 channel: normalTaskChannel, 75 exchange: GENERAL_DEAD_EXCHANGE, 76 }, 77 { 78 queueName: QueueName.INSERT_ALGOLIOA, 79 routingKey: RoutingKey.INSERT_ALGOLIOA, 80 channel: normalTaskChannel, 81 exchange: NORMAL_EXCHANGE, 82 }, 83 { 84 queueName: QueueName.UPDATE_ALGOLIA, 85 routingKey: RoutingKey.UPDATE_ALGOLIA, 86 channel: normalTaskChannel, 87 exchange: NORMAL_EXCHANGE, 88 }, 89 { 90 queueName: QueueName.FLASK_EXCEL_GENERATION, 91 routingKey: RoutingKey.FLASK_EXCEL_GENERATION, 92 channel: normalTaskChannel, 93 exchange: NORMAL_EXCHANGE, 94 }, 95 { 96 queueName: QueueName.EXPORT_REPORT, 97 routingKey: RoutingKey.EXPORT_REPORT, 98 channel: normalTaskChannel, 99 exchange: NORMAL_EXCHANGE, 100 }, 101 { 102 queueName: QueueName.LLM_USAGE_LOG, 103 routingKey: RoutingKey.LLM_USAGE_LOG, 104 channel: normalTaskChannel, 105 exchange: NORMAL_EXCHANGE 106 }, 107 { 108 queueName: QueueName.LLM_IMPACT_UPDATE, 109 routingKey: RoutingKey.LLM_IMPACT_UPDATE, 110 channel: normalTaskChannel, 111 exchange: NORMAL_EXCHANGE 112 } 113 ] 114 115 for (const queueConfig of queueConfigs) { 116 await bind(queues, queueConfig); 117 } 118} 119 120export default queueBinding;
3.5. consumptions.ts
1import llmSummaryQueue from "./llmSummaryQueue"; 2import algoliaUpdateQueue from "./algoliaUpdateQueue"; 3import excelGenReqToFlaskQueue from "./excelGenReqToFlaskQueue"; 4import llmReplyQueue from "./llmReplyQueues"; 5import llmTranslationQueue from "./llmTranslationQueue"; 6import llmUsageLogQueue from "./llmUsageLogQueues"; 7import snoozeAndPinDeadLetterQueue from "./snoozeAndPinDeadLetterQueue"; 8import snoozeAndPinQueue from "./snoozeAndPinQueue"; 9import llmImpactUpdateQueue from "./llmImpactUpdateQueue"; 10 11export default [ 12 algoliaUpdateQueue.initConsumption, 13 excelGenReqToFlaskQueue.initConsumption, 14 llmReplyQueue.initConsumption, 15 llmSummaryQueue.initConsumption, 16 llmTranslationQueue.initConsumption, 17 llmUsageLogQueue.initConsumption, 18 snoozeAndPinDeadLetterQueue.initConsumption, 19 // snoozeAndPinQueue.initConsumption, <---- don't add this 20 llmImpactUpdateQueue.initConsumption, 21];
initConsumption is a method of our custom MessageQueue class which simplify our code by templating the boilerplate code:
3.6. MessageQueue Class (model/MessageQueue.ts)
1import { Channel } from "amqplib"; 2import { MessageErrorModel } from "../../db/mongo/models/MessageErrorLog"; 3import gmailService from "../../service/gmailService"; 4import logger from "../../util/logger"; 5import QueueName from "../QueueName"; 6import RoutingKey from "../RoutingKey"; 7import channels, { NORMAL_EXCHANGE } from "../channels"; 8 9const ERROR_EMAIL_RECEIVER = process.env.ERROR_EMAIL_RECEIVER; 10const env = process.env.env; 11 12export default class MessageQueue<MessageType> { 13 private queueName: QueueName; 14 private routingKey: RoutingKey; 15 private channel: () => (Channel | null); 16 private exchange = NORMAL_EXCHANGE; 17 public consumption?: (decoded: MessageType) => void | Promise<void>; 18 19 constructor(args: { 20 queueName: QueueName, 21 routingKey: RoutingKey, 22 channel: () => (Channel | null), 23 consumption?: (decoded: MessageType) => void | Promise<void>, 24 exchange?: string 25 }) { 26 const { exchange = NORMAL_EXCHANGE } = args 27 this.queueName = args.queueName; 28 this.routingKey = args.routingKey; 29 this.channel = args.channel; 30 this.consumption = args.consumption; 31 this.exchange = exchange; 32 } 33 34 public publish = (msg: MessageType) => { 35 const refinedMsg = msg as MessageType & { routingKey: RoutingKey }; 36 refinedMsg.routingKey = this.routingKey; 37 const msg_ = JSON.stringify(refinedMsg); 38 39 this.channel()?.publish( 40 this.exchange, 41 this.routingKey, 42 Buffer.from(msg_) 43 ); 44 } 45 46 public initConsumption = () => { 47 const q = channels.getQueue(this.queueName); 48 if (!this.channel()) { 49 throw new Error(`llmTaskChannel cannot be established`); 50 } 51 if (!q) { 52 throw new Error(` q: ${this.routingKey} cannot be established`); 53 } 54 this.channel()?.consume(q.queue, (msg) => { 55 const msg_ = msg?.content.toString(); 56 const decodedMsg: MessageType = JSON.parse(msg_ || '{ "message": "msg_ is null" }'); 57 if (!msg?.content) { 58 logger.info("null message"); 59 return; 60 } 61 try { 62 logger.info(`[${this.routingKey}]: processing msg ${msg_}`) 63 const result = this.consumption?.(decodedMsg); 64 if (result instanceof Promise) { 65 // synchronous call cannot catch the error thrown inside a promise. 66 result 67 .then(() => { 68 this.channel()?.ack(msg); 69 }).catch(err => { 70 const errorLog = new MessageErrorModel({ 71 msg: { 72 err: err?.message || "", 73 step: (decodedMsg as { routingKey?: string }).routingKey || "", 74 param: decodedMsg 75 } 76 }) 77 gmailService.sendEmail({ 78 to: ERROR_EMAIL_RECEIVER || "", 79 html: `<div> 80 <div>Error Message: 81 <p/> 82 <div> 83 ${JSON.stringify(decodedMsg, null, 2)} 84 </div> 85 <p>The same message is also logged in mongodb.</p> 86 </div>`, 87 subject: `Error message from ${env?.toUpperCase()} environment`, 88 text: `Error message from ${env?.toUpperCase()} environment` 89 }) 90 errorLog.save().then(() => { 91 this.channel()?.nack(msg, false, false); 92 }) 93 }); 94 } else { 95 this.channel()?.ack(msg); 96 } 97 } catch (err) { 98 const errorLog = new MessageErrorModel({ 99 msg: { 100 err: (err as { message?: string })?.message || "", 101 step: (decodedMsg as { routingKey?: string }).routingKey || "", 102 param: decodedMsg 103 } 104 }) 105 gmailService.sendEmail({ 106 to: ERROR_EMAIL_RECEIVER || "", 107 html: `<div> 108 <div>Error Message: 109 <p/> 110 <div> 111 ${JSON.stringify(decodedMsg, null, 2)} 112 </div> 113 <p>The same message is also logged in mongodb.</p> 114 </div>`, 115 subject: `Error message from ${env?.toUpperCase()} environment`, 116 text: `Error message from ${env?.toUpperCase()} environment` 117 }) 118 errorLog.save().then(() => { 119 this.channel()?.nack(msg, false, false); 120 }) 121 } 122 }, { noAck: false }); 123 } 124}
- Here the
try-catchlogic seems a bit repetitive. - But note that a normal
try-catchwould not catch the error thrown inside a promise, therefore we need to repeatedly catch the error. - Here we reject a message by either throwing an error explicitly (for example, we might want to try catch to execute custom logging logic, and then throw the error again)
- or by letting the program throw any error.
4. Example of Queues
Note that by using MessageQueue class we can pay all our attention to writing consumption logic.
4.1. Normal Task Queue
4.1.1. excelGenReqToFlaskQueue
1import LLMStatus from "../../constants/LLMStatus"; 2import { db } from "../../db/kysely/database"; 3 4import { MessageErrorModel } from "../../db/mongo/models/MessageErrorLog"; 5import { SummaryLangChoice } from "../../dto/dto"; 6import chatService from "../../service/chatService"; 7import RedisUtil from "../../util/RedisUtil"; 8import QueueName from "../QueueName"; 9import RoutingKey from "../RoutingKey"; 10import channels from "../channels"; 11import MessageQueue from "../model/MessageQueue"; 12 13const llmTaskChannel = () => channels.getLLMTaskChannel(); 14 15const excelGenReqToFlaskQueue = new MessageQueue<{ 16 roomId: string, lang: SummaryLangChoice 17}>({ 18 channel: llmTaskChannel, 19 queueName: QueueName.FLASK_EXCEL_GENERATION, 20 routingKey: RoutingKey.FLASK_EXCEL_GENERATION, 21 consumption: async (decoded) => { 22 const { lang, roomId } = decoded; 23 try { 24 await chatService.dispatchExcelGenerationTaskToFlask(roomId, lang); 25 await RedisUtil.setLLMStatus(roomId, LLMStatus.FINISHED); 26 } catch (err) { 27 const errorLog = new MessageErrorModel({ 28 msg: { 29 err: err, 30 step: "excelGenReqToFlaskQueue", 31 param: decoded 32 } 33 }) 34 // only session will use this excel generation function 35 await db.updateTable("MessagesSession") 36 .set({ isLiveEnded: false }).where("MessagesSession.id", "=", roomId).execute(); 37 await errorLog.save(); 38 RedisUtil.setLLMStatus(roomId, LLMStatus.FAILED); 39 throw err; 40 } 41 } 42}) 43 44export default excelGenReqToFlaskQueue;
4.2. Dead-Letter Queues
4.2.1. snoozeAndPinQueue.ts
Note that this queue is supposed to be a delayed task queue, no consumption should be inited. Otherwise we have to at least ack, nack, reject which violates our purpose to let the message expire automatically.
1import { SnoozeAndPinMessage } from "../../dto/dto"; 2import QueueName from "../QueueName"; 3import RoutingKey from "../RoutingKey"; 4import channels from "../channels"; 5import MessageQueue from "../model/MessageQueue"; 6 7const normalTaskChannel = () => channels.getNormalTaskChannel(); 8 9const snoozeAndPinQueue = new MessageQueue<SnoozeAndPinMessage> ({ 10 channel: normalTaskChannel, 11 queueName: QueueName.SNOOZE_AND_PIN, 12 routingKey: RoutingKey.SNOOZE_AND_PIN, 13}); 14 15export default snoozeAndPinQueue;
Here we have leave the consumption field empty and we have not put it inside the list of consumptions.ts.
4.2.2. snoozeAndPinDeadLetterQueue.ts
According to our configuration in queueBinding.ts, after SNOOZE_PIN_TTL ms, the message from QueueName.SNOOZE_AND_PIN will be redirected to RoutingKey.SNOOZE_AND_PIN_DEAD_LETTER via GENERAL_DEAD_EXCHANGE exchange.
1import nonDraftsCache from "../../caching/nonDraftsCache"; 2import { db } from "../../db/kysely/database"; 3import { SnoozeAndPinMessage } from "../../dto/dto"; 4import QueueName from "../QueueName"; 5import RoutingKey from "../RoutingKey"; 6import channels from "../channels"; 7import MessageQueue from "../model/MessageQueue"; 8 9const normalTaskChannel = () => channels.getNormalTaskChannel(); 10 11const snoozeAndPinDeadLetterQueue = new MessageQueue<SnoozeAndPinMessage>({ 12 channel: normalTaskChannel, 13 queueName: QueueName.SNOOZE_AND_PIN_DEAD_LETTER, 14 routingKey: RoutingKey.SNOOZE_AND_PIN_DEAD_LETTER, 15 consumption: async (decoded) => { 16 const { sessionId, channelId, isAdmin } = decoded; 17 await db.updateTable("MessagesSession") 18 .set({ 19 prioritizedOrSnoozedAt: 0, 20 sortingTimestamp: eb => eb.selectFrom("MessagesSession as NewSession") 21 .select("NewSession.createdAt") 22 .where("NewSession.id", "=", sessionId) 23 }) 24 .where("MessagesSession.id", "=", sessionId) 25 .execute(); 26 27 const { customClearCache } = nonDraftsCache.setCacheKey({ channelId, isAdmin }); 28 await customClearCache(); 29 } 30}); 31 32export default snoozeAndPinDeadLetterQueue;












