Gniazdowanie równoległe.ForEach
W aplikacji metro muszę wykonać kilka połączeń WCF. Istnieje znaczna liczba połączeń do wykonania, więc muszę zrobić je w pętli równoległej. Problem polega na tym, że pętla równoległa kończy się przed zakończeniem wszystkich wywołań WCF.
Jak byś to przeformułował, żeby działało zgodnie z oczekiwaniami?
var ids = new List<string>() { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" };
var customers = new System.Collections.Concurrent.BlockingCollection<Customer>();
Parallel.ForEach(ids, async i =>
{
ICustomerRepo repo = new CustomerRepo();
var cust = await repo.GetCustomer(i);
customers.Add(cust);
});
foreach ( var customer in customers )
{
Console.WriteLine(customer.ID);
}
Console.ReadKey();
9 answers
Cała idea Parallel.ForEach()
polega na tym, że masz zbiór wątków i każdy wątek przetwarza część kolekcji. Jak zauważyłeś, to nie działa z async
-await
, gdzie chcesz zwolnić wątek na czas trwania połączenia asynchronicznego.
Można to" naprawić " blokując ForEach()
wątki, ale to pokonuje cały punkt async
-await
.
Możesz użyć TPL Dataflow zamiast Parallel.ForEach()
, który obsługuje asynchroniczne Task
s cóż.
W szczególności, Twój kod może być napisany za pomocą TransformBlock
, który przekształca każdy identyfikator w Customer
za pomocÄ… async
lambda. Blok ten może być skonfigurowany do wykonywania równolegle. Możesz połączyć ten blok z ActionBlock
, który zapisuje każdy Customer
do konsoli.
Po skonfigurowaniu sieci blokowej można Post()
każdy identyfikator do TransformBlock
.
W kodzie:
var ids = new List<string> { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" };
var getCustomerBlock = new TransformBlock<string, Customer>(
async i =>
{
ICustomerRepo repo = new CustomerRepo();
return await repo.GetCustomer(i);
}, new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
});
var writeCustomerBlock = new ActionBlock<Customer>(c => Console.WriteLine(c.ID));
getCustomerBlock.LinkTo(
writeCustomerBlock, new DataflowLinkOptions
{
PropagateCompletion = true
});
foreach (var id in ids)
getCustomerBlock.Post(id);
getCustomerBlock.Complete();
writeCustomerBlock.Completion.Wait();
Chociaż prawdopodobnie chcesz ograniczyć równoległość TransformBlock
do jakiejś małej stałej. Ponadto, można ograniczyć pojemność TransformBlock
i dodaj do niej elementy asynchronicznie używając SendAsync()
, na przykład, jeśli kolekcja jest zbyt duża.
Dodatkową korzyścią w porównaniu z Twoim kodem (jeśli zadziałał) jest to, że pisanie rozpocznie się natychmiast po zakończeniu pojedynczego elementu i nie będzie czekać, aż całe przetwarzanie zostanie zakończone.
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2018-02-25 18:12:45
Odpowiedź Svicka jest (jak zwykle) doskonała.
Jednak uważam, że przepływ danych jest bardziej przydatny, gdy rzeczywiście masz duże ilości danych do przesłania. Lub gdy potrzebujesz kolejki zgodnej z async
.
W Twoim przypadku prostszym rozwiązaniem jest użycie równoległości w stylu async
:
var ids = new List<string>() { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" };
var customerTasks = ids.Select(i =>
{
ICustomerRepo repo = new CustomerRepo();
return repo.GetCustomer(i);
});
var customers = await Task.WhenAll(customerTasks);
foreach (var customer in customers)
{
Console.WriteLine(customer.ID);
}
Console.ReadKey();
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-05-23 10:31:28
Używanie przepływu danych, jak zasugerował svick, może być przesadą, a odpowiedź Stephena nie zapewnia środków do kontrolowania współbieżności operacji. Można to jednak osiągnąć dość prosto:
public static async Task RunWithMaxDegreeOfConcurrency<T>(
int maxDegreeOfConcurrency, IEnumerable<T> collection, Func<T, Task> taskFactory)
{
var activeTasks = new List<Task>(maxDegreeOfConcurrency);
foreach (var task in collection.Select(taskFactory))
{
activeTasks.Add(task);
if (activeTasks.Count == maxDegreeOfConcurrency)
{
await Task.WhenAny(activeTasks.ToArray());
//observe exceptions here
activeTasks.RemoveAll(t => t.IsCompleted);
}
}
await Task.WhenAll(activeTasks.ToArray()).ContinueWith(t =>
{
//observe exceptions in a manner consistent with the above
});
}
Wywołania ToArray()
mogą być zoptymalizowane przez użycie tablicy zamiast listy i zastąpienie ukończonych zadań, ale wątpię, by to miało znaczenie w większości scenariuszy. Przykładowe użycie na pytanie OP:
RunWithMaxDegreeOfConcurrency(10, ids, async i =>
{
ICustomerRepo repo = new CustomerRepo();
var cust = await repo.GetCustomer(i);
customers.Add(cust);
});
EDIT kolega SO user i TPL wiz Eli Arbel wskazał mi powiązany artykuł od Stephena Touba. Jak zwykle, jego realizacja jest zarówno elegancka, jak i skuteczna: {]}
public static Task ForEachAsync<T>(
this IEnumerable<T> source, int dop, Func<T, Task> body)
{
return Task.WhenAll(
from partition in Partitioner.Create(source).GetPartitions(dop)
select Task.Run(async delegate {
using (partition)
while (partition.MoveNext())
await body(partition.Current).ContinueWith(t =>
{
//observe exceptions
});
}));
}
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-05-23 10:31:28
Możesz zaoszczędzić wysiłek dzięki nowemu pakietowi Asyncenumerator NuGet , który nie istniał 4 lata temu, gdy pytanie zostało pierwotnie opublikowane. Pozwala kontrolować stopień równoległości:
using System.Collections.Async;
...
await ids.ParallelForEachAsync(async i =>
{
ICustomerRepo repo = new CustomerRepo();
var cust = await repo.GetCustomer(i);
customers.Add(cust);
},
maxDegreeOfParallelism: 10);
Zastrzeżenie: jestem autorem biblioteki AsyncEnumerator, która jest open source i licencjonowana na MIT, i zamieszczam tę wiadomość tylko po to, aby pomóc społeczności.
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2018-02-24 18:00:10
Zawiń Parallel.Foreach
w Task.Run()
i zamiast await
Użyj słowa kluczowego [yourasyncmethod].Result
(musisz wykonać zadanie.Uruchom rzecz, aby nie blokować wątku UI)
CoÅ› takiego:
var yourForeachTask = Task.Run(() =>
{
Parallel.ForEach(ids, i =>
{
ICustomerRepo repo = new CustomerRepo();
var cust = repo.GetCustomer(i).Result;
customers.Add(cust);
});
});
await yourForeachTask;
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2018-07-13 09:29:44
Powinno to być dość wydajne i łatwiejsze niż uruchomienie całego przepływu danych TPL:
var customers = await ids.SelectAsync(async i =>
{
ICustomerRepo repo = new CustomerRepo();
return await repo.GetCustomer(i);
});
...
public static async Task<IList<TResult>> SelectAsync<TSource, TResult>(this IEnumerable<TSource> source, Func<TSource, Task<TResult>> selector, int maxDegreesOfParallelism = 4)
{
var results = new List<TResult>();
var activeTasks = new HashSet<Task<TResult>>();
foreach (var item in source)
{
activeTasks.Add(selector(item));
if (activeTasks.Count >= maxDegreesOfParallelism)
{
var completed = await Task.WhenAny(activeTasks);
activeTasks.Remove(completed);
results.Add(completed.Result);
}
}
results.AddRange(await Task.WhenAll(activeTasks));
return results;
}
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2016-02-17 18:57:32
Po wprowadzeniu kilku metod pomocniczych, będziesz mógł uruchamiać równoległe zapytania za pomocą tego prostego sintax:
const int DegreeOfParallelism = 10;
IEnumerable<double> result = await Enumerable.Range(0, 1000000)
.Split(DegreeOfParallelism)
.SelectManyAsync(async i => await CalculateAsync(i).ConfigureAwait(false))
.ConfigureAwait(false);
To, co dzieje się tutaj, to dzielenie kolekcji źródeł na 10 części (.Split(DegreeOfParallelism)
), Następnie uruchamiamy 10 zadań, z których każde przetwarza swoje pozycje jeden po drugim (.SelectManyAsync(...)
) i Å‚Ä…czymy je z powrotem w jednÄ… listÄ™.
Warto wspomnieć, że istnieje prostsze podejście:
double[] result2 = await Enumerable.Range(0, 1000000)
.Select(async i => await CalculateAsync(i).ConfigureAwait(false))
.WhenAll()
.ConfigureAwait(false);
Ale potrzebuje : Jeśli masz zbyt duży zbiór źródłowy, będzie on]} dla każdego przedmiotu od razu, co może spowodować znaczące uderzenia wydajności.
Metody rozszerzenia użyte w powyższych przykładach wyglądają następująco:
public static class CollectionExtensions
{
/// <summary>
/// Splits collection into number of collections of nearly equal size.
/// </summary>
public static IEnumerable<List<T>> Split<T>(this IEnumerable<T> src, int slicesCount)
{
if (slicesCount <= 0) throw new ArgumentOutOfRangeException(nameof(slicesCount));
List<T> source = src.ToList();
var sourceIndex = 0;
for (var targetIndex = 0; targetIndex < slicesCount; targetIndex++)
{
var list = new List<T>();
int itemsLeft = source.Count - targetIndex;
while (slicesCount * list.Count < itemsLeft)
{
list.Add(source[sourceIndex++]);
}
yield return list;
}
}
/// <summary>
/// Takes collection of collections, projects those in parallel and merges results.
/// </summary>
public static async Task<IEnumerable<TResult>> SelectManyAsync<T, TResult>(
this IEnumerable<IEnumerable<T>> source,
Func<T, Task<TResult>> func)
{
List<TResult>[] slices = await source
.Select(async slice => await slice.SelectListAsync(func).ConfigureAwait(false))
.WhenAll()
.ConfigureAwait(false);
return slices.SelectMany(s => s);
}
/// <summary>Runs selector and awaits results.</summary>
public static async Task<List<TResult>> SelectListAsync<TSource, TResult>(this IEnumerable<TSource> source, Func<TSource, Task<TResult>> selector)
{
List<TResult> result = new List<TResult>();
foreach (TSource source1 in source)
{
TResult result1 = await selector(source1).ConfigureAwait(false);
result.Add(result1);
}
return result;
}
/// <summary>Wraps tasks with Task.WhenAll.</summary>
public static Task<TResult[]> WhenAll<TResult>(this IEnumerable<Task<TResult>> source)
{
return Task.WhenAll<TResult>(source);
}
}
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-11-17 15:38:22
Jestem trochę spóźniony na imprezę, ale może warto rozważyć użycie GetAwaiter.GetResult (), aby uruchomić kod asynchroniczny w kontekście synchronizacji, ale tak jak paralled jak poniżej;
Parallel.ForEach(ids, i =>
{
ICustomerRepo repo = new CustomerRepo();
// Run this in thread which Parallel library occupied.
var cust = repo.GetCustomer(i).GetAwaiter().GetResult();
customers.Add(cust);
});
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2016-11-30 16:30:38
Metoda rozszerzenia do tego, która wykorzystuje Semaforeslim, a także pozwala ustawić maksymalny stopień równoległości
/// <summary>
/// Concurrently Executes async actions for each item of <see cref="IEnumerable<typeparamref name="T"/>
/// </summary>
/// <typeparam name="T">Type of IEnumerable</typeparam>
/// <param name="enumerable">instance of <see cref="IEnumerable<typeparamref name="T"/>"/></param>
/// <param name="action">an async <see cref="Action" /> to execute</param>
/// <param name="maxDegreeOfParallelism">Optional, An integer that represents the maximum degree of parallelism,
/// Must be grater than 0</param>
/// <returns>A Task representing an async operation</returns>
/// <exception cref="ArgumentOutOfRangeException">If the maxActionsToRunInParallel is less than 1</exception>
public static async Task ForEachAsyncConcurrent<T>(
this IEnumerable<T> enumerable,
Func<T, Task> action,
int? maxDegreeOfParallelism = null)
{
if (maxDegreeOfParallelism.HasValue)
{
using (var semaphoreSlim = new SemaphoreSlim(
maxDegreeOfParallelism.Value, maxDegreeOfParallelism.Value))
{
var tasksWithThrottler = new List<Task>();
foreach (var item in enumerable)
{
// Increment the number of currently running tasks and wait if they are more than limit.
await semaphoreSlim.WaitAsync();
tasksWithThrottler.Add(Task.Run(async () =>
{
await action(item);
// action is completed, so decrement the number of currently running tasks
semaphoreSlim.Release();
}));
}
// Wait for all tasks to complete.
await Task.WhenAll(tasksWithThrottler.ToArray());
}
}
else
{
await Task.WhenAll(enumerable.Select(item => action(item)));
}
}
Przykładowe Użycie:
await enumerable.ForEachAsyncConcurrent(
async item =>
{
await SomeAsyncMethod(item);
},
5);
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2018-05-09 22:46:16