diff --git a/Services/server/src/rabbitmq/handler.ts b/Services/server/src/rabbitmq/handler.ts new file mode 100644 index 0000000..298a130 --- /dev/null +++ b/Services/server/src/rabbitmq/handler.ts @@ -0,0 +1,103 @@ +import { EhrFile } from "../interfaces/ehr-fs"; +import esClient from "../elasticsearch"; +import minioClient from "../storage"; + +const cachedBuffer: Record = {}; + +export async function handler(key: string): Promise { + console.info(`[AMQ] Messages received - key: ${key}`); + + const [bucket, ...fragment] = key.split("/"); + const pathname = fragment.join("/"); + + if (!cachedBuffer[key]) { + const stream = await minioClient.getObject(bucket, pathname); + const buffer = Buffer.concat(await stream.toArray()); + cachedBuffer[key] = buffer; + } + + const rec = await getInfo(pathname); + + const result = rec + ? await handleFoundRecord(rec, cachedBuffer[key]) + : await handleNotFoundRecord(pathname, cachedBuffer[key]); + + return result; +} + +// Get info and delete it from ElasticSearch to re-index +async function getInfo(pathname: string) { + const result = await esClient + .search }>({ + index: "my-test-index", + query: { match: { pathname } }, + }) + .catch((e) => console.error(e)); + + // pathname is unique and should not have multiple record with same path + if (result && result.hits.hits.length > 0 && result.hits.hits[0]._source) { + await esClient + .delete({ + index: "my-test-index", + id: result.hits.hits[0]._id, + }) + .catch((e) => console.error(e)); + + return result.hits.hits[0]._source; + } + + if (result && result.hits.hits.length === 0) console.info("Index Not Found"); + + return false; +} + +/** + * Handle when record in elasticsearch cannot be found. + * This will insert empty metadata. + */ +async function handleNotFoundRecord(pathname: string, buffer: Buffer) { + const filename = pathname.split("/").at(-1); + const base64 = Buffer.from(buffer).toString("base64"); + + const metadata = { + pathname, + fileName: filename ?? "n/a", // should not possible to fallback + fileSize: Buffer.byteLength(buffer), + fileType: "", + title: "", + description: "", + category: [], + keyword: [], + upload: true, + createdAt: new Date().toISOString(), + createdBy: "n/a", + updatedAt: new Date().toISOString(), + updatedBy: "n/a", + } satisfies Partial; + + const result = await esClient + .index({ + pipeline: "attachment", + index: "my-test-index", + document: { data: base64, ...metadata }, + }) + .catch((e) => console.error(e)); + + if (result) return true; + + return false; +} + +async function handleFoundRecord(metadata: EhrFile, buffer: Buffer) { + const result = await esClient + .index({ + pipeline: "attachment", + index: "my-test-index", + document: { data: Buffer.from(buffer).toString("base64"), ...metadata, upload: true }, + }) + .catch((e) => console.error(e)); + + if (result) return true; + + return false; +} diff --git a/Services/server/src/rabbitmq/index.ts b/Services/server/src/rabbitmq/index.ts new file mode 100644 index 0000000..2115eaf --- /dev/null +++ b/Services/server/src/rabbitmq/index.ts @@ -0,0 +1,38 @@ +import amqp from "amqplib"; + +export async function init(cb: (key: string) => boolean | Promise) { + if (!process.env.AMQ_URL || !process.env.AMQ_QUEUE) return; + + const { AMQ_URL: url, AMQ_QUEUE: queue } = process.env; + + const connection = await amqp.connect(url); + + const channel = await connection.createChannel(); + + channel.assertQueue(queue, { durable: true }); + channel.prefetch(1); + + console.log("[RabbitMQ] Listening for message..."); + + channel.consume( + queue, + async (msg) => { + if (!msg) return; + + const parsed: Record = JSON.parse(msg.content.toString()); + + if (typeof parsed.Key !== "string" || parsed.Key.includes(".keep")) return channel.ack(msg); + + const key = parsed.Key; + + if (await cb(key)) return channel.ack(msg); + + return await new Promise((resolve) => setTimeout(() => resolve(channel.nack(msg)), 3000)); + }, + { noAck: false }, + ); +} + +export default { + init, +};