Add migration to create CheckInJobStatuses table for RMQ task control

- Introduced a new migration that creates the CheckInJobStatuses table.
- The table includes fields for tracking job statuses, timestamps, user information, and error messages.
- Supports various statuses such as PENDING, PROCESSING, COMPLETED, and FAILED.
This commit is contained in:
Suphonchai Phoonsawat 2026-01-20 10:49:13 +07:00
parent 3532df32fd
commit a463df5716
12 changed files with 2259 additions and 126 deletions

View file

@ -21,63 +21,52 @@ var queue = configuration["Rabbit:Queue"] ?? "basic-queue";
// create connection
var factory = new ConnectionFactory()
{
HostName = host,
UserName = user,
Password = pass
//Uri = new Uri("amqp://admin:P@ssw0rd@192.168.4.11:5672")
HostName = host,// หรือ hostname ของ RabbitMQ Server ที่คุณใช้
UserName = user, // ใส่ชื่อผู้ใช้ของคุณ
Password = pass // ใส่รหัสผ่านของคุณ
};
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
//channel.QueueDeclare(queue: "bma-checkin-queue", durable: true, exclusive: false, autoDelete: false, arguments: null);
channel.QueueDeclare(queue: queue, durable: true, exclusive: false, autoDelete: false, arguments: null);
var consumer = new EventingBasicConsumer(channel);
string? consumerTag = null;
bool isConsuming = false;
consumer.Received += async (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
// Double-check time before processing (safety check)
if (!IsWithinOperatingHours())
{
WriteToConsole($"Message received outside operating hours. Requeuing message.");
channel.BasicNack(ea.DeliveryTag, false, true); // Requeue the message
return;
}
await CallRestApi(message);
// convert string into object
//var request = JsonConvert.DeserializeObject<CheckInRequest>(message);
//using (var db = new ApplicationDbContext())
//{
// var item = new AttendantItem
// {
// Name = request.Name,
// CheckInDateTime = request.CheckInDateTime,
// };
// db.AttendantItems.Add(item);
// db.SaveChanges();
// WriteToConsole($"ได้รับคำขอจาก Queue: {message}");
// WriteToConsole($"ตอบกลับจาก REST API: {JsonConvert.SerializeObject(item)}");
//}
WriteToConsole($"ได้รับคำขอจาก Queue: {message}");
//WriteToConsole($"ตอบกลับจาก REST API: {JsonConvert.SerializeObject(item)}");
};
// Monitor and control consumer based on time schedule
using var timer = new PeriodicTimer(TimeSpan.FromMinutes(1));
//channel.BasicConsume(queue: "bma-checkin-queue", autoAck: true, consumer: consumer);
channel.BasicConsume(queue: queue, autoAck: true, consumer: consumer);
while (await timer.WaitForNextTickAsync())
{
var shouldBeConsuming = IsWithinOperatingHours();
if (shouldBeConsuming && !isConsuming)
{
// Start consuming
consumerTag = channel.BasicConsume(queue: queue, autoAck: true, consumer: consumer);
isConsuming = true;
WriteToConsole($"✅ Started consuming messages at {GetCurrentBangkokTime():yyyy-MM-dd HH:mm:ss}");
}
else if (!shouldBeConsuming && isConsuming)
{
// Stop consuming
if (consumerTag != null)
{
channel.BasicCancel(consumerTag);
consumerTag = null;
}
isConsuming = false;
WriteToConsole($"⏸️ Stopped consuming messages at {GetCurrentBangkokTime():yyyy-MM-dd HH:mm:ss}. Will resume after 08:10 tomorrow.");
}
}
//Console.WriteLine("\nPress 'Enter' to exit the process...");
await Task.Delay(-1);
static void WriteToConsole(string message)
{
@ -106,25 +95,6 @@ async Task CallRestApi(string requestData)
}
}
DateTime GetCurrentBangkokTime()
{
var bangkokTimeZone = TimeZoneInfo.FindSystemTimeZoneById("SE Asia Standard Time");
return TimeZoneInfo.ConvertTimeFromUtc(DateTime.UtcNow, bangkokTimeZone);
}
bool IsWithinOperatingHours()
{
var currentTime = GetCurrentBangkokTime();
var startTime = new TimeSpan(8, 10, 0); // 8:10 AM
var endTime = new TimeSpan(23, 59, 59); // End of day
var currentTimeOfDay = currentTime.TimeOfDay;
// Consumer should only work from 8:10 AM to end of day
return currentTimeOfDay >= startTime && currentTimeOfDay <= endTime;
}
public class ResponseObject
{
[JsonPropertyName("status")]