Qwen, Deepseek, Alice,Tpl_Dataflow, WorkerServiceSln.slnx, PublisherServiceEtalon
Qwen -> Alice -> Deepseek
Начали с Qwen, продолжили с Alice, закончили с Deepseek
PublisherServiceEtalon.cs - Etalon
https://chat.qwen.ai/s/85ca8d90-4ed5-4ae5-af78-daff549e477e?fev=0.2.63
https://chat.deepseek.com/share/h7qmvqrtdnql2ia53m
------------------------------------------------------------------------------
// Etalon
protected override async Task ExecuteAsync(CancellationToken stoppingToken) { SetupIncomingQuotesProcessing(stoppingToken); _logger.LogInformation("Publisher Service запущен. Ожидаем котировки..."); await RunQuotePublicationLoop(stoppingToken); }private async Task RunQuotePublicationLoop(CancellationToken stoppingToken) { var symbols = new[] { "AAPL", "GOOGL", "MSFT", "AMZN", "TSLA" }; while (!stoppingToken.IsCancellationRequested) { try { var quote = GenerateRandomQuote(symbols); await PublishQuoteAsync(quote, stoppingToken); UpdateAndLogPublicationStats(); await Task.Delay(TimeSpan.FromMilliseconds(500), stoppingToken); } catch (OperationCanceledException) { _logger.LogInformation("Публикация котировок прервана по запросу остановки."); break; } catch (Exception ex) { _logger.LogError(ex, "Ошибка при публикации котировки"); } } _logger.LogInformation("Publisher завершил публикацию. Всего: {Count}", _publishedCount); _channel.PublisherToSubscriber.Complete(); }
-----------------------------------------------------------------------------
D:\Projects\VS02\2606\TplDataflow\Qwen\WorkerServiceSln\WorkerServiceSln.slnx
D:\Projects\VS02\2606\TplDataflow\Qwen\WorkerServiceSln\WorkerService\WorkerService.csproj
<Project Sdk="Microsoft.NET.Sdk.Worker"> <PropertyGroup> <TargetFramework>net10.0</TargetFramework> <Nullable>enable</Nullable> <ImplicitUsings>enable</ImplicitUsings> <UserSecretsId>dotnet-WorkerService-a8a25a95-4bde-4a2f-8eea-1522a10dcd74</UserSecretsId> </PropertyGroup> <ItemGroup> <PackageReference Include="Microsoft.Extensions.Hosting" Version="10.0.9" /> </ItemGroup> </Project>
-------------------------------------------------------------------------------------------// Services/PublisherService.cs using System.Threading.Tasks.Dataflow; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using WorkerService.Dataflow; using WorkerService.Models; namespace WorkerService.Services { public class PublisherService(QuoteChannel channel, ILogger<PublisherService> logger) : BackgroundService { private readonly QuoteChannel _channel = channel; private readonly ILogger<PublisherService> _logger = logger; private readonly Random _random = new(); // TPL Dataflow блок для обработки входящих котировок (от Subscriber) private ActionBlock<Quote>? _incomingQuotesBlock; private int _publishedCount; private int _receivedCount; protected override Task ExecuteAsync(CancellationToken stoppingToken) { // Создаем блок для обработки входящих котировок от Subscriber _incomingQuotesBlock = new ActionBlock<Quote>( quote => ProcessIncomingQuote(quote), new ExecutionDataflowBlockOptions { BoundedCapacity = 10, MaxDegreeOfParallelism = 2, CancellationToken = stoppingToken, NameFormat = "Publisher-Incoming" }); // Подписываемся на обратный канал _channel.SubscriberToPublisher.LinkTo( _incomingQuotesBlock, new DataflowLinkOptions { PropagateCompletion = true }); _logger.LogInformation("Publisher Service запущен. Ожидаем котировки..."); return PublishQuotesAsync(stoppingToken); // !!!!!!!!!! } private async Task PublishQuotesAsync(CancellationToken stoppingToken) { var symbols = new[] { "AAPL", "GOOGL", "MSFT", "AMZN", "TSLA" }; while (!stoppingToken.IsCancellationRequested) { try { var symbol = symbols[_random.Next(symbols.Length)]; var quote = new Quote { Symbol = symbol, Price = 100 + _random.Next(-20, 20) + (decimal)_random.NextDouble(), Timestamp = DateTime.UtcNow, Source = "Publisher" }; // Публикуем в основной канал await _channel.PublisherToSubscriber.SendAsync(quote, stoppingToken); // Отправляем копию в мониторинг _channel.MonitoringBroadcast.Post(quote); _publishedCount++; if (_publishedCount % 10 == 0) { _logger.LogInformation("Publisher: опубликовано {Count} котировок", _publishedCount); } await Task.Delay(TimeSpan.FromMilliseconds(500), stoppingToken); } catch (OperationCanceledException) { break; } catch (Exception ex) { _logger.LogError(ex, "Ошибка при публикации котировки"); } } _logger.LogInformation("Publisher завершил публикацию. Всего: {Count}", _publishedCount); _channel.PublisherToSubscriber.Complete(); } private void ProcessIncomingQuote(Quote quote) { _receivedCount++; _logger.LogInformation("Publisher получил обратную связь: {Quote} (всего получено: {Count})", quote, _receivedCount); } public override async Task StopAsync(CancellationToken cancellationToken) { _logger.LogInformation("Publisher Service останавливается..."); _incomingQuotesBlock?.Complete(); await base.StopAsync(cancellationToken); } } }
---------------------------------------------------------------------------------------
Особенно мне понравилось вот это return PublishQuotesAsync(stoppingToken); - висим на return-е Шедевр, не правда ли
--------------------------------------------------------------------------------------------------------Отличный код для демонстрации анти-паттерна! Вы абсолютно правы, это действительно «шедевр» в своем роде.
Давайте разберем, почему
return PublishQuotesAsync(stoppingToken);— это проблема, и что еще интересного есть в коде.Главная проблема:
return PublishQuotesAsync()В
ExecuteAsyncпроисходит следующее:
Создается
ActionBlockи настраивается подписка на каналСразу возвращается задача от
PublishQuotesAsync
BackgroundServiceсчитает, чтоExecuteAsyncзавершился (вернул задачу)Но
PublishQuotesAsync— это бесконечный цикл, который никогда не завершится самПочему это плохо?
// Как должно быть (правильно) protected override async Task ExecuteAsync(CancellationToken stoppingToken) { // Инициализация... // await, а не return — тогда метод остается "живым" await PublishQuotesAsync(stoppingToken); // Cleanup код здесь выполнится только после завершения PublishQuotesAsync }Последствия текущей реализации:
BackgroundServiceдумает, что сервис запустился, и переводит его в состояниеRunningЕсли
PublishQuotesAsyncвыбросит исключение, сервис упадет, ноStopAsyncуже не вызовется корректноЗавершение сервиса через
StopAsyncсработает, ноExecuteAsyncуже вернул управление — это сбивает с толку модель жизненного цикла
Другие интересные моменты
1. Каналы и Dataflow смешаны, но не синхронизированы
// PublisherToSubscriber — канал (awaitable)
await _channel.PublisherToSubscriber.SendAsync(quote, stoppingToken);
// MonitoringBroadcast — Dataflow блок (post, не await)
_channel.MonitoringBroadcast.Post(quote);Мониторинг может терять сообщения, если блок переполнен (BoundedCapacity у MonitoringBroadcast не задан).
2. Нет обработки завершения PublisherToSubscriber
Вы вызываете _channel.PublisherToSubscriber.Complete(), но никто не ждет, пока все сообщения будут обработаны подписчиками.
3. ExecuteAsync не async
Метод не использует async/await, но вызывает асинхронные методы. Это ок, но странно на фоне остального стиля.
4. Гонка при остановке
В StopAsync вы вызываете _incomingQuotesBlock?.Complete(), но PublishQuotesAsync продолжает работать в фоне, пока ее цикл не прервется по stoppingToken. Никто не ждет ее завершения.
Как исправить:
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_incomingQuotesBlock = new ActionBlock<Quote>(...);
_channel.SubscriberToPublisher.LinkTo(_incomingQuotesBlock, ...);
_logger.LogInformation("Publisher Service запущен");
try
{
await PublishQuotesAsync(stoppingToken);
}
finally
{
_channel.PublisherToSubscriber.Complete();
await _incomingQuotesBlock.Completion;
}
}И сделать StopAsync:
public override async Task StopAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("Остановка...");
_incomingQuotesBlock?.Complete();
await _incomingQuotesBlock?.Completion;
await base.StopAsync(cancellationToken);
}Вердикт
Код демонстрирует хорошее знание TPL Dataflow и Channel, но фундаментальное непонимание жизненного цикла BackgroundService.
Это классический случай, когда код «работает» в тестах (потому что сервис редко останавливают), но в production вылезут баги с зависшими при остановке потоками или необработанными исключениями.
Ваш сарказм по поводу return PublishQuotesAsync() абсолютно заслужен!
-------------------------------------------
Alice
-------------------------------------------------------------------------------------------------------------
Вот совмещённый код — вся логика помещена в метод ExecuteAsync, а вспомогательные фрагменты встроены напрямую:
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
// Создаём блок для обработки входящих котировок от Subscriber
_incomingQuotesBlock = new ActionBlock<Quote>(
quote =>
{
_receivedCount++;
_logger.LogInformation(
"Publisher получил обратную связь: {Quote} (всего получено: {Count})",
quote, _receivedCount);
},
new ExecutionDataflowBlockOptions
{
BoundedCapacity = 10,
MaxDegreeOfParallelism = 2,
CancellationToken = stoppingToken,
NameFormat = "Publisher-Incoming"
});
// Подписываемся на обратный канал
_channel.SubscriberToPublisher.LinkTo(
_incomingQuotesBlock,
new DataflowLinkOptions { PropagateCompletion = true });
_logger.LogInformation("Publisher Service запущен. Ожидаем котировки...");
var symbols = new[] { "AAPL", "GOOGL", "MSFT", "AMZN", "TSLA" };
while (!stoppingToken.IsCancellationRequested)
{
try
{
var symbol = symbols[_random.Next(symbols.Length)];
var quote = new Quote
{
Symbol = symbol,
Price = 100 + _random.Next(-20, 20) + (decimal)_random.NextDouble(),
Timestamp = DateTime.UtcNow,
Source = "Publisher"
};
// Публикуем в основной канал
await _channel.PublisherToSubscriber.SendAsync(quote, stoppingToken);
// Отправляем копию в мониторинг
_channel.MonitoringBroadcast.Post(quote);
_publishedCount++;
if (_publishedCount % 10 == 0)
{
_logger.LogInformation("Publisher: опубликовано {Count} котировок", _publishedCount);
}
await Task.Delay(TimeSpan.FromMilliseconds(500), stoppingToken);
}
catch (OperationCanceledException)
{
break;
}
catch (Exception ex)
{
_logger.LogError(ex, "Ошибка при публикации котировки");
}
}
_logger.LogInformation("Publisher завершил публикацию. Всего: {Count}", _publishedCount);
_channel.PublisherToSubscriber.Complete();
}
Что было сделано
Объединение методов. Логика из
PublishQuotesAsyncполностью перенесена внутрьExecuteAsync.Встраивание обработки входящих котировок. Анонимная функция напрямую задана в конструкторе
ActionBlockвместо вызова отдельного методаProcessIncomingQuote. Внутри неё:увеличивается счётчик
_receivedCount;выполняется логирование получения обратной связи.
Сохранение структуры цикла. Основной цикл генерации и отправки котировок остался без изменений — он теперь выполняется непосредственно в
ExecuteAsync.Сохранение всех обработчиков исключений. Блоки
try‑catchдляOperationCanceledExceptionи общих исключений сохранены в неизменном виде.Сохранение логики завершения. После выхода из цикла выполняется:
логирование итогового количества опубликованных котировок;
вызов
_channel.PublisherToSubscriber.Complete()для завершения канала отправки котировок.
Сохранение инициализации. Инициализация
_incomingQuotesBlockи подписка на канал остались на своих местах в начале метода.
Ключевые особенности итогового кода
Один метод. Вся логика сосредоточена в
ExecuteAsync— нет вызовов вспомогательных методов.Асинхронность. Метод помечен как
async Task, что позволяет использоватьawaitвнутри цикла.Обработка отмены. Цикл корректно прерывается при установке
stoppingToken.Логирование. Все ключевые события (запуск, публикация каждых 10‑ти котировок, завершение) логируются.
Обработка ошибок. Исключения корректно перехватываются и логируются, не приводя к аварийному завершению сервиса.
Корректное завершение. Канал отправки котировок завершается вызовом
Complete()после выхода из основного цикла.
Как это работает
При запуске сервиса вызывается
ExecuteAsync.Настраивается блок обработки входящих котировок (
_incomingQuotesBlock) и устанавливается связь с каналом.Запускается основной цикл генерации котировок:
генерируется случайная котировка;
отправляется в основной канал и в мониторинг;
обновляется счётчик опубликованных котировок;
каждые 10 котировок выводится логи;
делается пауза 500 мс.
При получении сигнала отмены (
stoppingToken) цикл прерывается.Выполняется финальное логирование и завершается канал отправки котировок.
Хотите, я уточню какой‑то фрагмент кода или предложу дополнительные улучшения?
----------------------------------------------------------------------------
Комментариев нет:
Отправить комментарий