суббота, 6 июня 2026 г.

Alice, MyBackgroundServicesApp.slnx, MessageBus

Alice, MyBackgroundServicesApp.slnx, MessageBus

D:\Projects\VS\2605\Alice\MyBackgroundServicesApp\MyBackgroundServicesApp.slnx

D:\Projects\VS\2605\Alice\MyBackgroundServicesApp\MyBackgroundServicesApp\MyBackgroundServicesApp.csproj

Service1, Service2, Service3

----------------------------------------------------------

AliceProj_260606.txt - Notes

----------------------------------------------------------

// Bus/IMessageBus.cs
interface IMessageBus
{
    ValueTask PublishAsync(IMessage message, CancellationToken ct = default);
    ValueTask SendAsync(ICommand command, CancellationToken ct = default);
}
---------------------------
// Bus/MessageBus.cs

using System.Threading.Channels;

class MessageBus : IMessageBus
{
    private readonly Channel<IMessage> _channel;
    private readonly IServiceProvider _serviceProvider;

    public MessageBus(IServiceProvider serviceProvider, Channel<IMessage> channel)
    {
        _serviceProvider = serviceProvider;
        _channel = channel;
    }

    public ValueTask PublishAsync(IMessage message, CancellationToken ct = default)
    {
        return _channel.Writer.WriteAsync(message, ct);
    }

    public ValueTask SendAsync(ICommand command, CancellationToken ct = default)
    {
        return _channel.Writer.WriteAsync(command, ct);
    }
}
-----------------------------
// Bus/MessageRouter.cs

using System.Reflection;
using System.Threading.Channels;

class MessageRouter : BackgroundService
{
    private readonly ChannelReader<IMessage> _reader;
    private readonly IServiceProvider _serviceProvider;

    public MessageRouter(Channel<IMessage> channel, IServiceProvider serviceProvider)
    {
        _reader = channel.Reader;
        _serviceProvider = serviceProvider;
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        Console.WriteLine("MessageRouter started processing messages...");

        try
        {
            await foreach (var message in _reader.ReadAllAsync(stoppingToken))
            {
                if (stoppingToken.IsCancellationRequested) break;

                await RouteMessageAsync(message, stoppingToken);
            }
        }
        catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
        {
            Console.WriteLine("MessageRouter received shutdown signal, finishing current work...");
        }

        Console.WriteLine("MessageRouter finished processing all messages.");
    }

    private async Task RouteMessageAsync(IMessage message, CancellationToken ct)
    {
        var messageType = message.GetType();
        var handlerInterface = typeof(IHandler<>).MakeGenericType(messageType);
        var handlers = _serviceProvider.GetServices(handlerInterface);

        if (!handlers.Any())
        {
            Console.WriteLine($"No handlers found for message type: {messageType.Name}");
            return;
        }

        var executionMode = ExecutionMode.Parallel; // можно настроить иначе

        await ExecutionManager.ExecuteHandlersAsync(
            handlers,
            message,
            executionMode,
            ct);
    }
}

Комментариев нет:

Отправить комментарий