Gniazdowanie równoległe.ForEach

W aplikacji metro muszę wykonać kilka połączeń WCF. Istnieje znaczna liczba połączeń do wykonania, więc muszę zrobić je w pętli równoległej. Problem polega na tym, że pętla równoległa kończy się przed zakończeniem wszystkich wywołań WCF.

Jak byś to przeformułował, żeby działało zgodnie z oczekiwaniami?

var ids = new List<string>() { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" };
var customers = new  System.Collections.Concurrent.BlockingCollection<Customer>();

Parallel.ForEach(ids, async i =>
{
    ICustomerRepo repo = new CustomerRepo();
    var cust = await repo.GetCustomer(i);
    customers.Add(cust);
});

foreach ( var customer in customers )
{
    Console.WriteLine(customer.ID);
}

Console.ReadKey();
Author: Paul Brinkley, 2012-07-19

9 answers

Cała idea Parallel.ForEach() polega na tym, że masz zbiór wątków i każdy wątek przetwarza część kolekcji. Jak zauważyłeś, to nie działa z async-await, gdzie chcesz zwolnić wątek na czas trwania połączenia asynchronicznego.

Można to" naprawić " blokując ForEach() wątki, ale to pokonuje cały punkt async-await.

Możesz użyć TPL Dataflow zamiast Parallel.ForEach(), który obsługuje asynchroniczne Tasks cóż.

W szczególności, Twój kod może być napisany za pomocą TransformBlock, który przekształca każdy identyfikator w Customer za pomocą async lambda. Blok ten może być skonfigurowany do wykonywania równolegle. Możesz połączyć ten blok z ActionBlock, który zapisuje każdy Customer do konsoli. Po skonfigurowaniu sieci blokowej można Post() każdy identyfikator do TransformBlock.

W kodzie:

var ids = new List<string> { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" };

var getCustomerBlock = new TransformBlock<string, Customer>(
    async i =>
    {
        ICustomerRepo repo = new CustomerRepo();
        return await repo.GetCustomer(i);
    }, new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
    });
var writeCustomerBlock = new ActionBlock<Customer>(c => Console.WriteLine(c.ID));
getCustomerBlock.LinkTo(
    writeCustomerBlock, new DataflowLinkOptions
    {
        PropagateCompletion = true
    });

foreach (var id in ids)
    getCustomerBlock.Post(id);

getCustomerBlock.Complete();
writeCustomerBlock.Completion.Wait();

Chociaż prawdopodobnie chcesz ograniczyć równoległość TransformBlock do jakiejś małej stałej. Ponadto, można ograniczyć pojemność TransformBlock i dodaj do niej elementy asynchronicznie używając SendAsync(), na przykład, jeśli kolekcja jest zbyt duża.

Dodatkową korzyścią w porównaniu z Twoim kodem (jeśli zadziałał) jest to, że pisanie rozpocznie się natychmiast po zakończeniu pojedynczego elementu i nie będzie czekać, aż całe przetwarzanie zostanie zakończone.

 119
Author: svick,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2018-02-25 18:12:45

Odpowiedź Svicka jest (jak zwykle) doskonała.

Jednak uważam, że przepływ danych jest bardziej przydatny, gdy rzeczywiście masz duże ilości danych do przesłania. Lub gdy potrzebujesz kolejki zgodnej z async.

W Twoim przypadku prostszym rozwiązaniem jest użycie równoległości w stylu async:

var ids = new List<string>() { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" };

var customerTasks = ids.Select(i =>
  {
    ICustomerRepo repo = new CustomerRepo();
    return repo.GetCustomer(i);
  });
var customers = await Task.WhenAll(customerTasks);

foreach (var customer in customers)
{
  Console.WriteLine(customer.ID);
}

Console.ReadKey();
 87
Author: Stephen Cleary,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-05-23 10:31:28

Używanie przepływu danych, jak zasugerował svick, może być przesadą, a odpowiedź Stephena nie zapewnia środków do kontrolowania współbieżności operacji. Można to jednak osiągnąć dość prosto:

public static async Task RunWithMaxDegreeOfConcurrency<T>(
     int maxDegreeOfConcurrency, IEnumerable<T> collection, Func<T, Task> taskFactory)
{
    var activeTasks = new List<Task>(maxDegreeOfConcurrency);
    foreach (var task in collection.Select(taskFactory))
    {
        activeTasks.Add(task);
        if (activeTasks.Count == maxDegreeOfConcurrency)
        {
            await Task.WhenAny(activeTasks.ToArray());
            //observe exceptions here
            activeTasks.RemoveAll(t => t.IsCompleted); 
        }
    }
    await Task.WhenAll(activeTasks.ToArray()).ContinueWith(t => 
    {
        //observe exceptions in a manner consistent with the above   
    });
}

Wywołania ToArray() mogą być zoptymalizowane przez użycie tablicy zamiast listy i zastąpienie ukończonych zadań, ale wątpię, by to miało znaczenie w większości scenariuszy. Przykładowe użycie na pytanie OP:

RunWithMaxDegreeOfConcurrency(10, ids, async i =>
{
    ICustomerRepo repo = new CustomerRepo();
    var cust = await repo.GetCustomer(i);
    customers.Add(cust);
});

EDIT kolega SO user i TPL wiz Eli Arbel wskazał mi powiązany artykuł od Stephena Touba. Jak zwykle, jego realizacja jest zarówno elegancka, jak i skuteczna: {]}

public static Task ForEachAsync<T>(
      this IEnumerable<T> source, int dop, Func<T, Task> body) 
{ 
    return Task.WhenAll( 
        from partition in Partitioner.Create(source).GetPartitions(dop) 
        select Task.Run(async delegate { 
            using (partition) 
                while (partition.MoveNext()) 
                    await body(partition.Current).ContinueWith(t => 
                          {
                              //observe exceptions
                          });

        })); 
}
 58
Author: Ohad Schneider,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-05-23 10:31:28

Możesz zaoszczędzić wysiłek dzięki nowemu pakietowi Asyncenumerator NuGet , który nie istniał 4 lata temu, gdy pytanie zostało pierwotnie opublikowane. Pozwala kontrolować stopień równoległości:

using System.Collections.Async;
...

await ids.ParallelForEachAsync(async i =>
{
    ICustomerRepo repo = new CustomerRepo();
    var cust = await repo.GetCustomer(i);
    customers.Add(cust);
},
maxDegreeOfParallelism: 10);

Zastrzeżenie: jestem autorem biblioteki AsyncEnumerator, która jest open source i licencjonowana na MIT, i zamieszczam tę wiadomość tylko po to, aby pomóc społeczności.

 16
Author: Serge Semenov,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2018-02-24 18:00:10

Zawiń Parallel.Foreach w Task.Run() i zamiast await Użyj słowa kluczowego [yourasyncmethod].Result

(musisz wykonać zadanie.Uruchom rzecz, aby nie blokować wątku UI)

CoÅ› takiego:

var yourForeachTask = Task.Run(() =>
        {
            Parallel.ForEach(ids, i =>
            {
                ICustomerRepo repo = new CustomerRepo();
                var cust = repo.GetCustomer(i).Result;
                customers.Add(cust);
            });
        });
await yourForeachTask;
 9
Author: ofcoursedude,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2018-07-13 09:29:44

Powinno to być dość wydajne i łatwiejsze niż uruchomienie całego przepływu danych TPL:

var customers = await ids.SelectAsync(async i =>
{
    ICustomerRepo repo = new CustomerRepo();
    return await repo.GetCustomer(i);
});

...

public static async Task<IList<TResult>> SelectAsync<TSource, TResult>(this IEnumerable<TSource> source, Func<TSource, Task<TResult>> selector, int maxDegreesOfParallelism = 4)
{
    var results = new List<TResult>();

    var activeTasks = new HashSet<Task<TResult>>();
    foreach (var item in source)
    {
        activeTasks.Add(selector(item));
        if (activeTasks.Count >= maxDegreesOfParallelism)
        {
            var completed = await Task.WhenAny(activeTasks);
            activeTasks.Remove(completed);
            results.Add(completed.Result);
        }
    }

    results.AddRange(await Task.WhenAll(activeTasks));
    return results;
}
 6
Author: John Gietzen,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2016-02-17 18:57:32

Po wprowadzeniu kilku metod pomocniczych, będziesz mógł uruchamiać równoległe zapytania za pomocą tego prostego sintax:

const int DegreeOfParallelism = 10;
IEnumerable<double> result = await Enumerable.Range(0, 1000000)
    .Split(DegreeOfParallelism)
    .SelectManyAsync(async i => await CalculateAsync(i).ConfigureAwait(false))
    .ConfigureAwait(false);

To, co dzieje się tutaj, to dzielenie kolekcji źródeł na 10 części (.Split(DegreeOfParallelism)), Następnie uruchamiamy 10 zadań, z których każde przetwarza swoje pozycje jeden po drugim (.SelectManyAsync(...)) i łączymy je z powrotem w jedną listę.

Warto wspomnieć, że istnieje prostsze podejście:

double[] result2 = await Enumerable.Range(0, 1000000)
    .Select(async i => await CalculateAsync(i).ConfigureAwait(false))
    .WhenAll()
    .ConfigureAwait(false);

Ale potrzebuje : Jeśli masz zbyt duży zbiór źródłowy, będzie on]} dla każdego przedmiotu od razu, co może spowodować znaczące uderzenia wydajności.

