fix: skip binary on elasticsearch error in 400 range
This commit is contained in:
parent
7cc01f9531
commit
c633224b88
1 changed files with 32 additions and 9 deletions
|
|
@ -27,6 +27,8 @@ if (!DEFAULT_INDEX) throw Error("Default ElasticSearch index must be specified."
|
|||
// for failed queue that will come later
|
||||
const cachedBuffer: Record<string, Buffer> = {};
|
||||
const cachedMetadata: Record<string, { size: number; type: string }> = {};
|
||||
const cachedRecord: Record<string, StorageFile> = {};
|
||||
let errorKey: string[] = [];
|
||||
|
||||
export async function handler(key: string, event: string): Promise<boolean> {
|
||||
console.info(`[AMQ] Messages received - key: ${key}, event: ${event}`);
|
||||
|
|
@ -61,15 +63,19 @@ export async function handler(key: string, event: string): Promise<boolean> {
|
|||
|
||||
const rec = await popInfo(pathname);
|
||||
|
||||
console.info(`[AMQ] Key: ${key} - ${JSON.stringify(rec, null, 2) ?? "Not Found."}`);
|
||||
if (rec) cachedRecord[key] = rec;
|
||||
|
||||
const result = rec
|
||||
? await handleFoundRecord(rec, cachedBuffer[key], cachedMetadata[key])
|
||||
: await handleNotFoundRecord(pathname, cachedBuffer[key], cachedMetadata[key]);
|
||||
console.info(`[AMQ] Key: ${key} - ${JSON.stringify(cachedRecord[key] ?? "Not Found", null, 2)}`);
|
||||
|
||||
const result = cachedRecord[key]
|
||||
? await handleFoundRecord(cachedRecord[key], cachedBuffer[key], cachedMetadata[key], key)
|
||||
: await handleNotFoundRecord(pathname, cachedBuffer[key], cachedMetadata[key], key);
|
||||
|
||||
if (result) {
|
||||
delete cachedBuffer[key];
|
||||
delete cachedMetadata[key];
|
||||
delete cachedRecord[key];
|
||||
errorKey = errorKey.filter((v) => v !== key);
|
||||
}
|
||||
|
||||
return result;
|
||||
|
|
@ -96,7 +102,7 @@ async function popInfo(pathname: string) {
|
|||
return result.hits.hits[0]._source;
|
||||
}
|
||||
|
||||
if (result && result.hits.hits.length === 0) console.info("Index Not Found");
|
||||
if (result && result.hits.hits.length === 0) console.info("[AMQ] Index Not Found");
|
||||
|
||||
return false;
|
||||
}
|
||||
|
|
@ -126,10 +132,11 @@ async function handleNotFoundRecord(
|
|||
pathname: string,
|
||||
buffer: Buffer,
|
||||
stat: { size: number; type: string },
|
||||
key: string,
|
||||
) {
|
||||
const path = stripLeadingSlash(pathname.split("/").slice(0, -1).join("/") + "/");
|
||||
const filename = pathname.split("/").at(-1);
|
||||
const base64 = Buffer.from(buffer).toString("base64");
|
||||
const base64 = errorKey.includes(key) ? "" : Buffer.from(buffer).toString("base64");
|
||||
|
||||
const metadata = {
|
||||
pathname,
|
||||
|
|
@ -158,7 +165,13 @@ async function handleNotFoundRecord(
|
|||
document: { data: base64, ...metadata },
|
||||
refresh: "wait_for",
|
||||
})
|
||||
.catch((e) => console.error(e));
|
||||
.catch((e) => {
|
||||
if (e.meta.statusCode >= 400 && e.meta.statusCode < 500) {
|
||||
errorKey.push(key);
|
||||
console.log(cachedRecord);
|
||||
}
|
||||
console.error(e);
|
||||
});
|
||||
|
||||
if (!result) return false;
|
||||
|
||||
|
|
@ -173,6 +186,7 @@ async function handleFoundRecord(
|
|||
metadata: StorageFile,
|
||||
buffer: Buffer,
|
||||
stat: { size: number; type: string },
|
||||
key: string,
|
||||
) {
|
||||
metadata.fileSize = stat.size;
|
||||
metadata.fileType = stat.type;
|
||||
|
|
@ -182,10 +196,19 @@ async function handleFoundRecord(
|
|||
.index({
|
||||
pipeline: "attachment",
|
||||
index: DEFAULT_INDEX!,
|
||||
document: { data: Buffer.from(buffer).toString("base64"), ...metadata },
|
||||
document: {
|
||||
data: errorKey.includes(key) ? "" : Buffer.from(buffer).toString("base64"),
|
||||
...metadata,
|
||||
},
|
||||
refresh: "wait_for",
|
||||
})
|
||||
.catch((e) => console.error(e));
|
||||
.catch((e) => {
|
||||
if (e.meta.statusCode >= 400 && e.meta.statusCode < 500) {
|
||||
errorKey.push(key);
|
||||
console.log(cachedRecord);
|
||||
}
|
||||
console.error(e);
|
||||
});
|
||||
|
||||
if (!result) return false;
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue