import amqp from "amqplib"; import { AppDataSource } from "../database/data-source"; import { Command } from "../entities/Command"; import { commandTypePath } from "../interfaces/utils"; import CallAPI from "../interfaces/call-api"; import HttpError from "../interfaces/http-error"; import HttpStatusCode from "../interfaces/http-status"; import { RequestWithUser } from "../middlewares/user"; export let sendToQueue: (payload: any) => void; export async function init() { //----> (1) Producer if (!process.env.AMQ_URL || !process.env.AMQ_QUEUE) return; const { AMQ_URL: url, AMQ_QUEUE: queue } = process.env; //----> (1.2) get url and queue from .env const connection = await amqp.connect(url); //----> (1.3) set up url with amqp protocol const channel = await connection.createChannel(); //----> (1.4) create Channel channel.assertQueue(queue, { durable: true }); //----> (1.5) assert queue and set durable (if "true" save to disk on RabbitMQ) channel.prefetch(1); sendToQueue = (payload: any, persistent = true) => { //----> (2) sendQueue To RabbitMQ and set persistent (if "true" redo the failed queue when server run again) channel.sendToQueue(queue, Buffer.from(JSON.stringify(payload)), { persistent, }); }; console.log("[AMQ] Listening for message..."); createConsumer(queue, channel, handler); //----> (3) Process Consumer // createConsumer(queue1, channel, handler1); // createConsumer(queue2, channel, handler2); } function createConsumer( //----> consumer queue: string, channel: amqp.Channel, handler: (msg: amqp.ConsumeMessage) => Promise | boolean, ) { channel.consume( queue, async (msg) => { if (!msg) return; if (await handler(msg)) return channel.ack(msg); return await new Promise((resolve) => setTimeout(() => resolve(channel.nack(msg)), 3000)); }, { noAck: false }, ); } async function handler(msg: amqp.ConsumeMessage): Promise { //----> condition before process consumer const repo = AppDataSource.getRepository(Command); const { data, token, user } = JSON.parse(msg.content.toString()); const { id, status, lastUpdateUserId, lastUpdateFullName, lastUpdatedAt } = data; const command = await repo.findOne({ where: { id: id }, relations: ["commandType", "commandRecives"], }); if (!command) return true; const path = commandTypePath(command.commandType.code); if (path == null) throw new HttpError(HttpStatusCode.NOT_FOUND, "ไม่พบประเภทคำสั่งนี้ในระบบ"); return await new CallAPI() .PostData( { headers: { authorization: token }, //time bomb }, path + "/excecute", { refIds: command.commandRecives .filter((x) => x.refId != null) .map((x) => ({ refId: x.refId, commandAffectDate: command.commandAffectDate, commandNo: command.commandNo, commandYear: command.commandYear, commandId: command.id, templateDoc: command.positionDetail, amount: x.amount, positionSalaryAmount: x.positionSalaryAmount, mouthSalaryAmount: x.mouthSalaryAmount, })), }, false, ) .then(async (res) => { console.log("[AMQ] Excecute Command Success"); Object.assign(command, { status, lastUpdateUserId, lastUpdateFullName, lastUpdatedAt }); const result = await repo.save(command).catch((e) => console.log(e)); if (result) return true; return false; }) .catch((e) => { console.error(e); return false; }); } // async function handler(msg: amqp.ConsumeMessage): Promise { //----> condition before process consumer // const repo = AppDataSource.getRepository(Command); // const { id, status, lastUpdateUserId, lastUpdateFullName, lastUpdatedAt } = JSON.parse( // msg.content.toString(), // ); // const record = await repo.findOne({ // where: { id }, // }); // if (!record) return true; // Object.assign(record, { status, lastUpdateUserId, lastUpdateFullName, lastUpdatedAt }); // const result = await repo.save(record).catch((e) => console.log(e)); // if (result) return true; // return false; // }