From c633224b8851136ecb2a66d37e01d9608aa4f5bc Mon Sep 17 00:00:00 2001 From: Methapon2001 <61303214+Methapon2001@users.noreply.github.com> Date: Mon, 22 Jan 2024 20:07:51 +0700 Subject: [PATCH] fix: skip binary on elasticsearch error in 400 range --- Services/server/src/rabbitmq/handler.ts | 41 +++++++++++++++++++------ 1 file changed, 32 insertions(+), 9 deletions(-) diff --git a/Services/server/src/rabbitmq/handler.ts b/Services/server/src/rabbitmq/handler.ts index c179c4f..f6f4b44 100644 --- a/Services/server/src/rabbitmq/handler.ts +++ b/Services/server/src/rabbitmq/handler.ts @@ -27,6 +27,8 @@ if (!DEFAULT_INDEX) throw Error("Default ElasticSearch index must be specified." // for failed queue that will come later const cachedBuffer: Record = {}; const cachedMetadata: Record = {}; +const cachedRecord: Record = {}; +let errorKey: string[] = []; export async function handler(key: string, event: string): Promise { console.info(`[AMQ] Messages received - key: ${key}, event: ${event}`); @@ -61,15 +63,19 @@ export async function handler(key: string, event: string): Promise { const rec = await popInfo(pathname); - console.info(`[AMQ] Key: ${key} - ${JSON.stringify(rec, null, 2) ?? "Not Found."}`); + if (rec) cachedRecord[key] = rec; - const result = rec - ? await handleFoundRecord(rec, cachedBuffer[key], cachedMetadata[key]) - : await handleNotFoundRecord(pathname, cachedBuffer[key], cachedMetadata[key]); + console.info(`[AMQ] Key: ${key} - ${JSON.stringify(cachedRecord[key] ?? "Not Found", null, 2)}`); + + const result = cachedRecord[key] + ? await handleFoundRecord(cachedRecord[key], cachedBuffer[key], cachedMetadata[key], key) + : await handleNotFoundRecord(pathname, cachedBuffer[key], cachedMetadata[key], key); if (result) { delete cachedBuffer[key]; delete cachedMetadata[key]; + delete cachedRecord[key]; + errorKey = errorKey.filter((v) => v !== key); } return result; @@ -96,7 +102,7 @@ async function popInfo(pathname: string) { return result.hits.hits[0]._source; } - if (result && result.hits.hits.length === 0) console.info("Index Not Found"); + if (result && result.hits.hits.length === 0) console.info("[AMQ] Index Not Found"); return false; } @@ -126,10 +132,11 @@ async function handleNotFoundRecord( pathname: string, buffer: Buffer, stat: { size: number; type: string }, + key: string, ) { const path = stripLeadingSlash(pathname.split("/").slice(0, -1).join("/") + "/"); const filename = pathname.split("/").at(-1); - const base64 = Buffer.from(buffer).toString("base64"); + const base64 = errorKey.includes(key) ? "" : Buffer.from(buffer).toString("base64"); const metadata = { pathname, @@ -158,7 +165,13 @@ async function handleNotFoundRecord( document: { data: base64, ...metadata }, refresh: "wait_for", }) - .catch((e) => console.error(e)); + .catch((e) => { + if (e.meta.statusCode >= 400 && e.meta.statusCode < 500) { + errorKey.push(key); + console.log(cachedRecord); + } + console.error(e); + }); if (!result) return false; @@ -173,6 +186,7 @@ async function handleFoundRecord( metadata: StorageFile, buffer: Buffer, stat: { size: number; type: string }, + key: string, ) { metadata.fileSize = stat.size; metadata.fileType = stat.type; @@ -182,10 +196,19 @@ async function handleFoundRecord( .index({ pipeline: "attachment", index: DEFAULT_INDEX!, - document: { data: Buffer.from(buffer).toString("base64"), ...metadata }, + document: { + data: errorKey.includes(key) ? "" : Buffer.from(buffer).toString("base64"), + ...metadata, + }, refresh: "wait_for", }) - .catch((e) => console.error(e)); + .catch((e) => { + if (e.meta.statusCode >= 400 && e.meta.statusCode < 500) { + errorKey.push(key); + console.log(cachedRecord); + } + console.error(e); + }); if (!result) return false;