Przetwarzanie jednocześnie w Scali

Jako w mojej własnej odpowiedzi na moje pytanie , mam sytuację, w której przetwarzam dużą liczbę zdarzeń, które pojawiają się w kolejce. Każde zdarzenie jest obsługiwane w dokładnie taki sam sposób i każde even może być obsługiwane niezależnie od wszystkich innych zdarzeń.

Mój program wykorzystuje Framework współbieżności Scala i wiele procesów jest modelowanych jako Actor s. Ponieważ ActorS przetwarzają swoje wiadomości sekwencyjnie, nie są one dobrze dostosowane do tego szczególny problem (chociaż moi inni aktorzy wykonują czynności, które sekwencyjne). Ponieważ chcę, aby Scala "kontrolowała" całe tworzenie wątków (co, jak zakładam, ma na celu posiadanie systemu współbieżności), wydaje się, że mam 2 możliwości: {]}

  1. wysyłanie zdarzeń do puli procesorów zdarzeń, które kontroluję
  2. niech moje Actor przetworzy je jednocześnie przez jakiś inny mechanizm

Myślałem, że #1 neguje punkt użycia podsystemu actors: ilu aktorów procesorów powinienem utworzyć?To oczywiste pytanie. Te rzeczy są podobno Ukryte przede mną i rozwiązywane przez podsystem.

Moja odpowiedź brzmiała TAK:

val eventProcessor = actor {
  loop {
    react {
      case MyEvent(x) =>
        //I want to be able to handle multiple events at the same time
        //create a new actor to handle it
        actor {
          //processing code here
          process(x)
        }
    }
  }
}

Czy jest lepsze podejście? Czy to nieprawda?

edit: prawdopodobnie lepszym podejściem jest:

val eventProcessor = actor {
  loop {
    react {
      case MyEvent(x) =>
        //Pass processing to the underlying ForkJoin framework
        Scheduler.execute(process(e))
    }
  }
}
Author: Community, 2009-06-17

5 answers

To wygląda jak duplikat innego pytania. Więc powtórzę swoją odpowiedź

Aktorzy przetwarzają jedną wiadomość na raz. Klasyczny wzorzec przetwarzania wielu wiadomości polega na tym, aby jeden koordynator był frontowy dla puli podmiotów konsumenckich. Jeśli używasz Reacta, Pula odbiorców może być duża, ale nadal będzie używać tylko niewielkiej liczby wątków JVM. Oto przykład, w którym tworzę pulę 10 konsumentów i jednego koordynatora do ich front.
import scala.actors.Actor
import scala.actors.Actor._

case class Request(sender : Actor, payload : String)
case class Ready(sender : Actor)
case class Result(result : String)
case object Stop

def consumer(n : Int) = actor {
  loop {
    react {
      case Ready(sender) => 
        sender ! Ready(self)
      case Request(sender, payload) =>
        println("request to consumer " + n + " with " + payload)
        // some silly computation so the process takes awhile
        val result = ((payload + payload + payload) map {case '0' => 'X'; case '1' => "-"; case c => c}).mkString
        sender ! Result(result)
        println("consumer " + n + " is done processing " + result )
      case Stop => exit
    }
  }
}

// a pool of 10 consumers
val consumers = for (n <- 0 to 10) yield consumer(n)

val coordinator = actor {
  loop {
     react {
        case msg @ Request(sender, payload) =>
           consumers foreach {_ ! Ready(self)}
           react {
              // send the request to the first available consumer
              case Ready(consumer) => consumer ! msg
           }
         case Stop => 
           consumers foreach {_ ! Stop} 
           exit
     }
  }
}

// a little test loop - note that it's not doing anything with the results or telling the coordinator to stop
for (i <- 0 to 1000) coordinator ! Request(self, i.toString)

Ten kod testuje do sprawdź, który konsument jest dostępny i wyślij zapytanie do tego konsumenta. Alternatywy są po prostu losowo przypisać do konsumentów lub użyć round robin scheduler.