Metody rozszerzenia użyte w powyższych przykładach wyglądają następująco:

public static class CollectionExtensions
{
    /// <summary>
    /// Splits collection into number of collections of nearly equal size.
    /// </summary>
    public static IEnumerable<List<T>> Split<T>(this IEnumerable<T> src, int slicesCount)
    {
        if (slicesCount <= 0) throw new ArgumentOutOfRangeException(nameof(slicesCount));

        List<T> source = src.ToList();
        var sourceIndex = 0;
        for (var targetIndex = 0; targetIndex < slicesCount; targetIndex++)
        {
            var list = new List<T>();
            int itemsLeft = source.Count - targetIndex;
            while (slicesCount * list.Count < itemsLeft)
            {
                list.Add(source[sourceIndex++]);
            }

            yield return list;
        }
    }

    /// <summary>
    /// Takes collection of collections, projects those in parallel and merges results.
    /// </summary>
    public static async Task<IEnumerable<TResult>> SelectManyAsync<T, TResult>(
        this IEnumerable<IEnumerable<T>> source,
        Func<T, Task<TResult>> func)
    {
        List<TResult>[] slices = await source
            .Select(async slice => await slice.SelectListAsync(func).ConfigureAwait(false))
            .WhenAll()
            .ConfigureAwait(false);
        return slices.SelectMany(s => s);
    }

