เผยแพร่โครงสร้างเพิ่ม rabbit mq #666
This commit is contained in:
parent
2dcaa8ae4c
commit
9d9795c614
4 changed files with 105 additions and 111 deletions
|
|
@ -6,20 +6,23 @@ import CallAPI from "../interfaces/call-api";
|
|||
import HttpError from "../interfaces/http-error";
|
||||
import HttpStatusCode from "../interfaces/http-status";
|
||||
import { RequestWithUser } from "../middlewares/user";
|
||||
import { PosMaster } from "../entities/PosMaster";
|
||||
import { Profile } from "../entities/Profile";
|
||||
|
||||
export let sendToQueue: (payload: any) => void;
|
||||
|
||||
export let sendToQueueOrg: (payload: any) => void;
|
||||
export async function init() {
|
||||
//----> (1) Producer
|
||||
if (!process.env.AMQ_URL || !process.env.AMQ_QUEUE) return;
|
||||
if (!process.env.AMQ_URL || !process.env.AMQ_QUEUE || !process.env.AMQ_QUEUE_ORG) return;
|
||||
|
||||
const { AMQ_URL: url, AMQ_QUEUE: queue } = process.env; //----> (1.2) get url and queue from .env
|
||||
const { AMQ_URL: url, AMQ_QUEUE: queue, AMQ_QUEUE_ORG: queue_org } = process.env; //----> (1.2) get url and queue from .env
|
||||
|
||||
const connection = await amqp.connect(url); //----> (1.3) set up url with amqp protocol
|
||||
|
||||
const channel = await connection.createChannel(); //----> (1.4) create Channel
|
||||
|
||||
channel.assertQueue(queue, { durable: true }); //----> (1.5) assert queue and set durable (if "true" save to disk on RabbitMQ)
|
||||
channel.assertQueue(queue_org, { durable: true });
|
||||
channel.prefetch(1);
|
||||
|
||||
sendToQueue = (payload: any, persistent = true) => {
|
||||
|
|
@ -29,10 +32,14 @@ export async function init() {
|
|||
});
|
||||
};
|
||||
|
||||
sendToQueueOrg = (payload: any, persistent = true) => {
|
||||
channel.sendToQueue(queue_org, Buffer.from(JSON.stringify(payload)), { persistent });
|
||||
};
|
||||
|
||||
console.log("[AMQ] Listening for message...");
|
||||
|
||||
createConsumer(queue, channel, handler); //----> (3) Process Consumer
|
||||
// createConsumer(queue1, channel, handler1);
|
||||
createConsumer(queue_org, channel, handler_org);
|
||||
// createConsumer(queue2, channel, handler2);
|
||||
}
|
||||
|
||||
|
|
@ -107,24 +114,54 @@ async function handler(msg: amqp.ConsumeMessage): Promise<boolean> {
|
|||
});
|
||||
}
|
||||
|
||||
// async function handler(msg: amqp.ConsumeMessage): Promise<boolean> { //----> condition before process consumer
|
||||
// const repo = AppDataSource.getRepository(Command);
|
||||
|
||||
// const { id, status, lastUpdateUserId, lastUpdateFullName, lastUpdatedAt } = JSON.parse(
|
||||
// msg.content.toString(),
|
||||
// );
|
||||
|
||||
// const record = await repo.findOne({
|
||||
// where: { id },
|
||||
// });
|
||||
|
||||
// if (!record) return true;
|
||||
|
||||
// Object.assign(record, { status, lastUpdateUserId, lastUpdateFullName, lastUpdatedAt });
|
||||
|
||||
// const result = await repo.save(record).catch((e) => console.log(e));
|
||||
|
||||
// if (result) return true;
|
||||
|
||||
// return false;
|
||||
// }
|
||||
async function handler_org(msg: amqp.ConsumeMessage): Promise<boolean> {
|
||||
//----> condition before process consume
|
||||
const repoPosmaster = AppDataSource.getRepository(PosMaster);
|
||||
const repoProfile = AppDataSource.getRepository(Profile);
|
||||
const { data, token, user } = JSON.parse(msg.content.toString());
|
||||
const { id, status, lastUpdateUserId, lastUpdateFullName, lastUpdatedAt } = data;
|
||||
try {
|
||||
const posMaster = await repoPosmaster.find({
|
||||
where: { orgRevisionId: id },
|
||||
relations: [
|
||||
"orgRoot",
|
||||
"orgChild4",
|
||||
"orgChild3",
|
||||
"orgChild2",
|
||||
"orgChild1",
|
||||
"positions",
|
||||
"positions.posLevel",
|
||||
"positions.posType",
|
||||
"positions.posExecutive",
|
||||
],
|
||||
});
|
||||
await Promise.all(
|
||||
posMaster.map(async (item) => {
|
||||
if (item.next_holderId != null && status == "NOW") {
|
||||
const profile = await repoProfile.findOne({
|
||||
where: { id: item.next_holderId == null ? "" : item.next_holderId },
|
||||
});
|
||||
const position = await item.positions.find((x) => x.positionIsSelected == true);
|
||||
const _null: any = null;
|
||||
if (profile != null) {
|
||||
profile.posLevelId = position?.posLevelId ?? _null;
|
||||
profile.posTypeId = position?.posTypeId ?? _null;
|
||||
profile.position = position?.positionName ?? _null;
|
||||
await repoProfile.save(profile);
|
||||
}
|
||||
}
|
||||
item.current_holderId = item.next_holderId;
|
||||
item.next_holderId = null;
|
||||
item.lastUpdateUserId = lastUpdateUserId;
|
||||
item.lastUpdateFullName = lastUpdateFullName;
|
||||
item.lastUpdatedAt = lastUpdatedAt;
|
||||
await repoPosmaster.save(item).catch((e) => console.log(e));
|
||||
}),
|
||||
);
|
||||
console.log("[AMQ] Excecute Organization Success");
|
||||
return true;
|
||||
} catch (error) {
|
||||
console.error(error);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue