import amqp from "amqplib"; import { AppDataSource } from "../database/data-source"; import { Command } from "../entities/Command"; import { commandTypePath } from "../interfaces/utils"; import CallAPI from "../interfaces/call-api"; import HttpError from "../interfaces/http-error"; import HttpStatusCode from "../interfaces/http-status"; import { RequestWithUser } from "../middlewares/user"; import { PosMaster } from "../entities/PosMaster"; import { Profile } from "../entities/Profile"; export let sendToQueue: (payload: any) => void; export let sendToQueueOrg: (payload: any) => void; export async function init() { //----> (1) Producer if (!process.env.AMQ_URL || !process.env.AMQ_QUEUE || !process.env.AMQ_QUEUE_ORG) return; const { AMQ_URL: url, AMQ_QUEUE: queue, AMQ_QUEUE_ORG: queue_org } = process.env; //----> (1.2) get url and queue from .env const connection = await amqp.connect(url); //----> (1.3) set up url with amqp protocol const channel = await connection.createChannel(); //----> (1.4) create Channel channel.assertQueue(queue, { durable: true }); //----> (1.5) assert queue and set durable (if "true" save to disk on RabbitMQ) channel.assertQueue(queue_org, { durable: true }); channel.prefetch(1); sendToQueue = (payload: any, persistent = true) => { //----> (2) sendQueue To RabbitMQ and set persistent (if "true" redo the failed queue when server run again) channel.sendToQueue(queue, Buffer.from(JSON.stringify(payload)), { persistent, }); }; sendToQueueOrg = (payload: any, persistent = true) => { channel.sendToQueue(queue_org, Buffer.from(JSON.stringify(payload)), { persistent }); }; console.log("[AMQ] Listening for message..."); createConsumer(queue, channel, handler); //----> (3) Process Consumer createConsumer(queue_org, channel, handler_org); // createConsumer(queue2, channel, handler2); } let retries = 0; function createConsumer( //----> consumer queue: string, channel: amqp.Channel, handler: (msg: amqp.ConsumeMessage) => Promise | boolean, ) { channel.consume( queue, async (msg) => { if (!msg) return; if ((await handler(msg)) || retries++ >= 3) { retries = 0; return channel.ack(msg); } return await new Promise((resolve) => setTimeout(() => resolve(channel.nack(msg)), 3000)); }, { noAck: false }, ); } async function handler(msg: amqp.ConsumeMessage): Promise { //----> condition before process consumer const repo = AppDataSource.getRepository(Command); const { data, token, user } = JSON.parse(msg.content.toString()); const { id, status, lastUpdateUserId, lastUpdateFullName, lastUpdatedAt } = data; const command = await repo.findOne({ where: { id: id }, relations: ["commandType", "commandRecives"], }); if (!command) return true; const path = commandTypePath(command.commandType.code); if (path == null) throw new HttpError(HttpStatusCode.NOT_FOUND, "ไม่พบประเภทคำสั่งนี้ในระบบ"); return await new CallAPI() .PostData( { headers: { authorization: token }, //time bomb }, path + "/excecute", { refIds: command.commandRecives .filter((x) => x.refId != null) .map((x) => ({ refId: x.refId, commandAffectDate: command.commandAffectDate, commandNo: command.commandNo, commandYear: command.commandYear, commandId: command.id, templateDoc: command.positionDetail, amount: x.amount, positionSalaryAmount: x.positionSalaryAmount, mouthSalaryAmount: x.mouthSalaryAmount, })), }, false, ) .then(async (res) => { console.log("[AMQ] Excecute Command Success"); Object.assign(command, { status, lastUpdateUserId, lastUpdateFullName, lastUpdatedAt }); const result = await repo.save(command).catch((e) => console.log(e)); if (result) return true; return false; }) .catch((e) => { console.error(e); return false; }); } async function handler_org(msg: amqp.ConsumeMessage): Promise { //----> condition before process consume const repoPosmaster = AppDataSource.getRepository(PosMaster); const repoProfile = AppDataSource.getRepository(Profile); const { data, token, user } = JSON.parse(msg.content.toString()); const { id, status, lastUpdateUserId, lastUpdateFullName, lastUpdatedAt } = data; try { const posMaster = await repoPosmaster.find({ where: { orgRevisionId: id }, relations: [ "orgRoot", "orgChild4", "orgChild3", "orgChild2", "orgChild1", "positions", "positions.posLevel", "positions.posType", "positions.posExecutive", ], }); await Promise.all( posMaster.map(async (item) => { if (item.next_holderId != null && status == "NOW") { const profile = await repoProfile.findOne({ where: { id: item.next_holderId == null ? "" : item.next_holderId }, }); const position = await item.positions.find((x) => x.positionIsSelected == true); const _null: any = null; if (profile != null) { profile.posLevelId = position?.posLevelId ?? _null; profile.posTypeId = position?.posTypeId ?? _null; profile.position = position?.positionName ?? _null; await repoProfile.save(profile); } } item.current_holderId = item.next_holderId; item.next_holderId = null; item.lastUpdateUserId = lastUpdateUserId; item.lastUpdateFullName = lastUpdateFullName; item.lastUpdatedAt = lastUpdatedAt; await repoPosmaster.save(item).catch((e) => console.log(e)); }), ); console.log("[AMQ] Excecute Organization Success"); return true; } catch (error) { console.error(error); return false; } }