diff --git a/src/services/rabbitmq.ts b/src/services/rabbitmq.ts index 42c386e0..097473f8 100644 --- a/src/services/rabbitmq.ts +++ b/src/services/rabbitmq.ts @@ -24,9 +24,27 @@ import { In, Not } from "typeorm"; import { PosMasterAct } from "../entities/PosMasterAct"; import { PermissionOrg } from "../entities/PermissionOrg"; import { sendWebSocket } from "./webSocket"; -import { CreatePosMasterHistoryOfficer } from "./PositionService"; import { PayloadSendNoti } from "../interfaces/utils"; import { PermissionProfile } from "../entities/PermissionProfile"; +import { PosMasterHistory } from "../entities/PosMasterHistory"; + +let reconnectTimer: ReturnType | null = null; + +function scheduleReconnect() { + if (reconnectTimer) { + return; + } + + reconnectTimer = setTimeout(async () => { + reconnectTimer = null; + try { + await init(); + } catch (error) { + console.error("[AMQ] Reconnect failed:", error); + scheduleReconnect(); + } + }, 1000); +} export let sendToQueue: (payload: any) => void; export let sendToQueueOrg: (payload: any) => void; @@ -55,10 +73,28 @@ export async function init() { console.log(connection ? "[AMQ] Connection success" : "[AMQ] Connection failed"); + connection.on("error", (error) => { + console.error("[AMQ] Connection error:", error); + }); + + connection.on("close", () => { + console.error("[AMQ] Connection closed. Scheduling reconnect..."); + scheduleReconnect(); + }); + const channel = await connection.createChannel(); //----> (1.4) create Channel console.log(channel ? "[AMQ] Create channel success" : "[AMQ] Create channel failed"); + channel.on("error", (error) => { + console.error("[AMQ] Channel error:", error); + }); + + channel.on("close", () => { + console.error("[AMQ] Channel closed. Scheduling reconnect..."); + scheduleReconnect(); + }); + channel.assertQueue(queue, { durable: true }), //----> (1.5) assert queue and set durable (if "true" save to disk on RabbitMQ) channel.assertQueue(queue_org, { durable: true }), channel.assertQueue(queue_org_draft, { durable: true }), @@ -97,18 +133,26 @@ function createConsumer( //----> consumer channel: amqp.Channel, handler: (msg: amqp.ConsumeMessage) => Promise | boolean, ) { - let retries = 0; channel.consume( queue, async (msg) => { if (!msg) return; - if ((await handler(msg)) || retries++ >= 3) { - retries = 0; - console.log("[AMQ] Process Consumer success"); + try { + if (await handler(msg)) { + console.log("[AMQ] Process Consumer success"); + return channel.ack(msg); + } + console.error(`[AMQ] Process Consumer failed on queue ${queue}, acknowledging without retry`); return channel.ack(msg); + } catch (error) { + console.error(`[AMQ] Consumer processing error on queue ${queue}:`, error); + try { + console.error(`[AMQ] Acknowledging failed message on queue ${queue} without retry`); + channel.ack(msg); + } catch (channelError) { + console.error(`[AMQ] Failed to ack/nack message on queue ${queue}:`, channelError); + } } - console.log("[AMQ] Process Consumer failed"); - return await new Promise((resolve) => setTimeout(() => resolve(channel.nack(msg)), 3000)); }, { noAck: false }, ); @@ -740,7 +784,7 @@ async function handler_org(msg: amqp.ConsumeMessage): Promise { // 3. เตรียม arrays สำหรับ batch operations const profilesToSave: Profile[] = []; const posMasterAssignsToSave: PosMasterAssign[] = []; - const historyCreateIds: string[] = []; + const historyRowsToSave: Partial[] = []; const posMasterUpdates: { id: string; current_holderId: string | null | undefined }[] = []; // ===== LOOP: เก็บข้อมูลทั้งหมด ===== @@ -809,12 +853,54 @@ async function handler_org(msg: amqp.ConsumeMessage): Promise { const isHolderChanged = oldHolderId !== newHolderId; if (isHolderChanged) { - historyCreateIds.push(item.id); + const nextHolderProfile = + item.next_holderId != null && item.next_holderId !== "" + ? profilesMap.get(item.next_holderId) + : null; + const selectedPosition = + item.positions.length > 0 + ? item.positions.find((position) => position.positionIsSelected === true) ?? null + : null; + const shortName = + [ + item.orgChild4?.orgChild4ShortName, + item.orgChild3?.orgChild3ShortName, + item.orgChild2?.orgChild2ShortName, + item.orgChild1?.orgChild1ShortName, + item.orgRoot?.orgRootShortName, + ].find((name) => typeof name === "string" && name.trim().length > 0) ?? _null; + + historyRowsToSave.push({ + ancestorDNA: item.ancestorDNA ? item.ancestorDNA : _null, + prefix: nextHolderProfile?.prefix || _null, + firstName: nextHolderProfile?.firstName || _null, + lastName: nextHolderProfile?.lastName || _null, + shortName, + posMasterNoPrefix: item.posMasterNoPrefix ?? _null, + posMasterNo: item.posMasterNo ?? _null, + posMasterNoSuffix: item.posMasterNoSuffix ?? _null, + position: selectedPosition?.positionName ?? _null, + posType: selectedPosition?.posType?.posTypeName ?? _null, + posLevel: selectedPosition?.posLevel?.posLevelName ?? _null, + posExecutive: selectedPosition?.posExecutive?.posExecutiveName ?? _null, + profileId: _null, + rootDnaId: item.orgRoot?.ancestorDNA || _null, + child1DnaId: item.orgChild1?.ancestorDNA || _null, + child2DnaId: item.orgChild2?.ancestorDNA || _null, + child3DnaId: item.orgChild3?.ancestorDNA || _null, + child4DnaId: item.orgChild4?.ancestorDNA || _null, + createdUserId: "", + createdFullName: "system", + lastUpdateUserId: "", + lastUpdateFullName: "system", + createdAt: new Date(), + lastUpdatedAt: new Date(), + }); } } console.timeEnd("[AMQ] prepare_batch_data"); console.log( - `[AMQ] Prepared - posMasterAssignsToSave: ${posMasterAssignsToSave.length}, profilesToSave: ${profilesToSave.length}, posMasterUpdates: ${posMasterUpdates.length}, historyCreateIds: ${historyCreateIds.length}`, + `[AMQ] Prepared - posMasterAssignsToSave: ${posMasterAssignsToSave.length}, profilesToSave: ${profilesToSave.length}, posMasterUpdates: ${posMasterUpdates.length}, historyCreateIds: ${historyRowsToSave.length}`, ); // ===== BATCH EXECUTION: save ทีละ batch ===== @@ -835,6 +921,7 @@ async function handler_org(msg: amqp.ConsumeMessage): Promise { const child2Repository = manager.getRepository(OrgChild2); const child3Repository = manager.getRepository(OrgChild3); const child4Repository = manager.getRepository(OrgChild4); + const posMasterHistoryRepository = manager.getRepository(PosMasterHistory); const targetOrgRevision = await repoOrgRevision .createQueryBuilder("orgRevision") @@ -903,21 +990,45 @@ async function handler_org(msg: amqp.ConsumeMessage): Promise { // 6. Batch update posMasters console.time("[AMQ] batch_update_posMasters"); - for (const update of posMasterUpdates) { - await repoPosmaster.update(update.id, { - current_holderId: update.current_holderId, - next_holderId: null, - lastUpdateUserId, - lastUpdateFullName, - lastUpdatedAt, - }); + if (posMasterUpdates.length > 0) { + const chunks = chunkArray(posMasterUpdates, 500); + const posMasterTableName = repoPosmaster.metadata.tableName; + for (const chunk of chunks as typeof posMasterUpdates[]) { + const caseClauses = chunk.map(() => "WHEN ? THEN ?").join(" "); + const wherePlaceholders = chunk.map(() => "?").join(", "); + const params = chunk.flatMap((update: (typeof posMasterUpdates)[number]) => [ + update.id, + update.current_holderId ?? null, + ]); + + params.push( + lastUpdateUserId, + lastUpdateFullName, + lastUpdatedAt, + ...chunk.map((update: (typeof posMasterUpdates)[number]) => update.id), + ); + + await manager.query( + `UPDATE \`${posMasterTableName}\` + SET current_holderId = CASE id ${caseClauses} END, + next_holderId = NULL, + lastUpdateUserId = ?, + lastUpdateFullName = ?, + lastUpdatedAt = ? + WHERE id IN (${wherePlaceholders})`, + params, + ); + } } console.timeEnd("[AMQ] batch_update_posMasters"); // 7. Batch create history console.time("[AMQ] batch_create_history"); - for (const id of historyCreateIds) { - await CreatePosMasterHistoryOfficer(id, null, undefined, undefined, manager); + if (historyRowsToSave.length > 0) { + const chunks = chunkArray(historyRowsToSave, 500); + for (const chunk of chunks) { + await posMasterHistoryRepository.save(posMasterHistoryRepository.create(chunk)); + } } console.timeEnd("[AMQ] batch_create_history"); @@ -958,25 +1069,23 @@ async function handler_org(msg: amqp.ConsumeMessage): Promise { //cone tree console.time("[AMQ] query_old_org_structure"); //หา dna tree - const orgRoot = await orgRootRepository.find({ - where: { orgRevisionId: orgRevisionPublish.id }, - }); - - const orgChild1 = await child1Repository.find({ - where: { orgRevisionId: orgRevisionPublish.id }, - }); - - const orgChild2 = await child2Repository.find({ - where: { orgRevisionId: orgRevisionPublish.id }, - }); - - const orgChild3 = await child3Repository.find({ - where: { orgRevisionId: orgRevisionPublish.id }, - }); - - const orgChild4 = await child4Repository.find({ - where: { orgRevisionId: orgRevisionPublish.id }, - }); + const [orgRoot, orgChild1, orgChild2, orgChild3, orgChild4] = await Promise.all([ + orgRootRepository.find({ + where: { orgRevisionId: orgRevisionPublish.id }, + }), + child1Repository.find({ + where: { orgRevisionId: orgRevisionPublish.id }, + }), + child2Repository.find({ + where: { orgRevisionId: orgRevisionPublish.id }, + }), + child3Repository.find({ + where: { orgRevisionId: orgRevisionPublish.id }, + }), + child4Repository.find({ + where: { orgRevisionId: orgRevisionPublish.id }, + }), + ]); console.timeEnd("[AMQ] query_old_org_structure"); console.log( `[AMQ] Old structure - orgRoot: ${orgRoot.length}, orgChild1: ${orgChild1.length}, orgChild2: ${orgChild2.length}, orgChild3: ${orgChild3.length}, orgChild4: ${orgChild4.length}`, @@ -1106,6 +1215,78 @@ async function handler_org(msg: amqp.ConsumeMessage): Promise { ? x.id : x.ancestorDNA, })); + + const groupByParentId = ( + items: T[], + getParentId: (item: T) => string | null | undefined, + ) => { + const grouped = new Map(); + for (const item of items) { + const parentId = getParentId(item); + if (!parentId) { + continue; + } + const current = grouped.get(parentId); + if (current) { + current.push(item); + } else { + grouped.set(parentId, [item]); + } + } + return grouped; + }; + + const buildEmployeeNodeKey = (item: { + orgRootId?: string | null; + orgChild1Id?: string | null; + orgChild2Id?: string | null; + orgChild3Id?: string | null; + orgChild4Id?: string | null; + }) => { + if (item.orgChild4Id) return `child4:${item.orgChild4Id}`; + if (item.orgChild3Id && item.orgChild4Id == null) return `child3:${item.orgChild3Id}`; + if (item.orgChild2Id && item.orgChild3Id == null) return `child2:${item.orgChild2Id}`; + if (item.orgChild1Id && item.orgChild2Id == null) return `child1:${item.orgChild1Id}`; + if (item.orgRootId && item.orgChild1Id == null) return `root:${item.orgRootId}`; + return null; + }; + + const groupByEmployeeNode = < + T extends { + orgRootId?: string | null; + orgChild1Id?: string | null; + orgChild2Id?: string | null; + orgChild3Id?: string | null; + orgChild4Id?: string | null; + }, + >( + items: T[], + ) => { + const grouped = new Map(); + for (const item of items) { + const key = buildEmployeeNodeKey(item); + if (!key) { + continue; + } + const current = grouped.get(key); + if (current) { + current.push(item); + } else { + grouped.set(key, [item]); + } + } + return grouped; + }; + + const employeePosMasterByNode = groupByEmployeeNode(_orgemployeePosMaster); + const employeeTempPosMasterByNode = groupByEmployeeNode(_orgemployeeTempPosMaster); + const getNodeKey = (level: "root" | "child1" | "child2" | "child3" | "child4", id: string) => + `${level}:${id}`; + const orgChild1ByRoot = groupByParentId(orgChild1, (item) => item.orgRootId); + const orgChild2ByChild1 = groupByParentId(orgChild2, (item) => item.orgChild1Id); + const orgChild3ByChild2 = groupByParentId(orgChild3, (item) => item.orgChild2Id); + const orgChild4ByChild3 = groupByParentId(orgChild4, (item) => item.orgChild3Id); + await repoEmployeeTempPosmaster .createQueryBuilder() .insert() @@ -1124,9 +1305,7 @@ async function handler_org(msg: amqp.ConsumeMessage): Promise { const dataId = x.id; const matchedOrgRoot = findMatchedNodeByAncestorDNA(orgRootCurrent, x); - const filteredEmployeePosMaster = _orgemployeePosMaster.filter( - (x: EmployeePosMaster) => x.orgRootId == dataId && x.orgChild1Id == null, - ); + const filteredEmployeePosMaster = employeePosMasterByNode.get(getNodeKey("root", dataId)) ?? []; await Promise.all( filteredEmployeePosMaster.map(async (item: any) => { @@ -1164,8 +1343,7 @@ async function handler_org(msg: amqp.ConsumeMessage): Promise { ); await Promise.all( - _orgemployeeTempPosMaster - .filter((x: EmployeeTempPosMaster) => x.orgRootId == dataId && x.orgChild1Id == null) + (employeeTempPosMasterByNode.get(getNodeKey("root", dataId)) ?? []) .map(async (item: any) => { delete item.id; const employeeTempPosMaster = Object.assign(new EmployeeTempPosMaster(), item); @@ -1200,12 +1378,11 @@ async function handler_org(msg: amqp.ConsumeMessage): Promise { }), ); - for (const x of orgChild1.filter((item: OrgChild1) => item.orgRootId == dataId)) { + for (const x of orgChild1ByRoot.get(dataId) ?? []) { const data1Id = x.id; const matchedOrgChild1 = findMatchedNodeByAncestorDNA(orgChild1Current, x); await Promise.all( - _orgemployeePosMaster - .filter((x: EmployeePosMaster) => x.orgChild1Id == data1Id && x.orgChild2Id == null) + (employeePosMasterByNode.get(getNodeKey("child1", data1Id)) ?? []) .map(async (item: any) => { delete item.id; const employeePosMaster = Object.assign(new EmployeePosMaster(), item); @@ -1242,10 +1419,7 @@ async function handler_org(msg: amqp.ConsumeMessage): Promise { ); await Promise.all( - _orgemployeeTempPosMaster - .filter( - (x: EmployeeTempPosMaster) => x.orgChild1Id == data1Id && x.orgChild2Id == null, - ) + (employeeTempPosMasterByNode.get(getNodeKey("child1", data1Id)) ?? []) .map(async (item: any) => { delete item.id; const employeeTempPosMaster = Object.assign(new EmployeeTempPosMaster(), item); @@ -1281,12 +1455,11 @@ async function handler_org(msg: amqp.ConsumeMessage): Promise { }), ); - for (const x of orgChild2.filter((item: OrgChild2) => item.orgChild1Id == data1Id)) { + for (const x of orgChild2ByChild1.get(data1Id) ?? []) { const data2Id = x.id; const matchedOrgChild2 = findMatchedNodeByAncestorDNA(orgChild2Current, x); await Promise.all( - _orgemployeePosMaster - .filter((x: EmployeePosMaster) => x.orgChild2Id == data2Id && x.orgChild3Id == null) + (employeePosMasterByNode.get(getNodeKey("child2", data2Id)) ?? []) .map(async (item: any) => { delete item.id; const employeePosMaster = Object.assign(new EmployeePosMaster(), item); @@ -1324,10 +1497,7 @@ async function handler_org(msg: amqp.ConsumeMessage): Promise { ); await Promise.all( - _orgemployeeTempPosMaster - .filter( - (x: EmployeeTempPosMaster) => x.orgChild2Id == data2Id && x.orgChild3Id == null, - ) + (employeeTempPosMasterByNode.get(getNodeKey("child2", data2Id)) ?? []) .map(async (item: any) => { delete item.id; const employeeTempPosMaster = Object.assign(new EmployeeTempPosMaster(), item); @@ -1364,14 +1534,11 @@ async function handler_org(msg: amqp.ConsumeMessage): Promise { }), ); - for (const x of orgChild3.filter((item: OrgChild3) => item.orgChild2Id == data2Id)) { + for (const x of orgChild3ByChild2.get(data2Id) ?? []) { const data3Id = x.id; const matchedOrgChild3 = findMatchedNodeByAncestorDNA(orgChild3Current, x); await Promise.all( - _orgemployeePosMaster - .filter( - (x: EmployeePosMaster) => x.orgChild3Id == data3Id && x.orgChild4Id == null, - ) + (employeePosMasterByNode.get(getNodeKey("child3", data3Id)) ?? []) .map(async (item: any) => { delete item.id; const employeePosMaster = Object.assign(new EmployeePosMaster(), item); @@ -1410,10 +1577,7 @@ async function handler_org(msg: amqp.ConsumeMessage): Promise { ); await Promise.all( - _orgemployeeTempPosMaster - .filter( - (x: EmployeeTempPosMaster) => x.orgChild3Id == data3Id && x.orgChild4Id == null, - ) + (employeeTempPosMasterByNode.get(getNodeKey("child3", data3Id)) ?? []) .map(async (item: any) => { delete item.id; const employeeTempPosMaster = Object.assign(new EmployeeTempPosMaster(), item); @@ -1451,12 +1615,11 @@ async function handler_org(msg: amqp.ConsumeMessage): Promise { }), ); - for (const x of orgChild4.filter((item: OrgChild4) => item.orgChild3Id == data3Id)) { + for (const x of orgChild4ByChild3.get(data3Id) ?? []) { const data4Id = x.id; const matchedOrgChild4 = findMatchedNodeByAncestorDNA(orgChild4Current, x); await Promise.all( - _orgemployeePosMaster - .filter((x: EmployeePosMaster) => x.orgChild4Id == data4Id) + (employeePosMasterByNode.get(getNodeKey("child4", data4Id)) ?? []) .map(async (item: any) => { delete item.id; const employeePosMaster = Object.assign(new EmployeePosMaster(), item); @@ -1496,8 +1659,7 @@ async function handler_org(msg: amqp.ConsumeMessage): Promise { ); await Promise.all( - _orgemployeeTempPosMaster - .filter((x: EmployeeTempPosMaster) => x.orgChild4Id == data4Id) + (employeeTempPosMasterByNode.get(getNodeKey("child4", data4Id)) ?? []) .map(async (item: any) => { delete item.id; const employeeTempPosMaster = Object.assign( @@ -1544,22 +1706,42 @@ async function handler_org(msg: amqp.ConsumeMessage): Promise { } } - const employeePosMaster = await repoEmployeePosmaster.find({ - where: { orgRevisionId: orgRevisionDraft.id }, - relations: ["positions", "positions.posLevel", "positions.posType"], - }); + const [employeePosMaster, employeeTempPosMaster] = await Promise.all([ + repoEmployeePosmaster.find({ + where: { orgRevisionId: orgRevisionDraft.id }, + relations: ["positions", "positions.posLevel", "positions.posType"], + }), + repoEmployeeTempPosmaster.find({ + where: { orgRevisionId: orgRevisionDraft.id }, + relations: ["positions", "positions.posLevel", "positions.posType"], + }), + ]); + const profileEmployeeIds = Array.from( + new Set( + [...employeePosMaster, ...employeeTempPosMaster] + .map((item) => item.next_holderId) + .filter((profileId): profileId is string => !!profileId), + ), + ); + const profileEmployeeMap = new Map(); + if (profileEmployeeIds.length > 0) { + const profiles = await repoProfileEmployee.findBy({ + id: In(profileEmployeeIds), + }); + profiles.forEach((profile) => profileEmployeeMap.set(profile.id, profile)); + } + const updatedProfileEmployeeIds = new Set(); + for (const item of employeePosMaster) { if (item.next_holderId != null) { - const profile = await repoProfileEmployee.findOne({ - where: { id: item.next_holderId == null ? "" : item.next_holderId }, - }); + const profile = profileEmployeeMap.get(item.next_holderId); const position = await item.positions.find((x) => x.positionIsSelected == true); const _null: any = null; if (profile != null) { profile.posLevelId = position?.posLevelId ?? _null; profile.posTypeId = position?.posTypeId ?? _null; profile.position = position?.positionName ?? _null; - await repoProfileEmployee.save(profile); + updatedProfileEmployeeIds.add(profile.id); } } item.lastUpdateUserId = lastUpdateUserId; @@ -1567,22 +1749,16 @@ async function handler_org(msg: amqp.ConsumeMessage): Promise { item.lastUpdatedAt = lastUpdatedAt; await repoEmployeePosmaster.save(item); } - const employeeTempPosMaster = await repoEmployeeTempPosmaster.find({ - where: { orgRevisionId: orgRevisionDraft.id }, - relations: ["positions", "positions.posLevel", "positions.posType"], - }); for (const item of employeeTempPosMaster) { if (item.next_holderId != null) { - const profile = await repoProfileEmployee.findOne({ - where: { id: item.next_holderId == null ? "" : item.next_holderId }, - }); + const profile = profileEmployeeMap.get(item.next_holderId); const position = await item.positions.find((x) => x.positionIsSelected == true); const _null: any = null; if (profile != null) { profile.posLevelId = position?.posLevelId ?? _null; profile.posTypeId = position?.posTypeId ?? _null; profile.position = position?.positionName ?? _null; - await repoProfileEmployee.save(profile); + updatedProfileEmployeeIds.add(profile.id); } } item.lastUpdateUserId = lastUpdateUserId; @@ -1590,6 +1766,15 @@ async function handler_org(msg: amqp.ConsumeMessage): Promise { item.lastUpdatedAt = lastUpdatedAt; await repoEmployeeTempPosmaster.save(item); } + if (updatedProfileEmployeeIds.size > 0) { + const profilesToSave = Array.from(updatedProfileEmployeeIds) + .map((profileId) => profileEmployeeMap.get(profileId)) + .filter((profile): profile is ProfileEmployee => !!profile); + const chunks = chunkArray(profilesToSave, 200); + for (const chunk of chunks) { + await repoProfileEmployee.save(chunk); + } + } console.timeEnd("[AMQ] clone_org_structure"); console.time("[AMQ] save_revision_status");