rabbitMQ command
This commit is contained in:
parent
cb4fc6defb
commit
608c8967bc
7 changed files with 440 additions and 37 deletions
126
src/services/rabbitmq.ts
Normal file
126
src/services/rabbitmq.ts
Normal file
|
|
@ -0,0 +1,126 @@
|
|||
import amqp from "amqplib";
|
||||
import { AppDataSource } from "../database/data-source";
|
||||
import { Command } from "../entities/Command";
|
||||
import { commandTypePath } from "../interfaces/utils";
|
||||
import CallAPI from "../interfaces/call-api";
|
||||
import HttpError from "../interfaces/http-error";
|
||||
import HttpStatusCode from "../interfaces/http-status";
|
||||
import { RequestWithUser } from "../middlewares/user";
|
||||
|
||||
export let sendToQueue: (payload: any) => void;
|
||||
|
||||
export async function init() { //----> (1) Producer
|
||||
if (!process.env.AMQ_URL || !process.env.AMQ_QUEUE) return;
|
||||
|
||||
const { AMQ_URL: url, AMQ_QUEUE: queue } = process.env; //----> (1.2) get url and queue from .env
|
||||
|
||||
const connection = await amqp.connect(url); //----> (1.3) set up url with amqp protocol
|
||||
|
||||
const channel = await connection.createChannel(); //----> (1.4) create Channel
|
||||
|
||||
channel.assertQueue(queue, { durable: true }); //----> (1.5) assert queue and set durable (if "true" save to disk on RabbitMQ)
|
||||
channel.prefetch(1);
|
||||
|
||||
sendToQueue = (payload: any, persistent = true) => { //----> (2) sendQueue To RabbitMQ and set persistent (if "true" redo the failed queue when server run again)
|
||||
channel.sendToQueue(queue, Buffer.from(JSON.stringify(payload)), {
|
||||
persistent,
|
||||
});
|
||||
};
|
||||
|
||||
console.log("[AMQ] Listening for message...");
|
||||
|
||||
createConsumer(queue, channel, handler); //----> (3) Process Consumer
|
||||
// createConsumer(queue1, channel, handler1);
|
||||
// createConsumer(queue2, channel, handler2);
|
||||
}
|
||||
|
||||
function createConsumer( //----> consumer
|
||||
queue: string,
|
||||
channel: amqp.Channel,
|
||||
handler: (msg: amqp.ConsumeMessage) => Promise<boolean> | boolean,
|
||||
) {
|
||||
channel.consume(
|
||||
queue,
|
||||
async (msg) => {
|
||||
if (!msg) return;
|
||||
|
||||
if (await handler(msg)) return channel.ack(msg);
|
||||
|
||||
return await new Promise((resolve) => setTimeout(() => resolve(channel.nack(msg)), 3000));
|
||||
},
|
||||
{ noAck: false },
|
||||
);
|
||||
}
|
||||
|
||||
async function handler(msg: amqp.ConsumeMessage): Promise<boolean> { //----> condition before process consumer
|
||||
const repo = AppDataSource.getRepository(Command);
|
||||
const { data, token, user } = JSON.parse(msg.content.toString());
|
||||
|
||||
const { id, status, lastUpdateUserId, lastUpdateFullName, lastUpdatedAt } = data;
|
||||
|
||||
const command = await repo.findOne({
|
||||
where: { id: id },
|
||||
relations: ["commandType", "commandRecives"],
|
||||
});
|
||||
|
||||
if (!command) return true;
|
||||
|
||||
const path = commandTypePath(command.commandType.code);
|
||||
if (path == null) throw new HttpError(HttpStatusCode.NOT_FOUND, "ไม่พบประเภทคำสั่งนี้ในระบบ");
|
||||
|
||||
return await new CallAPI()
|
||||
.PostData(
|
||||
{
|
||||
headers: { authorization: token }, //time bomb
|
||||
},
|
||||
path + "/excecute",
|
||||
{
|
||||
refIds: command.commandRecives
|
||||
.filter((x) => x.refId != null)
|
||||
.map((x) => ({
|
||||
refId: x.refId,
|
||||
commandAffectDate: command.commandAffectDate,
|
||||
commandNo: command.commandNo,
|
||||
commandYear: command.commandYear,
|
||||
templateDoc: command.positionDetail,
|
||||
amount: x.amount,
|
||||
positionSalaryAmount: x.positionSalaryAmount,
|
||||
mouthSalaryAmount: x.mouthSalaryAmount,
|
||||
})),
|
||||
},
|
||||
false,
|
||||
)
|
||||
.then(async (res) => {
|
||||
console.log("[AMQ] Excecute Command Success");
|
||||
Object.assign(command, { status, lastUpdateUserId, lastUpdateFullName, lastUpdatedAt });
|
||||
const result = await repo.save(command).catch((e) => console.log(e));
|
||||
if (result) return true;
|
||||
return false;
|
||||
})
|
||||
.catch((e) => {
|
||||
console.error(e);
|
||||
return false;
|
||||
});
|
||||
}
|
||||
|
||||
// async function handler(msg: amqp.ConsumeMessage): Promise<boolean> { //----> condition before process consumer
|
||||
// const repo = AppDataSource.getRepository(Command);
|
||||
|
||||
// const { id, status, lastUpdateUserId, lastUpdateFullName, lastUpdatedAt } = JSON.parse(
|
||||
// msg.content.toString(),
|
||||
// );
|
||||
|
||||
// const record = await repo.findOne({
|
||||
// where: { id },
|
||||
// });
|
||||
|
||||
// if (!record) return true;
|
||||
|
||||
// Object.assign(record, { status, lastUpdateUserId, lastUpdateFullName, lastUpdatedAt });
|
||||
|
||||
// const result = await repo.save(record).catch((e) => console.log(e));
|
||||
|
||||
// if (result) return true;
|
||||
|
||||
// return false;
|
||||
// }
|
||||
Loading…
Add table
Add a link
Reference in a new issue