W zależności od tego, co robisz, możesz być lepiej obsługiwany z przyszłości Scali. Na przykład, jeśli naprawdę nie potrzebujesz aktorów, wszystkie powyższe maszyny mogą być napisane jako

import scala.actors.Futures._

def transform(payload : String) = {      
  val result = ((payload + payload + payload) map {case '0' => 'X'; case '1' => "-"; case c => c}).mkString
  println("transformed " + payload + " to " + result )
  result
}

val results = for (i <- 0 to 1000) yield future(transform(i.toString))
 8
Author: James Iry,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2009-06-18 00:56:23

Jeśli zdarzenia mogą być obsługiwane niezależnie, dlaczego są w kolejce? Nie wiedząc nic więcej o swoim projekcie, wydaje się to niepotrzebnym krokiem. Jeśli możesz skomponować funkcję process z tym, co uruchamia te zdarzenia, możesz potencjalnie usunąć kolejkę.

Aktor to efekt współbieżny wyposażony w kolejkę. Jeśli chcesz przetwarzać wiele wiadomości jednocześnie, tak naprawdę nie chcesz aktora. Po prostu chcesz, aby funkcja (Any = > ()) była zaplanowana do wykonania w dogodnym czasie.

Powiedziawszy to, Twoje podejście jest rozsądne jeśli chcesz pozostać w bibliotece aktorów i jeśli kolejka zdarzeń nie jest pod twoją kontrolą.

Scalaz rozróżnia aktorów i efekty współbieżne. Podczas gdy jego Actor jest bardzo lekki, {[3] } jest lżejszy. Oto Twój kod z grubsza przetłumaczony na bibliotekę Scalaz:

val eventProcessor = effect (x => process x)
To z najnowszą głowicą bagażnika, jeszcze nie wypuszczoną.
 3
Author: Apocalisp,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2009-06-17 15:09:06

To brzmi jak prosty problem konsumenta / producenta. Użyłbym kolejki z pulą konsumentów. Prawdopodobnie mógłbyś napisać to za pomocą kilku linijek kodu używając Javy.util./ align = "left" /

 1
Author: jshen,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2009-06-17 15:21:28

Celem aktora (no cóż, jednym z nich) jest zapewnienie, że stan w aktorze może być dostępny tylko przez jeden wątek na raz. Jeśli przetwarzanie wiadomości nie zależy od dowolnego zmiennego stanu w programie actor, to prawdopodobnie bardziej odpowiednie byłoby wysłanie zadania do terminarza lub puli wątków do przetworzenia. Dodatkowa abstrakcja, którą zapewnia aktor, staje ci na drodze.

W Scali są wygodne metody.aktorzy.Scheduler do tego, możesz też użyć executora z Javy.util./ align = "left" /

 1
Author: Erik Engbrecht,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2009-06-17 17:22:53

Aktorzy są znacznie lżejsi niż wątki i jako taka inną opcją jest używanie obiektów aktora, takich jak obiekty uruchamialne, które są używane do przesyłania do puli wątków. Główną różnicą jest to, że nie musisz się martwić o ThreadPool - Pula wątków jest zarządzana przez framework actor i dotyczy głównie konfiguracji.

def submit(e: MyEvent) = actor {
  // no loop - the actor exits immediately after processing the first message
  react {
    case MyEvent(x) =>
      process(x)
  }
} ! e // immediately send the new actor a message

Następnie, aby przesłać wiadomość, powiedz tak:

submit(new MyEvent(x))

, co odpowiada

eventProcessor ! new MyEvent(x)
Z twojego pytania.

Testowałem to wzorzec pomyślnie z 1 milionem wiadomości wysłanych i odebranych w około 10 sekund na czterordzeniowym laptopie i7.

Mam nadzieję, że to pomoże.
 1
Author: Nikolay Botev,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2011-05-20 16:26:35