refactor: rabbitmq implement

This commit is contained in:
Methapon2001 2023-11-27 09:45:30 +07:00
parent 24350a11a4
commit 3fc70daed0
No known key found for this signature in database
GPG key ID: 849924FEF46BD132
12 changed files with 676 additions and 545 deletions

View file

@ -1,17 +1,25 @@
import { EhrFile } from "../interfaces/ehr-fs";
import esClient from "../elasticsearch";
import minioClient from "../storage";
import minioClient from "../minio";
const DEFAULT_INDEX = process.env.ELASTICSEARCH_INDEX;
if (!DEFAULT_INDEX) throw Error("Default ElasticSearch index must be specified.");
// for failed queue that will come later
const cachedBuffer: Record<string, Buffer> = {};
const cachedMetadata: Record<string, { size: number; type: string }> = {};
export async function handler(key: string): Promise<boolean> {
console.info(`[AMQ] Messages received - key: ${key}`);
export async function handler(key: string, event: string): Promise<boolean> {
console.info(`[AMQ] Messages received - key: ${key}, event: ${event}`);
const [bucket, ...fragment] = key.split("/");
const pathname = fragment.join("/");
if (event === "s3:ObjectRemoved:Delete") {
return await ensureDelete(pathname);
}
if (!cachedBuffer[key]) {
const stream = await minioClient.getObject(bucket, pathname);
const buffer = Buffer.concat(await stream.toArray());
@ -41,7 +49,7 @@ export async function handler(key: string): Promise<boolean> {
async function popInfo(pathname: string) {
const result = await esClient
.search<EhrFile & { attachment?: Record<string, unknown> }>({
index: "my-test-index",
index: DEFAULT_INDEX!,
query: { match: { pathname } },
})
.catch((e) => console.error(e));
@ -50,7 +58,7 @@ async function popInfo(pathname: string) {
if (result && result.hits.hits.length > 0 && result.hits.hits[0]._source) {
await esClient
.delete({
index: "my-test-index",
index: DEFAULT_INDEX!,
id: result.hits.hits[0]._id,
})
.catch((e) => console.error(e));
@ -63,6 +71,20 @@ async function popInfo(pathname: string) {
return false;
}
/**
* If there is data in database then delete it
*/
async function ensureDelete(pathname: string) {
await esClient
.deleteByQuery({
index: DEFAULT_INDEX!,
query: { match: { pathname } },
conflicts: "proceed",
})
.catch((e) => console.error(e));
return true;
}
/**
* Handle when record in elasticsearch cannot be found.
* This will insert empty metadata.
@ -94,7 +116,7 @@ async function handleNotFoundRecord(
const result = await esClient
.index({
pipeline: "attachment",
index: "my-test-index",
index: DEFAULT_INDEX!,
document: { data: base64, ...metadata },
})
.catch((e) => console.error(e));
@ -116,7 +138,7 @@ async function handleFoundRecord(
const result = await esClient
.index({
pipeline: "attachment",
index: "my-test-index",
index: DEFAULT_INDEX!,
document: { data: Buffer.from(buffer).toString("base64"), ...metadata },
})
.catch((e) => console.error(e));