CheckQueueInProgress

This commit is contained in:
AdisakKanthawilang 2025-07-11 16:50:12 +07:00
parent 027cbe2814
commit cd6aa9b1a2
2 changed files with 124 additions and 61 deletions

View file

@ -28,7 +28,7 @@ import { Profile } from "../entities/Profile";
import { RequestWithUser } from "../middlewares/user";
import permission from "../interfaces/permission";
import { PermissionOrg } from "../entities/PermissionOrg";
import { setLogDataDiff } from "../interfaces/utils";
import { checkQueueInProgress, setLogDataDiff } from "../interfaces/utils";
import { sendToQueueOrg, sendToQueueOrgDraft } from "../services/rabbitmq";
import { PosMasterAssign } from "../entities/PosMasterAssign";
import { PosMasterAct } from "../entities/PosMasterAct";
@ -142,35 +142,52 @@ export class OrganizationController extends Controller {
@Body() requestBody: CreateOrgRevision,
@Request() request: RequestWithUser,
) {
//new main revision
const before = null;
const revision = Object.assign(new OrgRevision(), requestBody) as OrgRevision;
revision.orgRevisionIsDraft = true;
revision.orgRevisionIsCurrent = false;
revision.createdUserId = request.user.sub;
revision.createdFullName = request.user.name;
revision.lastUpdateUserId = request.user.sub;
revision.lastUpdateFullName = request.user.name;
revision.createdAt = new Date();
revision.lastUpdatedAt = new Date();
await this.orgRevisionRepository.save(revision, { data: request });
setLogDataDiff(request, { before, after: revision });
const msg = {
data: {
requestBody: requestBody,
request: request.user,
revision: revision,
},
user: request.user,
};
try {
// CheckQueueInProgress
// const [isBusyDraft, isBusyPublish] = await Promise.all([
// checkQueueInProgress(`${process.env.AMQ_QUEUE_ORG_DRAFT}`),
// checkQueueInProgress(`${process.env.AMQ_QUEUE_ORG}`),
// ]);
// console.log("✅ ตรวจสอบแล้ว Draft Busy:", isBusyDraft);
// console.log("✅ ตรวจสอบแล้ว Publish Busy:", isBusyPublish);
// if (isBusyDraft || isBusyPublish) {
// console.log("🚫 พบว่ามีงานอยู่ในคิว — error")
// throw new HttpError(
// HttpStatusCode.CONFLICT,
// "ไม่สามารถดำเนินการได้ หากกำลังเผยแพร่หรือสร้างแบบร่างโครงสร้างหน่วยงาน",
// );
// }
//new main revision
const before = null;
const revision = Object.assign(new OrgRevision(), requestBody) as OrgRevision;
revision.orgRevisionIsDraft = true;
revision.orgRevisionIsCurrent = false;
revision.createdUserId = request.user.sub;
revision.createdFullName = request.user.name;
revision.lastUpdateUserId = request.user.sub;
revision.lastUpdateFullName = request.user.name;
revision.createdAt = new Date();
revision.lastUpdatedAt = new Date();
await this.orgRevisionRepository.save(revision, { data: request });
setLogDataDiff(request, { before, after: revision });
const msg = {
data: {
requestBody: requestBody,
request: request.user,
revision: revision,
},
user: request.user,
};
await sendToQueueOrgDraft(msg);
return new HttpSuccess("Draft is being created... Processing in the background.");
} catch (error: any) {
if (error?.status && error?.message) {
return error;
}
return new HttpError(
HttpStatusCode.NOT_FOUND,
"Failed to process the draft. Please try again later.",
HttpStatusCode.INTERNAL_SERVER_ERROR,
"Failed to process the draft. Please try again later."
);
}
}
@ -3208,45 +3225,71 @@ export class OrganizationController extends Controller {
*/
@Get("get/publish")
async runPublish(@Request() request: RequestWithUser) {
const today = new Date();
today.setHours(0, 0, 0, 0); // Set time to the beginning of the day
const orgRevisionPublish = await this.orgRevisionRepository
.createQueryBuilder("orgRevision")
.where("orgRevision.orgRevisionIsDraft = false")
.andWhere("orgRevision.orgRevisionIsCurrent = true")
.getOne();
try{
// CheckQueueInProgress
// console.log("🚀 ตรวจสอบว่ามีงานอยู่ในคิว");
// const [isBusyDraft, isBusyPublish] = await Promise.all([
// checkQueueInProgress(`${process.env.AMQ_QUEUE_ORG_DRAFT}`),
// checkQueueInProgress(`${process.env.AMQ_QUEUE_ORG}`),
// ]);
// console.log("✅ ตรวจสอบแล้ว Draft Busy:", isBusyDraft);
// console.log("✅ ตรวจสอบแล้ว Publish Busy:", isBusyPublish);
// if (isBusyDraft || isBusyPublish) {
// console.log("🚫 พบว่ามีงานอยู่ในคิว — error")
// throw new HttpError(
// HttpStatusCode.CONFLICT,
// "ไม่สามารถดำเนินการได้ หากกำลังเผยแพร่หรือสร้างแบบร่างโครงสร้างหน่วยงาน",
// );
// }
const today = new Date();
today.setHours(0, 0, 0, 0); // Set time to the beginning of the day
const orgRevisionPublish = await this.orgRevisionRepository
.createQueryBuilder("orgRevision")
.where("orgRevision.orgRevisionIsDraft = false")
.andWhere("orgRevision.orgRevisionIsCurrent = true")
.getOne();
const orgRevisionDraft = await this.orgRevisionRepository
.createQueryBuilder("orgRevision")
.where("orgRevision.orgRevisionIsDraft = true")
.andWhere("orgRevision.orgRevisionIsCurrent = false")
// .andWhere("DATE(orgRevision.orgPublishDate) = :today", { today })
.getOne();
if (!orgRevisionDraft) {
const orgRevisionDraft = await this.orgRevisionRepository
.createQueryBuilder("orgRevision")
.where("orgRevision.orgRevisionIsDraft = true")
.andWhere("orgRevision.orgRevisionIsCurrent = false")
// .andWhere("DATE(orgRevision.orgPublishDate) = :today", { today })
.getOne();
if (!orgRevisionDraft) {
return new HttpSuccess();
// throw new HttpError(HttpStatusCode.NOT_FOUND, "ไม่มีข้อมูลเผยแพร่");
}
// if (orgRevisionPublish) {
// orgRevisionPublish.orgRevisionIsDraft = false;
// orgRevisionPublish.orgRevisionIsCurrent = false;
// await this.orgRevisionRepository.save(orgRevisionPublish);
// }
// orgRevisionDraft.orgRevisionIsCurrent = true;
// orgRevisionDraft.orgRevisionIsDraft = false;
// await this.orgRevisionRepository.save(orgRevisionDraft);
const msg = {
data: {
id: orgRevisionDraft.id,
status: "NOW",
lastUpdateUserId: request.user.sub,
lastUpdateFullName: request.user.name,
lastUpdatedAt: new Date(),
},
user: request.user,
token: request.headers["authorization"],
};
sendToQueueOrg(msg);
return new HttpSuccess();
// throw new HttpError(HttpStatusCode.NOT_FOUND, "ไม่มีข้อมูลเผยแพร่");
} catch (error: any) {
if (error?.status && error?.message) {
return error;
}
return new HttpError(
HttpStatusCode.INTERNAL_SERVER_ERROR,
"Failed to process the publish. Please try again later."
);
}
// if (orgRevisionPublish) {
// orgRevisionPublish.orgRevisionIsDraft = false;
// orgRevisionPublish.orgRevisionIsCurrent = false;
// await this.orgRevisionRepository.save(orgRevisionPublish);
// }
// orgRevisionDraft.orgRevisionIsCurrent = true;
// orgRevisionDraft.orgRevisionIsDraft = false;
// await this.orgRevisionRepository.save(orgRevisionDraft);
const msg = {
data: {
id: orgRevisionDraft.id,
status: "NOW",
lastUpdateUserId: request.user.sub,
lastUpdateFullName: request.user.name,
lastUpdatedAt: new Date(),
},
user: request.user,
token: request.headers["authorization"],
};
sendToQueueOrg(msg);
return new HttpSuccess();
}
/**

View file

@ -503,6 +503,26 @@ export function editLogSequence(req: RequestWithUser, index: number, data: LogSe
req.app.locals.logData.sequence[index] = data;
}
export async function checkQueueInProgress(queueName: string) {
// const axios = require('axios');
// console.log("Checking queue in progress");
// const res = await axios.get(`${process.env.RABBIT_API_URL}/api/queues/%2F/${queueName}`, {
// auth: { username: process.env.RABBIT_USER , password: process.env.RABBIT_PASS },
// });
// const q = res.data;
// console.log(`Queue "${queueName}" has:`);
// console.log(` - ${q.messages_ready} messages ready`);
// console.log(` - ${q.messages_unacknowledged} messages in progress (unacked)`);
// if (q.messages_unacknowledged > 0) {
// return true;
// }
// return false;
}
export function commandTypePath(commandCode: string): string | null {
switch (commandCode) {
case "C-PM-01":