diff --git a/src/services/rabbitmq.ts b/src/services/rabbitmq.ts index 0dfdfc85..f97fde19 100644 --- a/src/services/rabbitmq.ts +++ b/src/services/rabbitmq.ts @@ -25,11 +25,9 @@ export async function init() { console.log(channel ? "[AMQ] Create channel success" : "[AMQ] Create channel failed"); - await Promise.all([ - channel.assertQueue(queue, { durable: true }), //----> (1.5) assert queue and set durable (if "true" save to disk on RabbitMQ) - channel.assertQueue(queue_org, { durable: true }), - channel.prefetch(1) - ]); + channel.assertQueue(queue, { durable: true }), //----> (1.5) assert queue and set durable (if "true" save to disk on RabbitMQ) + channel.assertQueue(queue_org, { durable: true }), + channel.prefetch(1) sendToQueue = (payload: any, persistent = true) => { //----> (2) sendQueue To RabbitMQ and set persistent (if "true" redo the failed queue when server run again) @@ -43,16 +41,14 @@ export async function init() { }; console.log("[AMQ] Listening for message..."); - await Promise.all([ - createConsumer(queue, channel, handler), //----> (3) Process Consumer - createConsumer(queue_org, channel, handler_org) - // createConsumer(queue2, channel, handler2); - ]); + createConsumer(queue, channel, handler), //----> (3) Process Consumer + createConsumer(queue_org, channel, handler_org) + // createConsumer(queue2, channel, handler2); } let retries = 0; -async function createConsumer( //----> consumer +function createConsumer( //----> consumer queue: string, channel: amqp.Channel, handler: (msg: amqp.ConsumeMessage) => Promise | boolean, @@ -60,14 +56,13 @@ async function createConsumer( //----> consumer channel.consume( queue, async (msg) => { - console.log("[AMQ] MSQ",msg); if (!msg) return; if ((await handler(msg)) || retries++ >= 3) { retries = 0; - console.log("[AMQ] Process consumer success"); + console.log("[AMQ] Process Consumer success"); return channel.ack(msg); } - console.log("[AMQ] Process consumer fail"); + console.log("[AMQ] Process Consumer fail"); return await new Promise((resolve) => setTimeout(() => resolve(channel.nack(msg)), 3000)); }, { noAck: false }, @@ -75,7 +70,6 @@ async function createConsumer( //----> consumer } async function handler(msg: amqp.ConsumeMessage): Promise { - console.log(msg ? "[AMQ] On handler" : "[AMQ] Not handler"); //----> condition before process consumer const repo = AppDataSource.getRepository(Command); const { data, token, user } = JSON.parse(msg.content.toString());