diff --git a/src/controllers/OrganizationController.ts b/src/controllers/OrganizationController.ts index ee8f3413..98077f8f 100644 --- a/src/controllers/OrganizationController.ts +++ b/src/controllers/OrganizationController.ts @@ -2532,11 +2532,16 @@ export class OrganizationController extends Controller { * Cronjob */ async cronjobRevision() { + console.log('[CronJob] cronjobRevision START'); + const startTime = Date.now(); + const today = new Date(); today.setUTCHours(0, 0, 0, 0); // Set time to the beginning of the day const tomorrow = new Date(today); tomorrow.setDate(tomorrow.getDate() + 1); + console.log(`[CronJob] Searching for draft revision with publishDate between ${today.toISOString()} and ${tomorrow.toISOString()}`); + const orgRevisionDraft = await this.orgRevisionRepository .createQueryBuilder("orgRevision") .where("orgRevision.orgRevisionIsDraft = true") @@ -2545,8 +2550,12 @@ export class OrganizationController extends Controller { .getOne(); if (!orgRevisionDraft) { + console.log('[CronJob] No draft revision found to publish'); return new HttpSuccess(); } + + console.log(`[CronJob] Found draft revision: ${orgRevisionDraft.id}, name: ${orgRevisionDraft.orgRevisionName}, publishDate: ${orgRevisionDraft.orgPublishDate}`); + // if (orgRevisionPublish) { // orgRevisionPublish.orgRevisionIsDraft = false; // orgRevisionPublish.orgRevisionIsCurrent = false; @@ -2575,7 +2584,10 @@ export class OrganizationController extends Controller { lastUpdatedAt: new Date(), }, }; + + console.log(`[CronJob] Sending to RabbitMQ queue - revisionId: ${orgRevisionDraft.id}`); sendToQueueOrg(msg); + console.log(`[CronJob] Sent to queue successfully - Total time: ${Date.now() - startTime}ms`); return new HttpSuccess(); } diff --git a/src/services/rabbitmq.ts b/src/services/rabbitmq.ts index fe70b580..31a4269d 100644 --- a/src/services/rabbitmq.ts +++ b/src/services/rabbitmq.ts @@ -497,6 +497,10 @@ async function handler_command_noti(msg: amqp.ConsumeMessage): Promise async function handler_org(msg: amqp.ConsumeMessage): Promise { //----> condition before process consume + console.time('[AMQ] handler_org_total'); + const startTime = Date.now(); + console.log(`[AMQ] handler_org START at ${new Date(startTime).toISOString()}`); + const repoPosmaster = AppDataSource.getRepository(PosMaster); const posMasterAssignRepository = AppDataSource.getRepository(PosMasterAssign); const posMasterActRepository = AppDataSource.getRepository(PosMasterAct); @@ -514,6 +518,8 @@ async function handler_org(msg: amqp.ConsumeMessage): Promise { const child4Repository = AppDataSource.getRepository(OrgChild4); const { data, token, user } = JSON.parse(msg.content.toString()); const { id, status, lastUpdateUserId, lastUpdateFullName, lastUpdatedAt } = data; + console.log(`[AMQ] Received message - revisionId: ${id}, status: ${status}`); + if (user) { sendWebSocket( "send-publish-org", @@ -524,6 +530,8 @@ async function handler_org(msg: amqp.ConsumeMessage): Promise { { userId: user?.sub }, ).catch(console.error); } + + console.time('[AMQ] query_revisions'); const orgRevisionPublish = await repoOrgRevision .createQueryBuilder("orgRevision") .where("orgRevision.orgRevisionIsDraft = false") @@ -535,19 +543,47 @@ async function handler_org(msg: amqp.ConsumeMessage): Promise { .where("orgRevision.orgRevisionIsDraft = true") .andWhere("orgRevision.orgRevisionIsCurrent = false") .getOne(); - if (orgRevisionPublish) { - //เข้าเงื่อนไขจะเปลี่ยนสถานะ orgRevisionPublish เป็นไม่ใช่ current และไม่เป็น daft - orgRevisionPublish.orgRevisionIsDraft = false; - orgRevisionPublish.orgRevisionIsCurrent = false; - await repoOrgRevision.save(orgRevisionPublish); + console.timeEnd('[AMQ] query_revisions'); + console.log(`[AMQ] orgRevisionPublish found: ${orgRevisionPublish ? orgRevisionPublish.id : 'null'}`); + console.log(`[AMQ] orgRevisionDraft found: ${orgRevisionDraft ? orgRevisionDraft.id : 'null'}`); + + // Validate: ต้องมี orgRevisionPublish เสมอสำหรับการเผยแพร่ + if (!orgRevisionPublish) { + console.error('[AMQ] Cannot publish: No current org revision found (isDraft=false, isCurrent=true)'); + if (user) { + sendWebSocket( + "send-publish-org", + { + success: false, + message: `ไม่พบข้อมูลโครงสร้างหน่วยงานปัจจุบัน ไม่สามารถเผยแพร่ได้`, + }, + { userId: user?.sub }, + ).catch(console.error); + } + return false; } - if (orgRevisionDraft) { - //เข้าเงื่อนไขจะเปลี่ยนสถานะ orgRevisionDraft เป็นไม่ใช่ daft และเป็น current - orgRevisionDraft.orgRevisionIsCurrent = true; - orgRevisionDraft.orgRevisionIsDraft = false; - await repoOrgRevision.save(orgRevisionDraft); + + // Validate: ต้องมี orgRevisionDraft ที่จะเผยแพร่ + if (!orgRevisionDraft) { + console.error('[AMQ] Cannot publish: No draft org revision found (isDraft=true, isCurrent=false)'); + if (user) { + sendWebSocket( + "send-publish-org", + { + success: false, + message: `ไม่พบข้อมูลโครงสร้างหน่วยงานแบบร่าง ไม่สามารถเผยแพร่ได้`, + }, + { userId: user?.sub }, + ).catch(console.error); + } + return false; } + + // NOTE: ย้ายการอัปเดตสถานะไปไว้หลังจากทำงานเสร็จทั้งหมด + // เพื่อป้องกันกรณี timeout/retry ทำให้สถานะเพี้ยน (ทุก row เป็น false,false) + try { + console.time('[AMQ] query_posMaster'); const posMaster = await repoPosmaster.find({ where: { orgRevisionId: id }, relations: [ @@ -562,23 +598,31 @@ async function handler_org(msg: amqp.ConsumeMessage): Promise { "positions.posExecutive", ], }); + console.timeEnd('[AMQ] query_posMaster'); + console.log(`[AMQ] posMaster count: ${posMaster.length}`); + console.time('[AMQ] query_old_data'); const oldPosMasters = await repoPosmaster.find({ where: { - orgRevisionId: orgRevisionPublish!.id, + orgRevisionId: orgRevisionPublish.id, }, select: ['id', 'current_holderId', 'ancestorDNA'] }); - // Task #2160 ดึง posMasterAssign ของ revision เดิม + // Task #2160 ดึง posMasterAssign ของ revision เดิม const oldposMasterAssigns = await posMasterAssignRepository.find({ relations: ["posMaster"], where: { posMaster: { - orgRevisionId: orgRevisionPublish!.id, + orgRevisionId: orgRevisionPublish.id, }, }, }); + console.timeEnd('[AMQ] query_old_data'); + console.log(`[AMQ] oldPosMasters count: ${oldPosMasters.length}`); + console.log(`[AMQ] oldposMasterAssigns count: ${oldposMasterAssigns.length}`); + + console.time('[AMQ] build_assignMap'); // สร้าง assignMap เอาไว้เก็บ posMasterAssign.ancestorDNA ของ revision เดิม const assignMap = new Map(); for (const posmasterAssign of oldposMasterAssigns) { @@ -592,19 +636,24 @@ async function handler_org(msg: amqp.ConsumeMessage): Promise { assignId: posmasterAssign.assignId }); } + console.timeEnd('[AMQ] build_assignMap'); + console.time('[AMQ] query_oldposMasterAct'); // ดึง posMasterAct ของ revision เดิม xxx const oldposMasterAct = await posMasterActRepository.find({ relations: ["posMaster", "posMasterChild"], where: { posMaster: { - orgRevisionId: orgRevisionPublish!.id, + orgRevisionId: orgRevisionPublish.id, }, }, }); + console.timeEnd('[AMQ] query_oldposMasterAct'); + console.log(`[AMQ] oldposMasterAct count: ${oldposMasterAct.length}`); type ActKey = string; // `${parentDNA}|${childDNA}` + console.time('[AMQ] build_maps'); const posMasterActMap = new Map(); for (const act of oldposMasterAct) { const parentDNA = act.posMaster?.ancestorDNA?.trim() ?? ''; @@ -629,10 +678,12 @@ async function handler_org(msg: amqp.ConsumeMessage): Promise { oldPosMasterMap.set(dna, oldPm); } } + console.timeEnd('[AMQ] build_maps'); const _null: any = null; // ===== BATCH PROCESSING: เตรียมข้อมูลก่อน loop ===== + console.time('[AMQ] prepare_batch_data'); // 1. รวบรวม profileIds ทั้งหมดที่ต้องอัพเดท const profileIds = posMaster .filter(item => item.next_holderId != null) @@ -647,6 +698,7 @@ async function handler_org(msg: amqp.ConsumeMessage): Promise { }); profiles.forEach(p => profilesMap.set(p.id, p)); } + console.log(`[AMQ] profiles to update: ${profilesMap.size}`); // 3. เตรียม arrays สำหรับ batch operations const profilesToSave: Profile[] = []; @@ -723,26 +775,33 @@ async function handler_org(msg: amqp.ConsumeMessage): Promise { historyCreateIds.push(item.id); } } + console.timeEnd('[AMQ] prepare_batch_data'); + console.log(`[AMQ] Prepared - posMasterAssignsToSave: ${posMasterAssignsToSave.length}, profilesToSave: ${profilesToSave.length}, posMasterUpdates: ${posMasterUpdates.length}, historyCreateIds: ${historyCreateIds.length}`); // ===== BATCH EXECUTION: save ทีละ batch ===== // 4. Batch save posMasterAssign (chunk 500) + console.time('[AMQ] batch_save_posMasterAssign'); if (posMasterAssignsToSave.length > 0) { const chunks = chunkArray(posMasterAssignsToSave, 500); for (const chunk of chunks) { await posMasterAssignRepository.save(chunk); } } + console.timeEnd('[AMQ] batch_save_posMasterAssign'); // 5. Batch save profiles (chunk 200) + console.time('[AMQ] batch_save_profiles'); if (profilesToSave.length > 0) { const chunks = chunkArray(profilesToSave, 200); for (const chunk of chunks) { await repoProfile.save(chunk); } } + console.timeEnd('[AMQ] batch_save_profiles'); // 6. Batch update posMasters + console.time('[AMQ] batch_update_posMasters'); for (const update of posMasterUpdates) { await repoPosmaster.update(update.id, { current_holderId: update.current_holderId, @@ -752,12 +811,17 @@ async function handler_org(msg: amqp.ConsumeMessage): Promise { lastUpdatedAt, }); } + console.timeEnd('[AMQ] batch_update_posMasters'); // 7. Batch create history + console.time('[AMQ] batch_create_history'); for (const id of historyCreateIds) { await CreatePosMasterHistoryOfficer(id, null); } + console.timeEnd('[AMQ] batch_create_history'); + // Clone oldposMasterAct + console.time('[AMQ] clone_oldposMasterAct'); for (const act of oldposMasterAct) { const parentDNA = act.posMaster?.ancestorDNA?.trim()?.toLowerCase() ?? ''; const childDNA = act.posMasterChild?.ancestorDNA?.trim()?.toLowerCase() ?? ''; @@ -783,20 +847,16 @@ async function handler_org(msg: amqp.ConsumeMessage): Promise { await posMasterActRepository.save(newAct); } + console.timeEnd('[AMQ] clone_oldposMasterAct'); if (orgRevisionPublish != null && orgRevisionDraft != null) { + console.time('[AMQ] clone_org_structure'); //new main revision const before = null; //ทุก orgRoot และ orgChild ข้างล่างนี้จะเป็นตัวเก่าที่ไม่ได้เป็น current revision //cone tree - // if ( - // orgRevisionPublish.typeDraft.toUpperCase() == "ORG" || - // orgRevisionPublish.typeDraft.toUpperCase() == "ORG_POSITION" || - // orgRevisionPublish.typeDraft.toUpperCase() == "ORG_POSITION_PERSON" || - // orgRevisionPublish.typeDraft.toUpperCase() == "ORG_POSITION_ROLE" || - // orgRevisionPublish.typeDraft.toUpperCase() == "ORG_POSITION_PERSON_ROLE" - // ) { + console.time('[AMQ] query_old_org_structure'); //หา dna tree const orgRoot = await orgRootRepository.find({ where: { orgRevisionId: orgRevisionPublish.id }, @@ -817,6 +877,9 @@ async function handler_org(msg: amqp.ConsumeMessage): Promise { const orgChild4 = await child4Repository.find({ where: { orgRevisionId: orgRevisionPublish.id }, }); + console.timeEnd('[AMQ] query_old_org_structure'); + console.log(`[AMQ] Old structure - orgRoot: ${orgRoot.length}, orgChild1: ${orgChild1.length}, orgChild2: ${orgChild2.length}, orgChild3: ${orgChild3.length}, orgChild4: ${orgChild4.length}`); + // Task #2172 ดึง orgRoot ของ revision ใหม่ const newRoots = await orgRootRepository.find({ where: { orgRevisionId: orgRevisionDraft.id }, @@ -825,6 +888,8 @@ async function handler_org(msg: amqp.ConsumeMessage): Promise { const newRootMap = new Map( newRoots.map(r => [r.ancestorDNA, r.id]) ); + + console.time('[AMQ] clone_permissionProfiles'); // ดึง permissionProfiles ของ revision เดิม const oldPermissionProfiles = await permissionProfilesRepository.find({ relations: ["orgRootTree"], @@ -857,12 +922,16 @@ async function handler_org(msg: amqp.ConsumeMessage): Promise { if (inserts.length > 0) { await permissionProfilesRepository.insert(inserts); } + console.timeEnd('[AMQ] clone_permissionProfiles'); //หา dna posmaster ถ้าไม่มีให้เอาตัวเองเป็น dna + console.time('[AMQ] query_employeePosMaster'); const orgemployeePosMaster = await repoEmployeePosmaster.find({ where: { orgRevisionId: orgRevisionPublish.id }, relations: ["positions"], }); + console.timeEnd('[AMQ] query_employeePosMaster'); + console.log(`[AMQ] orgemployeePosMaster count: ${orgemployeePosMaster.length}`); let _orgemployeePosMaster: EmployeePosMaster[]; // if ( @@ -892,6 +961,8 @@ async function handler_org(msg: amqp.ConsumeMessage): Promise { ? x.id : x.ancestorDNA, })); + + console.time('[AMQ] insert_employeePosMaster'); await repoEmployeePosmaster .createQueryBuilder() .insert() @@ -902,13 +973,17 @@ async function handler_org(msg: amqp.ConsumeMessage): Promise { overwrite: ["ancestorDNA"], }) .execute(); + console.timeEnd('[AMQ] insert_employeePosMaster'); // } //หา dna posmaster ถ้าไม่มีให้เอาตัวเองเป็น dna + console.time('[AMQ] query_employeeTempPosMaster'); const orgemployeeTempPosMaster = await repoEmployeeTempPosmaster.find({ where: { orgRevisionId: orgRevisionPublish.id }, relations: ["positions"], }); + console.timeEnd('[AMQ] query_employeeTempPosMaster'); + console.log(`[AMQ] orgemployeeTempPosMaster count: ${orgemployeeTempPosMaster.length}`); let _orgemployeeTempPosMaster: EmployeeTempPosMaster[]; // if ( @@ -937,8 +1012,12 @@ async function handler_org(msg: amqp.ConsumeMessage): Promise { .execute(); // } - //create org + //create org - forEach orgRoot (WARNING: async forEach without await) + console.time('[AMQ] forEach_orgRoot'); + console.log(`[AMQ] Starting forEach orgRoot loop (${orgRoot.length} items)`); + let processedOrgRoot = 0; orgRoot.forEach(async (x: any) => { + const itemStartTime = Date.now(); var dataId = x.id; const orgRootCurrent = await orgRootRepository.find({ @@ -965,9 +1044,11 @@ async function handler_org(msg: amqp.ConsumeMessage): Promise { // requestBody.typeDraft.toUpperCase() == "ORG_POSITION_PERSON_ROLE" // ) { //create employeePosmaster + const filteredEmployeePosMaster = _orgemployeePosMaster + .filter((x: EmployeePosMaster) => x.orgRootId == dataId && x.orgChild1Id == null); + await Promise.all( - _orgemployeePosMaster - .filter((x: EmployeePosMaster) => x.orgRootId == dataId && x.orgChild1Id == null) + filteredEmployeePosMaster .map(async (item: any) => { delete item.id; const employeePosMaster = Object.assign(new EmployeePosMaster(), item); @@ -1806,9 +1887,25 @@ async function handler_org(msg: amqp.ConsumeMessage): Promise { { userId: user?.sub }, ).catch(console.error); } + console.timeEnd('[AMQ] clone_org_structure'); + + // อัปเดตสถานะ orgRevision หลังจากทำงานเสร็จทั้งหมด + console.time('[AMQ] save_revision_status'); + orgRevisionPublish.orgRevisionIsDraft = false; + orgRevisionPublish.orgRevisionIsCurrent = false; + await repoOrgRevision.save(orgRevisionPublish); + + orgRevisionDraft.orgRevisionIsCurrent = true; + orgRevisionDraft.orgRevisionIsDraft = false; + await repoOrgRevision.save(orgRevisionDraft); + console.timeEnd('[AMQ] save_revision_status'); + + console.log(`[AMQ] handler_org SUCCESS - Total time: ${Date.now() - startTime}ms`); + console.timeEnd('[AMQ] handler_org_total'); return true; } catch (error) { - console.error(error); + const totalTime = Date.now() - startTime; + console.error(`[AMQ] handler_org ERROR after ${totalTime}ms:`, error); if (user) { sendWebSocket( "send-publish-org", @@ -1819,6 +1916,7 @@ async function handler_org(msg: amqp.ConsumeMessage): Promise { { userId: user?.sub }, ).catch(console.error); } + console.timeEnd('[AMQ] handler_org_total'); return false; } }