import { EhrFile } from "../interfaces/ehr-fs"; import esClient from "../elasticsearch"; import minioClient from "../storage"; // for failed queue that will come later const cachedBuffer: Record = {}; export async function handler(key: string): Promise { console.info(`[AMQ] Messages received - key: ${key}`); const [bucket, ...fragment] = key.split("/"); const pathname = fragment.join("/"); if (!cachedBuffer[key]) { const stream = await minioClient.getObject(bucket, pathname); const buffer = Buffer.concat(await stream.toArray()); cachedBuffer[key] = buffer; } const rec = await getInfo(pathname); const result = rec ? await handleFoundRecord(rec, cachedBuffer[key]) : await handleNotFoundRecord(pathname, cachedBuffer[key]); if (result) delete cachedBuffer[key]; return result; } // Get info and delete it from ElasticSearch to re-index async function getInfo(pathname: string) { const result = await esClient .search }>({ index: "my-test-index", query: { match: { pathname } }, }) .catch((e) => console.error(e)); // pathname is unique and should not have multiple record with same path if (result && result.hits.hits.length > 0 && result.hits.hits[0]._source) { await esClient .delete({ index: "my-test-index", id: result.hits.hits[0]._id, }) .catch((e) => console.error(e)); return result.hits.hits[0]._source; } if (result && result.hits.hits.length === 0) console.info("Index Not Found"); return false; } /** * Handle when record in elasticsearch cannot be found. * This will insert empty metadata. */ async function handleNotFoundRecord(pathname: string, buffer: Buffer) { const filename = pathname.split("/").at(-1); const base64 = Buffer.from(buffer).toString("base64"); const metadata = { pathname, fileName: filename ?? "n/a", // should not possible to fallback fileSize: Buffer.byteLength(buffer), fileType: "", title: "", description: "", category: [], keyword: [], upload: true, createdAt: new Date().toISOString(), createdBy: "n/a", updatedAt: new Date().toISOString(), updatedBy: "n/a", } satisfies Partial; const result = await esClient .index({ pipeline: "attachment", index: "my-test-index", document: { data: base64, ...metadata }, }) .catch((e) => console.error(e)); if (result) return true; return false; } async function handleFoundRecord(metadata: EhrFile, buffer: Buffer) { const result = await esClient .index({ pipeline: "attachment", index: "my-test-index", document: { data: Buffer.from(buffer).toString("base64"), ...metadata, upload: true }, }) .catch((e) => console.error(e)); if (result) return true; return false; }