Wstępna inicjalizacja puli wątków roboczych w celu ponownego użycia obiektów połączeń (sockets)
Muszę zbudować pulę workerów w Javie, gdzie każdy worker ma swoje własne podłączone Gniazdo; gdy wątek roboczy jest uruchomiony, używa gniazda, ale utrzymuje je otwarte do ponownego użycia później. Zdecydowaliśmy się na to podejście, ponieważ obciążenie związane z tworzeniem, podłączaniem i niszczeniem gniazd na zasadzie ad hoc wymagało zbyt dużego obciążenia, więc potrzebujemy metody, za pomocą której Pula pracowników jest wstępnie inicjalizowana z ich połączeniem z gniazdem, gotowa do podjęcia pracy przy zachowaniu bezpieczeństwa zasobów gniazd z innych wątków (gniazda nie są bezpieczne dla wątków), więc potrzebujemy czegoś podobnego...
public class SocketTask implements Runnable {
Socket socket;
public SocketTask(){
//create + connect socket here
}
public void run(){
//use socket here
}
}
Podczas uruchamiania aplikacji chcemy zainicjować workery i, miejmy nadzieję, połączenia z gniazdami...
MyWorkerPool pool = new MyWorkerPool();
for( int i = 0; i < 100; i++)
pool.addWorker( new WorkerThread());
Ponieważ praca jest wymagana przez aplikację, wysyłamy zadania do puli pracowników do natychmiastowej realizacji...
pool.queueWork( new SocketTask(..));
Aktualizacja kodu roboczego
Opierając się na pomocnych komentarzach Graya i jontejja, mam następujące kod działa...
SocketTask
public class SocketTask implements Runnable {
private String workDetails;
private static final ThreadLocal<Socket> threadLocal =
new ThreadLocal<Socket>(){
@Override
protected Socket initialValue(){
return new Socket();
}
};
public SocketTask(String details){
this.workDetails = details;
}
public void run(){
Socket s = getSocket(); //gets from threadlocal
//send data on socket based on workDetails, etc.
}
public static Socket getSocket(){
return threadLocal.get();
}
}
ExecutorService
ExecutorService threadPool =
Executors.newFixedThreadPool(5, Executors.defaultThreadFactory());
int tasks = 15;
for( int i = 1; i <= tasks; i++){
threadPool.execute(new SocketTask("foobar-" + i));
}
Podoba mi się to podejście z kilku powodów...
- Sockety są obiektami lokalnymi (poprzez ThreadLocal) dostępnymi dla uruchomionych zadań, eliminując problemy ze współbieżnością.
- gniazda są tworzone raz i pozostają otwarte, ponownie używane gdy nowe zadania stają w kolejce, eliminując obiekt gniazda Utwórz/zniszcz napowietrzne.
2 answers
Jednym z pomysłów byłoby umieszczenie Socket
s W BlockingQueue
. Następnie, gdy potrzebujesz Socket
twoje wątki mogą take()
z kolejki, a gdy zostaną zakończone z Socket
, put()
wrócą do kolejki.
public void run() {
Socket socket = socketQueue.take();
try {
// use the socket ...
} finally {
socketQueue.put(socket);
}
}
Ma to dodatkowe zalety:
- możesz wrócić do używania kodu
ExecutorService
. - można oddzielić komunikację z gniazdem od przetwarzania wyników.
- nie potrzebujesz korespondencji 1 do 1 do przetwarzania wątków i gniazd. Ale gniazdo komunikacja może być 98% pracy, więc może nie zysk.
- kiedy skończysz i twoje
ExecutorService
się skończy, możesz zamknąć swoje gniazda, usuwając je i zamykając.
To dodaje dodatkowy narzut innego BlockingQueue
, ale jeśli robisz Socket
komunikację, nie zauważysz tego.
Nie wierzymy, że ThreadFactory zaspokaja nasze potrzeby ...
Myślę, że możesz to zrobić, jeśli użyjesz thread-locals. Twój wątek factory utworzy wątek, który najpierw otworzy Gniazdo, zapisze je w wątku lokalnym, a następnie wywoła Runnable
arg, który wykonuje całą pracę z gniazdem, usuwając zadania z wewnętrznej kolejki ExecutorService
. Gdy to zrobisz, metoda arg.run()
zakończy się i możesz pobrać gniazdo z thread-local i zamknąć je.
Coś w tym stylu. Jest trochę brudny, ale powinieneś zrozumieć.
ExecutorService threadPool =
Executors.newFixedThreadPool(10,
new ThreadFactory() {
public Thread newThread(final Runnable r) {
Thread thread = new Thread(new Runnable() {
public void run() {
openSocketAndStoreInThreadLocal();
// our tasks would then get the socket from the thread-local
r.run();
getSocketFromThreadLocalAndCloseIt();
}
});
return thread;
}
}));
Aby Twoje zadania realizowały Runnable
i wyglądały następująco:
public SocketWorker implements Runnable {
private final ThreadLocal<Socket> threadLocal;
public SocketWorker(ThreadLocal<Socket> threadLocal) {
this.threadLocal = threadLocal;
}
public void run() {
Socket socket = threadLocal.get();
// use the socket ...
}
}
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2015-09-29 13:42:22
Myślę, że powinieneś użyć ThreadLocal
package com.stackoverflow.q16680096;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Main
{
public static void main(String[] args)
{
ExecutorService pool = Executors.newCachedThreadPool();
int nrOfConcurrentUsers = 100;
for(int i = 0; i < nrOfConcurrentUsers; i++)
{
pool.submit(new InitSocketTask());
}
// do stuff...
pool.submit(new Task());
}
}
package com.stackoverflow.q16680096;
import java.net.Socket;
public class InitSocketTask implements Runnable
{
public void run()
{
Socket socket = SocketPool.get();
// Do initial setup here
}
}
package com.stackoverflow.q16680096;
import java.net.Socket;
public final class SocketPool
{
private static final ThreadLocal<Socket> SOCKETS = new ThreadLocal<Socket>(){
@Override
protected Socket initialValue()
{
return new Socket(); // Pass in suitable arguments here...
}
};
public static Socket get()
{
return SOCKETS.get();
}
}
package com.stackoverflow.q16680096;
import java.net.Socket;
public class Task implements Runnable
{
public void run()
{
Socket socket = SocketPool.get();
// Do stuff with socket...
}
}
Gdzie każdy wątek otrzymuje własne gniazdo.
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2013-05-21 22:16:43