From 8f7c858cc562c9c8020733bf8657fff815b17fc9 Mon Sep 17 00:00:00 2001 From: Mateusz Skoczek Date: Thu, 29 Jan 2026 23:44:41 +0100 Subject: [PATCH] performance upgrade --- .../EventOutboxSender.cs | 51 ++++++++++++------- 1 file changed, 33 insertions(+), 18 deletions(-) diff --git a/TimetableDesigner.Backend.Events.OutboxPattern/EventOutboxSender.cs b/TimetableDesigner.Backend.Events.OutboxPattern/EventOutboxSender.cs index c9797d8..0cb5c83 100644 --- a/TimetableDesigner.Backend.Events.OutboxPattern/EventOutboxSender.cs +++ b/TimetableDesigner.Backend.Events.OutboxPattern/EventOutboxSender.cs @@ -1,5 +1,6 @@ using System.Text.Json; using Microsoft.EntityFrameworkCore; +using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; @@ -7,17 +8,23 @@ namespace TimetableDesigner.Backend.Events.OutboxPattern; public class EventOutboxSender : BackgroundService where TDbContext : DbContext, IEventOutboxDbContext { + private readonly IConfiguration _configuration; private readonly IServiceScopeFactory _serviceScopeFactory; private readonly IEventQueuePublisher _eventQueuePublisher; - public EventOutboxSender(IServiceScopeFactory serviceScopeFactory, IEventQueuePublisher eventQueuePublisher) + public EventOutboxSender(IConfiguration configuration, IServiceScopeFactory serviceScopeFactory, IEventQueuePublisher eventQueuePublisher) { + _configuration = configuration; _serviceScopeFactory = serviceScopeFactory; _eventQueuePublisher = eventQueuePublisher; } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { + IConfigurationSection section = _configuration.GetSection("Workers:EventOutboxSender"); + int delayBetweenQueries = section.GetValue("DelayBetweenEmptyQueries", 10); + int limitInSingleQuery = section.GetValue("LimitInSingleQuery", 50); + using (IServiceScope scope = _serviceScopeFactory.CreateScope()) using (TDbContext dbContext = scope.ServiceProvider.GetRequiredService()) { @@ -25,29 +32,37 @@ public class EventOutboxSender : BackgroundService where TDbContext while (!stoppingToken.IsCancellationRequested) { - Event? eventData = await dbContext.Events.FirstOrDefaultAsync(x => x.LastRetryOn == null, stoppingToken) - ?? await dbContext.Events.OrderBy(x => x.LastRetryOn).FirstOrDefaultAsync(stoppingToken); + IAsyncEnumerable? events = dbContext.Events + .OrderBy(x => x.LastRetryOn) + .Take(limitInSingleQuery) + .ToAsyncEnumerable(); - if (eventData is null) + bool any = false; + await foreach (Event eventData in events) { - continue; - } - - Type payloadType = Type.GetType(eventData.PayloadType)!; - JsonSerializer.Deserialize(eventData.Payload, payloadType); - - try - { - await _eventQueuePublisher.PublishAsync(eventData.Payload, payloadType); + any = true; - dbContext.Events.Remove(eventData); + Type payloadType = Type.GetType(eventData.PayloadType)!; + JsonSerializer.Deserialize(eventData.Payload, payloadType); + + try + { + await _eventQueuePublisher.PublishAsync(eventData.Payload, payloadType); + + dbContext.Events.Remove(eventData); + } + catch + { + eventData.LastRetryOn = DateTimeOffset.UtcNow; + eventData.RetryCount++; + } } - catch + + if (!any) { - eventData.LastRetryOn = DateTimeOffset.UtcNow; - eventData.RetryCount++; + await Task.Delay(TimeSpan.FromSeconds(delayBetweenQueries), stoppingToken); } - finally + else { await dbContext.SaveChangesAsync(stoppingToken); }