feat: cache metadata on requeue
This commit is contained in:
parent
9ca1213e50
commit
0a9f27a02e
1 changed files with 32 additions and 11 deletions
|
|
@ -4,6 +4,7 @@ import minioClient from "../storage";
|
|||
|
||||
// for failed queue that will come later
|
||||
const cachedBuffer: Record<string, Buffer> = {};
|
||||
const cachedMetadata: Record<string, { size: number; type: string }> = {};
|
||||
|
||||
export async function handler(key: string): Promise<boolean> {
|
||||
console.info(`[AMQ] Messages received - key: ${key}`);
|
||||
|
|
@ -17,19 +18,27 @@ export async function handler(key: string): Promise<boolean> {
|
|||
cachedBuffer[key] = buffer;
|
||||
}
|
||||
|
||||
const rec = await getInfo(pathname);
|
||||
if (!cachedMetadata[key]) {
|
||||
const stat = await minioClient.statObject(bucket, pathname);
|
||||
cachedMetadata[key] = { size: stat.size, type: stat.metaData["content-type"] };
|
||||
}
|
||||
|
||||
const rec = await popInfo(pathname);
|
||||
|
||||
const result = rec
|
||||
? await handleFoundRecord(rec, cachedBuffer[key])
|
||||
: await handleNotFoundRecord(pathname, cachedBuffer[key]);
|
||||
? await handleFoundRecord(rec, cachedBuffer[key], cachedMetadata[key])
|
||||
: await handleNotFoundRecord(pathname, cachedBuffer[key], cachedMetadata[key]);
|
||||
|
||||
if (result) delete cachedBuffer[key];
|
||||
if (result) {
|
||||
delete cachedBuffer[key];
|
||||
delete cachedMetadata[key];
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
// Get info and delete it from ElasticSearch to re-index
|
||||
async function getInfo(pathname: string) {
|
||||
async function popInfo(pathname: string) {
|
||||
const result = await esClient
|
||||
.search<EhrFile & { attachment?: Record<string, unknown> }>({
|
||||
index: "my-test-index",
|
||||
|
|
@ -58,15 +67,19 @@ async function getInfo(pathname: string) {
|
|||
* Handle when record in elasticsearch cannot be found.
|
||||
* This will insert empty metadata.
|
||||
*/
|
||||
async function handleNotFoundRecord(pathname: string, buffer: Buffer) {
|
||||
async function handleNotFoundRecord(
|
||||
pathname: string,
|
||||
buffer: Buffer,
|
||||
stat: { size: number; type: string },
|
||||
) {
|
||||
const filename = pathname.split("/").at(-1);
|
||||
const base64 = Buffer.from(buffer).toString("base64");
|
||||
|
||||
const metadata = {
|
||||
pathname,
|
||||
fileName: filename ?? "n/a", // should not possible to fallback
|
||||
fileSize: Buffer.byteLength(buffer),
|
||||
fileType: "",
|
||||
fileName: filename ?? "n/a", // should not possible to fallback, just in case.
|
||||
fileSize: stat.size,
|
||||
fileType: stat.type,
|
||||
title: "",
|
||||
description: "",
|
||||
category: [],
|
||||
|
|
@ -91,12 +104,20 @@ async function handleNotFoundRecord(pathname: string, buffer: Buffer) {
|
|||
return false;
|
||||
}
|
||||
|
||||
async function handleFoundRecord(metadata: EhrFile, buffer: Buffer) {
|
||||
async function handleFoundRecord(
|
||||
metadata: EhrFile,
|
||||
buffer: Buffer,
|
||||
stat: { size: number; type: string },
|
||||
) {
|
||||
metadata.fileSize = stat.size;
|
||||
metadata.fileType = stat.type;
|
||||
metadata.upload = true;
|
||||
|
||||
const result = await esClient
|
||||
.index({
|
||||
pipeline: "attachment",
|
||||
index: "my-test-index",
|
||||
document: { data: Buffer.from(buffer).toString("base64"), ...metadata, upload: true },
|
||||
document: { data: Buffer.from(buffer).toString("base64"), ...metadata },
|
||||
})
|
||||
.catch((e) => console.error(e));
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue