hrms-api-org/src/services/rabbitmq.ts

173 lines
6.2 KiB
TypeScript
Raw Normal View History

2024-10-11 11:05:31 +07:00
import amqp from "amqplib";
import { AppDataSource } from "../database/data-source";
import { Command } from "../entities/Command";
import { commandTypePath } from "../interfaces/utils";
import CallAPI from "../interfaces/call-api";
import HttpError from "../interfaces/http-error";
import HttpStatusCode from "../interfaces/http-status";
import { RequestWithUser } from "../middlewares/user";
import { PosMaster } from "../entities/PosMaster";
import { Profile } from "../entities/Profile";
2024-10-11 11:05:31 +07:00
export let sendToQueue: (payload: any) => void;
export let sendToQueueOrg: (payload: any) => void;
2024-10-18 16:11:22 +07:00
export async function init() {
//----> (1) Producer
if (!process.env.AMQ_URL || !process.env.AMQ_QUEUE || !process.env.AMQ_QUEUE_ORG) return;
2024-10-11 11:05:31 +07:00
const { AMQ_URL: url, AMQ_QUEUE: queue, AMQ_QUEUE_ORG: queue_org } = process.env; //----> (1.2) get url and queue from .env
2024-10-11 11:05:31 +07:00
const connection = await amqp.connect(url); //----> (1.3) set up url with amqp protocol
2024-11-18 12:02:45 +07:00
console.log(connection ? "[AMQ] Connection success" : "[AMQ] Connection failed");
2024-10-11 11:05:31 +07:00
const channel = await connection.createChannel(); //----> (1.4) create Channel
2024-11-18 12:02:45 +07:00
console.log(channel ? "[AMQ] Create channel success" : "[AMQ] Create channel failed");
2024-11-25 11:27:30 +07:00
channel.assertQueue(queue, { durable: true }), //----> (1.5) assert queue and set durable (if "true" save to disk on RabbitMQ)
channel.assertQueue(queue_org, { durable: true }),
channel.prefetch(1)
2024-10-11 11:05:31 +07:00
2024-10-18 16:11:22 +07:00
sendToQueue = (payload: any, persistent = true) => {
//----> (2) sendQueue To RabbitMQ and set persistent (if "true" redo the failed queue when server run again)
2024-10-11 11:05:31 +07:00
channel.sendToQueue(queue, Buffer.from(JSON.stringify(payload)), {
persistent,
});
};
2024-12-24 09:12:12 +07:00
sendToQueueOrg = (payload: any, persistent = true) => {
channel.sendToQueue(queue_org, Buffer.from(JSON.stringify(payload)), { persistent });
};
2024-10-11 11:05:31 +07:00
console.log("[AMQ] Listening for message...");
createConsumer(queue, channel, handler), //----> (3) Process Consumer
createConsumer(queue_org, channel, handler_org)
// createConsumer(queue2, channel, handler2);
2024-10-11 11:05:31 +07:00
}
2024-10-30 10:49:58 +07:00
let retries = 0;
function createConsumer( //----> consumer
2024-10-11 11:05:31 +07:00
queue: string,
channel: amqp.Channel,
handler: (msg: amqp.ConsumeMessage) => Promise<boolean> | boolean,
) {
channel.consume(
queue,
async (msg) => {
if (!msg) return;
2024-10-30 10:49:58 +07:00
if ((await handler(msg)) || retries++ >= 3) {
retries = 0;
console.log("[AMQ] Process Consumer success");
2024-10-30 10:49:58 +07:00
return channel.ack(msg);
}
2024-12-18 10:59:57 +07:00
console.log("[AMQ] Process Consumer failed");
2024-10-11 11:05:31 +07:00
return await new Promise((resolve) => setTimeout(() => resolve(channel.nack(msg)), 3000));
},
{ noAck: false },
);
}
2024-10-18 16:11:22 +07:00
async function handler(msg: amqp.ConsumeMessage): Promise<boolean> {
//----> condition before process consumer
2024-10-11 11:05:31 +07:00
const repo = AppDataSource.getRepository(Command);
const { data, token, user } = JSON.parse(msg.content.toString());
const { id, status, lastUpdateUserId, lastUpdateFullName, lastUpdatedAt } = data;
const command = await repo.findOne({
where: { id: id },
relations: ["commandType", "commandRecives"],
});
if (!command) return true;
const path = commandTypePath(command.commandType.code);
if (path == null) throw new HttpError(HttpStatusCode.NOT_FOUND, "ไม่พบประเภทคำสั่งนี้ในระบบ");
return await new CallAPI()
2024-12-18 10:59:57 +07:00
2024-10-11 11:05:31 +07:00
.PostData(
{
2024-12-18 10:59:57 +07:00
headers: { authorization: token },
2024-10-11 11:05:31 +07:00
},
path + "/excecute",
{
refIds: command.commandRecives
.filter((x) => x.refId != null)
.map((x) => ({
refId: x.refId,
commandAffectDate: command.commandAffectDate,
commandNo: command.commandNo,
commandYear: command.commandYear,
2024-10-18 16:11:22 +07:00
commandId: command.id,
2024-10-11 11:05:31 +07:00
templateDoc: command.positionDetail,
amount: x.amount,
2024-12-06 18:43:07 +07:00
amountSpecial: x.amountSpecial,
2024-10-11 11:05:31 +07:00
positionSalaryAmount: x.positionSalaryAmount,
mouthSalaryAmount: x.mouthSalaryAmount,
})),
},
false,
)
.then(async (res) => {
console.log("[AMQ] Excecute Command Success");
Object.assign(command, { status, lastUpdateUserId, lastUpdateFullName, lastUpdatedAt });
const result = await repo.save(command).catch((e) => console.log(e));
if (result) return true;
return false;
})
.catch((e) => {
console.error(e);
return false;
});
}
async function handler_org(msg: amqp.ConsumeMessage): Promise<boolean> {
//----> condition before process consume
const repoPosmaster = AppDataSource.getRepository(PosMaster);
const repoProfile = AppDataSource.getRepository(Profile);
const { data, token, user } = JSON.parse(msg.content.toString());
const { id, status, lastUpdateUserId, lastUpdateFullName, lastUpdatedAt } = data;
try {
const posMaster = await repoPosmaster.find({
where: { orgRevisionId: id },
relations: [
"orgRoot",
"orgChild4",
"orgChild3",
"orgChild2",
"orgChild1",
"positions",
"positions.posLevel",
"positions.posType",
"positions.posExecutive",
],
});
await Promise.all(
posMaster.map(async (item) => {
if (item.next_holderId != null && status == "NOW") {
const profile = await repoProfile.findOne({
where: { id: item.next_holderId == null ? "" : item.next_holderId },
});
const position = await item.positions.find((x) => x.positionIsSelected == true);
const _null: any = null;
if (profile != null) {
profile.posLevelId = position?.posLevelId ?? _null;
profile.posTypeId = position?.posTypeId ?? _null;
profile.position = position?.positionName ?? _null;
await repoProfile.save(profile);
}
}
item.current_holderId = item.next_holderId;
item.next_holderId = null;
item.lastUpdateUserId = lastUpdateUserId;
item.lastUpdateFullName = lastUpdateFullName;
item.lastUpdatedAt = lastUpdatedAt;
await repoPosmaster.save(item).catch((e) => console.log(e));
}),
);
console.log("[AMQ] Excecute Organization Success");
return true;
} catch (error) {
console.error(error);
2024-10-30 10:49:58 +07:00
return false;
}
}