Jak zaimplementować prosty wątek z ustaloną liczbą wątków roboczych

Szukam najprostszego, najprostszego sposobu na zaimplementowanie:

  • główny program tworzy instancję worker wątki do wykonania zadania.
  • tylko n zadania mogą być uruchomione jednocześnie.
  • Kiedy n zostanie osiągnięty, nie będzie więcej pracowników rozpoczynają się aż do running threads spada poniżej n.
Author: Raedwald, 2008-09-24

7 answers

Myślę, że Executors.newFixedThreadPool pasuje do Twoich wymagań. Istnieje wiele różnych sposobów korzystania z wynikowej usługi ExecutorService, w zależności od tego, czy chcesz, aby wynik został zwrócony do głównego wątku, Czy zadanie jest całkowicie niezależne i czy masz zbiór zadań do wykonania z góry, czy zadania są w kolejce w odpowiedzi na jakieś zdarzenie.

  Collection<YourTask> tasks = new ArrayList<YourTask>();
  YourTask yt1 = new YourTask();
  ...
  tasks.add(yt1);
  ...
  ExecutorService exec = Executors.newFixedThreadPool(5);
  List<Future<YourResultType>> results = exec.invokeAll(tasks);

Alternatywnie, jeśli masz nowe zadanie asynchroniczne do wykonania w odpowiedzi na niektóre event, prawdopodobnie chcesz po prostu użyć prostej metody execute(Runnable) ExecutorService.

 53
Author: erickson,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2016-07-20 16:56:46
/* Get an executor service that will run a maximum of 5 threads at a time: */
ExecutorService exec = Executors.newFixedThreadPool(5);
/* For all the 100 tasks to be done altogether... */
for (int i = 0; i < 100; i++) {
    /* ...execute the task to run concurrently as a runnable: */
    exec.execute(new Runnable() {
        public void run() {
            /* do the work to be done in its own thread */
            System.out.println("Running in: " + Thread.currentThread());
        }
    });
}
/* Tell the executor that after these 100 steps above, we will be done: */
exec.shutdown();
try {
    /* The tasks are now running concurrently. We wait until all work is done, 
     * with a timeout of 50 seconds: */
    boolean b = exec.awaitTermination(50, TimeUnit.SECONDS);
    /* If the execution timed out, false is returned: */
    System.out.println("All done: " + b);
} catch (InterruptedException e) { e.printStackTrace(); }
 22
Author: Fabian Steeg,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2008-09-24 09:54:02

Executors.newFixedThreadPool (int)

Executor executor = Executors.newFixedThreadPool(n);

Runnable runnable = new Runnable() {
 public void run() {
  // do your thing here
 }
}

executor.execute(runnable);
 5
Author: Matt,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2016-07-20 16:57:35

Użyj frameworka Executor; mianowicie newFixedThreadPool (N)

 2
Author: hazzen,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2016-07-20 17:00:13
  1. Jeśli kolejka zadań nie będzie nieograniczona, a zadania mogą być wykonywane w krótszych odstępach czasu, możesz użyć Executors.newFixedThreadPool(n); Jak sugerują eksperci.

    Jedyną wadą tego rozwiązania jest nieograniczona wielkość kolejki zadań. Nie masz nad tym kontroli. Ogromny stos w kolejce zadań pogorszy wydajność aplikacji i może spowodować brak pamięci w niektórych scenariuszach.

  2. Jeśli chcesz użyć ExecutorService i włączyć work stealing mechanizm, w którym bezczynność wątki robocze dziel obciążenie robocze z zajętych wątków roboczych, kradnąc zadania w kolejce zadań. Zwróci usługę executora typu ForkJoinPool.

    Public static ExecutorService newWorkStealingPool (Intel)

    Tworzy pulę wątków, która utrzymuje wystarczającą liczbę wątków do obsługi danego poziomu równoległości i może używać wielu kolejek w celu zmniejszenia konfliktu. Poziom równoległości odpowiada maksymalnej liczbie wątków aktywnie zaangażowanych lub dostępnych do zaangażowania w przetwarzanie zadań. Rzeczywista liczba wątków może dynamicznie rosnąć i kurczyć się. Grupa robocza nie daje żadnych gwarancji co do kolejności wykonania nadesłanych zadań.

  3. Preferuję ThreadPoolExecutor ze względu na elastyczność w interfejsach API do sterowania wieloma paratmetrami, które sterują wykonywaniem zadań flow.

    ThreadPoolExecutor(int corePoolSize, 
                           int maximumPoolSize, 
                           long keepAliveTime, 
                           TimeUnit unit, 
                           BlockingQueue<Runnable> workQueue, 
                           ThreadFactory threadFactory,
                           RejectedExecutionHandler handler)
    

W Twoim przypadku ustaw oba corePoolSize and maximumPoolSize as N. Tutaj możesz kontrolować rozmiar kolejki zadań, zdefiniować własną fabrykę wątków i Politykę obsługi odrzucania.

Spójrz na powiązane pytanie SE do dynamicznej kontroli wielkości puli:

Dynamiczna Pula Wątków

 1
Author: Ravindra babu,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-05-23 11:48:19

If you want to roll your own:

private static final int MAX_WORKERS = n;
private List<Worker> workers = new ArrayList<Worker>(MAX_WORKERS);

private boolean roomLeft() {
    synchronized (workers) {
        return (workers.size() < MAX_WORKERS);
    }
}

private void addWorker() {
    synchronized (workers) {
        workers.add(new Worker(this));
    }
}

public void removeWorker(Worker worker) {
    synchronized (workers) {
        workers.remove(worker);
    }
}

public Example() {
    while (true) {
        if (roomLeft()) {
            addWorker();
        } 
    }
}

Gdzie Worker jest twoją klasą, która rozszerza wątek. Każdy worker wywoła metodę removeWorker tej klasy, przekazując się jako parametr, po zakończeniu robienia tego.

Mając to na uwadze, Framework executora wygląda o wiele lepiej.

Edit: ktoś może wyjaśnić, dlaczego to jest takie złe, zamiast po prostu pomniejszać to?

 0
Author: rjohnston,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2009-03-27 04:55:01

Jak już wspomnieli inni tutaj, najlepszym rozwiązaniem jest utworzenie puli wątków z Executors class:

Jednakże, jeśli chcesz rzucić własną, ten kod powinien dać ci pomysł, jak postępować. Zasadniczo, po prostu dodaj każdy nowy wątek do grupy wątków i upewnij się, że nigdy nie masz więcej niż n aktywnych wątków w grupie: {]}

Task[] tasks = getTasks(); // array of tasks to complete
ThreadGroup group = new ThreadGroup();
int i=0;
while( i<tasks.length || group.activeCount()>0 ) {
    if( group.activeCount()<N && i<tasks.length ) {
        new TaskThread(group, tasks[i]).start();
        i++;
    } else {
        Thread.sleep(100);
    }
}
 0
Author: Eli Courtwright,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2016-10-03 13:53:47