fix sync to keycloak
This commit is contained in:
parent
81288f8db3
commit
76fc488d25
3 changed files with 303 additions and 10 deletions
|
|
@ -236,10 +236,31 @@ export class KeycloakSyncController extends Controller {
|
|||
*
|
||||
* @description Syncs profileId and orgRootDnaId to Keycloak for all users
|
||||
* that have a keycloak ID. Uses parallel processing for better performance.
|
||||
*
|
||||
* Features:
|
||||
* - Resume from checkpoint after failures (use resume=true)
|
||||
* - Automatic retry with exponential backoff
|
||||
* - Rate limiting to avoid overwhelming Keycloak
|
||||
* - Progress tracking and persistence
|
||||
*
|
||||
* @param resume - Resume from last checkpoint (default: false)
|
||||
* @param maxRetries - Maximum retry attempts for failed operations (default: 3)
|
||||
* @param rateLimit - Requests per second rate limit (default: 10)
|
||||
* @param clearProgress - Clear existing progress and start fresh (default: false)
|
||||
*/
|
||||
@Post("sync-all")
|
||||
async syncAll() {
|
||||
const result = await this.keycloakAttributeService.batchSyncUsers();
|
||||
async syncAll(
|
||||
@Query() resume: boolean = false,
|
||||
@Query() maxRetries: number = 3,
|
||||
@Query() rateLimit: number = 10,
|
||||
@Query() clearProgress: boolean = false,
|
||||
) {
|
||||
const result = await this.keycloakAttributeService.batchSyncUsers({
|
||||
resume,
|
||||
maxRetries,
|
||||
rateLimit,
|
||||
clearProgress,
|
||||
});
|
||||
|
||||
return new HttpSuccess({
|
||||
message: "Batch sync เสร็จสิ้น",
|
||||
|
|
@ -247,6 +268,7 @@ export class KeycloakSyncController extends Controller {
|
|||
success: result.success,
|
||||
failed: result.failed,
|
||||
details: result.details,
|
||||
resumed: result.resumed,
|
||||
});
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -1,5 +1,121 @@
|
|||
import { DecodedJwt, createDecoder } from "fast-jwt";
|
||||
|
||||
/**
|
||||
* RateLimiter
|
||||
* Limits the rate of API calls to avoid overwhelming the server
|
||||
*/
|
||||
export class RateLimiter {
|
||||
private requestsPerSecond: number;
|
||||
private requestTimes: number[] = [];
|
||||
|
||||
constructor(requestsPerSecond: number = 10) {
|
||||
this.requestsPerSecond = requestsPerSecond;
|
||||
}
|
||||
|
||||
/**
|
||||
* Throttle requests to stay within rate limit
|
||||
* Waits if rate limit would be exceeded
|
||||
*/
|
||||
async throttle(): Promise<void> {
|
||||
const now = Date.now();
|
||||
// Remove timestamps older than 1 second
|
||||
this.requestTimes = this.requestTimes.filter((t) => now - t < 1000);
|
||||
|
||||
if (this.requestTimes.length >= this.requestsPerSecond) {
|
||||
const oldestRequest = this.requestTimes[0];
|
||||
const waitTime = 1000 - (now - oldestRequest);
|
||||
if (waitTime > 0) {
|
||||
await new Promise((resolve) => setTimeout(resolve, waitTime));
|
||||
}
|
||||
}
|
||||
|
||||
this.requestTimes.push(Date.now());
|
||||
}
|
||||
|
||||
/**
|
||||
* Reset the rate limiter (e.g., after a long pause)
|
||||
*/
|
||||
reset(): void {
|
||||
this.requestTimes = [];
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if an error is a network error (retryable)
|
||||
* @param error - Error to check
|
||||
* @returns true if error is network-related and retryable
|
||||
*/
|
||||
function isNetworkError(error: any): boolean {
|
||||
if (!error) return false;
|
||||
|
||||
// Check for fetch network errors
|
||||
if (error.name === "TypeError" && error.message.includes("fetch")) {
|
||||
return true;
|
||||
}
|
||||
|
||||
// Check for ECONNREFUSED, ETIMEDOUT, etc.
|
||||
if (error.code && ["ECONNREFUSED", "ETIMEDOUT", "ECONNRESET", "ENOTFOUND"].includes(error.code)) {
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if an HTTP status code is retryable
|
||||
* @param status - HTTP status code
|
||||
* @returns true if status code indicates a temporary error
|
||||
*/
|
||||
function isRetryableStatus(status: number): boolean {
|
||||
// Retry on 5xx errors (server errors) and 429 (rate limit)
|
||||
return status >= 500 || status === 429;
|
||||
}
|
||||
|
||||
/**
|
||||
* Retry wrapper with exponential backoff
|
||||
* Retries failed operations with increasing delay between attempts
|
||||
*
|
||||
* @param fn - Function to execute
|
||||
* @param maxRetries - Maximum number of retry attempts
|
||||
* @param baseDelay - Base delay in milliseconds (doubles each retry)
|
||||
* @returns Promise with result of fn
|
||||
*/
|
||||
export async function withRetry<T>(
|
||||
fn: () => Promise<T>,
|
||||
maxRetries: number = 3,
|
||||
baseDelay: number = 1000,
|
||||
): Promise<T> {
|
||||
let lastError: any;
|
||||
|
||||
for (let attempt = 0; attempt <= maxRetries; attempt++) {
|
||||
try {
|
||||
return await fn();
|
||||
} catch (error: any) {
|
||||
lastError = error;
|
||||
|
||||
// Check if error is retryable
|
||||
const isRetryable = isNetworkError(error) || isRetryableStatus(error?.status);
|
||||
|
||||
if (!isRetryable) {
|
||||
// Don't retry on permanent errors (4xx except 429)
|
||||
throw error;
|
||||
}
|
||||
|
||||
if (attempt < maxRetries) {
|
||||
// Calculate delay with exponential backoff
|
||||
const delay = baseDelay * Math.pow(2, attempt);
|
||||
console.log(
|
||||
`[withRetry] Attempt ${attempt + 1}/${maxRetries + 1} failed, retrying in ${delay}ms...`,
|
||||
error.message || error,
|
||||
);
|
||||
await new Promise((resolve) => setTimeout(resolve, delay));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
throw lastError;
|
||||
}
|
||||
|
||||
const KC_URL = process.env.KC_URL;
|
||||
const KC_REALMS = process.env.KC_REALMS;
|
||||
const KC_CLIENT_ID = process.env.KC_SERVICE_ACCOUNT_CLIENT_ID;
|
||||
|
|
|
|||
|
|
@ -13,8 +13,11 @@ import {
|
|||
getRoles,
|
||||
addUserRoles,
|
||||
getAllUsersPaginated,
|
||||
withRetry,
|
||||
RateLimiter,
|
||||
} from "../keycloak";
|
||||
import { OrgRevision } from "../entities/OrgRevision";
|
||||
import { SyncProgressManager, SyncProgressState } from "../utils/sync-progress";
|
||||
|
||||
export interface UserProfileAttributes {
|
||||
profileId: string | null;
|
||||
|
|
@ -532,24 +535,62 @@ export class KeycloakAttributeService {
|
|||
* Batch sync multiple users with unlimited count and parallel processing
|
||||
* Useful for initial sync or periodic updates
|
||||
*
|
||||
* @param options - Optional configuration (limit for testing, concurrency for parallel processing)
|
||||
* Features:
|
||||
* - Resume from checkpoint after failures
|
||||
* - Automatic retry with exponential backoff
|
||||
* - Rate limiting to avoid overwhelming Keycloak
|
||||
* - Progress tracking and persistence
|
||||
*
|
||||
* @param options - Optional configuration
|
||||
* @returns Object with success count and details
|
||||
*/
|
||||
async batchSyncUsers(options?: {
|
||||
limit?: number;
|
||||
concurrency?: number;
|
||||
}): Promise<{ total: number; success: number; failed: number; details: any[] }> {
|
||||
resume?: boolean; // Resume from last checkpoint
|
||||
maxRetries?: number; // Retry attempts for failed operations
|
||||
rateLimit?: number; // Requests per second
|
||||
clearProgress?: boolean; // Start fresh, ignore existing progress
|
||||
}): Promise<{ total: number; success: number; failed: number; details: any[]; resumed?: boolean }> {
|
||||
const limit = options?.limit;
|
||||
const concurrency = options?.concurrency ?? 5;
|
||||
const resume = options?.resume ?? false;
|
||||
const maxRetries = options?.maxRetries ?? 3;
|
||||
const rateLimit = options?.rateLimit ?? 10;
|
||||
const clearProgress = options?.clearProgress ?? false;
|
||||
|
||||
const result = {
|
||||
total: 0,
|
||||
success: 0,
|
||||
failed: 0,
|
||||
details: [] as any[],
|
||||
resumed: false,
|
||||
};
|
||||
|
||||
let progressState: SyncProgressState | null = null;
|
||||
let rateLimiter: RateLimiter | null = null;
|
||||
|
||||
try {
|
||||
// Handle progress file based on options
|
||||
if (clearProgress) {
|
||||
SyncProgressManager.clear();
|
||||
console.log("[batchSyncUsers] Cleared existing progress, starting fresh");
|
||||
}
|
||||
|
||||
// Load existing progress if resume is requested
|
||||
if (resume && !clearProgress) {
|
||||
progressState = SyncProgressManager.load();
|
||||
if (progressState) {
|
||||
result.resumed = true;
|
||||
console.log(
|
||||
`[batchSyncUsers] Resuming from checkpoint: ${progressState.lastSyncedIndex}/${progressState.totalProfiles}`,
|
||||
);
|
||||
SyncProgressManager.logProgress(progressState);
|
||||
} else {
|
||||
console.log("[batchSyncUsers] No existing progress found, starting fresh");
|
||||
}
|
||||
}
|
||||
|
||||
// Build query for profiles with keycloak IDs (ข้าราชการ)
|
||||
const profileQuery = this.profileRepo
|
||||
.createQueryBuilder("p")
|
||||
|
|
@ -581,15 +622,44 @@ export class KeycloakAttributeService {
|
|||
|
||||
result.total = allProfiles.length;
|
||||
|
||||
// Initialize or resume progress state
|
||||
if (!progressState) {
|
||||
const profileIds = allProfiles.map((p) => p.profile.id);
|
||||
progressState = SyncProgressManager.initialize(profileIds);
|
||||
SyncProgressManager.save(progressState);
|
||||
console.log(`[batchSyncUsers] Starting sync of ${profileIds.length} profiles`);
|
||||
}
|
||||
|
||||
// Initialize rate limiter if rate limiting is enabled
|
||||
if (rateLimit && rateLimit > 0) {
|
||||
rateLimiter = new RateLimiter(rateLimit);
|
||||
console.log(`[batchSyncUsers] Rate limiting enabled: ${rateLimit} requests/second`);
|
||||
}
|
||||
|
||||
// Determine starting index based on progress
|
||||
let startIndex = progressState.lastSyncedIndex;
|
||||
|
||||
// Process in parallel with concurrency limit
|
||||
const processedResults = await this.processInParallel(
|
||||
const processedResults = await this.processInParallelWithProgress(
|
||||
allProfiles,
|
||||
concurrency,
|
||||
async ({ profile, type }, _index) => {
|
||||
startIndex,
|
||||
async ({ profile, type }, index) => {
|
||||
// Apply rate limiting if enabled
|
||||
if (rateLimiter) {
|
||||
await rateLimiter.throttle();
|
||||
}
|
||||
|
||||
const keycloakUserId = profile.keycloak;
|
||||
|
||||
try {
|
||||
const success = await this.syncOnOrganizationChange(profile.id, type);
|
||||
// Wrap sync operation with retry logic
|
||||
const success = await withRetry(
|
||||
async () => this.syncOnOrganizationChange(profile.id, type),
|
||||
maxRetries,
|
||||
1000, // Base delay: 1 second
|
||||
);
|
||||
|
||||
if (success) {
|
||||
result.success++;
|
||||
return {
|
||||
|
|
@ -599,6 +669,13 @@ export class KeycloakAttributeService {
|
|||
};
|
||||
} else {
|
||||
result.failed++;
|
||||
// Add to failed profiles in progress state
|
||||
SyncProgressManager.addFailedProfile(
|
||||
progressState!,
|
||||
index,
|
||||
profile.id,
|
||||
"Sync returned false",
|
||||
);
|
||||
return {
|
||||
profileId: profile.id,
|
||||
keycloakUserId,
|
||||
|
|
@ -608,14 +685,30 @@ export class KeycloakAttributeService {
|
|||
}
|
||||
} catch (error: any) {
|
||||
result.failed++;
|
||||
// Add to failed profiles in progress state
|
||||
SyncProgressManager.addFailedProfile(
|
||||
progressState!,
|
||||
index,
|
||||
profile.id,
|
||||
error.message || String(error),
|
||||
);
|
||||
return {
|
||||
profileId: profile.id,
|
||||
keycloakUserId,
|
||||
status: "error",
|
||||
error: error.message,
|
||||
error: error.message || String(error),
|
||||
};
|
||||
}
|
||||
},
|
||||
progressState,
|
||||
(updatedState) => {
|
||||
// Save progress after each batch
|
||||
SyncProgressManager.save(updatedState);
|
||||
// Log progress every 50 items
|
||||
if (updatedState.lastSyncedIndex % 50 === 0 || updatedState.lastSyncedIndex === updatedState.totalProfiles) {
|
||||
SyncProgressManager.logProgress(updatedState);
|
||||
}
|
||||
},
|
||||
);
|
||||
|
||||
// Separate results from errors
|
||||
|
|
@ -633,16 +726,78 @@ export class KeycloakAttributeService {
|
|||
}
|
||||
}
|
||||
|
||||
// Clear progress on successful completion
|
||||
SyncProgressManager.clear();
|
||||
|
||||
const elapsed = SyncProgressManager.formatElapsedTime(progressState.startTime);
|
||||
console.log(
|
||||
`Batch sync completed: total=${result.total}, success=${result.success}, failed=${result.failed}`,
|
||||
`[batchSyncUsers] Completed: total=${result.total}, success=${result.success}, failed=${result.failed}, elapsed=${elapsed}`,
|
||||
);
|
||||
|
||||
// Log failed profiles summary
|
||||
if (progressState.failedProfiles.length > 0) {
|
||||
console.log(
|
||||
`[batchSyncUsers] Failed profiles (${progressState.failedProfiles.length}):`,
|
||||
progressState.failedProfiles.map((f) => `${f.profileId}(${f.error})`).join(", "),
|
||||
);
|
||||
}
|
||||
} catch (error) {
|
||||
console.error("Error in batch sync:", error);
|
||||
console.error("[batchSyncUsers] Error in batch sync:", error);
|
||||
// Save progress before throwing
|
||||
if (progressState) {
|
||||
SyncProgressManager.save(progressState);
|
||||
console.log("[batchSyncUsers] Progress saved. Use resume=true to continue.");
|
||||
}
|
||||
throw error;
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Process items in parallel with concurrency limit and progress tracking
|
||||
* Extends processInParallel with progress state management
|
||||
*/
|
||||
private async processInParallelWithProgress<T, R>(
|
||||
items: T[],
|
||||
concurrencyLimit: number,
|
||||
startIndex: number,
|
||||
processor: (item: T, index: number) => Promise<R>,
|
||||
progressState: SyncProgressState,
|
||||
onProgress?: (state: SyncProgressState) => void,
|
||||
): Promise<Array<R | { error: any }>> {
|
||||
const results: Array<R | { error: any }> = [];
|
||||
|
||||
// Start from the saved checkpoint index
|
||||
for (let i = startIndex; i < items.length; i += concurrencyLimit) {
|
||||
const batch = items.slice(i, i + concurrencyLimit);
|
||||
|
||||
// Process batch in parallel with error handling
|
||||
const batchResults = await Promise.all(
|
||||
batch.map(async (item, batchIndex) => {
|
||||
const actualIndex = i + batchIndex;
|
||||
try {
|
||||
return await processor(item, actualIndex);
|
||||
} catch (error) {
|
||||
return { error };
|
||||
}
|
||||
}),
|
||||
);
|
||||
|
||||
results.push(...batchResults);
|
||||
|
||||
// Update progress state
|
||||
progressState.lastSyncedIndex = Math.min(i + concurrencyLimit, items.length);
|
||||
|
||||
// Call progress callback
|
||||
if (onProgress) {
|
||||
onProgress(progressState);
|
||||
}
|
||||
}
|
||||
|
||||
return results;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get current Keycloak attributes for a user
|
||||
*
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue