Relacja między programami obsługi poleceń, agregatami, repozytorium i magazynem zdarzeń w CQRS

Chciałbym zrozumieć pewne szczegóły relacji między programami obsługi poleceń, agregatami, repozytorium i magazynem zdarzeń w systemach opartych na CQRS.

Co do tej pory zrozumiałem:

  • programy obsługi poleceń otrzymują polecenia z magistrali. Są one odpowiedzialne za załadowanie odpowiedniego agregatu z repozytorium i wywołanie logiki domeny na agregacie. Po zakończeniu usuwają polecenie z magistrali.
  • agregat zapewnia zachowanie i wewnętrzny stan. Państwo nigdy nie jest publiczne. Jedynym sposobem na zmianę stanu jest użycie zachowania. Metody, które modelują to zachowanie, tworzą zdarzenia z właściwości polecenia i stosują te zdarzenia do agregatu, które z kolei wywołują procedury obsługi zdarzeń, które odpowiednio ustawiają stan wewnętrzny.
  • repozytorium po prostu umożliwia ładowanie agregatów na danym ID i dodawanie nowych agregatów. Zasadniczo repozytorium łączy domenę ze sklepem zdarzeń.
  • the event store, last but not co najmniej, jest odpowiedzialny za przechowywanie zdarzeń do bazy danych (lub jakiegokolwiek magazynu używanego) i przeładowanie tych zdarzeń jako tzw. strumień zdarzeń.

Jak na razie dobrze. Teraz są pewne problemy, których jeszcze nie dostałem:

  • Jeśli funkcja obsługi poleceń ma wywoływać zachowanie na jeszcze istniejącym agregacie, wszystko jest dość proste. Funkcja obsługi poleceń pobiera odniesienie do repozytorium, wywołuje jego metodę loadById i zwracane jest agregat. Ale co robi obsługa poleceń, gdy nie ma jeszcze agregatu, ale trzeba go stworzyć? Z mojego zrozumienia agregat powinien być później odbudowany przy użyciu zdarzeń. Oznacza to, że tworzenie agregatu odbywa się w odpowiedzi na zdarzenie fooCreated. Ale aby móc przechowywać dowolne wydarzenie (w tym fooCreated), potrzebuję agregatu. Wygląda to dla mnie jak problem z kurczakiem i jajkiem: nie mogę utworzyć agregatu bez zdarzenia, ale jedynym komponentem, który powinien tworzyć zdarzenia, jest agregat. Więc w zasadzie to przychodzi do: jak tworzyć nowe Agregaty, kto co robi?
  • gdy agregat uruchamia Zdarzenie, wewnętrzny moduł obsługi zdarzeń odpowiada na to zdarzenie (Zwykle wywołując je za pomocą metody apply) i zmienia stan agregatu. Jak to wydarzenie jest przekazywane do repozytorium? Kto rozpoczyna akcję "Prześlij nowe zdarzenia do repozytorium / magazynu zdarzeń"? Sam agregat? Repozytorium oglądając agregat? Ktoś inny, kto jest zapisany na wewnętrzne wydarzenia? ...?
  • na koniec mam problem z poprawnym zrozumieniem pojęcia strumienia wydarzeń: w mojej wyobraźni jest to po prostu uporządkowana lista wydarzeń. Ważne jest to, że jest "uporządkowany". Czy to prawda?
Author: Dennis Traub, 2012-09-11

3 answers

Poniższy tekst jest oparty na moim własnym doświadczeniu i moich eksperymentach z różnymi frameworkami, takimi jak Lokad.CQRS, NCQRS itp. Jestem pewien, że jest wiele sposobów, by to załatwić. Zamieszczę to, co ma dla mnie największy sens.

1. Tworzenie Zbiorcze:

Za każdym razem, gdy obsługa poleceń potrzebuje agregatu, używa repozytorium. Repozytorium pobiera odpowiednią listę zdarzeń ze sklepu zdarzeń i wywołuje przeciążony konstruktor, wstrzykując zdarzenia

var stream = eventStore.LoadStream(id)
var User = new User(stream)

Jeśli agregat nie istniał wcześniej, strumień będzie pusty, a nowo utworzony obiekt będzie w oryginalnym stanie. Możesz się upewnić, że w tym stanie tylko kilka poleceń może ożywić agregat, np. User.Create().

2. Przechowywanie nowych zdarzeń

Obsługa poleceń odbywa się wewnątrz jednostki pracy. Podczas wykonywania polecenia każde wynikowe zdarzenie zostanie dodane do listy wewnątrz agregatu (User.Changes). Po zakończeniu realizacji, zmiany zostaną dodane do magazynu wydarzeń. W poniższym przykładzie dzieje się to w następującej linii:

store.AppendToStream(cmd.UserId, stream.Version, user.Changes)

3. Kolejność zdarzeń

Wyobraź sobie, co by się stało, gdyby dwa kolejne zdarzenia zostały powtórzone w złej kolejności.

Przykład

Postaram się zilustrować fragment pseudo-kodu (celowo zostawiłem obawy repozytorium wewnątrz Handlera poleceń, aby pokazać, co się stanie za sceny):

Usługa Aplikacji:

UserCommandHandler
    Handle(CreateUser cmd)
        stream = store.LoadStream(cmd.UserId)
        user = new User(stream.Events)
        user.Create(cmd.UserName, ...)
        store.AppendToStream(cmd.UserId, stream.Version, user.Changes)

    Handle(BlockUser cmd)
        stream = store.LoadStream(cmd.UserId)
        user = new User(stream.Events)
        user.Block(string reason)
        store.AppendToStream(cmd.UserId, stream.Version, user.Changes)

Agregat:

User
    created = false
    blocked = false

    Changes = new List<Event>

    ctor(eventStream)
        foreach (event in eventStream)
            this.Apply(event)

    Create(userName, ...)
        if (this.created) throw "User already exists"
        this.Apply(new UserCreated(...))

    Block(reason)
        if (!this.created) throw "No such user"
        if (this.blocked) throw "User is already blocked"
        this.Apply(new UserBlocked(...))

    Apply(userCreatedEvent)
        this.created = true
        this.Changes.Add(userCreatedEvent)

    Apply(userBlockedEvent)
        this.blocked = true
        this.Changes.Add(userBlockedEvent)

Update:

Na marginesie: odpowiedź Yves ' a przypomniała mi ciekawy artykuł Udi Dahan sprzed kilku lat:

 32
Author: Dennis Traub,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2013-07-16 10:04:14

Mała wariacja na temat Dennisa doskonała odpowiedź:

  • Kiedy mamy do czynienia z "kreatywnymi" przypadkami użycia (tzn. które powinny wydzielać nowe Agregaty), spróbuj znaleźć inny agregat lub fabrykę, do której możesz przenieść tę odpowiedzialność. Nie jest to sprzeczne z posiadaniem ctor, który bierze zdarzenia do nawodnienia (lub jakikolwiek inny mechanizm do nawodnienia dla tej sprawy). Czasami fabryka jest tylko metodą statyczną( dobrą do przechwytywania "kontekstu" / "intencji"), czasami jest to metoda instancji innej agregat (dobre miejsce dla dziedziczenia "danych"), czasami jest to jawny obiekt fabryczny (dobre miejsce dla "złożonej" logiki tworzenia).
  • lubię udostępniać jawną metodę GetChanges () na moim agregacie, która zwraca wewnętrzną listę jako tablicę. Jeśli mój agregat ma pozostać w pamięci poza jednym wykonaniem, dodaję również metodę AcceptChanges (), aby wskazać, że wewnętrzna lista powinna zostać wyczyszczona(zwykle wywoływana po spłukaniu rzeczy do magazynu zdarzeń). Możesz użyć albo pociągnięcia (GetChanges / Changes) lub push (think. Net event lub iobservable) model oparty tutaj. Wiele zależy od semantyki transakcyjnej, technologii, potrzeb itp ...
  • twój strumień zdarzeń jest połączoną listą. Każda rewizja (event/changeset) wskazująca na poprzednią (a.k. a. rodzica). Twój strumień zdarzeń to sekwencja zdarzeń/zmian, które miały miejsce w określonym agregacie. Zamówienie ma być zagwarantowane tylko w granicach agregatu.
 10
Author: Yves Reynhout,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2012-09-11 08:26:17

Prawie zgadzam się z Yves ' em reynhoutem i Dennisem traubem, ale chcę ci pokazać, jak to robię. Chcę pozbawić moje Agregaty odpowiedzialności za stosowanie zdarzeń na siebie lub ponowne nawodnienie się; w przeciwnym razie istnieje wiele duplikacji kodu: każdy konstruktor agregatów będzie wyglądał tak samo: {]}

UserAggregate:
    ctor(eventStream)
         foreach (event in eventStream)
            this.Apply(event)


OrderAggregate:
    ctor(eventStream)
         foreach (event in eventStream)
            this.Apply(event)


ProfileAggregate:
    ctor(eventStream)
         foreach (event in eventStream)
            this.Apply(event)
Obowiązki te można pozostawić dyspozytorowi. Polecenie jest obsługiwane bezpośrednio przez agregat.
Command dispatcher class

    dispatchCommand(command) method:
        newEvents = ConcurentProofFunctionCaller.executeFunctionUntilSucceeds(tryToDispatchCommand)
        EventDispatcher.dispatchEvents(newEvents)

    tryToDispatchCommand(command) method:
        aggregateClass = CommandSubscriber.getAggregateClassForCommand(command)
        aggregate = AggregateRepository.loadAggregate(aggregateClass, command.getAggregateId())
        newEvents = CommandApplier.applyCommandOnAggregate(aggregate, command)
        AggregateRepository.saveAggregate(command.getAggregateId(), aggregate, newEvents)

ConcurentProofFunctionCaller class

    executeFunctionUntilSucceeds(pureFunction) method:
        do this n times
            try
                call result=pureFunction()
                return result
            catch(ConcurentWriteException)
                continue
        throw TooManyRetries    

AggregateRepository class

     loadAggregate(aggregateClass, aggregateId) method:
         aggregate = new aggregateClass
         priorEvents = EventStore.loadEvents()
         this.applyEventsOnAggregate(aggregate, priorEvents)

     saveAggregate(aggregateId, aggregate, newEvents)
        this.applyEventsOnAggregate(aggregate, newEvents)
        EventStore.saveEventsForAggregate(aggregateId, newEvents, priorEvents.version)

SomeAggregate class
    handleCommand1(command1) method:
        return new SomeEvent or throw someException BUT don't change state!
    applySomeEvent(SomeEvent) method:
        changeStateSomehow() and not throw any exception and don't return anything!

Należy pamiętać, że jest to pseudo kod wyświetlany z aplikacji PHP; prawdziwy kod powinien mieć rzeczy wstrzykiwane i inne obowiązki refakturowane w innych klasach. Ideea jest utrzymanie agregatów jak najbardziej czyste i uniknąć powielania kodu.

Niektóre ważne aspekty dotyczące agregatów:

  1. obsługa poleceń nie powinna zmieniać stanu; generują zdarzenia lub throw exceptions
  2. event ma zastosowanie nie powinien rzucać żadnych WYJĄTKÓW i nie powinien niczego zwracać; zmieniają się tylko stan wewnętrzny

Open-source implementacja PHP tego można znaleźć tutaj .

 0
Author: Constantin Galbenu,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2018-03-17 06:19:39