Wstępna inicjalizacja puli wątków roboczych w celu ponownego użycia obiektów połączeń (sockets)

Muszę zbudować pulę workerów w Javie, gdzie każdy worker ma swoje własne podłączone Gniazdo; gdy wątek roboczy jest uruchomiony, używa gniazda, ale utrzymuje je otwarte do ponownego użycia później. Zdecydowaliśmy się na to podejście, ponieważ obciążenie związane z tworzeniem, podłączaniem i niszczeniem gniazd na zasadzie ad hoc wymagało zbyt dużego obciążenia, więc potrzebujemy metody, za pomocą której Pula pracowników jest wstępnie inicjalizowana z ich połączeniem z gniazdem, gotowa do podjęcia pracy przy zachowaniu bezpieczeństwa zasobów gniazd z innych wątków (gniazda nie są bezpieczne dla wątków), więc potrzebujemy czegoś podobnego...

public class SocketTask implements Runnable {
  Socket socket;
  public SocketTask(){
    //create + connect socket here
  }

  public void run(){
    //use socket here
  }

}

Podczas uruchamiania aplikacji chcemy zainicjować workery i, miejmy nadzieję, połączenia z gniazdami...

MyWorkerPool pool = new MyWorkerPool();
for( int i = 0; i < 100; i++)
   pool.addWorker( new WorkerThread());

Ponieważ praca jest wymagana przez aplikację, wysyłamy zadania do puli pracowników do natychmiastowej realizacji...

pool.queueWork( new SocketTask(..));


Aktualizacja kodu roboczego
Opierając się na pomocnych komentarzach Graya i jontejja, mam następujące kod działa...

SocketTask

public class SocketTask implements Runnable {
    private String workDetails;
    private static final ThreadLocal<Socket> threadLocal = 
           new ThreadLocal<Socket>(){
        @Override
        protected Socket initialValue(){
            return new Socket();
        }           
    };

    public SocketTask(String details){              
        this.workDetails = details;
    }

    public void run(){      
        Socket s = getSocket(); //gets from threadlocal
        //send data on socket based on workDetails, etc.
    }

    public static Socket getSocket(){
        return threadLocal.get();
    }
}

ExecutorService

ExecutorService threadPool = 
    Executors.newFixedThreadPool(5, Executors.defaultThreadFactory());

    int tasks = 15;  
    for( int i = 1; i <= tasks; i++){
        threadPool.execute(new SocketTask("foobar-" + i));
    }   
Podoba mi się to podejście z kilku powodów...
  • Sockety są obiektami lokalnymi (poprzez ThreadLocal) dostępnymi dla uruchomionych zadań, eliminując problemy ze współbieżnością.
  • gniazda są tworzone raz i pozostają otwarte, ponownie używane gdy nowe zadania stają w kolejce, eliminując obiekt gniazda Utwórz/zniszcz napowietrzne.
Author: raffian, 2013-05-22

2 answers

Jednym z pomysłów byłoby umieszczenie Socket s W BlockingQueue. Następnie, gdy potrzebujesz Socket twoje wątki mogą take() z kolejki, a gdy zostaną zakończone z Socket, put() wrócą do kolejki.

public void run() {
    Socket socket = socketQueue.take();
    try {
       // use the socket ...
    } finally {
       socketQueue.put(socket);
    }
}

Ma to dodatkowe zalety:

  • możesz wrócić do używania kodu ExecutorService.
  • można oddzielić komunikację z gniazdem od przetwarzania wyników.
  • nie potrzebujesz korespondencji 1 do 1 do przetwarzania wątków i gniazd. Ale gniazdo komunikacja może być 98% pracy, więc może nie zysk.
  • kiedy skończysz i twoje ExecutorService się skończy, możesz zamknąć swoje gniazda, usuwając je i zamykając.

To dodaje dodatkowy narzut innego BlockingQueue, ale jeśli robisz Socket komunikację, nie zauważysz tego.

Nie wierzymy, że ThreadFactory zaspokaja nasze potrzeby ...

Myślę, że możesz to zrobić, jeśli użyjesz thread-locals. Twój wątek factory utworzy wątek, który najpierw otworzy Gniazdo, zapisze je w wątku lokalnym, a następnie wywoła Runnable arg, który wykonuje całą pracę z gniazdem, usuwając zadania z wewnętrznej kolejki ExecutorService. Gdy to zrobisz, metoda arg.run() zakończy się i możesz pobrać gniazdo z thread-local i zamknąć je.

Coś w tym stylu. Jest trochę brudny, ale powinieneś zrozumieć.

ExecutorService threadPool =
    Executors.newFixedThreadPool(10,
      new ThreadFactory() {
        public Thread newThread(final Runnable r) {
            Thread thread = new Thread(new Runnable() {
                public void run() {
                    openSocketAndStoreInThreadLocal();
                    // our tasks would then get the socket from the thread-local
                    r.run();
                    getSocketFromThreadLocalAndCloseIt();
                }
            });
            return thread;
        }
      }));

Aby Twoje zadania realizowały Runnable i wyglądały następująco:

public SocketWorker implements Runnable {
    private final ThreadLocal<Socket> threadLocal;
    public SocketWorker(ThreadLocal<Socket> threadLocal) {
       this.threadLocal = threadLocal;
    }
    public void run() {
        Socket socket = threadLocal.get();
        // use the socket ...
    }
}
 10
Author: Gray,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2015-09-29 13:42:22

Myślę, że powinieneś użyć ThreadLocal

package com.stackoverflow.q16680096;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Main
{
    public static void main(String[] args)
    {
        ExecutorService pool = Executors.newCachedThreadPool();
        int nrOfConcurrentUsers = 100;
        for(int i = 0; i < nrOfConcurrentUsers; i++)
        {
            pool.submit(new InitSocketTask());
        }

        // do stuff...

        pool.submit(new Task());
    }
}

package com.stackoverflow.q16680096;

import java.net.Socket;

public class InitSocketTask implements Runnable
{
    public void run()
    {
        Socket socket = SocketPool.get();
        // Do initial setup here
    }

}

package com.stackoverflow.q16680096;

import java.net.Socket;

public final class SocketPool
{
    private static final ThreadLocal<Socket> SOCKETS = new ThreadLocal<Socket>(){
        @Override
        protected Socket initialValue()
        {
            return new Socket(); // Pass in suitable arguments here...
        }
    };

    public static Socket get()
    {
        return SOCKETS.get();
    }
}

package com.stackoverflow.q16680096;

import java.net.Socket;

public class Task implements Runnable
{
    public void run()
    {
        Socket socket = SocketPool.get();
        // Do stuff with socket...
    }
}

Gdzie każdy wątek otrzymuje własne gniazdo.

 5
Author: jontejj,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2013-05-21 22:16:43