    /// <summary>Runs selector and awaits results.</summary>
    public static async Task<List<TResult>> SelectListAsync<TSource, TResult>(this IEnumerable<TSource> source, Func<TSource, Task<TResult>> selector)
    {
        List<TResult> result = new List<TResult>();
        foreach (TSource source1 in source)
        {
            TResult result1 = await selector(source1).ConfigureAwait(false);
            result.Add(result1);
        }
        return result;
    }

    /// <summary>Wraps tasks with Task.WhenAll.</summary>
    public static Task<TResult[]> WhenAll<TResult>(this IEnumerable<Task<TResult>> source)
    {
        return Task.WhenAll<TResult>(source);
    }
}
 2
Author: Vitaliy Ulantikov,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-11-17 15:38:22

Jestem trochę spóźniony na imprezę, ale może warto rozważyć użycie GetAwaiter.GetResult (), aby uruchomić kod asynchroniczny w kontekście synchronizacji, ale tak jak paralled jak poniżej;

 Parallel.ForEach(ids, i =>
{
    ICustomerRepo repo = new CustomerRepo();
    // Run this in thread which Parallel library occupied.
    var cust = repo.GetCustomer(i).GetAwaiter().GetResult();
    customers.Add(cust);
});
 1
Author: Teoman shipahi,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2016-11-30 16:30:38

Metoda rozszerzenia do tego, która wykorzystuje Semaforeslim, a także pozwala ustawić maksymalny stopień równoległości

    /// <summary>
    /// Concurrently Executes async actions for each item of <see cref="IEnumerable<typeparamref name="T"/>
    /// </summary>
    /// <typeparam name="T">Type of IEnumerable</typeparam>
    /// <param name="enumerable">instance of <see cref="IEnumerable<typeparamref name="T"/>"/></param>
    /// <param name="action">an async <see cref="Action" /> to execute</param>
    /// <param name="maxDegreeOfParallelism">Optional, An integer that represents the maximum degree of parallelism,
    /// Must be grater than 0</param>
    /// <returns>A Task representing an async operation</returns>
    /// <exception cref="ArgumentOutOfRangeException">If the maxActionsToRunInParallel is less than 1</exception>
    public static async Task ForEachAsyncConcurrent<T>(
        this IEnumerable<T> enumerable,
        Func<T, Task> action,
        int? maxDegreeOfParallelism = null)
    {
        if (maxDegreeOfParallelism.HasValue)
        {
            using (var semaphoreSlim = new SemaphoreSlim(
                maxDegreeOfParallelism.Value, maxDegreeOfParallelism.Value))
            {
                var tasksWithThrottler = new List<Task>();

                foreach (var item in enumerable)
                {
                    // Increment the number of currently running tasks and wait if they are more than limit.
                    await semaphoreSlim.WaitAsync();

                    tasksWithThrottler.Add(Task.Run(async () =>
                    {
                        await action(item);

                        // action is completed, so decrement the number of currently running tasks
                        semaphoreSlim.Release();
                    }));
                }

                // Wait for all tasks to complete.
                await Task.WhenAll(tasksWithThrottler.ToArray());
            }
        }
        else
        {
            await Task.WhenAll(enumerable.Select(item => action(item)));
        }
    }

Przykładowe Użycie:

await enumerable.ForEachAsyncConcurrent(
    async item =>
    {
        await SomeAsyncMethod(item);
    },
    5);
 0
Author: Jay Shah,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2018-05-09 22:46:16