fixed error and not retry
This commit is contained in:
parent
3335c4f44c
commit
b5c75379ff
1 changed files with 273 additions and 88 deletions
|
|
@ -24,9 +24,27 @@ import { In, Not } from "typeorm";
|
|||
import { PosMasterAct } from "../entities/PosMasterAct";
|
||||
import { PermissionOrg } from "../entities/PermissionOrg";
|
||||
import { sendWebSocket } from "./webSocket";
|
||||
import { CreatePosMasterHistoryOfficer } from "./PositionService";
|
||||
import { PayloadSendNoti } from "../interfaces/utils";
|
||||
import { PermissionProfile } from "../entities/PermissionProfile";
|
||||
import { PosMasterHistory } from "../entities/PosMasterHistory";
|
||||
|
||||
let reconnectTimer: ReturnType<typeof setTimeout> | null = null;
|
||||
|
||||
function scheduleReconnect() {
|
||||
if (reconnectTimer) {
|
||||
return;
|
||||
}
|
||||
|
||||
reconnectTimer = setTimeout(async () => {
|
||||
reconnectTimer = null;
|
||||
try {
|
||||
await init();
|
||||
} catch (error) {
|
||||
console.error("[AMQ] Reconnect failed:", error);
|
||||
scheduleReconnect();
|
||||
}
|
||||
}, 1000);
|
||||
}
|
||||
|
||||
export let sendToQueue: (payload: any) => void;
|
||||
export let sendToQueueOrg: (payload: any) => void;
|
||||
|
|
@ -55,10 +73,28 @@ export async function init() {
|
|||
|
||||
console.log(connection ? "[AMQ] Connection success" : "[AMQ] Connection failed");
|
||||
|
||||
connection.on("error", (error) => {
|
||||
console.error("[AMQ] Connection error:", error);
|
||||
});
|
||||
|
||||
connection.on("close", () => {
|
||||
console.error("[AMQ] Connection closed. Scheduling reconnect...");
|
||||
scheduleReconnect();
|
||||
});
|
||||
|
||||
const channel = await connection.createChannel(); //----> (1.4) create Channel
|
||||
|
||||
console.log(channel ? "[AMQ] Create channel success" : "[AMQ] Create channel failed");
|
||||
|
||||
channel.on("error", (error) => {
|
||||
console.error("[AMQ] Channel error:", error);
|
||||
});
|
||||
|
||||
channel.on("close", () => {
|
||||
console.error("[AMQ] Channel closed. Scheduling reconnect...");
|
||||
scheduleReconnect();
|
||||
});
|
||||
|
||||
channel.assertQueue(queue, { durable: true }), //----> (1.5) assert queue and set durable (if "true" save to disk on RabbitMQ)
|
||||
channel.assertQueue(queue_org, { durable: true }),
|
||||
channel.assertQueue(queue_org_draft, { durable: true }),
|
||||
|
|
@ -97,18 +133,26 @@ function createConsumer( //----> consumer
|
|||
channel: amqp.Channel,
|
||||
handler: (msg: amqp.ConsumeMessage) => Promise<boolean> | boolean,
|
||||
) {
|
||||
let retries = 0;
|
||||
channel.consume(
|
||||
queue,
|
||||
async (msg) => {
|
||||
if (!msg) return;
|
||||
if ((await handler(msg)) || retries++ >= 3) {
|
||||
retries = 0;
|
||||
console.log("[AMQ] Process Consumer success");
|
||||
try {
|
||||
if (await handler(msg)) {
|
||||
console.log("[AMQ] Process Consumer success");
|
||||
return channel.ack(msg);
|
||||
}
|
||||
console.error(`[AMQ] Process Consumer failed on queue ${queue}, acknowledging without retry`);
|
||||
return channel.ack(msg);
|
||||
} catch (error) {
|
||||
console.error(`[AMQ] Consumer processing error on queue ${queue}:`, error);
|
||||
try {
|
||||
console.error(`[AMQ] Acknowledging failed message on queue ${queue} without retry`);
|
||||
channel.ack(msg);
|
||||
} catch (channelError) {
|
||||
console.error(`[AMQ] Failed to ack/nack message on queue ${queue}:`, channelError);
|
||||
}
|
||||
}
|
||||
console.log("[AMQ] Process Consumer failed");
|
||||
return await new Promise((resolve) => setTimeout(() => resolve(channel.nack(msg)), 3000));
|
||||
},
|
||||
{ noAck: false },
|
||||
);
|
||||
|
|
@ -740,7 +784,7 @@ async function handler_org(msg: amqp.ConsumeMessage): Promise<boolean> {
|
|||
// 3. เตรียม arrays สำหรับ batch operations
|
||||
const profilesToSave: Profile[] = [];
|
||||
const posMasterAssignsToSave: PosMasterAssign[] = [];
|
||||
const historyCreateIds: string[] = [];
|
||||
const historyRowsToSave: Partial<PosMasterHistory>[] = [];
|
||||
const posMasterUpdates: { id: string; current_holderId: string | null | undefined }[] = [];
|
||||
|
||||
// ===== LOOP: เก็บข้อมูลทั้งหมด =====
|
||||
|
|
@ -809,12 +853,54 @@ async function handler_org(msg: amqp.ConsumeMessage): Promise<boolean> {
|
|||
const isHolderChanged = oldHolderId !== newHolderId;
|
||||
|
||||
if (isHolderChanged) {
|
||||
historyCreateIds.push(item.id);
|
||||
const nextHolderProfile =
|
||||
item.next_holderId != null && item.next_holderId !== ""
|
||||
? profilesMap.get(item.next_holderId)
|
||||
: null;
|
||||
const selectedPosition =
|
||||
item.positions.length > 0
|
||||
? item.positions.find((position) => position.positionIsSelected === true) ?? null
|
||||
: null;
|
||||
const shortName =
|
||||
[
|
||||
item.orgChild4?.orgChild4ShortName,
|
||||
item.orgChild3?.orgChild3ShortName,
|
||||
item.orgChild2?.orgChild2ShortName,
|
||||
item.orgChild1?.orgChild1ShortName,
|
||||
item.orgRoot?.orgRootShortName,
|
||||
].find((name) => typeof name === "string" && name.trim().length > 0) ?? _null;
|
||||
|
||||
historyRowsToSave.push({
|
||||
ancestorDNA: item.ancestorDNA ? item.ancestorDNA : _null,
|
||||
prefix: nextHolderProfile?.prefix || _null,
|
||||
firstName: nextHolderProfile?.firstName || _null,
|
||||
lastName: nextHolderProfile?.lastName || _null,
|
||||
shortName,
|
||||
posMasterNoPrefix: item.posMasterNoPrefix ?? _null,
|
||||
posMasterNo: item.posMasterNo ?? _null,
|
||||
posMasterNoSuffix: item.posMasterNoSuffix ?? _null,
|
||||
position: selectedPosition?.positionName ?? _null,
|
||||
posType: selectedPosition?.posType?.posTypeName ?? _null,
|
||||
posLevel: selectedPosition?.posLevel?.posLevelName ?? _null,
|
||||
posExecutive: selectedPosition?.posExecutive?.posExecutiveName ?? _null,
|
||||
profileId: _null,
|
||||
rootDnaId: item.orgRoot?.ancestorDNA || _null,
|
||||
child1DnaId: item.orgChild1?.ancestorDNA || _null,
|
||||
child2DnaId: item.orgChild2?.ancestorDNA || _null,
|
||||
child3DnaId: item.orgChild3?.ancestorDNA || _null,
|
||||
child4DnaId: item.orgChild4?.ancestorDNA || _null,
|
||||
createdUserId: "",
|
||||
createdFullName: "system",
|
||||
lastUpdateUserId: "",
|
||||
lastUpdateFullName: "system",
|
||||
createdAt: new Date(),
|
||||
lastUpdatedAt: new Date(),
|
||||
});
|
||||
}
|
||||
}
|
||||
console.timeEnd("[AMQ] prepare_batch_data");
|
||||
console.log(
|
||||
`[AMQ] Prepared - posMasterAssignsToSave: ${posMasterAssignsToSave.length}, profilesToSave: ${profilesToSave.length}, posMasterUpdates: ${posMasterUpdates.length}, historyCreateIds: ${historyCreateIds.length}`,
|
||||
`[AMQ] Prepared - posMasterAssignsToSave: ${posMasterAssignsToSave.length}, profilesToSave: ${profilesToSave.length}, posMasterUpdates: ${posMasterUpdates.length}, historyCreateIds: ${historyRowsToSave.length}`,
|
||||
);
|
||||
|
||||
// ===== BATCH EXECUTION: save ทีละ batch =====
|
||||
|
|
@ -835,6 +921,7 @@ async function handler_org(msg: amqp.ConsumeMessage): Promise<boolean> {
|
|||
const child2Repository = manager.getRepository(OrgChild2);
|
||||
const child3Repository = manager.getRepository(OrgChild3);
|
||||
const child4Repository = manager.getRepository(OrgChild4);
|
||||
const posMasterHistoryRepository = manager.getRepository(PosMasterHistory);
|
||||
|
||||
const targetOrgRevision = await repoOrgRevision
|
||||
.createQueryBuilder("orgRevision")
|
||||
|
|
@ -903,21 +990,45 @@ async function handler_org(msg: amqp.ConsumeMessage): Promise<boolean> {
|
|||
|
||||
// 6. Batch update posMasters
|
||||
console.time("[AMQ] batch_update_posMasters");
|
||||
for (const update of posMasterUpdates) {
|
||||
await repoPosmaster.update(update.id, {
|
||||
current_holderId: update.current_holderId,
|
||||
next_holderId: null,
|
||||
lastUpdateUserId,
|
||||
lastUpdateFullName,
|
||||
lastUpdatedAt,
|
||||
});
|
||||
if (posMasterUpdates.length > 0) {
|
||||
const chunks = chunkArray(posMasterUpdates, 500);
|
||||
const posMasterTableName = repoPosmaster.metadata.tableName;
|
||||
for (const chunk of chunks as typeof posMasterUpdates[]) {
|
||||
const caseClauses = chunk.map(() => "WHEN ? THEN ?").join(" ");
|
||||
const wherePlaceholders = chunk.map(() => "?").join(", ");
|
||||
const params = chunk.flatMap((update: (typeof posMasterUpdates)[number]) => [
|
||||
update.id,
|
||||
update.current_holderId ?? null,
|
||||
]);
|
||||
|
||||
params.push(
|
||||
lastUpdateUserId,
|
||||
lastUpdateFullName,
|
||||
lastUpdatedAt,
|
||||
...chunk.map((update: (typeof posMasterUpdates)[number]) => update.id),
|
||||
);
|
||||
|
||||
await manager.query(
|
||||
`UPDATE \`${posMasterTableName}\`
|
||||
SET current_holderId = CASE id ${caseClauses} END,
|
||||
next_holderId = NULL,
|
||||
lastUpdateUserId = ?,
|
||||
lastUpdateFullName = ?,
|
||||
lastUpdatedAt = ?
|
||||
WHERE id IN (${wherePlaceholders})`,
|
||||
params,
|
||||
);
|
||||
}
|
||||
}
|
||||
console.timeEnd("[AMQ] batch_update_posMasters");
|
||||
|
||||
// 7. Batch create history
|
||||
console.time("[AMQ] batch_create_history");
|
||||
for (const id of historyCreateIds) {
|
||||
await CreatePosMasterHistoryOfficer(id, null, undefined, undefined, manager);
|
||||
if (historyRowsToSave.length > 0) {
|
||||
const chunks = chunkArray(historyRowsToSave, 500);
|
||||
for (const chunk of chunks) {
|
||||
await posMasterHistoryRepository.save(posMasterHistoryRepository.create(chunk));
|
||||
}
|
||||
}
|
||||
console.timeEnd("[AMQ] batch_create_history");
|
||||
|
||||
|
|
@ -958,25 +1069,23 @@ async function handler_org(msg: amqp.ConsumeMessage): Promise<boolean> {
|
|||
//cone tree
|
||||
console.time("[AMQ] query_old_org_structure");
|
||||
//หา dna tree
|
||||
const orgRoot = await orgRootRepository.find({
|
||||
where: { orgRevisionId: orgRevisionPublish.id },
|
||||
});
|
||||
|
||||
const orgChild1 = await child1Repository.find({
|
||||
where: { orgRevisionId: orgRevisionPublish.id },
|
||||
});
|
||||
|
||||
const orgChild2 = await child2Repository.find({
|
||||
where: { orgRevisionId: orgRevisionPublish.id },
|
||||
});
|
||||
|
||||
const orgChild3 = await child3Repository.find({
|
||||
where: { orgRevisionId: orgRevisionPublish.id },
|
||||
});
|
||||
|
||||
const orgChild4 = await child4Repository.find({
|
||||
where: { orgRevisionId: orgRevisionPublish.id },
|
||||
});
|
||||
const [orgRoot, orgChild1, orgChild2, orgChild3, orgChild4] = await Promise.all([
|
||||
orgRootRepository.find({
|
||||
where: { orgRevisionId: orgRevisionPublish.id },
|
||||
}),
|
||||
child1Repository.find({
|
||||
where: { orgRevisionId: orgRevisionPublish.id },
|
||||
}),
|
||||
child2Repository.find({
|
||||
where: { orgRevisionId: orgRevisionPublish.id },
|
||||
}),
|
||||
child3Repository.find({
|
||||
where: { orgRevisionId: orgRevisionPublish.id },
|
||||
}),
|
||||
child4Repository.find({
|
||||
where: { orgRevisionId: orgRevisionPublish.id },
|
||||
}),
|
||||
]);
|
||||
console.timeEnd("[AMQ] query_old_org_structure");
|
||||
console.log(
|
||||
`[AMQ] Old structure - orgRoot: ${orgRoot.length}, orgChild1: ${orgChild1.length}, orgChild2: ${orgChild2.length}, orgChild3: ${orgChild3.length}, orgChild4: ${orgChild4.length}`,
|
||||
|
|
@ -1106,6 +1215,78 @@ async function handler_org(msg: amqp.ConsumeMessage): Promise<boolean> {
|
|||
? x.id
|
||||
: x.ancestorDNA,
|
||||
}));
|
||||
|
||||
const groupByParentId = <T extends { id: string }>(
|
||||
items: T[],
|
||||
getParentId: (item: T) => string | null | undefined,
|
||||
) => {
|
||||
const grouped = new Map<string, T[]>();
|
||||
for (const item of items) {
|
||||
const parentId = getParentId(item);
|
||||
if (!parentId) {
|
||||
continue;
|
||||
}
|
||||
const current = grouped.get(parentId);
|
||||
if (current) {
|
||||
current.push(item);
|
||||
} else {
|
||||
grouped.set(parentId, [item]);
|
||||
}
|
||||
}
|
||||
return grouped;
|
||||
};
|
||||
|
||||
const buildEmployeeNodeKey = (item: {
|
||||
orgRootId?: string | null;
|
||||
orgChild1Id?: string | null;
|
||||
orgChild2Id?: string | null;
|
||||
orgChild3Id?: string | null;
|
||||
orgChild4Id?: string | null;
|
||||
}) => {
|
||||
if (item.orgChild4Id) return `child4:${item.orgChild4Id}`;
|
||||
if (item.orgChild3Id && item.orgChild4Id == null) return `child3:${item.orgChild3Id}`;
|
||||
if (item.orgChild2Id && item.orgChild3Id == null) return `child2:${item.orgChild2Id}`;
|
||||
if (item.orgChild1Id && item.orgChild2Id == null) return `child1:${item.orgChild1Id}`;
|
||||
if (item.orgRootId && item.orgChild1Id == null) return `root:${item.orgRootId}`;
|
||||
return null;
|
||||
};
|
||||
|
||||
const groupByEmployeeNode = <
|
||||
T extends {
|
||||
orgRootId?: string | null;
|
||||
orgChild1Id?: string | null;
|
||||
orgChild2Id?: string | null;
|
||||
orgChild3Id?: string | null;
|
||||
orgChild4Id?: string | null;
|
||||
},
|
||||
>(
|
||||
items: T[],
|
||||
) => {
|
||||
const grouped = new Map<string, T[]>();
|
||||
for (const item of items) {
|
||||
const key = buildEmployeeNodeKey(item);
|
||||
if (!key) {
|
||||
continue;
|
||||
}
|
||||
const current = grouped.get(key);
|
||||
if (current) {
|
||||
current.push(item);
|
||||
} else {
|
||||
grouped.set(key, [item]);
|
||||
}
|
||||
}
|
||||
return grouped;
|
||||
};
|
||||
|
||||
const employeePosMasterByNode = groupByEmployeeNode(_orgemployeePosMaster);
|
||||
const employeeTempPosMasterByNode = groupByEmployeeNode(_orgemployeeTempPosMaster);
|
||||
const getNodeKey = (level: "root" | "child1" | "child2" | "child3" | "child4", id: string) =>
|
||||
`${level}:${id}`;
|
||||
const orgChild1ByRoot = groupByParentId(orgChild1, (item) => item.orgRootId);
|
||||
const orgChild2ByChild1 = groupByParentId(orgChild2, (item) => item.orgChild1Id);
|
||||
const orgChild3ByChild2 = groupByParentId(orgChild3, (item) => item.orgChild2Id);
|
||||
const orgChild4ByChild3 = groupByParentId(orgChild4, (item) => item.orgChild3Id);
|
||||
|
||||
await repoEmployeeTempPosmaster
|
||||
.createQueryBuilder()
|
||||
.insert()
|
||||
|
|
@ -1124,9 +1305,7 @@ async function handler_org(msg: amqp.ConsumeMessage): Promise<boolean> {
|
|||
const dataId = x.id;
|
||||
const matchedOrgRoot = findMatchedNodeByAncestorDNA(orgRootCurrent, x);
|
||||
|
||||
const filteredEmployeePosMaster = _orgemployeePosMaster.filter(
|
||||
(x: EmployeePosMaster) => x.orgRootId == dataId && x.orgChild1Id == null,
|
||||
);
|
||||
const filteredEmployeePosMaster = employeePosMasterByNode.get(getNodeKey("root", dataId)) ?? [];
|
||||
|
||||
await Promise.all(
|
||||
filteredEmployeePosMaster.map(async (item: any) => {
|
||||
|
|
@ -1164,8 +1343,7 @@ async function handler_org(msg: amqp.ConsumeMessage): Promise<boolean> {
|
|||
);
|
||||
|
||||
await Promise.all(
|
||||
_orgemployeeTempPosMaster
|
||||
.filter((x: EmployeeTempPosMaster) => x.orgRootId == dataId && x.orgChild1Id == null)
|
||||
(employeeTempPosMasterByNode.get(getNodeKey("root", dataId)) ?? [])
|
||||
.map(async (item: any) => {
|
||||
delete item.id;
|
||||
const employeeTempPosMaster = Object.assign(new EmployeeTempPosMaster(), item);
|
||||
|
|
@ -1200,12 +1378,11 @@ async function handler_org(msg: amqp.ConsumeMessage): Promise<boolean> {
|
|||
}),
|
||||
);
|
||||
|
||||
for (const x of orgChild1.filter((item: OrgChild1) => item.orgRootId == dataId)) {
|
||||
for (const x of orgChild1ByRoot.get(dataId) ?? []) {
|
||||
const data1Id = x.id;
|
||||
const matchedOrgChild1 = findMatchedNodeByAncestorDNA(orgChild1Current, x);
|
||||
await Promise.all(
|
||||
_orgemployeePosMaster
|
||||
.filter((x: EmployeePosMaster) => x.orgChild1Id == data1Id && x.orgChild2Id == null)
|
||||
(employeePosMasterByNode.get(getNodeKey("child1", data1Id)) ?? [])
|
||||
.map(async (item: any) => {
|
||||
delete item.id;
|
||||
const employeePosMaster = Object.assign(new EmployeePosMaster(), item);
|
||||
|
|
@ -1242,10 +1419,7 @@ async function handler_org(msg: amqp.ConsumeMessage): Promise<boolean> {
|
|||
);
|
||||
|
||||
await Promise.all(
|
||||
_orgemployeeTempPosMaster
|
||||
.filter(
|
||||
(x: EmployeeTempPosMaster) => x.orgChild1Id == data1Id && x.orgChild2Id == null,
|
||||
)
|
||||
(employeeTempPosMasterByNode.get(getNodeKey("child1", data1Id)) ?? [])
|
||||
.map(async (item: any) => {
|
||||
delete item.id;
|
||||
const employeeTempPosMaster = Object.assign(new EmployeeTempPosMaster(), item);
|
||||
|
|
@ -1281,12 +1455,11 @@ async function handler_org(msg: amqp.ConsumeMessage): Promise<boolean> {
|
|||
}),
|
||||
);
|
||||
|
||||
for (const x of orgChild2.filter((item: OrgChild2) => item.orgChild1Id == data1Id)) {
|
||||
for (const x of orgChild2ByChild1.get(data1Id) ?? []) {
|
||||
const data2Id = x.id;
|
||||
const matchedOrgChild2 = findMatchedNodeByAncestorDNA(orgChild2Current, x);
|
||||
await Promise.all(
|
||||
_orgemployeePosMaster
|
||||
.filter((x: EmployeePosMaster) => x.orgChild2Id == data2Id && x.orgChild3Id == null)
|
||||
(employeePosMasterByNode.get(getNodeKey("child2", data2Id)) ?? [])
|
||||
.map(async (item: any) => {
|
||||
delete item.id;
|
||||
const employeePosMaster = Object.assign(new EmployeePosMaster(), item);
|
||||
|
|
@ -1324,10 +1497,7 @@ async function handler_org(msg: amqp.ConsumeMessage): Promise<boolean> {
|
|||
);
|
||||
|
||||
await Promise.all(
|
||||
_orgemployeeTempPosMaster
|
||||
.filter(
|
||||
(x: EmployeeTempPosMaster) => x.orgChild2Id == data2Id && x.orgChild3Id == null,
|
||||
)
|
||||
(employeeTempPosMasterByNode.get(getNodeKey("child2", data2Id)) ?? [])
|
||||
.map(async (item: any) => {
|
||||
delete item.id;
|
||||
const employeeTempPosMaster = Object.assign(new EmployeeTempPosMaster(), item);
|
||||
|
|
@ -1364,14 +1534,11 @@ async function handler_org(msg: amqp.ConsumeMessage): Promise<boolean> {
|
|||
}),
|
||||
);
|
||||
|
||||
for (const x of orgChild3.filter((item: OrgChild3) => item.orgChild2Id == data2Id)) {
|
||||
for (const x of orgChild3ByChild2.get(data2Id) ?? []) {
|
||||
const data3Id = x.id;
|
||||
const matchedOrgChild3 = findMatchedNodeByAncestorDNA(orgChild3Current, x);
|
||||
await Promise.all(
|
||||
_orgemployeePosMaster
|
||||
.filter(
|
||||
(x: EmployeePosMaster) => x.orgChild3Id == data3Id && x.orgChild4Id == null,
|
||||
)
|
||||
(employeePosMasterByNode.get(getNodeKey("child3", data3Id)) ?? [])
|
||||
.map(async (item: any) => {
|
||||
delete item.id;
|
||||
const employeePosMaster = Object.assign(new EmployeePosMaster(), item);
|
||||
|
|
@ -1410,10 +1577,7 @@ async function handler_org(msg: amqp.ConsumeMessage): Promise<boolean> {
|
|||
);
|
||||
|
||||
await Promise.all(
|
||||
_orgemployeeTempPosMaster
|
||||
.filter(
|
||||
(x: EmployeeTempPosMaster) => x.orgChild3Id == data3Id && x.orgChild4Id == null,
|
||||
)
|
||||
(employeeTempPosMasterByNode.get(getNodeKey("child3", data3Id)) ?? [])
|
||||
.map(async (item: any) => {
|
||||
delete item.id;
|
||||
const employeeTempPosMaster = Object.assign(new EmployeeTempPosMaster(), item);
|
||||
|
|
@ -1451,12 +1615,11 @@ async function handler_org(msg: amqp.ConsumeMessage): Promise<boolean> {
|
|||
}),
|
||||
);
|
||||
|
||||
for (const x of orgChild4.filter((item: OrgChild4) => item.orgChild3Id == data3Id)) {
|
||||
for (const x of orgChild4ByChild3.get(data3Id) ?? []) {
|
||||
const data4Id = x.id;
|
||||
const matchedOrgChild4 = findMatchedNodeByAncestorDNA(orgChild4Current, x);
|
||||
await Promise.all(
|
||||
_orgemployeePosMaster
|
||||
.filter((x: EmployeePosMaster) => x.orgChild4Id == data4Id)
|
||||
(employeePosMasterByNode.get(getNodeKey("child4", data4Id)) ?? [])
|
||||
.map(async (item: any) => {
|
||||
delete item.id;
|
||||
const employeePosMaster = Object.assign(new EmployeePosMaster(), item);
|
||||
|
|
@ -1496,8 +1659,7 @@ async function handler_org(msg: amqp.ConsumeMessage): Promise<boolean> {
|
|||
);
|
||||
|
||||
await Promise.all(
|
||||
_orgemployeeTempPosMaster
|
||||
.filter((x: EmployeeTempPosMaster) => x.orgChild4Id == data4Id)
|
||||
(employeeTempPosMasterByNode.get(getNodeKey("child4", data4Id)) ?? [])
|
||||
.map(async (item: any) => {
|
||||
delete item.id;
|
||||
const employeeTempPosMaster = Object.assign(
|
||||
|
|
@ -1544,22 +1706,42 @@ async function handler_org(msg: amqp.ConsumeMessage): Promise<boolean> {
|
|||
}
|
||||
}
|
||||
|
||||
const employeePosMaster = await repoEmployeePosmaster.find({
|
||||
where: { orgRevisionId: orgRevisionDraft.id },
|
||||
relations: ["positions", "positions.posLevel", "positions.posType"],
|
||||
});
|
||||
const [employeePosMaster, employeeTempPosMaster] = await Promise.all([
|
||||
repoEmployeePosmaster.find({
|
||||
where: { orgRevisionId: orgRevisionDraft.id },
|
||||
relations: ["positions", "positions.posLevel", "positions.posType"],
|
||||
}),
|
||||
repoEmployeeTempPosmaster.find({
|
||||
where: { orgRevisionId: orgRevisionDraft.id },
|
||||
relations: ["positions", "positions.posLevel", "positions.posType"],
|
||||
}),
|
||||
]);
|
||||
const profileEmployeeIds = Array.from(
|
||||
new Set(
|
||||
[...employeePosMaster, ...employeeTempPosMaster]
|
||||
.map((item) => item.next_holderId)
|
||||
.filter((profileId): profileId is string => !!profileId),
|
||||
),
|
||||
);
|
||||
const profileEmployeeMap = new Map<string, ProfileEmployee>();
|
||||
if (profileEmployeeIds.length > 0) {
|
||||
const profiles = await repoProfileEmployee.findBy({
|
||||
id: In(profileEmployeeIds),
|
||||
});
|
||||
profiles.forEach((profile) => profileEmployeeMap.set(profile.id, profile));
|
||||
}
|
||||
const updatedProfileEmployeeIds = new Set<string>();
|
||||
|
||||
for (const item of employeePosMaster) {
|
||||
if (item.next_holderId != null) {
|
||||
const profile = await repoProfileEmployee.findOne({
|
||||
where: { id: item.next_holderId == null ? "" : item.next_holderId },
|
||||
});
|
||||
const profile = profileEmployeeMap.get(item.next_holderId);
|
||||
const position = await item.positions.find((x) => x.positionIsSelected == true);
|
||||
const _null: any = null;
|
||||
if (profile != null) {
|
||||
profile.posLevelId = position?.posLevelId ?? _null;
|
||||
profile.posTypeId = position?.posTypeId ?? _null;
|
||||
profile.position = position?.positionName ?? _null;
|
||||
await repoProfileEmployee.save(profile);
|
||||
updatedProfileEmployeeIds.add(profile.id);
|
||||
}
|
||||
}
|
||||
item.lastUpdateUserId = lastUpdateUserId;
|
||||
|
|
@ -1567,22 +1749,16 @@ async function handler_org(msg: amqp.ConsumeMessage): Promise<boolean> {
|
|||
item.lastUpdatedAt = lastUpdatedAt;
|
||||
await repoEmployeePosmaster.save(item);
|
||||
}
|
||||
const employeeTempPosMaster = await repoEmployeeTempPosmaster.find({
|
||||
where: { orgRevisionId: orgRevisionDraft.id },
|
||||
relations: ["positions", "positions.posLevel", "positions.posType"],
|
||||
});
|
||||
for (const item of employeeTempPosMaster) {
|
||||
if (item.next_holderId != null) {
|
||||
const profile = await repoProfileEmployee.findOne({
|
||||
where: { id: item.next_holderId == null ? "" : item.next_holderId },
|
||||
});
|
||||
const profile = profileEmployeeMap.get(item.next_holderId);
|
||||
const position = await item.positions.find((x) => x.positionIsSelected == true);
|
||||
const _null: any = null;
|
||||
if (profile != null) {
|
||||
profile.posLevelId = position?.posLevelId ?? _null;
|
||||
profile.posTypeId = position?.posTypeId ?? _null;
|
||||
profile.position = position?.positionName ?? _null;
|
||||
await repoProfileEmployee.save(profile);
|
||||
updatedProfileEmployeeIds.add(profile.id);
|
||||
}
|
||||
}
|
||||
item.lastUpdateUserId = lastUpdateUserId;
|
||||
|
|
@ -1590,6 +1766,15 @@ async function handler_org(msg: amqp.ConsumeMessage): Promise<boolean> {
|
|||
item.lastUpdatedAt = lastUpdatedAt;
|
||||
await repoEmployeeTempPosmaster.save(item);
|
||||
}
|
||||
if (updatedProfileEmployeeIds.size > 0) {
|
||||
const profilesToSave = Array.from(updatedProfileEmployeeIds)
|
||||
.map((profileId) => profileEmployeeMap.get(profileId))
|
||||
.filter((profile): profile is ProfileEmployee => !!profile);
|
||||
const chunks = chunkArray(profilesToSave, 200);
|
||||
for (const chunk of chunks) {
|
||||
await repoProfileEmployee.save(chunk);
|
||||
}
|
||||
}
|
||||
console.timeEnd("[AMQ] clone_org_structure");
|
||||
|
||||
console.time("[AMQ] save_revision_status");
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue