Compare commits
1 Commits
@@ -1,5 +1,6 @@
|
||||
using System.Text.Json;
|
||||
using Microsoft.EntityFrameworkCore;
|
||||
using Microsoft.Extensions.Configuration;
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
using Microsoft.Extensions.Hosting;
|
||||
|
||||
@@ -7,17 +8,23 @@ namespace TimetableDesigner.Backend.Events.OutboxPattern;
|
||||
|
||||
public class EventOutboxSender<TDbContext> : BackgroundService where TDbContext : DbContext, IEventOutboxDbContext
|
||||
{
|
||||
private readonly IConfiguration _configuration;
|
||||
private readonly IServiceScopeFactory _serviceScopeFactory;
|
||||
private readonly IEventQueuePublisher _eventQueuePublisher;
|
||||
|
||||
public EventOutboxSender(IServiceScopeFactory serviceScopeFactory, IEventQueuePublisher eventQueuePublisher)
|
||||
public EventOutboxSender(IConfiguration configuration, IServiceScopeFactory serviceScopeFactory, IEventQueuePublisher eventQueuePublisher)
|
||||
{
|
||||
_configuration = configuration;
|
||||
_serviceScopeFactory = serviceScopeFactory;
|
||||
_eventQueuePublisher = eventQueuePublisher;
|
||||
}
|
||||
|
||||
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
||||
{
|
||||
IConfigurationSection section = _configuration.GetSection("Workers:EventOutboxSender");
|
||||
int delayBetweenQueries = section.GetValue("DelayBetweenEmptyQueries", 10);
|
||||
int limitInSingleQuery = section.GetValue("LimitInSingleQuery", 50);
|
||||
|
||||
using (IServiceScope scope = _serviceScopeFactory.CreateScope())
|
||||
using (TDbContext dbContext = scope.ServiceProvider.GetRequiredService<TDbContext>())
|
||||
{
|
||||
@@ -25,29 +32,37 @@ public class EventOutboxSender<TDbContext> : BackgroundService where TDbContext
|
||||
|
||||
while (!stoppingToken.IsCancellationRequested)
|
||||
{
|
||||
Event? eventData = await dbContext.Events.FirstOrDefaultAsync(x => x.LastRetryOn == null, stoppingToken)
|
||||
?? await dbContext.Events.OrderBy(x => x.LastRetryOn).FirstOrDefaultAsync(stoppingToken);
|
||||
IAsyncEnumerable<Event>? events = dbContext.Events
|
||||
.OrderBy(x => x.LastRetryOn)
|
||||
.Take(limitInSingleQuery)
|
||||
.ToAsyncEnumerable();
|
||||
|
||||
if (eventData is null)
|
||||
bool any = false;
|
||||
await foreach (Event eventData in events)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
Type payloadType = Type.GetType(eventData.PayloadType)!;
|
||||
JsonSerializer.Deserialize(eventData.Payload, payloadType);
|
||||
|
||||
try
|
||||
{
|
||||
await _eventQueuePublisher.PublishAsync(eventData.Payload, payloadType);
|
||||
any = true;
|
||||
|
||||
dbContext.Events.Remove(eventData);
|
||||
Type payloadType = Type.GetType(eventData.PayloadType)!;
|
||||
JsonSerializer.Deserialize(eventData.Payload, payloadType);
|
||||
|
||||
try
|
||||
{
|
||||
await _eventQueuePublisher.PublishAsync(eventData.Payload, payloadType);
|
||||
|
||||
dbContext.Events.Remove(eventData);
|
||||
}
|
||||
catch
|
||||
{
|
||||
eventData.LastRetryOn = DateTimeOffset.UtcNow;
|
||||
eventData.RetryCount++;
|
||||
}
|
||||
}
|
||||
catch
|
||||
|
||||
if (!any)
|
||||
{
|
||||
eventData.LastRetryOn = DateTimeOffset.UtcNow;
|
||||
eventData.RetryCount++;
|
||||
await Task.Delay(TimeSpan.FromSeconds(delayBetweenQueries), stoppingToken);
|
||||
}
|
||||
finally
|
||||
else
|
||||
{
|
||||
await dbContext.SaveChangesAsync(stoppingToken);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user