Alice, MyBackgroundServicesApp.slnx, MessageBus
D:\Projects\VS\2605\Alice\MyBackgroundServicesApp\MyBackgroundServicesApp.slnx
D:\Projects\VS\2605\Alice\MyBackgroundServicesApp\MyBackgroundServicesApp\MyBackgroundServicesApp.csproj
Service1, Service2, Service3
----------------------------------------------------------
AliceProj_260606.txt - Notes
----------------------------------------------------------
// Bus/IMessageBus.cs interface IMessageBus { ValueTask PublishAsync(IMessage message, CancellationToken ct = default); ValueTask SendAsync(ICommand command, CancellationToken ct = default); }
---------------------------// Bus/MessageBus.cs using System.Threading.Channels; class MessageBus : IMessageBus { private readonly Channel<IMessage> _channel; private readonly IServiceProvider _serviceProvider; public MessageBus(IServiceProvider serviceProvider, Channel<IMessage> channel) { _serviceProvider = serviceProvider; _channel = channel; } public ValueTask PublishAsync(IMessage message, CancellationToken ct = default) { return _channel.Writer.WriteAsync(message, ct); } public ValueTask SendAsync(ICommand command, CancellationToken ct = default) { return _channel.Writer.WriteAsync(command, ct); } }
-----------------------------// Bus/MessageRouter.cs using System.Reflection; using System.Threading.Channels; class MessageRouter : BackgroundService { private readonly ChannelReader<IMessage> _reader; private readonly IServiceProvider _serviceProvider; public MessageRouter(Channel<IMessage> channel, IServiceProvider serviceProvider) { _reader = channel.Reader; _serviceProvider = serviceProvider; } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { Console.WriteLine("MessageRouter started processing messages..."); try { await foreach (var message in _reader.ReadAllAsync(stoppingToken)) { if (stoppingToken.IsCancellationRequested) break; await RouteMessageAsync(message, stoppingToken); } } catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested) { Console.WriteLine("MessageRouter received shutdown signal, finishing current work..."); } Console.WriteLine("MessageRouter finished processing all messages."); } private async Task RouteMessageAsync(IMessage message, CancellationToken ct) { var messageType = message.GetType(); var handlerInterface = typeof(IHandler<>).MakeGenericType(messageType); var handlers = _serviceProvider.GetServices(handlerInterface); if (!handlers.Any()) { Console.WriteLine($"No handlers found for message type: {messageType.Name}"); return; } var executionMode = ExecutionMode.Parallel; // можно настроить иначе await ExecutionManager.ExecuteHandlersAsync( handlers, message, executionMode, ct); } }
Комментариев нет:
Отправить комментарий