From 6c1e4a1e427a82d908cc14c8d17d83eb72688f4c Mon Sep 17 00:00:00 2001 From: waruneeauy Date: Tue, 5 May 2026 18:11:55 +0700 Subject: [PATCH] Optimize handler_org batch writes Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- src/services/rabbitmq.ts | 59 ++++++++++++++++++++++++++++++---------- 1 file changed, 45 insertions(+), 14 deletions(-) diff --git a/src/services/rabbitmq.ts b/src/services/rabbitmq.ts index 073042c9..990cd5fb 100644 --- a/src/services/rabbitmq.ts +++ b/src/services/rabbitmq.ts @@ -1035,6 +1035,7 @@ async function handler_org(msg: amqp.ConsumeMessage): Promise { // Clone oldposMasterAct console.time("[AMQ] clone_oldposMasterAct"); + const posMasterActRowsToInsert: Partial[] = []; for (const act of oldposMasterAct) { const parentDNA = act.posMaster?.ancestorDNA?.trim()?.toLowerCase() ?? ""; const childDNA = act.posMasterChild?.ancestorDNA?.trim()?.toLowerCase() ?? ""; @@ -1046,7 +1047,7 @@ async function handler_org(msg: amqp.ConsumeMessage): Promise { const { id, posMaster, posMasterChild, ...fields } = act; - const newAct = { + posMasterActRowsToInsert.push({ ...fields, posMasterId: newParentId, posMasterChildId: newChildId, @@ -1056,9 +1057,13 @@ async function handler_org(msg: amqp.ConsumeMessage): Promise { lastUpdatedAt: new Date(), lastUpdateFullName: user ? user.name : "system", lastUpdateUserId: user ? user.sub : "system", - }; - - await posMasterActRepository.save(newAct); + }); + } + if (posMasterActRowsToInsert.length > 0) { + const chunks = chunkArray(posMasterActRowsToInsert, 500); + for (const chunk of chunks) { + await posMasterActRepository.insert(chunk); + } } console.timeEnd("[AMQ] clone_oldposMasterAct"); @@ -1554,11 +1559,13 @@ async function handler_org(msg: amqp.ConsumeMessage): Promise { profiles.forEach((profile) => profileEmployeeMap.set(profile.id, profile)); } const updatedProfileEmployeeIds = new Set(); + const employeePosMasterIdsToTouch: string[] = []; + const employeeTempPosMasterIdsToTouch: string[] = []; for (const item of employeePosMaster) { if (item.next_holderId != null) { const profile = profileEmployeeMap.get(item.next_holderId); - const position = await item.positions.find((x) => x.positionIsSelected == true); + const position = item.positions.find((x) => x.positionIsSelected == true); const _null: any = null; if (profile != null) { profile.posLevelId = position?.posLevelId ?? _null; @@ -1567,15 +1574,12 @@ async function handler_org(msg: amqp.ConsumeMessage): Promise { updatedProfileEmployeeIds.add(profile.id); } } - item.lastUpdateUserId = lastUpdateUserId; - item.lastUpdateFullName = lastUpdateFullName; - item.lastUpdatedAt = lastUpdatedAt; - await repoEmployeePosmaster.save(item); + employeePosMasterIdsToTouch.push(item.id); } for (const item of employeeTempPosMaster) { if (item.next_holderId != null) { const profile = profileEmployeeMap.get(item.next_holderId); - const position = await item.positions.find((x) => x.positionIsSelected == true); + const position = item.positions.find((x) => x.positionIsSelected == true); const _null: any = null; if (profile != null) { profile.posLevelId = position?.posLevelId ?? _null; @@ -1584,10 +1588,37 @@ async function handler_org(msg: amqp.ConsumeMessage): Promise { updatedProfileEmployeeIds.add(profile.id); } } - item.lastUpdateUserId = lastUpdateUserId; - item.lastUpdateFullName = lastUpdateFullName; - item.lastUpdatedAt = lastUpdatedAt; - await repoEmployeeTempPosmaster.save(item); + employeeTempPosMasterIdsToTouch.push(item.id); + } + if (employeePosMasterIdsToTouch.length > 0) { + const chunks = chunkArray(employeePosMasterIdsToTouch, 500); + const employeePosMasterTableName = repoEmployeePosmaster.metadata.tableName; + for (const chunk of chunks) { + const wherePlaceholders = chunk.map(() => "?").join(", "); + await manager.query( + `UPDATE \`${employeePosMasterTableName}\` + SET lastUpdateUserId = ?, + lastUpdateFullName = ?, + lastUpdatedAt = ? + WHERE id IN (${wherePlaceholders})`, + [lastUpdateUserId, lastUpdateFullName, lastUpdatedAt, ...chunk], + ); + } + } + if (employeeTempPosMasterIdsToTouch.length > 0) { + const chunks = chunkArray(employeeTempPosMasterIdsToTouch, 500); + const employeeTempPosMasterTableName = repoEmployeeTempPosmaster.metadata.tableName; + for (const chunk of chunks) { + const wherePlaceholders = chunk.map(() => "?").join(", "); + await manager.query( + `UPDATE \`${employeeTempPosMasterTableName}\` + SET lastUpdateUserId = ?, + lastUpdateFullName = ?, + lastUpdatedAt = ? + WHERE id IN (${wherePlaceholders})`, + [lastUpdateUserId, lastUpdateFullName, lastUpdatedAt, ...chunk], + ); + } } if (updatedProfileEmployeeIds.size > 0) { const profilesToSave = Array.from(updatedProfileEmployeeIds)