186 lines
6.5 KiB
TypeScript
186 lines
6.5 KiB
TypeScript
import amqp from "amqplib";
|
|
import { AppDataSource } from "../database/data-source";
|
|
import { Command } from "../entities/Command";
|
|
import { commandTypePath } from "../interfaces/utils";
|
|
import CallAPI from "../interfaces/call-api";
|
|
import HttpError from "../interfaces/http-error";
|
|
import HttpStatusCode from "../interfaces/http-status";
|
|
import { RequestWithUser } from "../middlewares/user";
|
|
import { PosMaster } from "../entities/PosMaster";
|
|
import { Profile } from "../entities/Profile";
|
|
|
|
export let sendToQueue: (payload: any) => void;
|
|
export let sendToQueueOrg: (payload: any) => void;
|
|
export async function init() {
|
|
//----> (1) Producer
|
|
if (!process.env.AMQ_URL || !process.env.AMQ_QUEUE || !process.env.AMQ_QUEUE_ORG) return;
|
|
|
|
const { AMQ_URL: url, AMQ_QUEUE: queue, AMQ_QUEUE_ORG: queue_org } = process.env; //----> (1.2) get url and queue from .env
|
|
|
|
const connection = await amqp.connect(url); //----> (1.3) set up url with amqp protocol
|
|
|
|
console.log(connection ? "[AMQ] connection success" : "[AMQ] connection failed");
|
|
|
|
const channel = await connection.createChannel(); //----> (1.4) create Channel
|
|
|
|
console.log(channel ? "[AMQ] create channel success" : "[AMQ] create channel failed");
|
|
|
|
channel.assertQueue(queue, { durable: true }); //----> (1.5) assert queue and set durable (if "true" save to disk on RabbitMQ)
|
|
channel.assertQueue(queue_org, { durable: true });
|
|
channel.prefetch(1);
|
|
|
|
sendToQueue = (payload: any, persistent = true) => {
|
|
//----> (2) sendQueue To RabbitMQ and set persistent (if "true" redo the failed queue when server run again)
|
|
channel.sendToQueue(queue, Buffer.from(JSON.stringify(payload)), {
|
|
persistent,
|
|
});
|
|
};
|
|
|
|
sendToQueueOrg = (payload: any, persistent = true) => {
|
|
channel.sendToQueue(queue_org, Buffer.from(JSON.stringify(payload)), { persistent });
|
|
};
|
|
|
|
console.log("[AMQ] Listening for message...");
|
|
|
|
createConsumer(queue, channel, handler); //----> (3) Process Consumer
|
|
createConsumer(queue_org, channel, handler_org);
|
|
// createConsumer(queue2, channel, handler2);
|
|
}
|
|
|
|
let retries = 0;
|
|
|
|
function createConsumer( //----> consumer
|
|
queue: string,
|
|
channel: amqp.Channel,
|
|
handler: (msg: amqp.ConsumeMessage) => Promise<boolean> | boolean,
|
|
) {
|
|
channel.consume(
|
|
queue,
|
|
async (msg) => {
|
|
if (!msg) return;
|
|
|
|
if ((await handler(msg)) || retries++ >= 3) {
|
|
retries = 0;
|
|
return channel.ack(msg);
|
|
}
|
|
|
|
return await new Promise((resolve) => setTimeout(() => resolve(channel.nack(msg)), 3000));
|
|
},
|
|
{ noAck: false },
|
|
);
|
|
console.log(queue ? "[AMQ] create consumer success" : "[AMQ] create consumer failed");
|
|
}
|
|
|
|
async function handler(msg: amqp.ConsumeMessage): Promise<boolean> {
|
|
console.log(msg ? "[AMQ] on handler" : "[AMQ] don't handler");
|
|
//----> condition before process consumer
|
|
const repo = AppDataSource.getRepository(Command);
|
|
const { data, token, user } = JSON.parse(msg.content.toString());
|
|
|
|
console.log(repo ? "[AMQ] have repo" : "[AMQ] don't have repo");
|
|
|
|
const { id, status, lastUpdateUserId, lastUpdateFullName, lastUpdatedAt } = data;
|
|
|
|
console.log(data ? "[AMQ] have data" : "[AMQ] don't have data");
|
|
|
|
const command = await repo.findOne({
|
|
where: { id: id },
|
|
relations: ["commandType", "commandRecives"],
|
|
});
|
|
|
|
console.log(command ? "[AMQ] have command" : "[AMQ] don't have command");
|
|
|
|
if (!command) return true;
|
|
|
|
const path = commandTypePath(command.commandType.code);
|
|
if (path == null) throw new HttpError(HttpStatusCode.NOT_FOUND, "ไม่พบประเภทคำสั่งนี้ในระบบ");
|
|
|
|
console.log(path ? "[AMQ] have path" : "[AMQ] don't have path");
|
|
|
|
return await new CallAPI()
|
|
.PostData(
|
|
{
|
|
headers: { authorization: token }, //time bomb
|
|
},
|
|
path + "/excecute",
|
|
{
|
|
refIds: command.commandRecives
|
|
.filter((x) => x.refId != null)
|
|
.map((x) => ({
|
|
refId: x.refId,
|
|
commandAffectDate: command.commandAffectDate,
|
|
commandNo: command.commandNo,
|
|
commandYear: command.commandYear,
|
|
commandId: command.id,
|
|
templateDoc: command.positionDetail,
|
|
amount: x.amount,
|
|
positionSalaryAmount: x.positionSalaryAmount,
|
|
mouthSalaryAmount: x.mouthSalaryAmount,
|
|
})),
|
|
},
|
|
false,
|
|
)
|
|
.then(async (res) => {
|
|
console.log("[AMQ] Excecute Command Success");
|
|
Object.assign(command, { status, lastUpdateUserId, lastUpdateFullName, lastUpdatedAt });
|
|
const result = await repo.save(command).catch((e) => console.log(e));
|
|
if (result) return true;
|
|
return false;
|
|
})
|
|
.catch((e) => {
|
|
console.error(e);
|
|
return false;
|
|
});
|
|
}
|
|
|
|
async function handler_org(msg: amqp.ConsumeMessage): Promise<boolean> {
|
|
//----> condition before process consume
|
|
const repoPosmaster = AppDataSource.getRepository(PosMaster);
|
|
const repoProfile = AppDataSource.getRepository(Profile);
|
|
const { data, token, user } = JSON.parse(msg.content.toString());
|
|
const { id, status, lastUpdateUserId, lastUpdateFullName, lastUpdatedAt } = data;
|
|
try {
|
|
const posMaster = await repoPosmaster.find({
|
|
where: { orgRevisionId: id },
|
|
relations: [
|
|
"orgRoot",
|
|
"orgChild4",
|
|
"orgChild3",
|
|
"orgChild2",
|
|
"orgChild1",
|
|
"positions",
|
|
"positions.posLevel",
|
|
"positions.posType",
|
|
"positions.posExecutive",
|
|
],
|
|
});
|
|
await Promise.all(
|
|
posMaster.map(async (item) => {
|
|
if (item.next_holderId != null && status == "NOW") {
|
|
const profile = await repoProfile.findOne({
|
|
where: { id: item.next_holderId == null ? "" : item.next_holderId },
|
|
});
|
|
const position = await item.positions.find((x) => x.positionIsSelected == true);
|
|
const _null: any = null;
|
|
if (profile != null) {
|
|
profile.posLevelId = position?.posLevelId ?? _null;
|
|
profile.posTypeId = position?.posTypeId ?? _null;
|
|
profile.position = position?.positionName ?? _null;
|
|
await repoProfile.save(profile);
|
|
}
|
|
}
|
|
item.current_holderId = item.next_holderId;
|
|
item.next_holderId = null;
|
|
item.lastUpdateUserId = lastUpdateUserId;
|
|
item.lastUpdateFullName = lastUpdateFullName;
|
|
item.lastUpdatedAt = lastUpdatedAt;
|
|
await repoPosmaster.save(item).catch((e) => console.log(e));
|
|
}),
|
|
);
|
|
console.log("[AMQ] Excecute Organization Success");
|
|
return true;
|
|
} catch (error) {
|
|
console.error(error);
|
|
return false;
|
|
}
|
|
}
|