diff --git a/src/services/rabbitmq.ts b/src/services/rabbitmq.ts index 990cd5fb..c00b8150 100644 --- a/src/services/rabbitmq.ts +++ b/src/services/rabbitmq.ts @@ -1,5 +1,6 @@ import { randomUUID } from "crypto"; import amqp from "amqplib"; +import { promisify } from "util"; import { AppDataSource } from "../database/data-source"; import { Command } from "../entities/Command"; import { chunkArray, commandTypePath } from "../interfaces/utils"; @@ -29,6 +30,10 @@ import { PayloadSendNoti } from "../interfaces/utils"; import { PermissionProfile } from "../entities/PermissionProfile"; import { PosMasterHistory } from "../entities/PosMasterHistory"; +const redis = require("redis"); +const REDIS_HOST = process.env.REDIS_HOST; +const REDIS_PORT = process.env.REDIS_PORT; + let reconnectTimer: ReturnType | null = null; function scheduleReconnect() { @@ -143,7 +148,9 @@ function createConsumer( //----> consumer console.log("[AMQ] Process Consumer success"); return channel.ack(msg); } - console.error(`[AMQ] Process Consumer failed on queue ${queue}, acknowledging without retry`); + console.error( + `[AMQ] Process Consumer failed on queue ${queue}, acknowledging without retry`, + ); return channel.ack(msg); } catch (error) { console.error(`[AMQ] Consumer processing error on queue ${queue}:`, error); @@ -547,19 +554,19 @@ async function handler_org(msg: amqp.ConsumeMessage): Promise { const repoPosmaster = AppDataSource.getRepository(PosMaster); const posMasterAssignRepository = AppDataSource.getRepository(PosMasterAssign); const posMasterActRepository = AppDataSource.getRepository(PosMasterAct); - const permissionProfilesRepository = AppDataSource.getRepository(PermissionProfile); - const repoEmployeePosmaster = AppDataSource.getRepository(EmployeePosMaster); - const repoEmployeeTempPosmaster = AppDataSource.getRepository(EmployeeTempPosMaster); + // const permissionProfilesRepository = AppDataSource.getRepository(PermissionProfile); + // const repoEmployeePosmaster = AppDataSource.getRepository(EmployeePosMaster); + // const repoEmployeeTempPosmaster = AppDataSource.getRepository(EmployeeTempPosMaster); const repoProfile = AppDataSource.getRepository(Profile); - const repoProfileEmployee = AppDataSource.getRepository(ProfileEmployee); - const employeePositionRepository = AppDataSource.getRepository(EmployeePosition); + // const repoProfileEmployee = AppDataSource.getRepository(ProfileEmployee); + // const employeePositionRepository = AppDataSource.getRepository(EmployeePosition); const repoOrgRevision = AppDataSource.getRepository(OrgRevision); - const orgRootRepository = AppDataSource.getRepository(OrgRoot); - const child1Repository = AppDataSource.getRepository(OrgChild1); - const child2Repository = AppDataSource.getRepository(OrgChild2); - const child3Repository = AppDataSource.getRepository(OrgChild3); - const child4Repository = AppDataSource.getRepository(OrgChild4); - const { data, token, user } = JSON.parse(msg.content.toString()); + // const orgRootRepository = AppDataSource.getRepository(OrgRoot); + // const child1Repository = AppDataSource.getRepository(OrgChild1); + // const child2Repository = AppDataSource.getRepository(OrgChild2); + // const child3Repository = AppDataSource.getRepository(OrgChild3); + // const child4Repository = AppDataSource.getRepository(OrgChild4); + const { data, user } = JSON.parse(msg.content.toString()); const { id, status, lastUpdateUserId, lastUpdateFullName, lastUpdatedAt } = data; console.log(`[AMQ] Received message - revisionId: ${id}, status: ${status}`); @@ -994,7 +1001,7 @@ async function handler_org(msg: amqp.ConsumeMessage): Promise { if (posMasterUpdates.length > 0) { const chunks = chunkArray(posMasterUpdates, 500); const posMasterTableName = repoPosmaster.metadata.tableName; - for (const chunk of chunks as typeof posMasterUpdates[]) { + for (const chunk of chunks as (typeof posMasterUpdates)[]) { const caseClauses = chunk.map(() => "WHEN ? THEN ?").join(" "); const wherePlaceholders = chunk.map(() => "?").join(", "); const params = chunk.flatMap((update: (typeof posMasterUpdates)[number]) => [ @@ -1207,13 +1214,15 @@ async function handler_org(msg: amqp.ConsumeMessage): Promise { ? x.id : x.ancestorDNA, })); - const _orgemployeeTempPosMaster: EmployeeTempPosMaster[] = orgemployeeTempPosMaster.map((x) => ({ - ...x, - ancestorDNA: - x.ancestorDNA == null || x.ancestorDNA == "00000000-0000-0000-0000-000000000000" - ? x.id - : x.ancestorDNA, - })); + const _orgemployeeTempPosMaster: EmployeeTempPosMaster[] = orgemployeeTempPosMaster.map( + (x) => ({ + ...x, + ancestorDNA: + x.ancestorDNA == null || x.ancestorDNA == "00000000-0000-0000-0000-000000000000" + ? x.id + : x.ancestorDNA, + }), + ); console.time("[AMQ] insert_employeePosMaster"); await repoEmployeePosmaster @@ -1316,7 +1325,10 @@ async function handler_org(msg: amqp.ConsumeMessage): Promise { lastUpdateFullName: "System Administrator", lastUpdatedAt: timestamp, }); - const buildColumnData = (repository: Repository, source: T): Partial => { + const buildColumnData = ( + repository: Repository, + source: T, + ): Partial => { const row = {} as Partial; const target = row as Record; const sourceRecord = source as Record; @@ -1363,8 +1375,7 @@ async function handler_org(msg: amqp.ConsumeMessage): Promise { ...buildColumnData(employeePositionRepository, position), id: randomUUID(), posMasterId: positionParentKey === "posMasterId" ? parentId : undefined, - posMasterTempId: - positionParentKey === "posMasterTempId" ? parentId : undefined, + posMasterTempId: positionParentKey === "posMasterTempId" ? parentId : undefined, ...buildAuditFields(positionTimestamp), }); } @@ -1409,7 +1420,8 @@ async function handler_org(msg: amqp.ConsumeMessage): Promise { const dataId = x.id; const matchedOrgRoot = findMatchedNodeByAncestorDNA(orgRootCurrent, x); - const filteredEmployeePosMaster = employeePosMasterByNode.get(getNodeKey("root", dataId)) ?? []; + const filteredEmployeePosMaster = + employeePosMasterByNode.get(getNodeKey("root", dataId)) ?? []; await cloneEmployeeNodeBatch( filteredEmployeePosMaster, @@ -1664,6 +1676,8 @@ async function handler_org(msg: amqp.ConsumeMessage): Promise { console.log(`[AMQ] handler_org SUCCESS - Total time: ${Date.now() - startTime}ms`); console.timeEnd("[AMQ] handler_org_total"); + + await clearMenuAndRoleCache(); return true; } catch (error) { const totalTime = Date.now() - startTime; @@ -1683,6 +1697,32 @@ async function handler_org(msg: amqp.ConsumeMessage): Promise { } } +async function clearMenuAndRoleCache(): Promise { + const redisClient = redis.createClient({ + host: REDIS_HOST, + port: REDIS_PORT, + }); + + const keysAsync = promisify(redisClient.keys).bind(redisClient); + const delAsync = promisify(redisClient.del).bind(redisClient); + + try { + const menuKeys = await keysAsync("menu_*"); + if (menuKeys.length > 0) { + await delAsync(...menuKeys); + console.log(`[AMQ] Cleared ${menuKeys.length} menu cache keys`); + } + + const roleKeys = await keysAsync("role_*"); + if (roleKeys.length > 0) { + await delAsync(...roleKeys); + console.log(`[AMQ] Cleared ${roleKeys.length} role cache keys`); + } + } finally { + redisClient.quit(); + } +} + async function handler_org_draft(msg: amqp.ConsumeMessage): Promise { const { data, token, user } = JSON.parse(msg.content.toString()); const { requestBody, request, revision } = data;