Compare commits
6 Commits
@@ -11,9 +11,9 @@ public class Event
|
|||||||
public DateTimeOffset? LastRetryOn { get; set; }
|
public DateTimeOffset? LastRetryOn { get; set; }
|
||||||
public uint RetryCount { get; set; }
|
public uint RetryCount { get; set; }
|
||||||
|
|
||||||
public Event(object payload)
|
public static Event Create<T>(T payload) where T : class => new Event
|
||||||
{
|
{
|
||||||
Payload = JsonSerializer.Serialize(payload);
|
Payload = JsonSerializer.Serialize<T>(payload),
|
||||||
PayloadType = payload.GetType().FullName!;
|
PayloadType = payload.GetType().FullName!
|
||||||
}
|
};
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,49 +1,71 @@
|
|||||||
using System.Text.Json;
|
using System.Text.Json;
|
||||||
using Microsoft.EntityFrameworkCore;
|
using Microsoft.EntityFrameworkCore;
|
||||||
|
using Microsoft.Extensions.Configuration;
|
||||||
|
using Microsoft.Extensions.DependencyInjection;
|
||||||
using Microsoft.Extensions.Hosting;
|
using Microsoft.Extensions.Hosting;
|
||||||
|
|
||||||
namespace TimetableDesigner.Backend.Events.OutboxPattern;
|
namespace TimetableDesigner.Backend.Events.OutboxPattern;
|
||||||
|
|
||||||
public class EventOutboxSender<TDbContext> : BackgroundService where TDbContext : DbContext, IEventOutboxDbContext
|
public class EventOutboxSender<TDbContext> : BackgroundService where TDbContext : DbContext, IEventOutboxDbContext
|
||||||
{
|
{
|
||||||
private readonly TDbContext _databaseContext;
|
private readonly IConfiguration _configuration;
|
||||||
|
private readonly IServiceScopeFactory _serviceScopeFactory;
|
||||||
private readonly IEventQueuePublisher _eventQueuePublisher;
|
private readonly IEventQueuePublisher _eventQueuePublisher;
|
||||||
|
|
||||||
public EventOutboxSender(TDbContext databaseContext, IEventQueuePublisher eventQueuePublisher)
|
public EventOutboxSender(IConfiguration configuration, IServiceScopeFactory serviceScopeFactory, IEventQueuePublisher eventQueuePublisher)
|
||||||
{
|
{
|
||||||
_databaseContext = databaseContext;
|
_configuration = configuration;
|
||||||
|
_serviceScopeFactory = serviceScopeFactory;
|
||||||
_eventQueuePublisher = eventQueuePublisher;
|
_eventQueuePublisher = eventQueuePublisher;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
||||||
{
|
{
|
||||||
while (!stoppingToken.IsCancellationRequested)
|
IConfigurationSection section = _configuration.GetSection("Workers:EventOutboxSender");
|
||||||
|
int delayBetweenQueries = section.GetValue("DelayBetweenEmptyQueries", 10);
|
||||||
|
int limitInSingleQuery = section.GetValue("LimitInSingleQuery", 50);
|
||||||
|
|
||||||
|
using (IServiceScope scope = _serviceScopeFactory.CreateScope())
|
||||||
|
using (TDbContext dbContext = scope.ServiceProvider.GetRequiredService<TDbContext>())
|
||||||
{
|
{
|
||||||
Event? eventData = await _databaseContext.Events.FirstOrDefaultAsync(x => x.LastRetryOn == null, stoppingToken)
|
await dbContext.Database.EnsureCreatedAsync(stoppingToken);
|
||||||
?? await _databaseContext.Events.OrderBy(x => x.LastRetryOn).FirstOrDefaultAsync(stoppingToken);
|
|
||||||
|
|
||||||
if (eventData is null)
|
|
||||||
{
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
Type payloadType = Type.GetType(eventData.PayloadType)!;
|
while (!stoppingToken.IsCancellationRequested)
|
||||||
JsonSerializer.Deserialize(eventData.Payload, payloadType);
|
{
|
||||||
|
IAsyncEnumerable<Event>? events = dbContext.Events
|
||||||
|
.OrderBy(x => x.LastRetryOn)
|
||||||
|
.Take(limitInSingleQuery)
|
||||||
|
.ToAsyncEnumerable();
|
||||||
|
|
||||||
try
|
bool any = false;
|
||||||
{
|
await foreach (Event eventData in events)
|
||||||
await _eventQueuePublisher.PublishAsync(eventData.Payload, payloadType);
|
{
|
||||||
|
any = true;
|
||||||
|
|
||||||
_databaseContext.Events.Remove(eventData);
|
Type payloadType = Type.GetType(eventData.PayloadType)!;
|
||||||
}
|
JsonSerializer.Deserialize(eventData.Payload, payloadType);
|
||||||
catch
|
|
||||||
{
|
try
|
||||||
eventData.LastRetryOn = DateTimeOffset.UtcNow;
|
{
|
||||||
eventData.RetryCount++;
|
await _eventQueuePublisher.PublishAsync(eventData.Payload, payloadType);
|
||||||
}
|
|
||||||
finally
|
dbContext.Events.Remove(eventData);
|
||||||
{
|
}
|
||||||
await _databaseContext.SaveChangesAsync(stoppingToken);
|
catch
|
||||||
|
{
|
||||||
|
eventData.LastRetryOn = DateTimeOffset.UtcNow;
|
||||||
|
eventData.RetryCount++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!any)
|
||||||
|
{
|
||||||
|
await Task.Delay(TimeSpan.FromSeconds(delayBetweenQueries), stoppingToken);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
await dbContext.SaveChangesAsync(stoppingToken);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user