From 7827e1925425b573554beec601eccbb3107f1726 Mon Sep 17 00:00:00 2001 From: waruneeauy Date: Fri, 1 May 2026 00:03:39 +0700 Subject: [PATCH] fix handler_org and remove retry --- src/services/PositionService.ts | 45 ++++++++++++++++++++------------- src/services/rabbitmq.ts | 14 +++++----- 2 files changed, 34 insertions(+), 25 deletions(-) diff --git a/src/services/PositionService.ts b/src/services/PositionService.ts index 7d274f86..cd3a0aca 100644 --- a/src/services/PositionService.ts +++ b/src/services/PositionService.ts @@ -433,31 +433,40 @@ export async function BatchUpdatePosMasters( ): Promise { if (updates.length === 0) return; - const repoPosmaster = manager.getRepository(PosMaster); const CHUNK_SIZE = 1000; - const chunks = chunkArray(updates, CHUNK_SIZE); for (const chunk of chunks) { - const ids = chunk.map((u: any) => u.id); - - await repoPosmaster - .createQueryBuilder() - .update(PosMaster) - .set({ - next_holderId: null, - lastUpdateUserId: chunk[0].lastUpdateUserId, - lastUpdateFullName: chunk[0].lastUpdateFullName, - lastUpdatedAt: chunk[0].lastUpdatedAt - }) - .where('id IN (:...ids)', { ids }) - .execute(); + // Build single bulk UPDATE query using CASE WHEN + const caseStatements: string[] = []; + const params: any[] = []; for (const update of chunk) { - await repoPosmaster.update(update.id, { - current_holderId: update.current_holderId - }); + caseStatements.push(`WHEN ? THEN ?`); + params.push(update.id, update.current_holderId); } + + // Build IN clause placeholders + const idPlaceholders = chunk.map(() => '?').join(','); + const ids = chunk.map((u: any) => u.id); + + // Add common params at the end + params.push( + chunk[0].lastUpdateUserId, + chunk[0].lastUpdateFullName, + chunk[0].lastUpdatedAt, + ...ids + ); + + await manager.query(` + UPDATE posMaster + SET current_holderId = CASE id ${caseStatements.join(' ')} END, + next_holderId = NULL, + lastUpdateUserId = ?, + lastUpdateFullName = ?, + lastUpdatedAt = ? + WHERE id IN (${idPlaceholders}) + `, params); } } diff --git a/src/services/rabbitmq.ts b/src/services/rabbitmq.ts index 6f95c4be..420b21e3 100644 --- a/src/services/rabbitmq.ts +++ b/src/services/rabbitmq.ts @@ -92,8 +92,6 @@ export async function init() { // createConsumer(queue2, channel, handler2); } -let retries = 0; - function createConsumer( //----> consumer queue: string, channel: amqp.Channel, @@ -103,13 +101,15 @@ function createConsumer( //----> consumer queue, async (msg) => { if (!msg) return; - if ((await handler(msg)) || retries++ >= 3) { - retries = 0; + try { + await handler(msg); console.log("[AMQ] Process Consumer success"); + } catch (error) { + console.log("[AMQ] Process Consumer failed"); + } finally { + // Always acknowledge - no retries return channel.ack(msg); } - console.log("[AMQ] Process Consumer failed"); - return await new Promise((resolve) => setTimeout(() => resolve(channel.nack(msg)), 3000)); }, { noAck: false }, ); @@ -1963,7 +1963,7 @@ async function handler_org(msg: amqp.ConsumeMessage): Promise { ).catch(console.error); } console.timeEnd('[AMQ] handler_org_total'); - return false; // ✅ Return false to prevent RabbitMQ retry + throw error; // ✅ Re-throw to be caught by createConsumer's try-catch } }