import { Controller, Post, Route, Security, Tags, Request } from "tsoa"; import { AppDataSource } from "../database/data-source"; import HttpSuccess from "../interfaces/http-success"; import HttpStatus from "../interfaces/http-status"; import HttpError from "../interfaces/http-error"; import { RequestWithUser } from "../middlewares/user"; import { MoreThanOrEqual } from "typeorm"; import { PosMaster } from "./../entities/PosMaster"; import axios from "axios"; import { KeycloakSyncController } from "./KeycloakSyncController"; import { EmployeePosMaster } from "./../entities/EmployeePosMaster"; interface OrgUpdatePayload { profileId: string; rootDnaId: string | null; child1DnaId: string | null; child2DnaId: string | null; child3DnaId: string | null; child4DnaId: string | null; profileType: "PROFILE" | "PROFILE_EMPLOYEE"; } @Route("api/v1/org/script-profile-org") @Tags("Keycloak Sync") @Security("bearerAuth") export class ScriptProfileOrgController extends Controller { private posMasterRepo = AppDataSource.getRepository(PosMaster); private employeePosMasterRepo = AppDataSource.getRepository(EmployeePosMaster); // Idempotency flag to prevent concurrent runs private isRunning = false; // Configurable values private readonly BATCH_SIZE = parseInt(process.env.CRONJOB_BATCH_SIZE || "100", 10); private readonly UPDATE_WINDOW_HOURS = parseInt( process.env.CRONJOB_UPDATE_WINDOW_HOURS || "24", 10, ); @Post("update-org") public async cronjobUpdateOrg(@Request() request: RequestWithUser) { // Idempotency check - prevent concurrent runs if (this.isRunning) { console.log("cronjobUpdateOrg: Job already running, skipping this execution"); return new HttpSuccess({ message: "Job already running", skipped: true, }); } this.isRunning = true; const startTime = Date.now(); try { const windowStart = new Date(Date.now() - this.UPDATE_WINDOW_HOURS * 60 * 60 * 1000); console.log("cronjobUpdateOrg: Starting job", { windowHours: this.UPDATE_WINDOW_HOURS, windowStart: windowStart.toISOString(), batchSize: this.BATCH_SIZE, }); // Query with optimized select - only fetch required fields const [posMasters, posMasterEmployee] = await Promise.all([ this.posMasterRepo.find({ where: { lastUpdatedAt: MoreThanOrEqual(windowStart), orgRevision: { orgRevisionIsCurrent: true, }, }, relations: [ "orgRevision", "orgRoot", "orgChild1", "orgChild2", "orgChild3", "orgChild4", "current_holder", ], select: { id: true, current_holderId: true, lastUpdatedAt: true, orgRevision: { id: true }, orgRoot: { ancestorDNA: true }, orgChild1: { ancestorDNA: true }, orgChild2: { ancestorDNA: true }, orgChild3: { ancestorDNA: true }, orgChild4: { ancestorDNA: true }, current_holder: { id: true }, }, }), this.employeePosMasterRepo.find({ where: { lastUpdatedAt: MoreThanOrEqual(windowStart), orgRevision: { orgRevisionIsCurrent: true, }, }, relations: [ "orgRevision", "orgRoot", "orgChild1", "orgChild2", "orgChild3", "orgChild4", "current_holder", ], select: { id: true, current_holderId: true, lastUpdatedAt: true, orgRevision: { id: true }, orgRoot: { ancestorDNA: true }, orgChild1: { ancestorDNA: true }, orgChild2: { ancestorDNA: true }, orgChild3: { ancestorDNA: true }, orgChild4: { ancestorDNA: true }, current_holder: { id: true }, }, }), ]); console.log("cronjobUpdateOrg: Database query completed", { posMastersCount: posMasters.length, employeePosCount: posMasterEmployee.length, totalRecords: posMasters.length + posMasterEmployee.length, }); // Build payloads with proper profile type tracking const payloads = this.buildPayloads(posMasters, posMasterEmployee); if (payloads.length === 0) { console.log("cronjobUpdateOrg: No records to process"); return new HttpSuccess({ message: "No records to process", processed: 0, }); } // Update profile's org structure in leave service by calling API console.log("cronjobUpdateOrg: Calling leave service API", { payloadCount: payloads.length, }); await axios.put(`${process.env.API_URL}/leave-beginning/schedule/update-dna`, payloads, { headers: { "Content-Type": "application/json", api_key: process.env.API_KEY, }, timeout: 30000, // 30 second timeout }); console.log("cronjobUpdateOrg: Leave service API call successful"); // Group profile IDs by type for proper syncing const profileIdsByType = this.groupProfileIdsByType(payloads); // Sync to Keycloak with batching const keycloakSyncController = new KeycloakSyncController(); const syncResults = { total: 0, success: 0, failed: 0, byType: {} as Record, }; // Process each profile type separately for (const [profileType, profileIds] of Object.entries(profileIdsByType)) { console.log(`cronjobUpdateOrg: Syncing ${profileType} profiles`, { count: profileIds.length, }); const batches = this.chunkArray(profileIds, this.BATCH_SIZE); const typeResult = { total: profileIds.length, success: 0, failed: 0 }; for (let i = 0; i < batches.length; i++) { const batch = batches[i]; console.log( `cronjobUpdateOrg: Processing batch ${i + 1}/${batches.length} for ${profileType}`, { batchSize: batch.length, batchRange: `${i * this.BATCH_SIZE + 1}-${Math.min( (i + 1) * this.BATCH_SIZE, profileIds.length, )}`, }, ); try { const batchResult: any = await keycloakSyncController.syncByProfileIds({ profileIds: batch, profileType: profileType as "PROFILE" | "PROFILE_EMPLOYEE", }); // Extract result data if available const resultData = (batchResult as any)?.data || batchResult; typeResult.success += resultData.success || 0; typeResult.failed += resultData.failed || 0; console.log(`cronjobUpdateOrg: Batch ${i + 1}/${batches.length} completed`, { success: resultData.success || 0, failed: resultData.failed || 0, }); } catch (error: any) { console.error(`cronjobUpdateOrg: Batch ${i + 1}/${batches.length} failed`, { error: error.message, batchSize: batch.length, }); // Count all profiles in failed batch as failed typeResult.failed += batch.length; } } syncResults.byType[profileType] = typeResult; syncResults.total += typeResult.total; syncResults.success += typeResult.success; syncResults.failed += typeResult.failed; } const duration = Date.now() - startTime; console.log("cronjobUpdateOrg: Job completed", { duration: `${duration}ms`, processed: payloads.length, syncResults, }); return new HttpSuccess({ message: "Update org completed", processed: payloads.length, syncResults, duration: `${duration}ms`, }); } catch (error: any) { const duration = Date.now() - startTime; console.error("cronjobUpdateOrg: Job failed", { duration: `${duration}ms`, error: error.message, stack: error.stack, }); throw new HttpError(HttpStatus.INTERNAL_SERVER_ERROR, "Internal server error"); } finally { this.isRunning = false; } } /** * Build payloads from PosMaster and EmployeePosMaster records * Includes proper profile type tracking for accurate Keycloak sync */ private buildPayloads( posMasters: PosMaster[], posMasterEmployee: EmployeePosMaster[], ): OrgUpdatePayload[] { const payloads: OrgUpdatePayload[] = []; // Process PosMaster records (PROFILE type) for (const posMaster of posMasters) { if (posMaster.current_holder && posMaster.current_holderId) { payloads.push({ profileId: posMaster.current_holderId, rootDnaId: posMaster.orgRoot?.ancestorDNA || null, child1DnaId: posMaster.orgChild1?.ancestorDNA || null, child2DnaId: posMaster.orgChild2?.ancestorDNA || null, child3DnaId: posMaster.orgChild3?.ancestorDNA || null, child4DnaId: posMaster.orgChild4?.ancestorDNA || null, profileType: "PROFILE", }); } } // Process EmployeePosMaster records (PROFILE_EMPLOYEE type) for (const employeePos of posMasterEmployee) { if (employeePos.current_holder && employeePos.current_holderId) { payloads.push({ profileId: employeePos.current_holderId, rootDnaId: employeePos.orgRoot?.ancestorDNA || null, child1DnaId: employeePos.orgChild1?.ancestorDNA || null, child2DnaId: employeePos.orgChild2?.ancestorDNA || null, child3DnaId: employeePos.orgChild3?.ancestorDNA || null, child4DnaId: employeePos.orgChild4?.ancestorDNA || null, profileType: "PROFILE_EMPLOYEE", }); } } return payloads; } /** * Group profile IDs by their type for separate Keycloak sync calls */ private groupProfileIdsByType(payloads: OrgUpdatePayload[]): Record { const grouped: Record = { PROFILE: [], PROFILE_EMPLOYEE: [], }; for (const payload of payloads) { grouped[payload.profileType].push(payload.profileId); } // Remove empty groups and deduplicate IDs within each group const result: Record = {}; for (const [type, ids] of Object.entries(grouped)) { if (ids.length > 0) { // Deduplicate while preserving order result[type] = Array.from(new Set(ids)); } } return result; } /** * Split array into chunks of specified size */ private chunkArray(array: T[], chunkSize: number): T[][] { const chunks: T[][] = []; for (let i = 0; i < array.length; i += chunkSize) { chunks.push(array.slice(i, i + chunkSize)); } return chunks; } }