176 lines
5.9 KiB
C#
176 lines
5.9 KiB
C#
using Microsoft.Extensions.Configuration;
|
|
using RabbitMQ.Client;
|
|
using RabbitMQ.Client.Events;
|
|
using System.Text;
|
|
using System.Text.Json;
|
|
using System.Text.Json.Serialization;
|
|
|
|
var basePath = Directory.GetCurrentDirectory();
|
|
var configuration = new ConfigurationBuilder()
|
|
.SetBasePath(basePath)
|
|
.AddJsonFile("appsettings.json", optional: false, reloadOnChange: true)
|
|
.Build();
|
|
|
|
WriteToConsole("Consumer Start!");
|
|
|
|
var host = configuration["Rabbit:Host"] ?? "";
|
|
var user = configuration["Rabbit:User"] ?? "";
|
|
var pass = configuration["Rabbit:Password"] ?? "";
|
|
var queue = configuration["Rabbit:Queue"] ?? "basic-queue";
|
|
|
|
// Concurrency & prefetch (configurable via appsettings.json)
|
|
var maxConcurrency = int.TryParse(configuration["MaxConcurrency"], out var c) && c > 0 ? c : 5;
|
|
var prefetchCount = ushort.TryParse(configuration["PrefetchCount"], out var p) && p > 0 ? p : (ushort)20;
|
|
var httpTimeoutSec = int.TryParse(configuration["HttpTimeoutSeconds"], out var t) && t > 0 ? t : 60;
|
|
|
|
WriteToConsole($"Config -> MaxConcurrency: {maxConcurrency}, PrefetchCount: {prefetchCount}, HttpTimeout: {httpTimeoutSec}s");
|
|
|
|
// create connection
|
|
var factory = new ConnectionFactory()
|
|
{
|
|
HostName = host,
|
|
UserName = user,
|
|
Password = pass,
|
|
DispatchConsumersAsync = true
|
|
};
|
|
|
|
using var connection = factory.CreateConnection();
|
|
using var channel = connection.CreateModel();
|
|
|
|
channel.QueueDeclare(queue: queue, durable: true, exclusive: false, autoDelete: false, arguments: null);
|
|
|
|
// Prefetch: RabbitMQ จะส่ง message หลายตัวมาที่ consumer พร้อมกัน (ลด network round-trip)
|
|
channel.BasicQos(prefetchSize: 0, prefetchCount: prefetchCount, global: false);
|
|
|
|
// HttpClient แบบ SocketsHttpHandler พร้อม connection pooling รองรับ concurrent requests
|
|
var socketsHandler = new SocketsHttpHandler
|
|
{
|
|
MaxConnectionsPerServer = maxConcurrency * 2,
|
|
PooledConnectionLifetime = TimeSpan.FromMinutes(2),
|
|
PooledConnectionIdleTimeout = TimeSpan.FromSeconds(30)
|
|
};
|
|
using var httpClient = new HttpClient(socketsHandler);
|
|
httpClient.Timeout = TimeSpan.FromSeconds(httpTimeoutSec);
|
|
|
|
// SemaphoreSlim คุมจำนวน message ที่ประมวลผลพร้อมกัน (เนื่องจาก API มีข้อจำกัดเรื่อง concurrency)
|
|
using var semaphore = new SemaphoreSlim(maxConcurrency, maxConcurrency);
|
|
|
|
var consumer = new AsyncEventingBasicConsumer(channel);
|
|
|
|
consumer.Received += (model, ea) =>
|
|
{
|
|
// รอ semaphore ก่อนเริ่มประมวลผล
|
|
semaphore.WaitAsync().ContinueWith(async _ =>
|
|
{
|
|
try
|
|
{
|
|
var body = ea.Body.ToArray();
|
|
var message = Encoding.UTF8.GetString(body);
|
|
|
|
WriteToConsole($"Received message: {message}");
|
|
|
|
var success = await CallRestApi(message, httpClient, configuration);
|
|
|
|
if (success)
|
|
{
|
|
channel.BasicAck(ea.DeliveryTag, multiple: false);
|
|
WriteToConsole("Message processed successfully");
|
|
}
|
|
else
|
|
{
|
|
channel.BasicNack(ea.DeliveryTag, multiple: false, requeue: false);
|
|
WriteToConsole("Message processing failed - message rejected");
|
|
}
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
WriteToConsole($"Error processing message: {ex.Message}");
|
|
channel.BasicNack(ea.DeliveryTag, multiple: false, requeue: false);
|
|
}
|
|
finally
|
|
{
|
|
semaphore.Release();
|
|
}
|
|
}, TaskScheduler.Default).ConfigureAwait(false);
|
|
|
|
return Task.CompletedTask;
|
|
};
|
|
|
|
channel.BasicConsume(queue: queue, autoAck: false, consumer: consumer);
|
|
|
|
WriteToConsole("Consumer started. Waiting for messages...");
|
|
|
|
// Keep the application running
|
|
await Task.Delay(-1);
|
|
|
|
static void WriteToConsole(string message)
|
|
{
|
|
Console.WriteLine($"{DateTime.Now:yyyy-MM-dd HH:mm:ss} : {message}");
|
|
}
|
|
|
|
static async Task<bool> CallRestApi(string requestData, HttpClient client, IConfiguration configuration)
|
|
{
|
|
try
|
|
{
|
|
var apiPath = $"{configuration["API"]}/leave/process-check-in";
|
|
var content = new StringContent(requestData, Encoding.UTF8, "application/json");
|
|
|
|
var response = await client.PostAsync(apiPath, content);
|
|
|
|
if (response.IsSuccessStatusCode)
|
|
{
|
|
var responseContent = await response.Content.ReadAsStringAsync();
|
|
WriteToConsole($"API Success: {responseContent}");
|
|
return true;
|
|
}
|
|
else
|
|
{
|
|
var errorMessage = await response.Content.ReadAsStringAsync();
|
|
var res = JsonSerializer.Deserialize<ResponseObject>(errorMessage);
|
|
WriteToConsole($"API Error ({response.StatusCode}): {res?.Message ?? errorMessage}");
|
|
return false;
|
|
}
|
|
}
|
|
catch (HttpRequestException ex)
|
|
{
|
|
WriteToConsole($"HTTP Error: {ex.Message}");
|
|
return false;
|
|
}
|
|
catch (TaskCanceledException ex)
|
|
{
|
|
WriteToConsole($"Timeout: {ex.Message}");
|
|
return false;
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
WriteToConsole($"Unexpected Error: {ex.Message}");
|
|
return false;
|
|
}
|
|
}
|
|
|
|
public class ResponseObject
|
|
{
|
|
[JsonPropertyName("status")]
|
|
public int Status { get; set; }
|
|
|
|
[JsonPropertyName("message")]
|
|
public string? Message { get; set; }
|
|
|
|
[JsonPropertyName("result")]
|
|
public object? Result { get; set; }
|
|
}
|
|
|
|
public class CheckTimeDtoRB
|
|
{
|
|
public Guid? CheckInId { get; set; }
|
|
public double Lat { get; set; } = 0;
|
|
public double Lon { get; set; } = 0;
|
|
public string POI { get; set; } = string.Empty;
|
|
public bool IsLocation { get; set; } = true;
|
|
public string? LocationName { get; set; } = string.Empty;
|
|
public string? Remark { get; set; } = string.Empty;
|
|
public Guid? UserId { get; set; }
|
|
public DateTime? CurrentDate { get; set; }
|
|
public string? CheckInFileName { get; set; }
|
|
public byte[]? CheckInFileBytes { get; set; }
|
|
}
|