Merge pull request #2 from TimetableDesigner/dev

fix
This commit is contained in:
2026-01-28 23:54:02 +01:00
committed by GitHub
Unverified

View File

@@ -1,49 +1,56 @@
using System.Text.Json; using System.Text.Json;
using Microsoft.EntityFrameworkCore; using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Hosting;
namespace TimetableDesigner.Backend.Events.OutboxPattern; namespace TimetableDesigner.Backend.Events.OutboxPattern;
public class EventOutboxSender<TDbContext> : BackgroundService where TDbContext : DbContext, IEventOutboxDbContext public class EventOutboxSender<TDbContext> : BackgroundService where TDbContext : DbContext, IEventOutboxDbContext
{ {
private readonly TDbContext _databaseContext; private readonly IServiceScopeFactory _serviceScopeFactory;
private readonly IEventQueuePublisher _eventQueuePublisher; private readonly IEventQueuePublisher _eventQueuePublisher;
public EventOutboxSender(TDbContext databaseContext, IEventQueuePublisher eventQueuePublisher) public EventOutboxSender(IServiceScopeFactory serviceScopeFactory, IEventQueuePublisher eventQueuePublisher)
{ {
_databaseContext = databaseContext; _serviceScopeFactory = serviceScopeFactory;
_eventQueuePublisher = eventQueuePublisher; _eventQueuePublisher = eventQueuePublisher;
} }
protected override async Task ExecuteAsync(CancellationToken stoppingToken) protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{ {
while (!stoppingToken.IsCancellationRequested) using (IServiceScope scope = _serviceScopeFactory.CreateScope())
using (TDbContext dbContext = scope.ServiceProvider.GetRequiredService<TDbContext>())
{ {
Event? eventData = await _databaseContext.Events.FirstOrDefaultAsync(x => x.LastRetryOn == null, stoppingToken) await dbContext.Database.EnsureCreatedAsync(stoppingToken);
?? await _databaseContext.Events.OrderBy(x => x.LastRetryOn).FirstOrDefaultAsync(stoppingToken);
if (eventData is null) while (!stoppingToken.IsCancellationRequested)
{ {
continue; Event? eventData = await dbContext.Events.FirstOrDefaultAsync(x => x.LastRetryOn == null, stoppingToken)
} ?? await dbContext.Events.OrderBy(x => x.LastRetryOn).FirstOrDefaultAsync(stoppingToken);
Type payloadType = Type.GetType(eventData.PayloadType)!; if (eventData is null)
JsonSerializer.Deserialize(eventData.Payload, payloadType); {
continue;
}
try Type payloadType = Type.GetType(eventData.PayloadType)!;
{ JsonSerializer.Deserialize(eventData.Payload, payloadType);
await _eventQueuePublisher.PublishAsync(eventData.Payload, payloadType);
_databaseContext.Events.Remove(eventData); try
} {
catch await _eventQueuePublisher.PublishAsync(eventData.Payload, payloadType);
{
eventData.LastRetryOn = DateTimeOffset.UtcNow; dbContext.Events.Remove(eventData);
eventData.RetryCount++; }
} catch
finally {
{ eventData.LastRetryOn = DateTimeOffset.UtcNow;
await _databaseContext.SaveChangesAsync(stoppingToken); eventData.RetryCount++;
}
finally
{
await dbContext.SaveChangesAsync(stoppingToken);
}
} }
} }
} }