fix handler_org and remove retry
This commit is contained in:
parent
ac6b487d66
commit
e01acde791
2 changed files with 34 additions and 25 deletions
|
|
@ -433,31 +433,40 @@ export async function BatchUpdatePosMasters(
|
||||||
): Promise<void> {
|
): Promise<void> {
|
||||||
if (updates.length === 0) return;
|
if (updates.length === 0) return;
|
||||||
|
|
||||||
const repoPosmaster = manager.getRepository(PosMaster);
|
|
||||||
const CHUNK_SIZE = 1000;
|
const CHUNK_SIZE = 1000;
|
||||||
|
|
||||||
const chunks = chunkArray(updates, CHUNK_SIZE);
|
const chunks = chunkArray(updates, CHUNK_SIZE);
|
||||||
|
|
||||||
for (const chunk of chunks) {
|
for (const chunk of chunks) {
|
||||||
const ids = chunk.map((u: any) => u.id);
|
// Build single bulk UPDATE query using CASE WHEN
|
||||||
|
const caseStatements: string[] = [];
|
||||||
await repoPosmaster
|
const params: any[] = [];
|
||||||
.createQueryBuilder()
|
|
||||||
.update(PosMaster)
|
|
||||||
.set({
|
|
||||||
next_holderId: null,
|
|
||||||
lastUpdateUserId: chunk[0].lastUpdateUserId,
|
|
||||||
lastUpdateFullName: chunk[0].lastUpdateFullName,
|
|
||||||
lastUpdatedAt: chunk[0].lastUpdatedAt
|
|
||||||
})
|
|
||||||
.where('id IN (:...ids)', { ids })
|
|
||||||
.execute();
|
|
||||||
|
|
||||||
for (const update of chunk) {
|
for (const update of chunk) {
|
||||||
await repoPosmaster.update(update.id, {
|
caseStatements.push(`WHEN ? THEN ?`);
|
||||||
current_holderId: update.current_holderId
|
params.push(update.id, update.current_holderId);
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Build IN clause placeholders
|
||||||
|
const idPlaceholders = chunk.map(() => '?').join(',');
|
||||||
|
const ids = chunk.map((u: any) => u.id);
|
||||||
|
|
||||||
|
// Add common params at the end
|
||||||
|
params.push(
|
||||||
|
chunk[0].lastUpdateUserId,
|
||||||
|
chunk[0].lastUpdateFullName,
|
||||||
|
chunk[0].lastUpdatedAt,
|
||||||
|
...ids
|
||||||
|
);
|
||||||
|
|
||||||
|
await manager.query(`
|
||||||
|
UPDATE posMaster
|
||||||
|
SET current_holderId = CASE id ${caseStatements.join(' ')} END,
|
||||||
|
next_holderId = NULL,
|
||||||
|
lastUpdateUserId = ?,
|
||||||
|
lastUpdateFullName = ?,
|
||||||
|
lastUpdatedAt = ?
|
||||||
|
WHERE id IN (${idPlaceholders})
|
||||||
|
`, params);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -92,8 +92,6 @@ export async function init() {
|
||||||
// createConsumer(queue2, channel, handler2);
|
// createConsumer(queue2, channel, handler2);
|
||||||
}
|
}
|
||||||
|
|
||||||
let retries = 0;
|
|
||||||
|
|
||||||
function createConsumer( //----> consumer
|
function createConsumer( //----> consumer
|
||||||
queue: string,
|
queue: string,
|
||||||
channel: amqp.Channel,
|
channel: amqp.Channel,
|
||||||
|
|
@ -103,13 +101,15 @@ function createConsumer( //----> consumer
|
||||||
queue,
|
queue,
|
||||||
async (msg) => {
|
async (msg) => {
|
||||||
if (!msg) return;
|
if (!msg) return;
|
||||||
if ((await handler(msg)) || retries++ >= 3) {
|
try {
|
||||||
retries = 0;
|
await handler(msg);
|
||||||
console.log("[AMQ] Process Consumer success");
|
console.log("[AMQ] Process Consumer success");
|
||||||
|
} catch (error) {
|
||||||
|
console.log("[AMQ] Process Consumer failed");
|
||||||
|
} finally {
|
||||||
|
// Always acknowledge - no retries
|
||||||
return channel.ack(msg);
|
return channel.ack(msg);
|
||||||
}
|
}
|
||||||
console.log("[AMQ] Process Consumer failed");
|
|
||||||
return await new Promise((resolve) => setTimeout(() => resolve(channel.nack(msg)), 3000));
|
|
||||||
},
|
},
|
||||||
{ noAck: false },
|
{ noAck: false },
|
||||||
);
|
);
|
||||||
|
|
@ -1963,7 +1963,7 @@ async function handler_org(msg: amqp.ConsumeMessage): Promise<boolean> {
|
||||||
).catch(console.error);
|
).catch(console.error);
|
||||||
}
|
}
|
||||||
console.timeEnd('[AMQ] handler_org_total');
|
console.timeEnd('[AMQ] handler_org_total');
|
||||||
return false; // ✅ Return false to prevent RabbitMQ retry
|
throw error; // ✅ Re-throw to be caught by createConsumer's try-catch
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue