2 Commits
1.1.0 ... 1.1.1

View File

@@ -1,26 +1,32 @@
using System.Text.Json;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
namespace TimetableDesigner.Backend.Events.OutboxPattern;
public class EventOutboxSender<TDbContext> : BackgroundService where TDbContext : DbContext, IEventOutboxDbContext
{
private readonly TDbContext _databaseContext;
private readonly IServiceScopeFactory _serviceScopeFactory;
private readonly IEventQueuePublisher _eventQueuePublisher;
public EventOutboxSender(TDbContext databaseContext, IEventQueuePublisher eventQueuePublisher)
public EventOutboxSender(IServiceScopeFactory serviceScopeFactory, IEventQueuePublisher eventQueuePublisher)
{
_databaseContext = databaseContext;
_serviceScopeFactory = serviceScopeFactory;
_eventQueuePublisher = eventQueuePublisher;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
using (IServiceScope scope = _serviceScopeFactory.CreateScope())
using (TDbContext dbContext = scope.ServiceProvider.GetRequiredService<TDbContext>())
{
await dbContext.Database.EnsureCreatedAsync(stoppingToken);
while (!stoppingToken.IsCancellationRequested)
{
Event? eventData = await _databaseContext.Events.FirstOrDefaultAsync(x => x.LastRetryOn == null, stoppingToken)
?? await _databaseContext.Events.OrderBy(x => x.LastRetryOn).FirstOrDefaultAsync(stoppingToken);
Event? eventData = await dbContext.Events.FirstOrDefaultAsync(x => x.LastRetryOn == null, stoppingToken)
?? await dbContext.Events.OrderBy(x => x.LastRetryOn).FirstOrDefaultAsync(stoppingToken);
if (eventData is null)
{
@@ -34,7 +40,7 @@ public class EventOutboxSender<TDbContext> : BackgroundService where TDbContext
{
await _eventQueuePublisher.PublishAsync(eventData.Payload, payloadType);
_databaseContext.Events.Remove(eventData);
dbContext.Events.Remove(eventData);
}
catch
{
@@ -43,7 +49,8 @@ public class EventOutboxSender<TDbContext> : BackgroundService where TDbContext
}
finally
{
await _databaseContext.SaveChangesAsync(stoppingToken);
await dbContext.SaveChangesAsync(stoppingToken);
}
}
}
}