1 Commits
dev ... 1.1.0

2 changed files with 31 additions and 53 deletions

View File

@@ -11,9 +11,9 @@ public class Event
public DateTimeOffset? LastRetryOn { get; set; } public DateTimeOffset? LastRetryOn { get; set; }
public uint RetryCount { get; set; } public uint RetryCount { get; set; }
public static Event Create<T>(T payload) where T : class => new Event public Event(object payload)
{ {
Payload = JsonSerializer.Serialize<T>(payload), Payload = JsonSerializer.Serialize(payload);
PayloadType = payload.GetType().FullName! PayloadType = payload.GetType().FullName!;
}; }
} }

View File

@@ -1,71 +1,49 @@
using System.Text.Json; using System.Text.Json;
using Microsoft.EntityFrameworkCore; using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Hosting;
namespace TimetableDesigner.Backend.Events.OutboxPattern; namespace TimetableDesigner.Backend.Events.OutboxPattern;
public class EventOutboxSender<TDbContext> : BackgroundService where TDbContext : DbContext, IEventOutboxDbContext public class EventOutboxSender<TDbContext> : BackgroundService where TDbContext : DbContext, IEventOutboxDbContext
{ {
private readonly IConfiguration _configuration; private readonly TDbContext _databaseContext;
private readonly IServiceScopeFactory _serviceScopeFactory;
private readonly IEventQueuePublisher _eventQueuePublisher; private readonly IEventQueuePublisher _eventQueuePublisher;
public EventOutboxSender(IConfiguration configuration, IServiceScopeFactory serviceScopeFactory, IEventQueuePublisher eventQueuePublisher) public EventOutboxSender(TDbContext databaseContext, IEventQueuePublisher eventQueuePublisher)
{ {
_configuration = configuration; _databaseContext = databaseContext;
_serviceScopeFactory = serviceScopeFactory;
_eventQueuePublisher = eventQueuePublisher; _eventQueuePublisher = eventQueuePublisher;
} }
protected override async Task ExecuteAsync(CancellationToken stoppingToken) protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{ {
IConfigurationSection section = _configuration.GetSection("Workers:EventOutboxSender"); while (!stoppingToken.IsCancellationRequested)
int delayBetweenQueries = section.GetValue("DelayBetweenEmptyQueries", 10);
int limitInSingleQuery = section.GetValue("LimitInSingleQuery", 50);
using (IServiceScope scope = _serviceScopeFactory.CreateScope())
using (TDbContext dbContext = scope.ServiceProvider.GetRequiredService<TDbContext>())
{ {
await dbContext.Database.EnsureCreatedAsync(stoppingToken); Event? eventData = await _databaseContext.Events.FirstOrDefaultAsync(x => x.LastRetryOn == null, stoppingToken)
?? await _databaseContext.Events.OrderBy(x => x.LastRetryOn).FirstOrDefaultAsync(stoppingToken);
while (!stoppingToken.IsCancellationRequested)
if (eventData is null)
{ {
IAsyncEnumerable<Event>? events = dbContext.Events continue;
.OrderBy(x => x.LastRetryOn) }
.Take(limitInSingleQuery)
.ToAsyncEnumerable(); Type payloadType = Type.GetType(eventData.PayloadType)!;
JsonSerializer.Deserialize(eventData.Payload, payloadType);
bool any = false; try
await foreach (Event eventData in events) {
{ await _eventQueuePublisher.PublishAsync(eventData.Payload, payloadType);
any = true;
Type payloadType = Type.GetType(eventData.PayloadType)!; _databaseContext.Events.Remove(eventData);
JsonSerializer.Deserialize(eventData.Payload, payloadType); }
catch
try {
{ eventData.LastRetryOn = DateTimeOffset.UtcNow;
await _eventQueuePublisher.PublishAsync(eventData.Payload, payloadType); eventData.RetryCount++;
}
dbContext.Events.Remove(eventData); finally
} {
catch await _databaseContext.SaveChangesAsync(stoppingToken);
{
eventData.LastRetryOn = DateTimeOffset.UtcNow;
eventData.RetryCount++;
}
}
if (!any)
{
await Task.Delay(TimeSpan.FromSeconds(delayBetweenQueries), stoppingToken);
}
else
{
await dbContext.SaveChangesAsync(stoppingToken);
}
} }
} }
} }