fix handler_org error use temporary table
This commit is contained in:
parent
7827e19254
commit
acbfa3707a
2 changed files with 1378 additions and 1359 deletions
|
|
@ -433,40 +433,53 @@ export async function BatchUpdatePosMasters(
|
|||
): Promise<void> {
|
||||
if (updates.length === 0) return;
|
||||
|
||||
const CHUNK_SIZE = 1000;
|
||||
const CHUNK_SIZE = 5000;
|
||||
const chunks = chunkArray(updates, CHUNK_SIZE);
|
||||
|
||||
for (const chunk of chunks) {
|
||||
// Build single bulk UPDATE query using CASE WHEN
|
||||
const caseStatements: string[] = [];
|
||||
const params: any[] = [];
|
||||
// Create a temporary table for this batch
|
||||
const tempTableName = `temp_posmaster_update_${Date.now()}_${Math.random().toString(36).substring(7)}`;
|
||||
|
||||
for (const update of chunk) {
|
||||
caseStatements.push(`WHEN ? THEN ?`);
|
||||
params.push(update.id, update.current_holderId);
|
||||
try {
|
||||
// Create temporary table
|
||||
await manager.query(`
|
||||
CREATE TEMPORARY TABLE ${tempTableName} (
|
||||
id CHAR(36) PRIMARY KEY,
|
||||
current_holderId CHAR(36) NULL,
|
||||
lastUpdateUserId CHAR(36) NOT NULL,
|
||||
lastUpdateFullName VARCHAR(255) NOT NULL,
|
||||
lastUpdatedAt DATETIME NOT NULL
|
||||
) ENGINE=InnoDB
|
||||
`);
|
||||
|
||||
// Build insert query with proper parameter count
|
||||
const insertParams: any[] = [];
|
||||
const valuePlaceholders: string[] = [];
|
||||
for (const u of chunk) {
|
||||
valuePlaceholders.push('(?, ?, ?, ?, ?)');
|
||||
insertParams.push(u.id, u.current_holderId, u.lastUpdateUserId, u.lastUpdateFullName, u.lastUpdatedAt);
|
||||
}
|
||||
|
||||
// Build IN clause placeholders
|
||||
const idPlaceholders = chunk.map(() => '?').join(',');
|
||||
const ids = chunk.map((u: any) => u.id);
|
||||
|
||||
// Add common params at the end
|
||||
params.push(
|
||||
chunk[0].lastUpdateUserId,
|
||||
chunk[0].lastUpdateFullName,
|
||||
chunk[0].lastUpdatedAt,
|
||||
...ids
|
||||
);
|
||||
|
||||
// Bulk insert into temporary table
|
||||
await manager.query(`
|
||||
UPDATE posMaster
|
||||
SET current_holderId = CASE id ${caseStatements.join(' ')} END,
|
||||
next_holderId = NULL,
|
||||
lastUpdateUserId = ?,
|
||||
lastUpdateFullName = ?,
|
||||
lastUpdatedAt = ?
|
||||
WHERE id IN (${idPlaceholders})
|
||||
`, params);
|
||||
INSERT INTO ${tempTableName} (id, current_holderId, lastUpdateUserId, lastUpdateFullName, lastUpdatedAt)
|
||||
VALUES ${valuePlaceholders.join(',')}
|
||||
`, insertParams);
|
||||
|
||||
// Update using JOIN with temporary table (very fast - single query per chunk)
|
||||
await manager.query(`
|
||||
UPDATE posMaster p
|
||||
INNER JOIN ${tempTableName} t ON p.id = t.id
|
||||
SET p.current_holderId = t.current_holderId,
|
||||
p.next_holderId = NULL,
|
||||
p.lastUpdateUserId = t.lastUpdateUserId,
|
||||
p.lastUpdateFullName = t.lastUpdateFullName,
|
||||
p.lastUpdatedAt = t.lastUpdatedAt
|
||||
`);
|
||||
} finally {
|
||||
// Drop temporary table
|
||||
await manager.query(`DROP TEMPORARY TABLE IF EXISTS ${tempTableName}`).catch(() => {});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -24,7 +24,12 @@ import { In, Not } from "typeorm";
|
|||
import { PosMasterAct } from "../entities/PosMasterAct";
|
||||
import { PermissionOrg } from "../entities/PermissionOrg";
|
||||
import { sendWebSocket } from "./webSocket";
|
||||
import { CreatePosMasterHistoryOfficer, BatchUpdatePosMasters, BatchCreatePosMasterHistoryOfficer, BatchHistoryOperation } from "./PositionService";
|
||||
import {
|
||||
CreatePosMasterHistoryOfficer,
|
||||
BatchUpdatePosMasters,
|
||||
BatchCreatePosMasterHistoryOfficer,
|
||||
BatchHistoryOperation,
|
||||
} from "./PositionService";
|
||||
import { PayloadSendNoti } from "../interfaces/utils";
|
||||
import { PermissionProfile } from "../entities/PermissionProfile";
|
||||
|
||||
|
|
@ -405,7 +410,7 @@ async function handler_command_noti(msg: amqp.ConsumeMessage): Promise<boolean>
|
|||
|
||||
try {
|
||||
let profilesNotiRequest: Promise<any> | undefined;
|
||||
if (!(["C-PM-10"].includes(command.commandType.code))) {
|
||||
if (!["C-PM-10"].includes(command.commandType.code)) {
|
||||
profilesNotiRequest = new CallAPI()
|
||||
.PostData(
|
||||
{ headers: { authorization: token } },
|
||||
|
|
@ -482,8 +487,7 @@ async function handler_command_noti(msg: amqp.ConsumeMessage): Promise<boolean>
|
|||
/*เฉพาะคำสั่ง C-PM-10 ให้ตัด profilesNotiRequest ที่ส่ง noti ครั้งแรกออก*/
|
||||
if (["C-PM-10"].includes(command.commandType.code)) {
|
||||
await Promise.all([profilesSendRequest]);
|
||||
}
|
||||
else {
|
||||
} else {
|
||||
await Promise.all([profilesNotiRequest!, profilesSendRequest]);
|
||||
}
|
||||
|
||||
|
|
@ -497,7 +501,7 @@ async function handler_command_noti(msg: amqp.ConsumeMessage): Promise<boolean>
|
|||
|
||||
async function handler_org(msg: amqp.ConsumeMessage): Promise<boolean> {
|
||||
//----> condition before process consume
|
||||
console.time('[AMQ] handler_org_total');
|
||||
console.time("[AMQ] handler_org_total");
|
||||
const startTime = Date.now();
|
||||
console.log(`[AMQ] handler_org START at ${new Date(startTime).toISOString()}`);
|
||||
|
||||
|
|
@ -535,7 +539,7 @@ async function handler_org(msg: amqp.ConsumeMessage): Promise<boolean> {
|
|||
).catch(console.error);
|
||||
}
|
||||
|
||||
console.time('[AMQ] query_revisions');
|
||||
console.time("[AMQ] query_revisions");
|
||||
const orgRevisionPublish = await repoOrgRevision
|
||||
.createQueryBuilder("orgRevision")
|
||||
.where("orgRevision.orgRevisionIsDraft = false")
|
||||
|
|
@ -547,13 +551,19 @@ async function handler_org(msg: amqp.ConsumeMessage): Promise<boolean> {
|
|||
.where("orgRevision.orgRevisionIsDraft = true")
|
||||
.andWhere("orgRevision.orgRevisionIsCurrent = false")
|
||||
.getOne();
|
||||
console.timeEnd('[AMQ] query_revisions');
|
||||
console.log(`[AMQ] orgRevisionPublish found: ${orgRevisionPublish ? orgRevisionPublish.id : 'null'}`);
|
||||
console.log(`[AMQ] orgRevisionDraft found: ${orgRevisionDraft ? orgRevisionDraft.id : 'null'}`);
|
||||
console.timeEnd("[AMQ] query_revisions");
|
||||
console.log(
|
||||
`[AMQ] orgRevisionPublish found: ${orgRevisionPublish ? orgRevisionPublish.id : "null"}`,
|
||||
);
|
||||
console.log(
|
||||
`[AMQ] orgRevisionDraft found: ${orgRevisionDraft ? orgRevisionDraft.id : "null"}`,
|
||||
);
|
||||
|
||||
// Validate: ต้องมี orgRevisionPublish เสมอสำหรับการเผยแพร่
|
||||
if (!orgRevisionPublish) {
|
||||
console.error('[AMQ] Cannot publish: No current org revision found (isDraft=false, isCurrent=true)');
|
||||
console.error(
|
||||
"[AMQ] Cannot publish: No current org revision found (isDraft=false, isCurrent=true)",
|
||||
);
|
||||
if (user) {
|
||||
sendWebSocket(
|
||||
"send-publish-org",
|
||||
|
|
@ -569,7 +579,9 @@ async function handler_org(msg: amqp.ConsumeMessage): Promise<boolean> {
|
|||
|
||||
// Validate: ต้องมี orgRevisionDraft ที่จะเผยแพร่
|
||||
if (!orgRevisionDraft) {
|
||||
console.error('[AMQ] Cannot publish: No draft org revision found (isDraft=true, isCurrent=false)');
|
||||
console.error(
|
||||
"[AMQ] Cannot publish: No draft org revision found (isDraft=true, isCurrent=false)",
|
||||
);
|
||||
if (user) {
|
||||
sendWebSocket(
|
||||
"send-publish-org",
|
||||
|
|
@ -586,7 +598,7 @@ async function handler_org(msg: amqp.ConsumeMessage): Promise<boolean> {
|
|||
// NOTE: ย้ายการอัปเดตสถานะไปไว้หลังจากทำงานเสร็จทั้งหมด
|
||||
// เพื่อป้องกันกรณี timeout/retry ทำให้สถานะเพี้ยน (ทุก row เป็น false,false)
|
||||
|
||||
console.time('[AMQ] query_posMaster');
|
||||
console.time("[AMQ] query_posMaster");
|
||||
const POS_MASTER_PAGE_SIZE = 2000;
|
||||
let totalPosMastersProcessed = 0;
|
||||
let hasMoreRecords = true;
|
||||
|
|
@ -607,7 +619,7 @@ async function handler_org(msg: amqp.ConsumeMessage): Promise<boolean> {
|
|||
"positions.posType",
|
||||
"positions.posExecutive",
|
||||
],
|
||||
order: { id: 'ASC' },
|
||||
order: { id: "ASC" },
|
||||
skip: skip,
|
||||
take: POS_MASTER_PAGE_SIZE,
|
||||
});
|
||||
|
|
@ -619,15 +631,15 @@ async function handler_org(msg: amqp.ConsumeMessage): Promise<boolean> {
|
|||
|
||||
console.log(`[AMQ] Loaded posMaster page: ${totalPosMastersProcessed} records`);
|
||||
}
|
||||
console.timeEnd('[AMQ] query_posMaster');
|
||||
console.timeEnd("[AMQ] query_posMaster");
|
||||
console.log(`[AMQ] posMaster count: ${posMaster.length}`);
|
||||
|
||||
console.time('[AMQ] query_old_data');
|
||||
console.time("[AMQ] query_old_data");
|
||||
const oldPosMasters = await repoPosmaster.find({
|
||||
where: {
|
||||
orgRevisionId: orgRevisionPublish.id,
|
||||
},
|
||||
select: ['id', 'current_holderId', 'ancestorDNA']
|
||||
select: ["id", "current_holderId", "ancestorDNA"],
|
||||
});
|
||||
|
||||
// Task #2160 ดึง posMasterAssign ของ revision เดิม
|
||||
|
|
@ -639,11 +651,11 @@ async function handler_org(msg: amqp.ConsumeMessage): Promise<boolean> {
|
|||
},
|
||||
},
|
||||
});
|
||||
console.timeEnd('[AMQ] query_old_data');
|
||||
console.timeEnd("[AMQ] query_old_data");
|
||||
console.log(`[AMQ] oldPosMasters count: ${oldPosMasters.length}`);
|
||||
console.log(`[AMQ] oldposMasterAssigns count: ${oldposMasterAssigns.length}`);
|
||||
|
||||
console.time('[AMQ] build_assignMap');
|
||||
console.time("[AMQ] build_assignMap");
|
||||
// สร้าง assignMap เอาไว้เก็บ posMasterAssign.ancestorDNA ของ revision เดิม
|
||||
const assignMap = new Map<string, PosMasterAssignDTO[]>();
|
||||
for (const posmasterAssign of oldposMasterAssigns) {
|
||||
|
|
@ -654,12 +666,12 @@ async function handler_org(msg: amqp.ConsumeMessage): Promise<boolean> {
|
|||
assignMap.get(dna)!.push({
|
||||
id: posmasterAssign.id,
|
||||
posMasterId: posmasterAssign.posMasterId,
|
||||
assignId: posmasterAssign.assignId
|
||||
assignId: posmasterAssign.assignId,
|
||||
});
|
||||
}
|
||||
console.timeEnd('[AMQ] build_assignMap');
|
||||
console.timeEnd("[AMQ] build_assignMap");
|
||||
|
||||
console.time('[AMQ] query_oldposMasterAct');
|
||||
console.time("[AMQ] query_oldposMasterAct");
|
||||
// ดึง posMasterAct ของ revision เดิม xxx
|
||||
const oldposMasterAct = await posMasterActRepository.find({
|
||||
relations: ["posMaster", "posMasterChild"],
|
||||
|
|
@ -669,16 +681,16 @@ async function handler_org(msg: amqp.ConsumeMessage): Promise<boolean> {
|
|||
},
|
||||
},
|
||||
});
|
||||
console.timeEnd('[AMQ] query_oldposMasterAct');
|
||||
console.timeEnd("[AMQ] query_oldposMasterAct");
|
||||
console.log(`[AMQ] oldposMasterAct count: ${oldposMasterAct.length}`);
|
||||
|
||||
type ActKey = string; // `${parentDNA}|${childDNA}`
|
||||
|
||||
console.time('[AMQ] build_maps');
|
||||
console.time("[AMQ] build_maps");
|
||||
const posMasterActMap = new Map<ActKey, PosMasterAct[]>();
|
||||
for (const act of oldposMasterAct) {
|
||||
const parentDNA = act.posMaster?.ancestorDNA?.trim() ?? '';
|
||||
const childDNA = act.posMasterChild?.ancestorDNA?.trim() ?? '';
|
||||
const parentDNA = act.posMaster?.ancestorDNA?.trim() ?? "";
|
||||
const childDNA = act.posMasterChild?.ancestorDNA?.trim() ?? "";
|
||||
const key = `${parentDNA}|${childDNA}`;
|
||||
|
||||
if (!posMasterActMap.has(key)) {
|
||||
|
|
@ -689,7 +701,7 @@ async function handler_org(msg: amqp.ConsumeMessage): Promise<boolean> {
|
|||
|
||||
const posMasterIdMap = new Map<string, string>();
|
||||
for (const pm of posMaster) {
|
||||
posMasterIdMap.set(pm.ancestorDNA?.trim() ?? '', pm.id);
|
||||
posMasterIdMap.set(pm.ancestorDNA?.trim() ?? "", pm.id);
|
||||
}
|
||||
|
||||
const oldPosMasterMap = new Map<string, PosMaster>();
|
||||
|
|
@ -699,25 +711,25 @@ async function handler_org(msg: amqp.ConsumeMessage): Promise<boolean> {
|
|||
oldPosMasterMap.set(dna, oldPm);
|
||||
}
|
||||
}
|
||||
console.timeEnd('[AMQ] build_maps');
|
||||
console.timeEnd("[AMQ] build_maps");
|
||||
|
||||
const _null: any = null;
|
||||
|
||||
// ===== BATCH PROCESSING: เตรียมข้อมูลก่อน loop =====
|
||||
console.time('[AMQ] prepare_batch_data');
|
||||
console.time("[AMQ] prepare_batch_data");
|
||||
// 1. รวบรวม profileIds ทั้งหมดที่ต้องอัพเดท
|
||||
const profileIds = posMaster
|
||||
.filter(item => item.next_holderId != null)
|
||||
.map(item => item.next_holderId!)
|
||||
.filter(id => id != null && id !== "");
|
||||
.filter((item) => item.next_holderId != null)
|
||||
.map((item) => item.next_holderId!)
|
||||
.filter((id) => id != null && id !== "");
|
||||
|
||||
// 2. Batch load profiles ทั้งหมดในครั้งเดียว (แก้ปัญหา N+1 Query)
|
||||
const profilesMap = new Map<string, Profile>();
|
||||
if (profileIds.length > 0) {
|
||||
const profiles = await repoProfile.findBy({
|
||||
id: In(profileIds)
|
||||
id: In(profileIds),
|
||||
});
|
||||
profiles.forEach(p => profilesMap.set(p.id, p));
|
||||
profiles.forEach((p) => profilesMap.set(p.id, p));
|
||||
}
|
||||
console.log(`[AMQ] profiles to update: ${profilesMap.size}`);
|
||||
|
||||
|
|
@ -745,7 +757,7 @@ async function handler_org(msg: amqp.ConsumeMessage): Promise<boolean> {
|
|||
lastUpdatedAt: lastUpdatedAt,
|
||||
lastUpdateFullName: lastUpdateFullName,
|
||||
lastUpdateUserId: lastUpdateUserId,
|
||||
})
|
||||
}),
|
||||
);
|
||||
posMasterAssignsToSave.push(...newAssigns);
|
||||
}
|
||||
|
|
@ -796,33 +808,35 @@ async function handler_org(msg: amqp.ConsumeMessage): Promise<boolean> {
|
|||
historyCreateIds.push(item.id);
|
||||
}
|
||||
}
|
||||
console.timeEnd('[AMQ] prepare_batch_data');
|
||||
console.log(`[AMQ] Prepared - posMasterAssignsToSave: ${posMasterAssignsToSave.length}, profilesToSave: ${profilesToSave.length}, posMasterUpdates: ${posMasterUpdates.length}, historyCreateIds: ${historyCreateIds.length}`);
|
||||
console.timeEnd("[AMQ] prepare_batch_data");
|
||||
console.log(
|
||||
`[AMQ] Prepared - posMasterAssignsToSave: ${posMasterAssignsToSave.length}, profilesToSave: ${profilesToSave.length}, posMasterUpdates: ${posMasterUpdates.length}, historyCreateIds: ${historyCreateIds.length}`,
|
||||
);
|
||||
|
||||
// ===== BATCH EXECUTION: save ทีละ batch =====
|
||||
|
||||
// 4. Batch save posMasterAssign (chunk 500)
|
||||
console.time('[AMQ] batch_save_posMasterAssign');
|
||||
console.time("[AMQ] batch_save_posMasterAssign");
|
||||
if (posMasterAssignsToSave.length > 0) {
|
||||
const chunks = chunkArray(posMasterAssignsToSave, 500);
|
||||
for (const chunk of chunks) {
|
||||
await posMasterAssignRepository.save(chunk);
|
||||
}
|
||||
}
|
||||
console.timeEnd('[AMQ] batch_save_posMasterAssign');
|
||||
console.timeEnd("[AMQ] batch_save_posMasterAssign");
|
||||
|
||||
// 5. Batch save profiles (chunk 200)
|
||||
console.time('[AMQ] batch_save_profiles');
|
||||
console.time("[AMQ] batch_save_profiles");
|
||||
if (profilesToSave.length > 0) {
|
||||
const chunks = chunkArray(profilesToSave, 200);
|
||||
for (const chunk of chunks) {
|
||||
await repoProfile.save(chunk);
|
||||
}
|
||||
}
|
||||
console.timeEnd('[AMQ] batch_save_profiles');
|
||||
console.timeEnd("[AMQ] batch_save_profiles");
|
||||
|
||||
// 6. Batch update posMasters
|
||||
console.time('[AMQ] batch_update_posMasters');
|
||||
console.time("[AMQ] batch_update_posMasters");
|
||||
|
||||
const posMasterUpdatesForBatch = posMasterUpdates.map((u: any) => ({
|
||||
id: u.id,
|
||||
|
|
@ -832,19 +846,16 @@ async function handler_org(msg: amqp.ConsumeMessage): Promise<boolean> {
|
|||
lastUpdatedAt,
|
||||
}));
|
||||
|
||||
await BatchUpdatePosMasters(
|
||||
AppDataSource.manager,
|
||||
posMasterUpdatesForBatch
|
||||
);
|
||||
await BatchUpdatePosMasters(AppDataSource.manager, posMasterUpdatesForBatch);
|
||||
|
||||
console.timeEnd('[AMQ] batch_update_posMasters');
|
||||
console.timeEnd("[AMQ] batch_update_posMasters");
|
||||
|
||||
// 7. Batch create history
|
||||
console.time('[AMQ] batch_create_history');
|
||||
console.time("[AMQ] batch_create_history");
|
||||
|
||||
const historyOperations: BatchHistoryOperation[] = [];
|
||||
for (const id of historyCreateIds) {
|
||||
const pm = posMaster.find(p => p.id === id);
|
||||
const pm = posMaster.find((p) => p.id === id);
|
||||
if (pm) {
|
||||
historyOperations.push({
|
||||
posMasterId: id,
|
||||
|
|
@ -856,18 +867,15 @@ async function handler_org(msg: amqp.ConsumeMessage): Promise<boolean> {
|
|||
}
|
||||
}
|
||||
|
||||
await BatchCreatePosMasterHistoryOfficer(
|
||||
AppDataSource.manager,
|
||||
historyOperations
|
||||
);
|
||||
await BatchCreatePosMasterHistoryOfficer(AppDataSource.manager, historyOperations);
|
||||
|
||||
console.timeEnd('[AMQ] batch_create_history');
|
||||
console.timeEnd("[AMQ] batch_create_history");
|
||||
|
||||
// Clone oldposMasterAct
|
||||
console.time('[AMQ] clone_oldposMasterAct');
|
||||
console.time("[AMQ] clone_oldposMasterAct");
|
||||
for (const act of oldposMasterAct) {
|
||||
const parentDNA = act.posMaster?.ancestorDNA?.trim()?.toLowerCase() ?? '';
|
||||
const childDNA = act.posMasterChild?.ancestorDNA?.trim()?.toLowerCase() ?? '';
|
||||
const parentDNA = act.posMaster?.ancestorDNA?.trim()?.toLowerCase() ?? "";
|
||||
const childDNA = act.posMasterChild?.ancestorDNA?.trim()?.toLowerCase() ?? "";
|
||||
|
||||
const newParentId = posMasterIdMap.get(parentDNA);
|
||||
const newChildId = posMasterIdMap.get(childDNA);
|
||||
|
|
@ -890,16 +898,16 @@ async function handler_org(msg: amqp.ConsumeMessage): Promise<boolean> {
|
|||
|
||||
await posMasterActRepository.save(newAct);
|
||||
}
|
||||
console.timeEnd('[AMQ] clone_oldposMasterAct');
|
||||
console.timeEnd("[AMQ] clone_oldposMasterAct");
|
||||
|
||||
if (orgRevisionPublish != null && orgRevisionDraft != null) {
|
||||
console.time('[AMQ] clone_org_structure');
|
||||
console.time("[AMQ] clone_org_structure");
|
||||
//new main revision
|
||||
const before = null;
|
||||
|
||||
//ทุก orgRoot และ orgChild ข้างล่างนี้จะเป็นตัวเก่าที่ไม่ได้เป็น current revision
|
||||
//cone tree
|
||||
console.time('[AMQ] query_old_org_structure');
|
||||
console.time("[AMQ] query_old_org_structure");
|
||||
//หา dna tree
|
||||
const orgRoot = await orgRootRepository.find({
|
||||
where: { orgRevisionId: orgRevisionPublish.id },
|
||||
|
|
@ -920,27 +928,27 @@ async function handler_org(msg: amqp.ConsumeMessage): Promise<boolean> {
|
|||
const orgChild4 = await child4Repository.find({
|
||||
where: { orgRevisionId: orgRevisionPublish.id },
|
||||
});
|
||||
console.timeEnd('[AMQ] query_old_org_structure');
|
||||
console.log(`[AMQ] Old structure - orgRoot: ${orgRoot.length}, orgChild1: ${orgChild1.length}, orgChild2: ${orgChild2.length}, orgChild3: ${orgChild3.length}, orgChild4: ${orgChild4.length}`);
|
||||
console.timeEnd("[AMQ] query_old_org_structure");
|
||||
console.log(
|
||||
`[AMQ] Old structure - orgRoot: ${orgRoot.length}, orgChild1: ${orgChild1.length}, orgChild2: ${orgChild2.length}, orgChild3: ${orgChild3.length}, orgChild4: ${orgChild4.length}`,
|
||||
);
|
||||
|
||||
// Task #2172 ดึง orgRoot ของ revision ใหม่
|
||||
const newRoots = await orgRootRepository.find({
|
||||
where: { orgRevisionId: orgRevisionDraft.id },
|
||||
});
|
||||
// สร้าง newRootMap เอาไว้เก็บ orgRoot.ancestorDNA ของ revision ใหม่
|
||||
const newRootMap = new Map(
|
||||
newRoots.map(r => [r.ancestorDNA, r.id])
|
||||
);
|
||||
const newRootMap = new Map(newRoots.map((r) => [r.ancestorDNA, r.id]));
|
||||
|
||||
console.time('[AMQ] clone_permissionProfiles');
|
||||
console.time("[AMQ] clone_permissionProfiles");
|
||||
// ดึง permissionProfiles ของ revision เดิม
|
||||
const oldPermissionProfiles = await permissionProfilesRepository.find({
|
||||
relations: ["orgRootTree"],
|
||||
where: {
|
||||
orgRootTree: {
|
||||
orgRevisionId: orgRevisionPublish.id,
|
||||
}
|
||||
}
|
||||
},
|
||||
},
|
||||
});
|
||||
const inserts: any[] = [];
|
||||
for (const permiss of oldPermissionProfiles) {
|
||||
|
|
@ -965,15 +973,15 @@ async function handler_org(msg: amqp.ConsumeMessage): Promise<boolean> {
|
|||
if (inserts.length > 0) {
|
||||
await permissionProfilesRepository.insert(inserts);
|
||||
}
|
||||
console.timeEnd('[AMQ] clone_permissionProfiles');
|
||||
console.timeEnd("[AMQ] clone_permissionProfiles");
|
||||
|
||||
//หา dna posmaster ถ้าไม่มีให้เอาตัวเองเป็น dna
|
||||
console.time('[AMQ] query_employeePosMaster');
|
||||
console.time("[AMQ] query_employeePosMaster");
|
||||
const orgemployeePosMaster = await repoEmployeePosmaster.find({
|
||||
where: { orgRevisionId: orgRevisionPublish.id },
|
||||
relations: ["positions"],
|
||||
});
|
||||
console.timeEnd('[AMQ] query_employeePosMaster');
|
||||
console.timeEnd("[AMQ] query_employeePosMaster");
|
||||
console.log(`[AMQ] orgemployeePosMaster count: ${orgemployeePosMaster.length}`);
|
||||
|
||||
let _orgemployeePosMaster: EmployeePosMaster[];
|
||||
|
|
@ -998,14 +1006,16 @@ async function handler_org(msg: amqp.ConsumeMessage): Promise<boolean> {
|
|||
_orgemployeePosMaster = orgemployeePosMaster.map((x) => ({
|
||||
...x,
|
||||
current_holderId:
|
||||
x.current_holderId && validProfileIds.has(x.current_holderId) ? x.current_holderId : null,
|
||||
x.current_holderId && validProfileIds.has(x.current_holderId)
|
||||
? x.current_holderId
|
||||
: null,
|
||||
ancestorDNA:
|
||||
!x.ancestorDNA || x.ancestorDNA === "00000000-0000-0000-0000-000000000000"
|
||||
? x.id
|
||||
: x.ancestorDNA,
|
||||
}));
|
||||
|
||||
console.time('[AMQ] insert_employeePosMaster');
|
||||
console.time("[AMQ] insert_employeePosMaster");
|
||||
await repoEmployeePosmaster
|
||||
.createQueryBuilder()
|
||||
.insert()
|
||||
|
|
@ -1016,16 +1026,16 @@ async function handler_org(msg: amqp.ConsumeMessage): Promise<boolean> {
|
|||
overwrite: ["ancestorDNA"],
|
||||
})
|
||||
.execute();
|
||||
console.timeEnd('[AMQ] insert_employeePosMaster');
|
||||
console.timeEnd("[AMQ] insert_employeePosMaster");
|
||||
|
||||
// }
|
||||
//หา dna posmaster ถ้าไม่มีให้เอาตัวเองเป็น dna
|
||||
console.time('[AMQ] query_employeeTempPosMaster');
|
||||
console.time("[AMQ] query_employeeTempPosMaster");
|
||||
const orgemployeeTempPosMaster = await repoEmployeeTempPosmaster.find({
|
||||
where: { orgRevisionId: orgRevisionPublish.id },
|
||||
relations: ["positions"],
|
||||
});
|
||||
console.timeEnd('[AMQ] query_employeeTempPosMaster');
|
||||
console.timeEnd("[AMQ] query_employeeTempPosMaster");
|
||||
console.log(`[AMQ] orgemployeeTempPosMaster count: ${orgemployeeTempPosMaster.length}`);
|
||||
|
||||
let _orgemployeeTempPosMaster: EmployeeTempPosMaster[];
|
||||
|
|
@ -1056,7 +1066,7 @@ async function handler_org(msg: amqp.ConsumeMessage): Promise<boolean> {
|
|||
// }
|
||||
|
||||
//create org - forEach orgRoot (WARNING: async forEach without await)
|
||||
console.time('[AMQ] forEach_orgRoot');
|
||||
console.time("[AMQ] forEach_orgRoot");
|
||||
console.log(`[AMQ] Starting forEach orgRoot loop (${orgRoot.length} items)`);
|
||||
let processedOrgRoot = 0;
|
||||
orgRoot.forEach(async (x: any) => {
|
||||
|
|
@ -1087,12 +1097,12 @@ async function handler_org(msg: amqp.ConsumeMessage): Promise<boolean> {
|
|||
// requestBody.typeDraft.toUpperCase() == "ORG_POSITION_PERSON_ROLE"
|
||||
// ) {
|
||||
//create employeePosmaster
|
||||
const filteredEmployeePosMaster = _orgemployeePosMaster
|
||||
.filter((x: EmployeePosMaster) => x.orgRootId == dataId && x.orgChild1Id == null);
|
||||
const filteredEmployeePosMaster = _orgemployeePosMaster.filter(
|
||||
(x: EmployeePosMaster) => x.orgRootId == dataId && x.orgChild1Id == null,
|
||||
);
|
||||
|
||||
await Promise.all(
|
||||
filteredEmployeePosMaster
|
||||
.map(async (item: any) => {
|
||||
filteredEmployeePosMaster.map(async (item: any) => {
|
||||
delete item.id;
|
||||
const employeePosMaster = Object.assign(new EmployeePosMaster(), item);
|
||||
employeePosMaster.positions = [];
|
||||
|
|
@ -1145,7 +1155,6 @@ async function handler_org(msg: amqp.ConsumeMessage): Promise<boolean> {
|
|||
employeePosition.lastUpdatedAt = new Date();
|
||||
await employeePositionRepository.save(employeePosition);
|
||||
}
|
||||
|
||||
}),
|
||||
);
|
||||
//create employeeTempPosmaster
|
||||
|
|
@ -1205,7 +1214,6 @@ async function handler_org(msg: amqp.ConsumeMessage): Promise<boolean> {
|
|||
employeePosition.lastUpdatedAt = new Date();
|
||||
await employeePositionRepository.save(employeePosition);
|
||||
}
|
||||
|
||||
}),
|
||||
);
|
||||
// }
|
||||
|
|
@ -1226,7 +1234,8 @@ async function handler_org(msg: amqp.ConsumeMessage): Promise<boolean> {
|
|||
x.ancestorDNA === "00000000-0000-0000-0000-000000000000"
|
||||
) {
|
||||
return (
|
||||
i.ancestorDNA === null || i.ancestorDNA === "00000000-0000-0000-0000-000000000000"
|
||||
i.ancestorDNA === null ||
|
||||
i.ancestorDNA === "00000000-0000-0000-0000-000000000000"
|
||||
);
|
||||
}
|
||||
return i.ancestorDNA === x.ancestorDNA;
|
||||
|
|
@ -1241,7 +1250,9 @@ async function handler_org(msg: amqp.ConsumeMessage): Promise<boolean> {
|
|||
//create employeePosmaster
|
||||
await Promise.all(
|
||||
_orgemployeePosMaster
|
||||
.filter((x: EmployeePosMaster) => x.orgChild1Id == data1Id && x.orgChild2Id == null)
|
||||
.filter(
|
||||
(x: EmployeePosMaster) => x.orgChild1Id == data1Id && x.orgChild2Id == null,
|
||||
)
|
||||
.map(async (item: any) => {
|
||||
delete item.id;
|
||||
// console.log("[in case Child1] orgChild1Id == data1Id");
|
||||
|
|
@ -1297,7 +1308,6 @@ async function handler_org(msg: amqp.ConsumeMessage): Promise<boolean> {
|
|||
employeePosition.lastUpdatedAt = new Date();
|
||||
await employeePositionRepository.save(employeePosition);
|
||||
}
|
||||
|
||||
}),
|
||||
);
|
||||
// create employeeTempPosmaster
|
||||
|
|
@ -1360,7 +1370,6 @@ async function handler_org(msg: amqp.ConsumeMessage): Promise<boolean> {
|
|||
employeePosition.lastUpdatedAt = new Date();
|
||||
await employeePositionRepository.save(employeePosition);
|
||||
}
|
||||
|
||||
}),
|
||||
);
|
||||
// }
|
||||
|
|
@ -1456,7 +1465,6 @@ async function handler_org(msg: amqp.ConsumeMessage): Promise<boolean> {
|
|||
employeePosition.lastUpdatedAt = new Date();
|
||||
await employeePositionRepository.save(employeePosition);
|
||||
}
|
||||
|
||||
}),
|
||||
);
|
||||
// create employeeTempPosmaster
|
||||
|
|
@ -1524,7 +1532,6 @@ async function handler_org(msg: amqp.ConsumeMessage): Promise<boolean> {
|
|||
employeePosition.lastUpdatedAt = new Date();
|
||||
await employeePositionRepository.save(employeePosition);
|
||||
}
|
||||
|
||||
}),
|
||||
);
|
||||
// }
|
||||
|
|
@ -1622,7 +1629,6 @@ async function handler_org(msg: amqp.ConsumeMessage): Promise<boolean> {
|
|||
employeePosition.lastUpdatedAt = new Date();
|
||||
await employeePositionRepository.save(employeePosition);
|
||||
}
|
||||
|
||||
}),
|
||||
);
|
||||
// create employeeTempPosmaster
|
||||
|
|
@ -1691,7 +1697,6 @@ async function handler_org(msg: amqp.ConsumeMessage): Promise<boolean> {
|
|||
employeePosition.lastUpdatedAt = new Date();
|
||||
await employeePositionRepository.save(employeePosition);
|
||||
}
|
||||
|
||||
}),
|
||||
);
|
||||
// }
|
||||
|
|
@ -1790,7 +1795,6 @@ async function handler_org(msg: amqp.ConsumeMessage): Promise<boolean> {
|
|||
employeePosition.lastUpdatedAt = new Date();
|
||||
await employeePositionRepository.save(employeePosition);
|
||||
}
|
||||
|
||||
}),
|
||||
);
|
||||
//create employeeTempPosmaster
|
||||
|
|
@ -1857,7 +1861,6 @@ async function handler_org(msg: amqp.ConsumeMessage): Promise<boolean> {
|
|||
employeePosition.lastUpdatedAt = new Date();
|
||||
await employeePositionRepository.save(employeePosition);
|
||||
}
|
||||
|
||||
}),
|
||||
);
|
||||
// }
|
||||
|
|
@ -1930,10 +1933,10 @@ async function handler_org(msg: amqp.ConsumeMessage): Promise<boolean> {
|
|||
{ userId: user?.sub },
|
||||
).catch(console.error);
|
||||
}
|
||||
console.timeEnd('[AMQ] clone_org_structure');
|
||||
console.timeEnd("[AMQ] clone_org_structure");
|
||||
|
||||
// อัปเดตสถานะ orgRevision หลังจากทำงานเสร็จทั้งหมด
|
||||
console.time('[AMQ] save_revision_status');
|
||||
console.time("[AMQ] save_revision_status");
|
||||
orgRevisionPublish.orgRevisionIsDraft = false;
|
||||
orgRevisionPublish.orgRevisionIsCurrent = false;
|
||||
await repoOrgRevision.save(orgRevisionPublish);
|
||||
|
|
@ -1941,17 +1944,17 @@ async function handler_org(msg: amqp.ConsumeMessage): Promise<boolean> {
|
|||
orgRevisionDraft.orgRevisionIsCurrent = true;
|
||||
orgRevisionDraft.orgRevisionIsDraft = false;
|
||||
await repoOrgRevision.save(orgRevisionDraft);
|
||||
console.timeEnd('[AMQ] save_revision_status');
|
||||
console.timeEnd("[AMQ] save_revision_status");
|
||||
|
||||
console.log(`[AMQ] handler_org SUCCESS - Total time: ${Date.now() - startTime}ms`);
|
||||
console.timeEnd('[AMQ] handler_org_total');
|
||||
console.timeEnd("[AMQ] handler_org_total");
|
||||
return true;
|
||||
}); // ✅ END TRANSACTION - All operations succeeded, data is committed
|
||||
} catch (error) {
|
||||
// ✅ TRANSACTION AUTOMATICALLY ROLLED BACK - No data was saved
|
||||
const totalTime = Date.now() - startTime;
|
||||
console.error(`[AMQ] handler_org ERROR after ${totalTime}ms:`, error);
|
||||
console.error('[AMQ] Transaction rolled back - all changes were undone');
|
||||
console.error("[AMQ] Transaction rolled back - all changes were undone");
|
||||
if (user) {
|
||||
sendWebSocket(
|
||||
"send-publish-org",
|
||||
|
|
@ -1962,7 +1965,7 @@ async function handler_org(msg: amqp.ConsumeMessage): Promise<boolean> {
|
|||
{ userId: user?.sub },
|
||||
).catch(console.error);
|
||||
}
|
||||
console.timeEnd('[AMQ] handler_org_total');
|
||||
console.timeEnd("[AMQ] handler_org_total");
|
||||
throw error; // ✅ Re-throw to be caught by createConsumer's try-catch
|
||||
}
|
||||
}
|
||||
|
|
@ -2637,7 +2640,8 @@ async function handler_org_draft(msg: amqp.ConsumeMessage): Promise<boolean> {
|
|||
});
|
||||
await posMasterAssignRepository.delete({ posMasterId: In(_posMasters.map((x) => x.id)) });
|
||||
await posMasterActRepository.delete({ posMasterId: In(_posMasters.map((x) => x.id)) }); //ใช้ posMasterId ของ revision: draft *แต่ยังไม่เจอช็อดไหนที่ใช้โครงสร้างแบบร่างในรักษาการแทน
|
||||
await posMasterActRepository.delete({ //ใช้ posMasterId ของ revision: draft *แต่ยังไม่เจอช็อดไหนที่ใช้โครงสร้างแบบร่างในรักษาการแทน
|
||||
await posMasterActRepository.delete({
|
||||
//ใช้ posMasterId ของ revision: draft *แต่ยังไม่เจอช็อดไหนที่ใช้โครงสร้างแบบร่างในรักษาการแทน
|
||||
posMasterChildId: In(_posMasters.map((x) => x.id)),
|
||||
});
|
||||
// await posMasterRepository.remove(_posMasters);
|
||||
|
|
@ -2665,24 +2669,26 @@ async function handler_org_draft(msg: amqp.ConsumeMessage): Promise<boolean> {
|
|||
await child2Repository.delete({ orgRevisionId: In(_orgRevisions.map((x) => x.id)) });
|
||||
await child1Repository.delete({ orgRevisionId: In(_orgRevisions.map((x) => x.id)) });
|
||||
// Task #2160 อัพเดทหน้าที่จัดการโครงสร้างแบบร่าง
|
||||
if (["ORG", "ORG_POSITION", "ORG_POSITION_PERSON", "ORG_POSITION_ROLE", "ORG_POSITION_PERSON_ROLE"].includes(requestBody.typeDraft?.toUpperCase())) {
|
||||
if (
|
||||
[
|
||||
"ORG",
|
||||
"ORG_POSITION",
|
||||
"ORG_POSITION_PERSON",
|
||||
"ORG_POSITION_ROLE",
|
||||
"ORG_POSITION_PERSON_ROLE",
|
||||
].includes(requestBody.typeDraft?.toUpperCase())
|
||||
) {
|
||||
const _newRoots = await orgRootRepository.find({
|
||||
where: { orgRevisionId: revision.id }
|
||||
where: { orgRevisionId: revision.id },
|
||||
});
|
||||
const newRootMap = new Map(
|
||||
_newRoots.map(r => [r.ancestorDNA, r.id])
|
||||
);
|
||||
const newRootMap = new Map(_newRoots.map((r) => [r.ancestorDNA, r.id]));
|
||||
for (const oldRoot of _roots) {
|
||||
const newRootId = newRootMap.get(oldRoot.ancestorDNA);
|
||||
if (!newRootId) continue;
|
||||
// อัพเดท orgRootId ที่อยู่ภายใต้ orgRevision แบบร่างเดิมเป็นของ orgRevision แบบร่างใหม่
|
||||
await permissionOrgRepository.update(
|
||||
{ orgRootId: oldRoot.id },
|
||||
{ orgRootId: newRootId }
|
||||
);
|
||||
await permissionOrgRepository.update({ orgRootId: oldRoot.id }, { orgRootId: newRootId });
|
||||
}
|
||||
}
|
||||
else {
|
||||
} else {
|
||||
await permissionOrgRepository.delete({
|
||||
orgRootId: In(_roots.map((x) => x.id)),
|
||||
});
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue