Dławienie zadań asynchronicznych

Chciałbym uruchomić kilka zadań asynchronicznych, z limitem ilości zadań oczekujących na ukończenie w danym momencie.

Załóżmy, że masz 1000 adresów URL i chcesz mieć otwarte tylko 50 żądań na raz; ale gdy tylko jedno żądanie się zakończy, otwierasz połączenie z następnym adresem URL na liście. W ten sposób zawsze jest dokładnie 50 połączeń otwartych na raz, dopóki lista adresów URL nie zostanie wyczerpana.

Chcę również wykorzystać określoną liczbę wątków, jeśli możliwe.

Wymyśliłem metodę rozszerzenia, ThrottleTasksAsync która robi to, co chcę. Czy istnieje już prostsze rozwiązanie? Zakładam, że jest to powszechny scenariusz.

Użycie:

class Program
{
    static void Main(string[] args)
    {
        Enumerable.Range(1, 10).ThrottleTasksAsync(5, 2, async i => { Console.WriteLine(i); return i; }).Wait();

        Console.WriteLine("Press a key to exit...");
        Console.ReadKey(true);
    }
}

Oto kod:

static class IEnumerableExtensions
{
    public static async Task<Result_T[]> ThrottleTasksAsync<Enumerable_T, Result_T>(this IEnumerable<Enumerable_T> enumerable, int maxConcurrentTasks, int maxDegreeOfParallelism, Func<Enumerable_T, Task<Result_T>> taskToRun)
    {
        var blockingQueue = new BlockingCollection<Enumerable_T>(new ConcurrentBag<Enumerable_T>());

        var semaphore = new SemaphoreSlim(maxConcurrentTasks);

        // Run the throttler on a separate thread.
        var t = Task.Run(() =>
        {
            foreach (var item in enumerable)
            {
                // Wait for the semaphore
                semaphore.Wait();
                blockingQueue.Add(item);
            }

            blockingQueue.CompleteAdding();
        });

        var taskList = new List<Task<Result_T>>();

        Parallel.ForEach(IterateUntilTrue(() => blockingQueue.IsCompleted), new ParallelOptions { MaxDegreeOfParallelism = maxDegreeOfParallelism },
        _ =>
        {
            Enumerable_T item;

            if (blockingQueue.TryTake(out item, 100))
            {
                taskList.Add(
                    // Run the task
                    taskToRun(item)
                    .ContinueWith(tsk =>
                        {
                            // For effect
                            Thread.Sleep(2000);

                            // Release the semaphore
                            semaphore.Release();

                            return tsk.Result;
                        }
                    )
                );
            }
        });

        // Await all the tasks.
        return await Task.WhenAll(taskList);
    }

    static IEnumerable<bool> IterateUntilTrue(Func<bool> condition)
    {
        while (!condition()) yield return true;
    }
}

Metoda wykorzystuje BlockingCollection i SemaphoreSlim, aby to zadziałało. Przepustnica jest uruchamiana na jednym wątku, a wszystkie zadania asynchroniczne są uruchamiane na drugim wątku. Aby osiągnąć równoległość, dodałem parametr maxDegreeOfParallelism, który jest przekazywany do pętli Parallel.ForEach ponownie jako pętla while.

Stara wersja brzmiała:

foreach (var master = ...)
{
    var details = ...;
    Parallel.ForEach(details, detail => {
        // Process each detail record here
    }, new ParallelOptions { MaxDegreeOfParallelism = 15 });
    // Perform the final batch updates here
}

Ale Pula wątków szybko się wyczerpuje, a ty nie możesz async/await.

Bonus: Aby obejść problem w BlockingCollection gdzie wyjątek jest wrzucany w Take() kiedy wywołane jest CompleteAdding(), używam przeciążenia TryTake z timeoutem. Jeśli nie użyję timeoutu w TryTake, to pokonałoby to cel użycia BlockingCollection, ponieważ TryTake nie będzie blokować. Jest lepszy sposób? Najlepiej byłoby, gdyby TakeAsync metoda.

Author: Josh Wyant, 2014-03-19

3 answers

Zgodnie z sugestią, użyj TPL Dataflow.

A TransformBlock<TInput, TOutput> może tego szukasz.

Definiujesz MaxDegreeOfParallelism, aby ograniczyć liczbę łańcuchów, które można przekształcić (np. ile adresów URL można pobrać) równolegle. Następnie publikujesz adresy URL do bloku, a kiedy skończysz, mówisz blokowi, że skończyłeś dodawać elementy i pobierasz odpowiedzi.

var downloader = new TransformBlock<string, HttpResponse>(
        url => Download(url),
        new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 50 }
    );

var buffer = new BufferBlock<HttpResponse>();
downloader.LinkTo(buffer);

foreach(var url in urls)
    downloader.Post(url);
    //or await downloader.SendAsync(url);

downloader.Complete();
await downloader.Completion;

IList<HttpResponse> responses;
if (buffer.TryReceiveAll(out responses))
{
    //process responses
}

Uwaga: TransformBlock bufory zarówno wejściowe, jak i wyjściowe. Dlaczego więc musimy powiązać go z BufferBlock?

Ponieważ TransformBlock nie zakończy się, dopóki wszystkie przedmioty (HttpResponse) nie zostaną skonsumowane i await downloader.Completion zawisną. Zamiast tego pozwalamy downloader przekazać wszystkie swoje dane wyjściowe do dedykowanego bloku bufora - następnie czekamy na zakończenie downloader i sprawdzamy blok bufora.

 50
Author: dcastro,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2015-02-10 19:42:31

Powiedzmy, że masz 1000 adresów URL i chcesz mieć tylko 50 zapytań otwartych na czas; ale gdy tylko jedno żądanie się zakończy, otwierasz połączenie do następnego adresu URL na liście. W ten sposób zawsze jest dokładnie 50 połączenia otwierają się na raz, dopóki lista adresów URL nie zostanie wyczerpana.

Następujące proste rozwiązanie pojawiło się wiele razy tutaj NA SO. Nie używa kodu blokującego i nie tworzy jawnie wątków, więc bardzo dobrze skaluje:

const int MAX_DOWNLOADS = 50;

static async Task DownloadAsync(string[] urls)
{
    using (var semaphore = new SemaphoreSlim(MAX_DOWNLOADS))
    using (var httpClient = new HttpClient())
    {
        var tasks = urls.Select(async url => 
        {
            await semaphore.WaitAsync();
            try
            {
                var data = await httpClient.GetStringAsync(url);
                Console.WriteLine(data);
            }
            finally
            {
                semaphore.Release();
            }
        });

        await Task.WhenAll(tasks);
    }
}

Rzecz jest, przetwarzanie pobranych danych powinno odbywać się w innym potoku , z innym poziomem równoległości, zwłaszcza jeśli jest to przetwarzanie związane z CPU.

Na przykład, prawdopodobnie chcesz mieć 4 wątki jednocześnie przetwarzające dane (Liczba rdzeni procesora) i do 50 oczekujących żądań o więcej danych (które w ogóle nie używają wątków). AFAICT, to nie jest to, co twój kod obecnie robi.

Tam TPL Dataflow lub Rx mogą przydają się jako preferowane rozwiązanie. Jednak z pewnością jest możliwe zaimplementowanie czegoś takiego za pomocą zwykłego TPL. Uwaga, jedynym kodem blokującym jest ten, który przetwarza dane wewnątrz Task.Run:

const int MAX_DOWNLOADS = 50;
const int MAX_PROCESSORS = 4;

// process data
class Processing
{
    SemaphoreSlim _semaphore = new SemaphoreSlim(MAX_PROCESSORS);
    HashSet<Task> _pending = new HashSet<Task>();
    object _lock = new Object();

    async Task ProcessAsync(string data)
    {
        await _semaphore.WaitAsync();
        try
        {
            await Task.Run(() =>
            {
                // simuate work
                Thread.Sleep(1000);
                Console.WriteLine(data);
            });
        }
        finally
        {
            _semaphore.Release();
        }
    }

    public async void QueueItemAsync(string data)
    {
        var task = ProcessAsync(data);
        lock (_lock)
            _pending.Add(task);
        try
        {
            await task;
        }
        catch
        {
            if (!task.IsCanceled && !task.IsFaulted)
                throw; // not the task's exception, rethrow
            // don't remove faulted/cancelled tasks from the list
            return;
        }
        // remove successfully completed tasks from the list 
        lock (_lock)
            _pending.Remove(task);
    }

    public async Task WaitForCompleteAsync()
    {
        Task[] tasks;
        lock (_lock)
            tasks = _pending.ToArray();
        await Task.WhenAll(tasks);
    }
}

// download data
static async Task DownloadAsync(string[] urls)
{
    var processing = new Processing();

    using (var semaphore = new SemaphoreSlim(MAX_DOWNLOADS))
    using (var httpClient = new HttpClient())
    {
        var tasks = urls.Select(async (url) =>
        {
            await semaphore.WaitAsync();
            try
            {
                var data = await httpClient.GetStringAsync(url);
                // put the result on the processing pipeline
                processing.QueueItemAsync(data);
            }
            finally
            {
                semaphore.Release();
            }
        });

        await Task.WhenAll(tasks.ToArray());
        await processing.WaitForCompleteAsync();
    }
}
 34
Author: noseratio,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2016-10-10 19:39:25

Zgodnie z życzeniem, oto kod, który wybrałem.

Praca jest ustawiana w konfiguracji master-detail, a każdy master jest przetwarzany jako wsad. Każda jednostka pracy jest ustawiona w kolejce w ten sposób:

var success = true;

// Start processing all the master records.
Master master;
while (null != (master = await StoredProcedures.ClaimRecordsAsync(...)))
{
    await masterBuffer.SendAsync(master);
}

// Finished sending master records
masterBuffer.Complete();

// Now, wait for all the batches to complete.
await batchAction.Completion;

return success;

Wzorce są buforowane pojedynczo, aby zapisać pracę dla innych procesów zewnętrznych. Szczegóły dla każdego mistrza są wysyłane do pracy za pośrednictwem masterTransform TransformManyBlock. A BatchedJoinBlock jest również tworzony w celu zebrania szczegółów w jednej partii.

Rzeczywista praca jest wykonywana w detailTransform TransformBlock, asynchronicznie, 150 na raz. BoundedCapacity jest ustawione na 300, aby upewnić się, że zbyt wiele wzorców nie zostanie buforowanych na początku łańcucha, a jednocześnie pozostawia miejsce na wystarczającą ilość szczegółowych rekordów do kolejkowania, aby umożliwić przetwarzanie 150 rekordów jednocześnie. Blok wysyła object do swoich celów, ponieważ jest filtrowany przez łącza w zależności od tego, czy jest to Detail czy Exception.

The batchAction ActionBlock zbiera Dane wyjściowe ze wszystkich partii i wykonuje masowe aktualizacje bazy danych, błąd logowanie itp. dla każdej partii.

Będzie kilka BatchedJoinBlock s, po jednym dla każdego mistrza. Ponieważ każda ISourceBlock jest wysyłana sekwencyjnie, a każda partia akceptuje tylko liczbę szczegółowych rekordów związanych z jednym wzorcem, partie będą przetwarzane w kolejności. Każdy blok wysyła tylko jedną grupę i jest odłączany po zakończeniu. Tylko ostatni blok wsadowy propaguje jego zakończenie do ostatecznego ActionBlock.

Sieć przepływu danych:

// The dataflow network
BufferBlock<Master> masterBuffer = null;
TransformManyBlock<Master, Detail> masterTransform = null;
TransformBlock<Detail, object> detailTransform = null;
ActionBlock<Tuple<IList<object>, IList<object>>> batchAction = null;

// Buffer master records to enable efficient throttling.
masterBuffer = new BufferBlock<Master>(new DataflowBlockOptions { BoundedCapacity = 1 });

// Sequentially transform master records into a stream of detail records.
masterTransform = new TransformManyBlock<Master, Detail>(async masterRecord =>
{
    var records = await StoredProcedures.GetObjectsAsync(masterRecord);

    // Filter the master records based on some criteria here
    var filteredRecords = records;

    // Only propagate completion to the last batch
    var propagateCompletion = masterBuffer.Completion.IsCompleted && masterTransform.InputCount == 0;

    // Create a batch join block to encapsulate the results of the master record.
    var batchjoinblock = new BatchedJoinBlock<object, object>(records.Count(), new GroupingDataflowBlockOptions { MaxNumberOfGroups = 1 });

    // Add the batch block to the detail transform pipeline's link queue, and link the batch block to the the batch action block.
    var detailLink1 = detailTransform.LinkTo(batchjoinblock.Target1, detailResult => detailResult is Detail);
    var detailLink2 = detailTransform.LinkTo(batchjoinblock.Target2, detailResult => detailResult is Exception);
    var batchLink = batchjoinblock.LinkTo(batchAction, new DataflowLinkOptions { PropagateCompletion = propagateCompletion });

    // Unlink batchjoinblock upon completion.
    // (the returned task does not need to be awaited, despite the warning.)
    batchjoinblock.Completion.ContinueWith(task =>
    {
        detailLink1.Dispose();
        detailLink2.Dispose();
        batchLink.Dispose();
    });

    return filteredRecords;
}, new ExecutionDataflowBlockOptions { BoundedCapacity = 1 });

// Process each detail record asynchronously, 150 at a time.
detailTransform = new TransformBlock<Detail, object>(async detail => {
    try
    {
        // Perform the action for each detail here asynchronously
        await DoSomethingAsync();

        return detail;
    }
    catch (Exception e)
    {
        success = false;
        return e;
    }

}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 150, BoundedCapacity = 300 });

// Perform the proper action for each batch
batchAction = new ActionBlock<Tuple<IList<object>, IList<object>>>(async batch =>
{
    var details = batch.Item1.Cast<Detail>();
    var errors = batch.Item2.Cast<Exception>();

    // Do something with the batch here
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 });

masterBuffer.LinkTo(masterTransform, new DataflowLinkOptions { PropagateCompletion = true });
masterTransform.LinkTo(detailTransform, new DataflowLinkOptions { PropagateCompletion = true });
 3
Author: Josh Wyant,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2014-04-08 19:31:07