Śpiący aktorzy?

Jaki jest najlepszy sposób na sen aktora? Mam aktorów skonfigurowanych jako agenci, którzy chcą utrzymywać różne części bazy danych (w tym uzyskiwanie danych z zewnętrznych źródeł). Z wielu powodów (w tym nie przeciążanie bazy danych lub komunikacji i ogólne Problemy z obciążeniem) chcę, aby aktorzy spali między każdą operacją. Patrzę na jakieś 10 obiektów aktora.

Aktorzy będą działać praktycznie w nieskończoność, ponieważ zawsze będą nowe dane wchodząc, lub siedząc w tabeli oczekując na rozpropagowanie do innych części bazy danych itp. Chodzi o to, aby baza danych była jak najbardziej kompletna w dowolnym momencie.

Mógłbym to zrobić z nieskończoną pętlą i uśpieniem na końcu każdej pętli, ale zgodnie z http://www.scala-lang.org/node/242 aktorzy używają puli wątków, która jest rozszerzana za każdym razem, gdy wszystkie wątki są zablokowane. Więc wyobrażam sobie wątek.sen u każdego aktora byłby złym pomysłem, tak jak marnowanie wątków niepotrzebnie.

Mógłbym mieć centralę z własną pętlą, która wysyła wiadomości do subskrybentów na zegarze (np. asynchroniczny zegar zdarzeń)?

Czy ktoś zrobił coś podobnego lub ma jakieś sugestie? Przepraszam za dodatkowe (być może zbędne) informacje.

Cheers

Joe

 16
Author: Joe, 2009-08-03

4 answers

Nie ma potrzeby jawnego uśpienia aktora: użycie loop i react dla każdego aktora oznacza, że podstawowa Pula wątków będzie miała wątki oczekujące, podczas gdy nie ma wiadomości dla aktorów do przetworzenia.

W przypadku, gdy chcesz zaplanować zdarzenia dla aktorów do przetworzenia, jest to dość łatwe przy użyciu jednowątkowego harmonogramu z narzędzi java.util.concurrent:

object Scheduler {
  import java.util.concurrent.Executors
  import scala.compat.Platform
  import java.util.concurrent.TimeUnit
  private lazy val sched = Executors.newSingleThreadScheduledExecutor();
  def schedule(f: => Unit, time: Long) {
    sched.schedule(new Runnable {
      def run = f
    }, time , TimeUnit.MILLISECONDS);
  }
}

Można to rozszerzyć o wykonywanie zadań okresowych i może być używany tak więc:

val execTime = //...  
Scheduler.schedule( { Actor.actor { target ! message }; () }, execTime)

Twój docelowy aktor będzie musiał zaimplementować odpowiednią pętlę react, aby przetworzyć daną wiadomość. Nie ma potrzeby, żebyś spał z aktorem.

 17
Author: oxbow_lakes,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2012-07-04 02:14:27

W pierwszej odpowiedzi Erlang miał rację, ale wygląda na to, że zniknął. Z aktorami Scali można łatwo zrobić ten sam trik podobny do Erlanga. Np. stwórzmy scheduler, który nie używa wątków:

import actors.{Actor,TIMEOUT}

def scheduler(time: Long)(f: => Unit) = {
  def fixedRateLoop {
    Actor.reactWithin(time) {
      case TIMEOUT => f; fixedRateLoop
      case 'stop => 
    }
  }
  Actor.actor(fixedRateLoop)
}

I przetestujmy to (zrobiłem to dobrze w Scali REPL) używając testowego Klienta:

case class Ping(t: Long)

import Actor._
val test = actor { loop {
  receiveWithin(3000) {
    case Ping(t) => println(t/1000)
    case TIMEOUT => println("TIMEOUT")
    case 'stop => exit
  }
} }

Uruchom scheduler:

import compat.Platform.currentTime
val sched = scheduler(2000) { test ! Ping(currentTime) }

I zobaczysz coś takiego

scala> 1249383399
1249383401
1249383403
1249383405
1249383407

Co oznacza, że nasz scheduler wysyła wiadomość co 2 sekundy zgodnie z oczekiwaniami. Zatrzymajmy scheduler:

sched ! 'stop

Klient testowy zacznie zgłaszać timeouty:

scala> TIMEOUT
TIMEOUT
TIMEOUT

Przestań też:

test ! 'stop
 21
Author: Alexander Azarov,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2011-09-02 13:27:09

Actorping (Licencja Apache) z lift-util ma schedule i scheduleAtFixedRate Source: ActorPing.scala

Od scaladoc:

Obiekt ActorPing ustawia aktora do ping-ed z daną wiadomością w określonych odstępach czasu. Metody schedule zwracają obiekt ScheduledFuture, który w razie potrzeby można anulować

 4
Author: user33994,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2009-08-07 10:04:19

Niestety są dwa błędy w odpowiedzi oxbow_lakes.

Jeden jest prostym błędem deklaracji (long time vs time: Long), ale drugi jest bardziej subtelny.

Oxbow_lakes deklaruje run jako

def run = actors.Scheduler.execute(f) 
To jednak prowadzi do znikania wiadomości od czasu do czasu. Oznacza to, że są zaplanowane, ale nigdy nie zostaną wysłane. Deklarowanie run jako
def run = f
Naprawiłem to dla mnie. Dokładnie tak samo było w Actorpingu lift-util.

Cały kod schedulera staje się:

object Scheduler {
    private lazy val sched = Executors.newSingleThreadedScheduledExecutor();
    def schedule(f: => Unit, time: Long) {
        sched.schedule(new Runnable {
          def run = f
        }, time - Platform.currentTime, TimeUnit.MILLISECONDS);
    }
}

Próbowałem edytować post oxbow_lakes, ale nie mogłem go zapisać (złamany?), nie mam jeszcze prawa komentować. Dlatego nowy post.

 2
Author: Gerd Riesselmann,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2011-11-11 21:16:43