using Microsoft.Extensions.Configuration; using RabbitMQ.Client; using RabbitMQ.Client.Events; using System.Text; using System.Text.Json; using System.Text.Json.Serialization; var basePath = Directory.GetCurrentDirectory(); var configuration = new ConfigurationBuilder() .SetBasePath(basePath) .AddJsonFile("appsettings.json", optional: false, reloadOnChange: true) .Build(); WriteToConsole("Consumer Start!"); var host = configuration["Rabbit:Host"] ?? ""; var user = configuration["Rabbit:User"] ?? ""; var pass = configuration["Rabbit:Password"] ?? ""; var queue = configuration["Rabbit:Queue"] ?? "basic-queue"; // Concurrency & prefetch (configurable via appsettings.json) var maxConcurrency = int.TryParse(configuration["MaxConcurrency"], out var c) && c > 0 ? c : 5; var prefetchCount = ushort.TryParse(configuration["PrefetchCount"], out var p) && p > 0 ? p : (ushort)20; var httpTimeoutSec = int.TryParse(configuration["HttpTimeoutSeconds"], out var t) && t > 0 ? t : 60; WriteToConsole($"Config -> MaxConcurrency: {maxConcurrency}, PrefetchCount: {prefetchCount}, HttpTimeout: {httpTimeoutSec}s"); // create connection var factory = new ConnectionFactory() { HostName = host, UserName = user, Password = pass, DispatchConsumersAsync = true }; using var connection = factory.CreateConnection(); using var channel = connection.CreateModel(); channel.QueueDeclare(queue: queue, durable: true, exclusive: false, autoDelete: false, arguments: null); // Prefetch: RabbitMQ จะส่ง message หลายตัวมาที่ consumer พร้อมกัน (ลด network round-trip) channel.BasicQos(prefetchSize: 0, prefetchCount: prefetchCount, global: false); // HttpClient แบบ SocketsHttpHandler พร้อม connection pooling รองรับ concurrent requests var socketsHandler = new SocketsHttpHandler { MaxConnectionsPerServer = maxConcurrency * 2, PooledConnectionLifetime = TimeSpan.FromMinutes(2), PooledConnectionIdleTimeout = TimeSpan.FromSeconds(30) }; using var httpClient = new HttpClient(socketsHandler); httpClient.Timeout = TimeSpan.FromSeconds(httpTimeoutSec); // SemaphoreSlim คุมจำนวน message ที่ประมวลผลพร้อมกัน (เนื่องจาก API มีข้อจำกัดเรื่อง concurrency) using var semaphore = new SemaphoreSlim(maxConcurrency, maxConcurrency); var consumer = new AsyncEventingBasicConsumer(channel); consumer.Received += (model, ea) => { // รอ semaphore ก่อนเริ่มประมวลผล semaphore.WaitAsync().ContinueWith(async _ => { try { var body = ea.Body.ToArray(); var message = Encoding.UTF8.GetString(body); WriteToConsole($"Received message: {message}"); var success = await CallRestApi(message, httpClient, configuration); if (success) { channel.BasicAck(ea.DeliveryTag, multiple: false); WriteToConsole("Message processed successfully"); } else { channel.BasicNack(ea.DeliveryTag, multiple: false, requeue: false); WriteToConsole("Message processing failed - message rejected"); } } catch (Exception ex) { WriteToConsole($"Error processing message: {ex.Message}"); channel.BasicNack(ea.DeliveryTag, multiple: false, requeue: false); } finally { semaphore.Release(); } }, TaskScheduler.Default).ConfigureAwait(false); return Task.CompletedTask; }; channel.BasicConsume(queue: queue, autoAck: false, consumer: consumer); WriteToConsole("Consumer started. Waiting for messages..."); // Keep the application running await Task.Delay(-1); static void WriteToConsole(string message) { Console.WriteLine($"{DateTime.Now:yyyy-MM-dd HH:mm:ss} : {message}"); } static async Task CallRestApi(string requestData, HttpClient client, IConfiguration configuration) { try { var apiPath = $"{configuration["API"]}/leave/process-check-in"; var content = new StringContent(requestData, Encoding.UTF8, "application/json"); var response = await client.PostAsync(apiPath, content); if (response.IsSuccessStatusCode) { var responseContent = await response.Content.ReadAsStringAsync(); WriteToConsole($"API Success: {responseContent}"); return true; } else { var errorMessage = await response.Content.ReadAsStringAsync(); var res = JsonSerializer.Deserialize(errorMessage); WriteToConsole($"API Error ({response.StatusCode}): {res?.Message ?? errorMessage}"); return false; } } catch (HttpRequestException ex) { WriteToConsole($"HTTP Error: {ex.Message}"); return false; } catch (TaskCanceledException ex) { WriteToConsole($"Timeout: {ex.Message}"); return false; } catch (Exception ex) { WriteToConsole($"Unexpected Error: {ex.Message}"); return false; } } public class ResponseObject { [JsonPropertyName("status")] public int Status { get; set; } [JsonPropertyName("message")] public string? Message { get; set; } [JsonPropertyName("result")] public object? Result { get; set; } } public class CheckTimeDtoRB { public Guid? CheckInId { get; set; } public double Lat { get; set; } = 0; public double Lon { get; set; } = 0; public string POI { get; set; } = string.Empty; public bool IsLocation { get; set; } = true; public string? LocationName { get; set; } = string.Empty; public string? Remark { get; set; } = string.Empty; public Guid? UserId { get; set; } public DateTime? CurrentDate { get; set; } public string? CheckInFileName { get; set; } public byte[]? CheckInFileBytes { get; set; } }