import { StorageFile } from "../interfaces/storage-fs"; import esClient from "../elasticsearch"; import minioClient from "../minio"; import * as io from "../lib/websocket"; // const MINIO_ERROR_MESSAGE = "เกิดข้อผิดพลาดกับระบบจัดการไฟล์"; // async function checkPathExist(bucket: string, path: string[]) { // if (path.filter(Boolean).length === 0) return true; // root does not contain any mark // return await checkFileExist(bucket, `${path.filter(Boolean).join("/")}/.keep`); // } // // async function checkFileExist(bucket: string, pathname: string) { // return Boolean( // await minioClient.statObject(bucket, stripLeadingSlash(pathname)).catch((e) => { // if (e.code === "NotFound") return false; // console.error(`Storage Error: ${e}`); // throw new Error(MINIO_ERROR_MESSAGE); // }), // ); // } const DEFAULT_INDEX = process.env.ELASTICSEARCH_INDEX; if (!DEFAULT_INDEX) throw Error("Default ElasticSearch index must be specified."); // for failed queue that will come later const cachedBuffer: Record = {}; const cachedMetadata: Record = {}; const cachedRecord: Record = {}; let errorKey: string[] = []; export async function handler(key: string, event: string): Promise { console.info(`[AMQ] Messages received - key: ${key}, event: ${event}`); const [bucket, ...fragment] = key.split("/"); const pathname = fragment.join("/"); if (event === "s3:ObjectRemoved:Delete") { return await ensureDelete(pathname); } if (!cachedBuffer[key]) { try { const stream = await minioClient.getObject(bucket, pathname); const buffer = Buffer.concat(await stream.toArray()); cachedBuffer[key] = buffer; } catch (e: any) { if (e.code === "NoSuchKey") { console.info(`[AMQ] Key: ${key} received but cannot be found.`); delete cachedBuffer[key]; delete cachedMetadata[key]; await ensureDelete(pathname); return true; } } } if (!cachedMetadata[key]) { const stat = await minioClient.statObject(bucket, pathname); cachedMetadata[key] = { size: stat.size, type: stat.metaData["content-type"] }; } const rec = await popInfo(pathname); if (rec) cachedRecord[key] = rec; console.info(`[AMQ] Key: ${key} - ${JSON.stringify(cachedRecord[key] ?? "Not Found", null, 2)}`); const result = cachedRecord[key] ? await handleFoundRecord(cachedRecord[key], cachedBuffer[key], cachedMetadata[key], key) : await handleNotFoundRecord(pathname, cachedBuffer[key], cachedMetadata[key], key); if (result) { delete cachedBuffer[key]; delete cachedMetadata[key]; delete cachedRecord[key]; errorKey = errorKey.filter((v) => v !== key); } return result; } // Get info and delete it from ElasticSearch to re-index async function popInfo(pathname: string) { const result = await esClient .search }>({ index: DEFAULT_INDEX!, query: { match: { pathname } }, }) .catch((e) => console.error(e)); // pathname is unique and should not have multiple record with same path if (result && result.hits.hits.length > 0 && result.hits.hits[0]._source) { await esClient .delete({ index: DEFAULT_INDEX!, id: result.hits.hits[0]._id, }) .catch((e) => console.error(e)); return result.hits.hits[0]._source; } if (result && result.hits.hits.length === 0) console.info("[AMQ] Index Not Found"); return false; } /** * If there is data in database then delete it */ async function ensureDelete(pathname: string) { await esClient .deleteByQuery({ index: DEFAULT_INDEX!, query: { match: { pathname } }, conflicts: "proceed", }) .catch((e) => console.error(e)); io.getInstance()?.emit("FileDelete", { pathname }); return true; } /** * Handle when record in elasticsearch cannot be found. * This will insert empty metadata. */ async function handleNotFoundRecord( pathname: string, buffer: Buffer, stat: { size: number; type: string }, key: string, ) { const path = stripLeadingSlash(pathname.split("/").slice(0, -1).join("/") + "/"); const filename = pathname.split("/").at(-1); const base64 = errorKey.includes(key) ? "" : Buffer.from(buffer).toString("base64"); const metadata = { pathname, path, fileName: filename ?? "n/a", // should not possible to fallback, but just in case. fileSize: stat.size, fileType: stat.type, title: "", description: "", author: "", category: [], keyword: [], metadata: {}, upload: true, hidden: false, createdAt: new Date().toISOString(), createdBy: "n/a", updatedAt: new Date().toISOString(), updatedBy: "n/a", } satisfies StorageFile; const result = await esClient .index({ pipeline: "attachment", index: DEFAULT_INDEX!, document: { data: base64, ...metadata }, refresh: "wait_for", }) .catch((e) => { if (e.meta.statusCode >= 400 && e.meta.statusCode < 500) { errorKey.push(key); } console.error(e); }); if (!result) return false; if (!metadata.hidden) { io.getInstance()?.emit("FileUpload", metadata); } return true; } async function handleFoundRecord( metadata: StorageFile, buffer: Buffer, stat: { size: number; type: string }, key: string, ) { metadata.fileSize = stat.size; metadata.fileType = stat.type; metadata.upload = true; const result = await esClient .index({ pipeline: "attachment", index: DEFAULT_INDEX!, document: { data: errorKey.includes(key) ? "" : Buffer.from(buffer).toString("base64"), ...metadata, }, refresh: "wait_for", }) .catch((e) => { if (e.meta.statusCode >= 400 && e.meta.statusCode < 500) { errorKey.push(key); console.log(cachedRecord); } console.error(e); }); if (!result) return false; if (!metadata.hidden) { io.getInstance()?.emit("FileUpload", metadata); } return true; } function stripLeadingSlash(str: string) { return str.replace(/^\//, ""); }