add rabbit noti
This commit is contained in:
parent
968d5f6dad
commit
2a9d00ada3
2 changed files with 83 additions and 124 deletions
|
|
@ -28,13 +28,15 @@ import { PermissionOrg } from "../entities/PermissionOrg";
|
|||
export let sendToQueue: (payload: any) => void;
|
||||
export let sendToQueueOrg: (payload: any) => void;
|
||||
export let sendToQueueOrgDraft: (payload: any) => void;
|
||||
export let sendToQueueCommandNoti: (payload: any) => void;
|
||||
export async function init() {
|
||||
//----> (1) Producer
|
||||
if (
|
||||
!process.env.AMQ_URL ||
|
||||
!process.env.AMQ_QUEUE ||
|
||||
!process.env.AMQ_QUEUE_ORG ||
|
||||
!process.env.AMQ_QUEUE_ORG_DRAFT
|
||||
!process.env.AMQ_QUEUE_ORG_DRAFT ||
|
||||
!process.env.AMQ_QUEUE_COMMAND_NOTI
|
||||
)
|
||||
return;
|
||||
|
||||
|
|
@ -43,6 +45,7 @@ export async function init() {
|
|||
AMQ_QUEUE: queue,
|
||||
AMQ_QUEUE_ORG: queue_org,
|
||||
AMQ_QUEUE_ORG_DRAFT: queue_org_draft,
|
||||
AMQ_QUEUE_COMMAND_NOTI: queue_command_noti,
|
||||
} = process.env; //----> (1.2) get url and queue from .env
|
||||
|
||||
const connection = await amqp.connect(url); //----> (1.3) set up url with amqp protocol
|
||||
|
|
@ -54,9 +57,10 @@ export async function init() {
|
|||
console.log(channel ? "[AMQ] Create channel success" : "[AMQ] Create channel failed");
|
||||
|
||||
channel.assertQueue(queue, { durable: true }), //----> (1.5) assert queue and set durable (if "true" save to disk on RabbitMQ)
|
||||
channel.assertQueue(queue_org, { durable: true }),
|
||||
channel.assertQueue(queue_org_draft, { durable: true }),
|
||||
channel.prefetch(1);
|
||||
channel.assertQueue(queue_org, { durable: true }),
|
||||
channel.assertQueue(queue_org_draft, { durable: true }),
|
||||
channel.assertQueue(queue_command_noti, { durable: true }),
|
||||
channel.prefetch(1);
|
||||
|
||||
sendToQueue = (payload: any, persistent = true) => {
|
||||
//----> (2) sendQueue To RabbitMQ and set persistent (if "true" redo the failed queue when server run again)
|
||||
|
|
@ -73,10 +77,15 @@ export async function init() {
|
|||
channel.sendToQueue(queue_org_draft, Buffer.from(JSON.stringify(payload)), { persistent });
|
||||
};
|
||||
|
||||
sendToQueueCommandNoti = (payload: any, persistent = true) => {
|
||||
channel.sendToQueue(queue_command_noti, Buffer.from(JSON.stringify(payload)), { persistent });
|
||||
};
|
||||
|
||||
console.log("[AMQ] Listening for message...");
|
||||
createConsumer(queue, channel, handler), //----> (3) Process Consumer
|
||||
createConsumer(queue_org, channel, handler_org);
|
||||
createConsumer(queue_org, channel, handler_org);
|
||||
createConsumer(queue_org_draft, channel, handler_org_draft);
|
||||
createConsumer(queue_command_noti, channel, handler_command_noti);
|
||||
// createConsumer(queue2, channel, handler2);
|
||||
}
|
||||
|
||||
|
|
@ -156,6 +165,64 @@ async function handler(msg: amqp.ConsumeMessage): Promise<boolean> {
|
|||
});
|
||||
}
|
||||
|
||||
async function handler_command_noti(msg: amqp.ConsumeMessage): Promise<boolean> {
|
||||
const { data, token, user } = JSON.parse(msg.content.toString());
|
||||
const { profiles, command } = data;
|
||||
|
||||
try {
|
||||
const profilesNotiRequest = new CallAPI()
|
||||
.PostData(
|
||||
{ headers: { authorization: token } },
|
||||
"/placement/noti/profiles",
|
||||
{
|
||||
subject: `${command.issue}`,
|
||||
body: `${command.issue}`,
|
||||
receiverUserIds: profiles,
|
||||
payload: "",// แนบไฟล์ (ถ้าจำเป็น)
|
||||
isSendMail: true,
|
||||
isSendInbox: true,
|
||||
isSendNotification: true,
|
||||
},
|
||||
false
|
||||
);
|
||||
|
||||
let profilesSend = command && command.commandSends.length > 0
|
||||
? command.commandSends
|
||||
.filter((x: any) => x.profileId != null)
|
||||
.map((x: any) => ({
|
||||
receiverUserId: x.profileId,
|
||||
notiLink: "",
|
||||
isSendMail: x.commandSendCCs.map((x: any) => x.name == "EMAIL").length > 0,
|
||||
isSendInbox: x.commandSendCCs.map((x: any) => x.name == "INBOX").length > 0,
|
||||
isSendNotification: true,
|
||||
}))
|
||||
: [];
|
||||
|
||||
const profilesSendRequest = new CallAPI()
|
||||
.PostData(
|
||||
{ headers: { authorization: token } },
|
||||
"/placement/noti/profiles-send",
|
||||
{
|
||||
subject: `${command.issue}`,
|
||||
body: `${command.issue}`,
|
||||
receiverUserIds: profilesSend,
|
||||
payload: "", // แนบไฟล์ (ถ้าจำเป็น)
|
||||
},
|
||||
false
|
||||
);
|
||||
|
||||
await Promise.all([profilesNotiRequest, profilesSendRequest]);
|
||||
|
||||
console.log("[AMQ] Send Notification Success");
|
||||
return true;
|
||||
|
||||
} catch (error) {
|
||||
console.error("[AMQ] Error:", error);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
async function handler_org(msg: amqp.ConsumeMessage): Promise<boolean> {
|
||||
//----> condition before process consume
|
||||
const repoPosmaster = AppDataSource.getRepository(PosMaster);
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue