Refactor message consumption to start after 8:10 AM and implement time checks for operating hours
All checks were successful
Build & Deploy Checkin Service / build (push) Successful in 1m20s
All checks were successful
Build & Deploy Checkin Service / build (push) Successful in 1m20s
This commit is contained in:
parent
6c8e79b1bc
commit
3532df32fd
1 changed files with 51 additions and 49 deletions
|
|
@ -13,9 +13,6 @@ var configuration = new ConfigurationBuilder()
|
|||
|
||||
WriteToConsole("Consumer Start!");
|
||||
|
||||
// Wait until 8:00 AM before starting to consume messages
|
||||
await WaitUntil8AM();
|
||||
|
||||
var host = configuration["Rabbit:Host"] ?? "";
|
||||
var user = configuration["Rabbit:User"] ?? "";
|
||||
var pass = configuration["Rabbit:Password"] ?? "";
|
||||
|
|
@ -24,52 +21,63 @@ var queue = configuration["Rabbit:Queue"] ?? "basic-queue";
|
|||
// create connection
|
||||
var factory = new ConnectionFactory()
|
||||
{
|
||||
//Uri = new Uri("amqp://admin:P@ssw0rd@192.168.4.11:5672")
|
||||
HostName = host,// หรือ hostname ของ RabbitMQ Server ที่คุณใช้
|
||||
UserName = user, // ใส่ชื่อผู้ใช้ของคุณ
|
||||
Password = pass // ใส่รหัสผ่านของคุณ
|
||||
HostName = host,
|
||||
UserName = user,
|
||||
Password = pass
|
||||
};
|
||||
|
||||
using var connection = factory.CreateConnection();
|
||||
using var channel = connection.CreateModel();
|
||||
|
||||
//channel.QueueDeclare(queue: "bma-checkin-queue", durable: true, exclusive: false, autoDelete: false, arguments: null);
|
||||
channel.QueueDeclare(queue: queue, durable: true, exclusive: false, autoDelete: false, arguments: null);
|
||||
|
||||
var consumer = new EventingBasicConsumer(channel);
|
||||
string? consumerTag = null;
|
||||
bool isConsuming = false;
|
||||
|
||||
consumer.Received += async (model, ea) =>
|
||||
{
|
||||
var body = ea.Body.ToArray();
|
||||
var message = Encoding.UTF8.GetString(body);
|
||||
|
||||
// Double-check time before processing (safety check)
|
||||
if (!IsWithinOperatingHours())
|
||||
{
|
||||
WriteToConsole($"Message received outside operating hours. Requeuing message.");
|
||||
channel.BasicNack(ea.DeliveryTag, false, true); // Requeue the message
|
||||
return;
|
||||
}
|
||||
|
||||
await CallRestApi(message);
|
||||
|
||||
// convert string into object
|
||||
//var request = JsonConvert.DeserializeObject<CheckInRequest>(message);
|
||||
//using (var db = new ApplicationDbContext())
|
||||
//{
|
||||
// var item = new AttendantItem
|
||||
// {
|
||||
// Name = request.Name,
|
||||
// CheckInDateTime = request.CheckInDateTime,
|
||||
// };
|
||||
// db.AttendantItems.Add(item);
|
||||
// db.SaveChanges();
|
||||
|
||||
// WriteToConsole($"ได้รับคำขอจาก Queue: {message}");
|
||||
// WriteToConsole($"ตอบกลับจาก REST API: {JsonConvert.SerializeObject(item)}");
|
||||
//}
|
||||
|
||||
WriteToConsole($"ได้รับคำขอจาก Queue: {message}");
|
||||
//WriteToConsole($"ตอบกลับจาก REST API: {JsonConvert.SerializeObject(item)}");
|
||||
};
|
||||
|
||||
//channel.BasicConsume(queue: "bma-checkin-queue", autoAck: true, consumer: consumer);
|
||||
channel.BasicConsume(queue: queue, autoAck: true, consumer: consumer);
|
||||
// Monitor and control consumer based on time schedule
|
||||
using var timer = new PeriodicTimer(TimeSpan.FromMinutes(1));
|
||||
|
||||
//Console.WriteLine("\nPress 'Enter' to exit the process...");
|
||||
|
||||
await Task.Delay(-1);
|
||||
while (await timer.WaitForNextTickAsync())
|
||||
{
|
||||
var shouldBeConsuming = IsWithinOperatingHours();
|
||||
|
||||
if (shouldBeConsuming && !isConsuming)
|
||||
{
|
||||
// Start consuming
|
||||
consumerTag = channel.BasicConsume(queue: queue, autoAck: true, consumer: consumer);
|
||||
isConsuming = true;
|
||||
WriteToConsole($"✅ Started consuming messages at {GetCurrentBangkokTime():yyyy-MM-dd HH:mm:ss}");
|
||||
}
|
||||
else if (!shouldBeConsuming && isConsuming)
|
||||
{
|
||||
// Stop consuming
|
||||
if (consumerTag != null)
|
||||
{
|
||||
channel.BasicCancel(consumerTag);
|
||||
consumerTag = null;
|
||||
}
|
||||
isConsuming = false;
|
||||
WriteToConsole($"⏸️ Stopped consuming messages at {GetCurrentBangkokTime():yyyy-MM-dd HH:mm:ss}. Will resume after 08:10 tomorrow.");
|
||||
}
|
||||
}
|
||||
|
||||
static void WriteToConsole(string message)
|
||||
{
|
||||
|
|
@ -98,28 +106,22 @@ async Task CallRestApi(string requestData)
|
|||
}
|
||||
}
|
||||
|
||||
async Task WaitUntil8AM()
|
||||
DateTime GetCurrentBangkokTime()
|
||||
{
|
||||
// Get current time in Bangkok timezone
|
||||
var bangkokTimeZone = TimeZoneInfo.FindSystemTimeZoneById("SE Asia Standard Time");
|
||||
var currentTime = TimeZoneInfo.ConvertTimeFromUtc(DateTime.UtcNow, bangkokTimeZone);
|
||||
return TimeZoneInfo.ConvertTimeFromUtc(DateTime.UtcNow, bangkokTimeZone);
|
||||
}
|
||||
|
||||
bool IsWithinOperatingHours()
|
||||
{
|
||||
var currentTime = GetCurrentBangkokTime();
|
||||
var startTime = new TimeSpan(8, 10, 0); // 8:10 AM
|
||||
var endTime = new TimeSpan(23, 59, 59); // End of day
|
||||
|
||||
var targetTime = new DateTime(currentTime.Year, currentTime.Month, currentTime.Day, 8, 10, 0);
|
||||
var currentTimeOfDay = currentTime.TimeOfDay;
|
||||
|
||||
// If current time is already past 8:10 AM today, start immediately
|
||||
if (currentTime >= targetTime)
|
||||
{
|
||||
WriteToConsole($"Current time is {currentTime:HH:mm:ss}. Starting consumer immediately.");
|
||||
return;
|
||||
}
|
||||
|
||||
// Calculate time to wait
|
||||
var timeToWait = targetTime - currentTime;
|
||||
WriteToConsole($"Current time is {currentTime:HH:mm:ss}. Waiting until 08:10:00 to start consuming messages.");
|
||||
WriteToConsole($"Time to wait: {timeToWait.Hours} hours, {timeToWait.Minutes} minutes, {timeToWait.Seconds} seconds");
|
||||
|
||||
await Task.Delay(timeToWait);
|
||||
WriteToConsole("It's now 08:10:00. Starting to consume messages from queue.");
|
||||
// Consumer should only work from 8:10 AM to end of day
|
||||
return currentTimeOfDay >= startTime && currentTimeOfDay <= endTime;
|
||||
}
|
||||
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue