diff --git a/src/controllers/KeycloakSyncController.ts b/src/controllers/KeycloakSyncController.ts index f24eee35..995fa3c0 100644 --- a/src/controllers/KeycloakSyncController.ts +++ b/src/controllers/KeycloakSyncController.ts @@ -236,10 +236,31 @@ export class KeycloakSyncController extends Controller { * * @description Syncs profileId and orgRootDnaId to Keycloak for all users * that have a keycloak ID. Uses parallel processing for better performance. + * + * Features: + * - Resume from checkpoint after failures (use resume=true) + * - Automatic retry with exponential backoff + * - Rate limiting to avoid overwhelming Keycloak + * - Progress tracking and persistence + * + * @param resume - Resume from last checkpoint (default: false) + * @param maxRetries - Maximum retry attempts for failed operations (default: 3) + * @param rateLimit - Requests per second rate limit (default: 10) + * @param clearProgress - Clear existing progress and start fresh (default: false) */ @Post("sync-all") - async syncAll() { - const result = await this.keycloakAttributeService.batchSyncUsers(); + async syncAll( + @Query() resume: boolean = false, + @Query() maxRetries: number = 3, + @Query() rateLimit: number = 10, + @Query() clearProgress: boolean = false, + ) { + const result = await this.keycloakAttributeService.batchSyncUsers({ + resume, + maxRetries, + rateLimit, + clearProgress, + }); return new HttpSuccess({ message: "Batch sync เสร็จสิ้น", @@ -247,6 +268,7 @@ export class KeycloakSyncController extends Controller { success: result.success, failed: result.failed, details: result.details, + resumed: result.resumed, }); } diff --git a/src/keycloak/index.ts b/src/keycloak/index.ts index 90ab019c..b661450c 100644 --- a/src/keycloak/index.ts +++ b/src/keycloak/index.ts @@ -1,5 +1,121 @@ import { DecodedJwt, createDecoder } from "fast-jwt"; +/** + * RateLimiter + * Limits the rate of API calls to avoid overwhelming the server + */ +export class RateLimiter { + private requestsPerSecond: number; + private requestTimes: number[] = []; + + constructor(requestsPerSecond: number = 10) { + this.requestsPerSecond = requestsPerSecond; + } + + /** + * Throttle requests to stay within rate limit + * Waits if rate limit would be exceeded + */ + async throttle(): Promise { + const now = Date.now(); + // Remove timestamps older than 1 second + this.requestTimes = this.requestTimes.filter((t) => now - t < 1000); + + if (this.requestTimes.length >= this.requestsPerSecond) { + const oldestRequest = this.requestTimes[0]; + const waitTime = 1000 - (now - oldestRequest); + if (waitTime > 0) { + await new Promise((resolve) => setTimeout(resolve, waitTime)); + } + } + + this.requestTimes.push(Date.now()); + } + + /** + * Reset the rate limiter (e.g., after a long pause) + */ + reset(): void { + this.requestTimes = []; + } +} + +/** + * Check if an error is a network error (retryable) + * @param error - Error to check + * @returns true if error is network-related and retryable + */ +function isNetworkError(error: any): boolean { + if (!error) return false; + + // Check for fetch network errors + if (error.name === "TypeError" && error.message.includes("fetch")) { + return true; + } + + // Check for ECONNREFUSED, ETIMEDOUT, etc. + if (error.code && ["ECONNREFUSED", "ETIMEDOUT", "ECONNRESET", "ENOTFOUND"].includes(error.code)) { + return true; + } + + return false; +} + +/** + * Check if an HTTP status code is retryable + * @param status - HTTP status code + * @returns true if status code indicates a temporary error + */ +function isRetryableStatus(status: number): boolean { + // Retry on 5xx errors (server errors) and 429 (rate limit) + return status >= 500 || status === 429; +} + +/** + * Retry wrapper with exponential backoff + * Retries failed operations with increasing delay between attempts + * + * @param fn - Function to execute + * @param maxRetries - Maximum number of retry attempts + * @param baseDelay - Base delay in milliseconds (doubles each retry) + * @returns Promise with result of fn + */ +export async function withRetry( + fn: () => Promise, + maxRetries: number = 3, + baseDelay: number = 1000, +): Promise { + let lastError: any; + + for (let attempt = 0; attempt <= maxRetries; attempt++) { + try { + return await fn(); + } catch (error: any) { + lastError = error; + + // Check if error is retryable + const isRetryable = isNetworkError(error) || isRetryableStatus(error?.status); + + if (!isRetryable) { + // Don't retry on permanent errors (4xx except 429) + throw error; + } + + if (attempt < maxRetries) { + // Calculate delay with exponential backoff + const delay = baseDelay * Math.pow(2, attempt); + console.log( + `[withRetry] Attempt ${attempt + 1}/${maxRetries + 1} failed, retrying in ${delay}ms...`, + error.message || error, + ); + await new Promise((resolve) => setTimeout(resolve, delay)); + } + } + } + + throw lastError; +} + const KC_URL = process.env.KC_URL; const KC_REALMS = process.env.KC_REALMS; const KC_CLIENT_ID = process.env.KC_SERVICE_ACCOUNT_CLIENT_ID; diff --git a/src/services/KeycloakAttributeService.ts b/src/services/KeycloakAttributeService.ts index 53cce4b4..2b3af9ab 100644 --- a/src/services/KeycloakAttributeService.ts +++ b/src/services/KeycloakAttributeService.ts @@ -13,8 +13,11 @@ import { getRoles, addUserRoles, getAllUsersPaginated, + withRetry, + RateLimiter, } from "../keycloak"; import { OrgRevision } from "../entities/OrgRevision"; +import { SyncProgressManager, SyncProgressState } from "../utils/sync-progress"; export interface UserProfileAttributes { profileId: string | null; @@ -532,24 +535,62 @@ export class KeycloakAttributeService { * Batch sync multiple users with unlimited count and parallel processing * Useful for initial sync or periodic updates * - * @param options - Optional configuration (limit for testing, concurrency for parallel processing) + * Features: + * - Resume from checkpoint after failures + * - Automatic retry with exponential backoff + * - Rate limiting to avoid overwhelming Keycloak + * - Progress tracking and persistence + * + * @param options - Optional configuration * @returns Object with success count and details */ async batchSyncUsers(options?: { limit?: number; concurrency?: number; - }): Promise<{ total: number; success: number; failed: number; details: any[] }> { + resume?: boolean; // Resume from last checkpoint + maxRetries?: number; // Retry attempts for failed operations + rateLimit?: number; // Requests per second + clearProgress?: boolean; // Start fresh, ignore existing progress + }): Promise<{ total: number; success: number; failed: number; details: any[]; resumed?: boolean }> { const limit = options?.limit; const concurrency = options?.concurrency ?? 5; + const resume = options?.resume ?? false; + const maxRetries = options?.maxRetries ?? 3; + const rateLimit = options?.rateLimit ?? 10; + const clearProgress = options?.clearProgress ?? false; const result = { total: 0, success: 0, failed: 0, details: [] as any[], + resumed: false, }; + let progressState: SyncProgressState | null = null; + let rateLimiter: RateLimiter | null = null; + try { + // Handle progress file based on options + if (clearProgress) { + SyncProgressManager.clear(); + console.log("[batchSyncUsers] Cleared existing progress, starting fresh"); + } + + // Load existing progress if resume is requested + if (resume && !clearProgress) { + progressState = SyncProgressManager.load(); + if (progressState) { + result.resumed = true; + console.log( + `[batchSyncUsers] Resuming from checkpoint: ${progressState.lastSyncedIndex}/${progressState.totalProfiles}`, + ); + SyncProgressManager.logProgress(progressState); + } else { + console.log("[batchSyncUsers] No existing progress found, starting fresh"); + } + } + // Build query for profiles with keycloak IDs (ข้าราชการ) const profileQuery = this.profileRepo .createQueryBuilder("p") @@ -581,15 +622,44 @@ export class KeycloakAttributeService { result.total = allProfiles.length; + // Initialize or resume progress state + if (!progressState) { + const profileIds = allProfiles.map((p) => p.profile.id); + progressState = SyncProgressManager.initialize(profileIds); + SyncProgressManager.save(progressState); + console.log(`[batchSyncUsers] Starting sync of ${profileIds.length} profiles`); + } + + // Initialize rate limiter if rate limiting is enabled + if (rateLimit && rateLimit > 0) { + rateLimiter = new RateLimiter(rateLimit); + console.log(`[batchSyncUsers] Rate limiting enabled: ${rateLimit} requests/second`); + } + + // Determine starting index based on progress + let startIndex = progressState.lastSyncedIndex; + // Process in parallel with concurrency limit - const processedResults = await this.processInParallel( + const processedResults = await this.processInParallelWithProgress( allProfiles, concurrency, - async ({ profile, type }, _index) => { + startIndex, + async ({ profile, type }, index) => { + // Apply rate limiting if enabled + if (rateLimiter) { + await rateLimiter.throttle(); + } + const keycloakUserId = profile.keycloak; try { - const success = await this.syncOnOrganizationChange(profile.id, type); + // Wrap sync operation with retry logic + const success = await withRetry( + async () => this.syncOnOrganizationChange(profile.id, type), + maxRetries, + 1000, // Base delay: 1 second + ); + if (success) { result.success++; return { @@ -599,6 +669,13 @@ export class KeycloakAttributeService { }; } else { result.failed++; + // Add to failed profiles in progress state + SyncProgressManager.addFailedProfile( + progressState!, + index, + profile.id, + "Sync returned false", + ); return { profileId: profile.id, keycloakUserId, @@ -608,14 +685,30 @@ export class KeycloakAttributeService { } } catch (error: any) { result.failed++; + // Add to failed profiles in progress state + SyncProgressManager.addFailedProfile( + progressState!, + index, + profile.id, + error.message || String(error), + ); return { profileId: profile.id, keycloakUserId, status: "error", - error: error.message, + error: error.message || String(error), }; } }, + progressState, + (updatedState) => { + // Save progress after each batch + SyncProgressManager.save(updatedState); + // Log progress every 50 items + if (updatedState.lastSyncedIndex % 50 === 0 || updatedState.lastSyncedIndex === updatedState.totalProfiles) { + SyncProgressManager.logProgress(updatedState); + } + }, ); // Separate results from errors @@ -633,16 +726,78 @@ export class KeycloakAttributeService { } } + // Clear progress on successful completion + SyncProgressManager.clear(); + + const elapsed = SyncProgressManager.formatElapsedTime(progressState.startTime); console.log( - `Batch sync completed: total=${result.total}, success=${result.success}, failed=${result.failed}`, + `[batchSyncUsers] Completed: total=${result.total}, success=${result.success}, failed=${result.failed}, elapsed=${elapsed}`, ); + + // Log failed profiles summary + if (progressState.failedProfiles.length > 0) { + console.log( + `[batchSyncUsers] Failed profiles (${progressState.failedProfiles.length}):`, + progressState.failedProfiles.map((f) => `${f.profileId}(${f.error})`).join(", "), + ); + } } catch (error) { - console.error("Error in batch sync:", error); + console.error("[batchSyncUsers] Error in batch sync:", error); + // Save progress before throwing + if (progressState) { + SyncProgressManager.save(progressState); + console.log("[batchSyncUsers] Progress saved. Use resume=true to continue."); + } + throw error; } return result; } + /** + * Process items in parallel with concurrency limit and progress tracking + * Extends processInParallel with progress state management + */ + private async processInParallelWithProgress( + items: T[], + concurrencyLimit: number, + startIndex: number, + processor: (item: T, index: number) => Promise, + progressState: SyncProgressState, + onProgress?: (state: SyncProgressState) => void, + ): Promise> { + const results: Array = []; + + // Start from the saved checkpoint index + for (let i = startIndex; i < items.length; i += concurrencyLimit) { + const batch = items.slice(i, i + concurrencyLimit); + + // Process batch in parallel with error handling + const batchResults = await Promise.all( + batch.map(async (item, batchIndex) => { + const actualIndex = i + batchIndex; + try { + return await processor(item, actualIndex); + } catch (error) { + return { error }; + } + }), + ); + + results.push(...batchResults); + + // Update progress state + progressState.lastSyncedIndex = Math.min(i + concurrencyLimit, items.length); + + // Call progress callback + if (onProgress) { + onProgress(progressState); + } + } + + return results; + } + /** * Get current Keycloak attributes for a user *