feat: add rabbitmq handler (not implemented)
This commit is contained in:
parent
34c3f27418
commit
093eb7df85
2 changed files with 141 additions and 0 deletions
103
Services/server/src/rabbitmq/handler.ts
Normal file
103
Services/server/src/rabbitmq/handler.ts
Normal file
|
|
@ -0,0 +1,103 @@
|
|||
import { EhrFile } from "../interfaces/ehr-fs";
|
||||
import esClient from "../elasticsearch";
|
||||
import minioClient from "../storage";
|
||||
|
||||
const cachedBuffer: Record<string, Buffer> = {};
|
||||
|
||||
export async function handler(key: string): Promise<boolean> {
|
||||
console.info(`[AMQ] Messages received - key: ${key}`);
|
||||
|
||||
const [bucket, ...fragment] = key.split("/");
|
||||
const pathname = fragment.join("/");
|
||||
|
||||
if (!cachedBuffer[key]) {
|
||||
const stream = await minioClient.getObject(bucket, pathname);
|
||||
const buffer = Buffer.concat(await stream.toArray());
|
||||
cachedBuffer[key] = buffer;
|
||||
}
|
||||
|
||||
const rec = await getInfo(pathname);
|
||||
|
||||
const result = rec
|
||||
? await handleFoundRecord(rec, cachedBuffer[key])
|
||||
: await handleNotFoundRecord(pathname, cachedBuffer[key]);
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
// Get info and delete it from ElasticSearch to re-index
|
||||
async function getInfo(pathname: string) {
|
||||
const result = await esClient
|
||||
.search<EhrFile & { attachment?: Record<string, unknown> }>({
|
||||
index: "my-test-index",
|
||||
query: { match: { pathname } },
|
||||
})
|
||||
.catch((e) => console.error(e));
|
||||
|
||||
// pathname is unique and should not have multiple record with same path
|
||||
if (result && result.hits.hits.length > 0 && result.hits.hits[0]._source) {
|
||||
await esClient
|
||||
.delete({
|
||||
index: "my-test-index",
|
||||
id: result.hits.hits[0]._id,
|
||||
})
|
||||
.catch((e) => console.error(e));
|
||||
|
||||
return result.hits.hits[0]._source;
|
||||
}
|
||||
|
||||
if (result && result.hits.hits.length === 0) console.info("Index Not Found");
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle when record in elasticsearch cannot be found.
|
||||
* This will insert empty metadata.
|
||||
*/
|
||||
async function handleNotFoundRecord(pathname: string, buffer: Buffer) {
|
||||
const filename = pathname.split("/").at(-1);
|
||||
const base64 = Buffer.from(buffer).toString("base64");
|
||||
|
||||
const metadata = {
|
||||
pathname,
|
||||
fileName: filename ?? "n/a", // should not possible to fallback
|
||||
fileSize: Buffer.byteLength(buffer),
|
||||
fileType: "",
|
||||
title: "",
|
||||
description: "",
|
||||
category: [],
|
||||
keyword: [],
|
||||
upload: true,
|
||||
createdAt: new Date().toISOString(),
|
||||
createdBy: "n/a",
|
||||
updatedAt: new Date().toISOString(),
|
||||
updatedBy: "n/a",
|
||||
} satisfies Partial<EhrFile>;
|
||||
|
||||
const result = await esClient
|
||||
.index({
|
||||
pipeline: "attachment",
|
||||
index: "my-test-index",
|
||||
document: { data: base64, ...metadata },
|
||||
})
|
||||
.catch((e) => console.error(e));
|
||||
|
||||
if (result) return true;
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
async function handleFoundRecord(metadata: EhrFile, buffer: Buffer) {
|
||||
const result = await esClient
|
||||
.index({
|
||||
pipeline: "attachment",
|
||||
index: "my-test-index",
|
||||
document: { data: Buffer.from(buffer).toString("base64"), ...metadata, upload: true },
|
||||
})
|
||||
.catch((e) => console.error(e));
|
||||
|
||||
if (result) return true;
|
||||
|
||||
return false;
|
||||
}
|
||||
38
Services/server/src/rabbitmq/index.ts
Normal file
38
Services/server/src/rabbitmq/index.ts
Normal file
|
|
@ -0,0 +1,38 @@
|
|||
import amqp from "amqplib";
|
||||
|
||||
export async function init(cb: (key: string) => boolean | Promise<boolean>) {
|
||||
if (!process.env.AMQ_URL || !process.env.AMQ_QUEUE) return;
|
||||
|
||||
const { AMQ_URL: url, AMQ_QUEUE: queue } = process.env;
|
||||
|
||||
const connection = await amqp.connect(url);
|
||||
|
||||
const channel = await connection.createChannel();
|
||||
|
||||
channel.assertQueue(queue, { durable: true });
|
||||
channel.prefetch(1);
|
||||
|
||||
console.log("[RabbitMQ] Listening for message...");
|
||||
|
||||
channel.consume(
|
||||
queue,
|
||||
async (msg) => {
|
||||
if (!msg) return;
|
||||
|
||||
const parsed: Record<string, unknown> = JSON.parse(msg.content.toString());
|
||||
|
||||
if (typeof parsed.Key !== "string" || parsed.Key.includes(".keep")) return channel.ack(msg);
|
||||
|
||||
const key = parsed.Key;
|
||||
|
||||
if (await cb(key)) return channel.ack(msg);
|
||||
|
||||
return await new Promise((resolve) => setTimeout(() => resolve(channel.nack(msg)), 3000));
|
||||
},
|
||||
{ noAck: false },
|
||||
);
|
||||
}
|
||||
|
||||
export default {
|
||||
init,
|
||||
};
|
||||
Loading…
Add table
Add a link
Reference in a new issue