แก้ปัญหา RabbitMQ ประมวลผลในคิวช้า และรอคิวนาน
Some checks failed
Build & Deploy Checkin Service / build (push) Failing after 40s

This commit is contained in:
Suphonchai Phoonsawat 2026-06-23 11:25:42 +07:00
parent ae417e4777
commit 5f678b2898
3 changed files with 139 additions and 24 deletions

View file

@ -18,6 +18,13 @@ var user = configuration["Rabbit:User"] ?? "";
var pass = configuration["Rabbit:Password"] ?? "";
var queue = configuration["Rabbit:Queue"] ?? "basic-queue";
// Concurrency & prefetch (configurable via appsettings.json)
var maxConcurrency = int.TryParse(configuration["MaxConcurrency"], out var c) && c > 0 ? c : 5;
var prefetchCount = ushort.TryParse(configuration["PrefetchCount"], out var p) && p > 0 ? p : (ushort)20;
var httpTimeoutSec = int.TryParse(configuration["HttpTimeoutSeconds"], out var t) && t > 0 ? t : 60;
WriteToConsole($"Config -> MaxConcurrency: {maxConcurrency}, PrefetchCount: {prefetchCount}, HttpTimeout: {httpTimeoutSec}s");
// create connection
var factory = new ConnectionFactory()
{
@ -32,39 +39,61 @@ using var channel = connection.CreateModel();
channel.QueueDeclare(queue: queue, durable: true, exclusive: false, autoDelete: false, arguments: null);
// Create a SINGLE static HttpClient instance to prevent socket exhaustion
using var httpClient = new HttpClient();
httpClient.Timeout = TimeSpan.FromSeconds(300); // 5 นาที
// Prefetch: RabbitMQ จะส่ง message หลายตัวมาที่ consumer พร้อมกัน (ลด network round-trip)
channel.BasicQos(prefetchSize: 0, prefetchCount: prefetchCount, global: false);
// HttpClient แบบ SocketsHttpHandler พร้อม connection pooling รองรับ concurrent requests
var socketsHandler = new SocketsHttpHandler
{
MaxConnectionsPerServer = maxConcurrency * 2,
PooledConnectionLifetime = TimeSpan.FromMinutes(2),
PooledConnectionIdleTimeout = TimeSpan.FromSeconds(30)
};
using var httpClient = new HttpClient(socketsHandler);
httpClient.Timeout = TimeSpan.FromSeconds(httpTimeoutSec);
// SemaphoreSlim คุมจำนวน message ที่ประมวลผลพร้อมกัน (เนื่องจาก API มีข้อจำกัดเรื่อง concurrency)
using var semaphore = new SemaphoreSlim(maxConcurrency, maxConcurrency);
var consumer = new AsyncEventingBasicConsumer(channel);
consumer.Received += async (model, ea) =>
consumer.Received += (model, ea) =>
{
try
// รอ semaphore ก่อนเริ่มประมวลผล
semaphore.WaitAsync().ContinueWith(async _ =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
WriteToConsole($"Received message: {message}");
var success = await CallRestApi(message, httpClient, configuration);
if (success)
try
{
channel.BasicAck(ea.DeliveryTag, multiple: false);
WriteToConsole("Message processed successfully");
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
WriteToConsole($"Received message: {message}");
var success = await CallRestApi(message, httpClient, configuration);
if (success)
{
channel.BasicAck(ea.DeliveryTag, multiple: false);
WriteToConsole("Message processed successfully");
}
else
{
channel.BasicNack(ea.DeliveryTag, multiple: false, requeue: false);
WriteToConsole("Message processing failed - message rejected");
}
}
else
catch (Exception ex)
{
WriteToConsole($"Error processing message: {ex.Message}");
channel.BasicNack(ea.DeliveryTag, multiple: false, requeue: false);
WriteToConsole("Message processing failed - message rejected");
}
}
catch (Exception ex)
{
WriteToConsole($"Error processing message: {ex.Message}");
channel.BasicNack(ea.DeliveryTag, multiple: false, requeue: false);
}
finally
{
semaphore.Release();
}
}, TaskScheduler.Default).ConfigureAwait(false);
return Task.CompletedTask;
};
channel.BasicConsume(queue: queue, autoAck: false, consumer: consumer);