diff --git a/src/interfaces/utils.ts b/src/interfaces/utils.ts index 931fe446..244da06c 100644 --- a/src/interfaces/utils.ts +++ b/src/interfaces/utils.ts @@ -553,6 +553,14 @@ export async function checkQueueInProgress(queueName: string) { // return false; } +export function chunkArray(array: any, size: number) { + const result = []; + for (let i = 0; i < array.length; i += size) { + result.push(array.slice(i, i + size)); + } + return result; +} + export function commandTypePath(commandCode: string): string | null { switch (commandCode) { case "C-PM-01": diff --git a/src/services/rabbitmq.ts b/src/services/rabbitmq.ts index e1392178..0f3614cd 100644 --- a/src/services/rabbitmq.ts +++ b/src/services/rabbitmq.ts @@ -1,7 +1,7 @@ import amqp from "amqplib"; import { AppDataSource } from "../database/data-source"; import { Command } from "../entities/Command"; -import { commandTypePath } from "../interfaces/utils"; +import { chunkArray, commandTypePath } from "../interfaces/utils"; import CallAPI from "../interfaces/call-api"; import HttpError from "../interfaces/http-error"; import HttpStatusCode from "../interfaces/http-status"; @@ -115,95 +115,180 @@ function createConsumer( //----> consumer async function handler(msg: amqp.ConsumeMessage): Promise { //----> condition before process consumer + // const repo = AppDataSource.getRepository(Command); + // const { data, token, user } = JSON.parse(msg.content.toString()); + // const { id, status, lastUpdateUserId, lastUpdateFullName, lastUpdatedAt } = data; + // const command = await repo.findOne({ + // where: { id: id }, + // relations: ["commandType", "commandRecives"], + // }); + // if (!command) return true; + // let waiting_message = `ระบบทำการออกคำสั่งเลขที่ ${command.commandNo}/${command.commandYear + 543}`; + // let success_message = `ระบบออกคำสั่งเลขที่ ${command.commandNo}/${command.commandYear + 543} เสร็จสิ้น`; + // let error_message = `ระบบออกคำสั่งเลขที่ ${command.commandNo}/${command.commandYear + 543} ผิดพลาด` + // if(command.commandType?.code == "C-PM-47"){ + // waiting_message = `ระบบทำการออกคำสั่งเลขที่ ${command.commandNo}`; + // success_message = `ระบบออกคำสั่งเลขที่ ${command.commandNo} เสร็จสิ้น`; + // error_message = `ระบบออกคำสั่งเลขที่ ${command.commandNo} ผิดพลาด`; + // } + // if (user) { + // sendWebSocket( + // "send-command-notification", + // { + // success: true, + // message: waiting_message, + // payload: command, + // }, + // { userId: user?.sub }, + // ).catch(console.error); + // } + // const path = commandTypePath(command.commandType.code); + // if (path == null) throw new HttpError(HttpStatusCode.NOT_FOUND, "ไม่พบประเภทคำสั่งนี้ในระบบ"); + // return await new CallAPI() + // //chunk 50 + // .PostData( + // { + // headers: { authorization: token }, + // }, + // path + "/excecute", + // { + // refIds: command.commandRecives //chunk + // .filter((x) => x.refId != null) + // .map((x) => ({ + // refId: x.refId, + // commandNo: command.commandNo, + // commandYear: command.commandYear, + // commandId: command.id, + // remark: command.positionDetail, + // amount: x.amount, + // amountSpecial: x.amountSpecial, + // positionSalaryAmount: x.positionSalaryAmount, + // mouthSalaryAmount: x.mouthSalaryAmount, + // commandCode: command.commandType.commandCode, + // commandName: command.commandType.name, + // commandDateAffect: command.commandExcecuteDate, + // commandDateSign: command.commandAffectDate, + // })), + // }, + // false, + // ) + // .then(async (res) => { + // console.log("[AMQ] Excecute Command Success"); + // Object.assign(command, { status, lastUpdateUserId, lastUpdateFullName, lastUpdatedAt }); + // const result = await repo.save(command).catch((e) => console.log(e)); + // if (user) { + // sendWebSocket( + // "send-command-notification", + // { + // success: true, + // message: success_message, + // payload: command, + // }, + // { userId: user?.sub }, + // ).catch(console.error); + // } + // return !!result; + // }) + // .catch((e) => { + // console.error(e); + // if (user) { + // sendWebSocket( + // "send-command-notification", + // { + // success: false, + // message: error_message, + // payload: command, + // }, + // { userId: user?.sub }, + // ).catch(console.error); + // } + // return false; + // }); const repo = AppDataSource.getRepository(Command); const { data, token, user } = JSON.parse(msg.content.toString()); const { id, status, lastUpdateUserId, lastUpdateFullName, lastUpdatedAt } = data; + const command = await repo.findOne({ where: { id: id }, relations: ["commandType", "commandRecives"], }); if (!command) return true; + let waiting_message = `ระบบทำการออกคำสั่งเลขที่ ${command.commandNo}/${command.commandYear + 543}`; let success_message = `ระบบออกคำสั่งเลขที่ ${command.commandNo}/${command.commandYear + 543} เสร็จสิ้น`; - let error_message = `ระบบออกคำสั่งเลขที่ ${command.commandNo}/${command.commandYear + 543} ผิดพลาด` - if(command.commandType?.code == "C-PM-47"){ + let error_message = `ระบบออกคำสั่งเลขที่ ${command.commandNo}/${command.commandYear + 543} ผิดพลาด`; + + if (command.commandType?.code == "C-PM-47") { waiting_message = `ระบบทำการออกคำสั่งเลขที่ ${command.commandNo}`; success_message = `ระบบออกคำสั่งเลขที่ ${command.commandNo} เสร็จสิ้น`; error_message = `ระบบออกคำสั่งเลขที่ ${command.commandNo} ผิดพลาด`; } + if (user) { sendWebSocket( "send-command-notification", - { - success: true, - message: waiting_message, - payload: command, - }, + { success: true, message: waiting_message, payload: command }, { userId: user?.sub }, ).catch(console.error); } + const path = commandTypePath(command.commandType.code); if (path == null) throw new HttpError(HttpStatusCode.NOT_FOUND, "ไม่พบประเภทคำสั่งนี้ในระบบ"); - return await new CallAPI() - - .PostData( - { - headers: { authorization: token }, - }, - path + "/excecute", - { - refIds: command.commandRecives - .filter((x) => x.refId != null) - .map((x) => ({ - refId: x.refId, - commandNo: command.commandNo, - commandYear: command.commandYear, - commandId: command.id, - remark: command.positionDetail, - amount: x.amount, - amountSpecial: x.amountSpecial, - positionSalaryAmount: x.positionSalaryAmount, - mouthSalaryAmount: x.mouthSalaryAmount, - commandCode: command.commandType.commandCode, - commandName: command.commandType.name, - commandDateAffect: command.commandExcecuteDate, - commandDateSign: command.commandAffectDate, - })), - }, - false, - ) - .then(async (res) => { - console.log("[AMQ] Excecute Command Success"); - Object.assign(command, { status, lastUpdateUserId, lastUpdateFullName, lastUpdatedAt }); - const result = await repo.save(command).catch((e) => console.log(e)); - if (user) { - sendWebSocket( - "send-command-notification", - { - success: true, - message: success_message, - payload: command, - }, - { userId: user?.sub }, - ).catch(console.error); - } - return !!result; - }) - .catch((e) => { - console.error(e); - if (user) { - sendWebSocket( - "send-command-notification", - { - success: false, - message: error_message, - payload: command, - }, - { userId: user?.sub }, - ).catch(console.error); - } - return false; - }); + + try { + const chunks = chunkArray( + command.commandRecives.filter((x) => x.refId != null).map((x) => ({ + refId: x.refId, + commandNo: command.commandNo, + commandYear: command.commandYear, + commandId: command.id, + remark: command.positionDetail, + amount: x.amount, + amountSpecial: x.amountSpecial, + positionSalaryAmount: x.positionSalaryAmount, + mouthSalaryAmount: x.mouthSalaryAmount, + commandCode: command.commandType.commandCode, + commandName: command.commandType.name, + commandDateAffect: command.commandExcecuteDate, + commandDateSign: command.commandAffectDate, + })), + 50 + ); + + for (const chunk of chunks) { + await new CallAPI().PostData( + { headers: { authorization: token } }, + path + "/excecute", + { refIds: chunk }, + false + ); + } + + Object.assign(command, { status, lastUpdateUserId, lastUpdateFullName, lastUpdatedAt }); + const result = await repo.save(command); + + if (user) { + sendWebSocket( + "send-command-notification", + { success: true, message: success_message, payload: command }, + { userId: user?.sub }, + ).catch(console.error); + } + + console.log("[AMQ] Excecute Command Success"); + return !!result; + + } catch (e) { + console.error(e); + if (user) { + sendWebSocket( + "send-command-notification", + { success: false, message: error_message, payload: command }, + { userId: user?.sub }, + ).catch(console.error); + } + return false; + } } // async function handler(msg: amqp.ConsumeMessage): Promise {