Pauzowanie aktora w Akce

Mam w Akce aktora, który będzie przetwarzał wiadomości w celu stworzenia pewnych Bytów. Niektóre pola tych podmiotów są obliczane na podstawie stanu innych podmiotów w bazie danych w momencie tworzenia.

Chciałbym uniknąć tworzenia warunków Rasowych, w których przetwarzanie aktora przebiega szybciej niż baza danych jest w stanie utrzymać podmioty. Może to prowadzić do niespójności danych, jak:

  • aktor tworzy Foo i wysyła go do innych aktorów na dalsze przetwarzanie i zapisywanie
  • aktor jest proszony o stworzenie kolejnego Foo. Ponieważ pierwszy z nich nie jest jeszcze zapisany, nowy jest tworzony na podstawie starej zawartości DB, tworząc w ten sposób błędną Foo.

Teraz ta możliwość jest dość odległa, ponieważ tworzenie Foo s zostanie uruchomione ręcznie. Ale wciąż jest możliwe, że podwójne kliknięcie może powodować problemy przy dużym obciążeniu. A kto wie czy jutro Foo zostanie utworzone automatycznie.

Stąd, co ja need jest sposobem, aby powiedzieć aktorowi, aby poczekał, i wznowić swoje operacje dopiero po potwierdzeniu, że Foos zostały zapisane.

Czy jest sposób, aby umieścić aktora w stanie bezczynności i powiedzieć mu, aby po jakimś czasie wznowił działalność?

Zasadniczo chciałbym używać skrzynki pocztowej jako kolejki komunikatów i mieć kontrolę nad prędkością przetwarzania kolejki.

Author: Andrea, 2013-01-28

2 answers

Nie, Nie można zawiesić aktora: aktorzy zawsze pobierają wiadomości ze swojej skrzynki pocztowej tak szybko, jak to możliwe. To pozostawia tylko możliwość, że przychodzące żądania są przechowywane, aby być przetwarzane później:

class A(db: ActorRef) extends Actor with Stash {
  def receive = {
    case Request =>
      doWork()
      db ! Persist
      context.setReceiveTimeout(5.seconds)
      context.become({
        case Request        => stash()
        case Persisted      => context.unbecome(); unstashAll()
        case ReceiveTimeout => throw new TimeoutException("not persisted")
      }, discardOld = false)
  }
}

Należy pamiętać, że dostarczanie wiadomości nie jest gwarantowane (lub baza danych może być wyłączona) i dlatego zalecany jest limit czasu.

Podstawowy problem

Ten problem pojawia się głównie w tych przypadkach, które nie są dobrze dopasowane do modelu aktora i model domeny: aktor jest jednostką spójności, ale w Twoim przypadku użytkowania Twój spójny obraz wymaga aktualnej zewnętrznej jednostki (bazy danych), aby aktor postępował właściwie. Nie mogę polecić rozwiązania, nie wiedząc więcej o przypadku użycia, ale spróbuj przemodelować swój problem, biorąc to pod uwagę.

 24
Author: Roland Kuhn,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2013-01-28 17:05:53

Okazuje się, że wymaga to tylko kilku linijek. Jest to rozwiązanie, które wymyśliłem, które zgadza się z sugestią pagoda_5b:

class QueueingActor(nextActor: ActorRef) extends Actor with Stash {
  import QueueingActor._

  def receive = {
    case message =>
      context.become({
        case Resume =>
          unstashAll()
          context.unbecome()
        case _ => stash()
      })
      nextActor ! message
  }
}

object QueueingActor {
  case class Resume()
}
 6
Author: Andrea,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2013-01-28 16:25:12