import { StorageFile } from "../interfaces/storage-fs"; import esClient from "../elasticsearch"; import minioClient from "../minio"; const DEFAULT_INDEX = process.env.ELASTICSEARCH_INDEX; if (!DEFAULT_INDEX) throw Error("Default ElasticSearch index must be specified."); // for failed queue that will come later const cachedBuffer: Record = {}; const cachedMetadata: Record = {}; export async function handler(key: string, event: string): Promise { console.info(`[AMQ] Messages received - key: ${key}, event: ${event}`); const [bucket, ...fragment] = key.split("/"); const pathname = fragment.join("/"); if (event === "s3:ObjectRemoved:Delete") { return await ensureDelete(pathname); } if (!cachedBuffer[key]) { try { const stream = await minioClient.getObject(bucket, pathname); const buffer = Buffer.concat(await stream.toArray()); cachedBuffer[key] = buffer; } catch (e: any) { if (e.code === "NoSuchKey") { delete cachedBuffer[key]; delete cachedMetadata[key]; await ensureDelete(pathname); return true; } } } if (!cachedMetadata[key]) { const stat = await minioClient.statObject(bucket, pathname); cachedMetadata[key] = { size: stat.size, type: stat.metaData["content-type"] }; } const rec = await popInfo(pathname); const result = rec ? await handleFoundRecord(rec, cachedBuffer[key], cachedMetadata[key]) : await handleNotFoundRecord(pathname, cachedBuffer[key], cachedMetadata[key]); if (result) { delete cachedBuffer[key]; delete cachedMetadata[key]; } return result; } // Get info and delete it from ElasticSearch to re-index async function popInfo(pathname: string) { const result = await esClient .search }>({ index: DEFAULT_INDEX!, query: { match: { pathname } }, }) .catch((e) => console.error(e)); // pathname is unique and should not have multiple record with same path if (result && result.hits.hits.length > 0 && result.hits.hits[0]._source) { await esClient .delete({ index: DEFAULT_INDEX!, id: result.hits.hits[0]._id, }) .catch((e) => console.error(e)); return result.hits.hits[0]._source; } if (result && result.hits.hits.length === 0) console.info("Index Not Found"); return false; } /** * If there is data in database then delete it */ async function ensureDelete(pathname: string) { await esClient .deleteByQuery({ index: DEFAULT_INDEX!, query: { match: { pathname } }, conflicts: "proceed", }) .catch((e) => console.error(e)); return true; } /** * Handle when record in elasticsearch cannot be found. * This will insert empty metadata. */ async function handleNotFoundRecord( pathname: string, buffer: Buffer, stat: { size: number; type: string }, ) { const path = pathname.split("/").slice(0, -1).join("/") + "/"; const filename = pathname.split("/").at(-1); const base64 = Buffer.from(buffer).toString("base64"); const metadata = { pathname, path, fileName: filename ?? "n/a", // should not possible to fallback, but just in case. fileSize: stat.size, fileType: stat.type, title: "", description: "", category: [], keyword: [], upload: true, createdAt: new Date().toISOString(), createdBy: "n/a", updatedAt: new Date().toISOString(), updatedBy: "n/a", } satisfies Partial; const result = await esClient .index({ pipeline: "attachment", index: DEFAULT_INDEX!, document: { data: base64, ...metadata }, }) .catch((e) => console.error(e)); if (result) return true; return false; } async function handleFoundRecord( metadata: StorageFile, buffer: Buffer, stat: { size: number; type: string }, ) { metadata.fileSize = stat.size; metadata.fileType = stat.type; metadata.upload = true; const result = await esClient .index({ pipeline: "attachment", index: DEFAULT_INDEX!, document: { data: Buffer.from(buffer).toString("base64"), ...metadata }, }) .catch((e) => console.error(e)); if (result) return true; return false; }