Python Observer Pattern: Przykłady, Porady? [zamknięte]

zamknięte . To pytanie musi być bardziej skoncentrowane. Obecnie nie przyjmuje odpowiedzi.

Chcesz poprawić to pytanie? Update the question so it edytując ten post.

Zamknięte 2 lata temu .

Popraw to pytanie

Czy są jakieś przykładowe przykłady Gof Observer zaimplementowanego w Pythonie? Mam bitowy kod, który obecnie ma bity kodu debugowania splecione przez klasę klucza (obecnie generowanie komunikatów na stderr jeśli ustawiony jest magiczny env). Dodatkowo, klasa ma interfejs do stopniowego zwracania wyników, jak również do przechowywania ich (w pamięci) do przetwarzania końcowego. (Sama klasa jest menedżerem zadań do jednoczesnego wykonywania poleceń na zdalnych maszynach przez ssh).

Obecnie użycie klasy wygląda mniej więcej tak:

job = SSHJobMan(hostlist, cmd)
job.start()
while not job.done():
    for each in job.poll():
        incrementally_process(job.results[each])
        time.sleep(0.2) # or other more useful work
post_process(job.results)

Alernatywny model użycia to:

job = SSHJobMan(hostlist, cmd)
job.wait()  # implicitly performs a start()
process(job.results)

To wszystko działa dobrze dla obecnego narzędzia. Jednak brakuje elastyczność. Na przykład obecnie obsługuję krótki format wyjściowy lub pasek postępu jako wyniki przyrostowe, obsługuję również krótkie, kompletne i "scalone wiadomości" wyjścia dla funkcji post_process().

Chciałbym jednak obsługiwać wiele strumieni wyników / wyjść (pasek postępu do terminala, debugowanie i ostrzeżenia do pliku dziennika, wyjścia z udanych zadań do jednego pliku / katalogu,komunikaty o błędach i inne wyniki z nieudanych zadań do innego, itp.).

To brzmi jak sytuacja, która wymaga obserwatora ... niech instancje mojej klasy akceptują rejestrację z innych obiektów i oddzwaniają do nich z określonymi typami zdarzeń w miarę ich występowania.

Patrzę na PyPubSub ponieważ widziałem kilka odniesień do tego w tak powiązanych pytaniach. Nie jestem pewien, czy jestem gotowy, aby dodać zewnętrzną zależność do Mojego narzędzia, ale mogę zobaczyć wartość w użyciu ich interfejsu jako modelu dla mojego, jeśli to ma ułatwić innym korzystanie. (Projekt ma na celu zarówno samodzielne narzędzie wiersza poleceń, jak i klasa do pisania innych skryptów / narzędzi).

W skrócie wiem, jak robić to, co chcę ... ale jest wiele sposobów, aby to osiągnąć. Chcę sugestii, co najprawdopodobniej zadziała dla innych użytkowników kodu na dłuższą metę.

Sam kod znajduje się pod adresem: classh .

Author: riven, 2009-12-14

9 answers

Jednak brakuje mu elastyczności.

Cóż... właściwie to wygląda mi to na dobry projekt, jeśli asynchroniczne API jest tym, czego chcesz. Zazwyczaj tak jest. Może wystarczy przełączyć się ze stderr na Pythona logging moduł, który ma swój własny model publikowania/subskrybowania, co z Logger.addHandler() i tak dalej. Jeśli chcesz wspierać obserwatorów, radzę ci to uprościć. Naprawdę potrzebujesz tylko kilku linijek kodu.
class Event(object):
    pass

class Observable(object):
    def __init__(self):
        self.callbacks = []
    def subscribe(self, callback):
        self.callbacks.append(callback)
    def fire(self, **attrs):
        e = Event()
        e.source = self
        for k, v in attrs.iteritems():
            setattr(e, k, v)
        for fn in self.callbacks:
            fn(e)

Twoja klasa pracy podklasa can Observable. Gdy dzieje się coś interesującego, zadzwoń self.fire(type="progress", percent=50) lub tym podobne.

 55
Author: Jason Orendorff,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2009-12-18 03:01:00

Myślę, że ludzie w innych odpowiedziach przesadzają. W Pythonie można łatwo uzyskać zdarzenia za pomocą mniej niż 15 linii kodu.

Ty prosty masz dwie klasy: Event i Observer. Każda klasa, która chce nasłuchać zdarzenia, musi dziedziczyć Observer i ustawić listen (observe) dla określonego zdarzenia. Gdy Event zostanie uruchomiona instancja i wywołana, wszyscy obserwatorzy nasłuchujący tego zdarzenia uruchomią określone wywołanie zwrotne funkcje.

class Observer():
    _observers = []
    def __init__(self):
        self._observers.append(self)
        self._observables = {}
    def observe(self, event_name, callback):
        self._observables[event_name] = callback


class Event():
    def __init__(self, name, data, autofire = True):
        self.name = name
        self.data = data
        if autofire:
            self.fire()
    def fire(self):
        for observer in Observer._observers:
            if self.name in observer._observables:
                observer._observables[self.name](self.data)

Przykład :

class Room(Observer):

    def __init__(self):
        print("Room is ready.")
        Observer.__init__(self) # Observer's init needs to be called
    def someone_arrived(self, who):
        print(who + " has arrived!")

room = Room()
room.observe('someone arrived',  room.someone_arrived)

Event('someone arrived', 'Lenard')

Wyjście:

Room is ready.
Lenard has arrived!
 26
Author: Pithikos,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2015-04-27 17:01:42

Jeszcze kilka podejść...

Przykład: moduł logowania

Może wystarczy przełączyć się ze stderr na Pythona logging moduł, który ma potężny model publikowania/subskrybowania.

Łatwo rozpocząć tworzenie zapisów dziennika.

# producer
import logging

log = logging.getLogger("myjobs")  # that's all the setup you need

class MyJob(object):
    def run(self):
        log.info("starting job")
        n = 10
        for i in range(n):
            log.info("%.1f%% done" % (100.0 * i / n))
        log.info("work complete")
Po stronie konsumenta jest trochę więcej pracy. Niestety Konfiguracja wyjścia loggera zajmuje 7 całych linii kodu. ;)
# consumer
import myjobs, sys, logging

if user_wants_log_output:
    ch = logging.StreamHandler(sys.stderr)
    ch.setLevel(logging.INFO)
    formatter = logging.Formatter(
        "%(asctime)s - %(name)s - %(levelname)s - %(message)s")
    ch.setFormatter(formatter)
    myjobs.log.addHandler(ch)
    myjobs.log.setLevel(logging.INFO)

myjobs.MyJob().run()

Z drugiej strony jest niesamowita ilość rzeczy w pakiecie logowania. Jeśli kiedykolwiek będziesz musiał wysłać dane dziennika do obracającego się zestawu plików, adresu e-mail i dziennika zdarzeń systemu Windows, możesz to zrobić.

Przykład: najprostszy możliwy obserwator

Ale nie musisz w ogóle korzystać z żadnej biblioteki. Niezwykle prostym sposobem wspierania obserwatorów jest wywołanie metody, która nic nie robi.

# producer
class MyJob(object):
    def on_progress(self, pct):
        """Called when progress is made. pct is the percent complete.
        By default this does nothing. The user may override this method
        or even just assign to it."""
        pass

    def run(self):
        n = 10
        for i in range(n):
            self.on_progress(100.0 * i / n)
        self.on_progress(100.0)

# consumer
import sys, myjobs
job = myjobs.MyJob()
job.on_progress = lambda pct: sys.stdout.write("%.1f%% done\n" % pct)
job.run()

Czasami zamiast pisać lambdę, można po prostu powiedzieć job.on_progress = progressBar.update, co jest miłe.

To jest tak proste, jak to tylko możliwe. Jeden wadą jest to, że nie obsługuje naturalnie wielu słuchaczy subskrybujących te same wydarzenia.

Przykład: zdarzenia podobne do C #

Z odrobiną kodu wsparcia, możesz uzyskać zdarzenia podobne do C#w Pythonie. Oto kod:

# glue code
class event(object):
    def __init__(self, func):
        self.__doc__ = func.__doc__
        self._key = ' ' + func.__name__
    def __get__(self, obj, cls):
        try:
            return obj.__dict__[self._key]
        except KeyError, exc:
            be = obj.__dict__[self._key] = boundevent()
            return be

class boundevent(object):
    def __init__(self):
        self._fns = []
    def __iadd__(self, fn):
        self._fns.append(fn)
        return self
    def __isub__(self, fn):
        self._fns.remove(fn)
        return self
    def __call__(self, *args, **kwargs):
        for f in self._fns[:]:
            f(*args, **kwargs)

Producent deklaruje wydarzenie za pomocą dekoratora:

# producer
class MyJob(object):
    @event
    def progress(pct):
        """Called when progress is made. pct is the percent complete."""

    def run(self):
        n = 10
        for i in range(n+1):
            self.progress(100.0 * i / n)

#consumer
import sys, myjobs
job = myjobs.MyJob()
job.progress += lambda pct: sys.stdout.write("%.1f%% done\n" % pct)
job.run()

Działa to dokładnie tak, jak powyższy kod" simple observer", ale możesz dodać dowolną liczbę słuchaczy za pomocą +=. (W przeciwieństwie do C#, nie ma typów obsługi zdarzeń, nie masz new EventHandler(foo.bar) podczas subskrybowania zdarzenia i nie musisz sprawdzać czy nie ma null przed uruchomieniem zdarzenia. Podobnie jak C#, zdarzenia Nie squelch WYJĄTKÓW.)

Jak wybrać

Jeśli logging zrobi wszystko, czego potrzebujesz, użyj tego. W przeciwnym razie zrób najprostszą rzecz, która działa dla Ciebie. Najważniejsze jest to, że nie musisz brać na siebie dużej zewnętrznej zależności.

 15
Author: Jason Orendorff,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2009-12-18 04:59:42

Może implementacja, w której obiekty nie są utrzymywane przy życiu tylko dlatego, że coś obserwują? Poniżej znajduje się implementacja wzorca obserwatora o następujących cechach:

  1. użycie jest pythoniczne. Aby dodać obserwatora do metody bound .bar instancji foo, wystarczy wykonać foo.bar.addObserver(observer).
  2. Obserwatorzy nie są utrzymywani przy życiu z racji bycia obserwatorami. Innymi słowy, kod obserwatora nie używa mocnych odniesień.
  3. nie wymaga podklasowania (deskryptory ftw).
  4. może być używany z niezniszczalnymi typami.
  5. może być używany tyle razy, ile chcesz w jednej klasie.
  6. (bonus) od dziś kod istnieje w odpowiednim do pobrania, instalowalnym pakiecie na GitHubie .

Oto kod ( pakiet github lub Pakiet PyPI mają najbardziej aktualną implementację):

import weakref
import functools

class ObservableMethod(object):
    """
    A proxy for a bound method which can be observed.

    I behave like a bound method, but other bound methods can subscribe to be
    called whenever I am called.
    """

    def __init__(self, obj, func):
        self.func = func
        functools.update_wrapper(self, func)
        self.objectWeakRef = weakref.ref(obj)
        self.callbacks = {}  #observing object ID -> weak ref, methodNames

    def addObserver(self, boundMethod):
        """
        Register a bound method to observe this ObservableMethod.

        The observing method will be called whenever this ObservableMethod is
        called, and with the same arguments and keyword arguments. If a
        boundMethod has already been registered to as a callback, trying to add
        it again does nothing. In other words, there is no way to sign up an
        observer to be called back multiple times.
        """
        obj = boundMethod.__self__
        ID = id(obj)
        if ID in self.callbacks:
            s = self.callbacks[ID][1]
        else:
            wr = weakref.ref(obj, Cleanup(ID, self.callbacks))
            s = set()
            self.callbacks[ID] = (wr, s)
        s.add(boundMethod.__name__)

    def discardObserver(self, boundMethod):
        """
        Un-register a bound method.
        """
        obj = boundMethod.__self__
        if id(obj) in self.callbacks:
            self.callbacks[id(obj)][1].discard(boundMethod.__name__)

    def __call__(self, *arg, **kw):
        """
        Invoke the method which I proxy, and all of it's callbacks.

        The callbacks are called with the same *args and **kw as the main
        method.
        """
        result = self.func(self.objectWeakRef(), *arg, **kw)
        for ID in self.callbacks:
            wr, methodNames = self.callbacks[ID]
            obj = wr()
            for methodName in methodNames:
                getattr(obj, methodName)(*arg, **kw)
        return result

    @property
    def __self__(self):
        """
        Get a strong reference to the object owning this ObservableMethod

        This is needed so that ObservableMethod instances can observe other
        ObservableMethod instances.
        """
        return self.objectWeakRef()


class ObservableMethodDescriptor(object):

    def __init__(self, func):
        """
        To each instance of the class using this descriptor, I associate an
        ObservableMethod.
        """
        self.instances = {}  # Instance id -> (weak ref, Observablemethod)
        self._func = func

    def __get__(self, inst, cls):
        if inst is None:
            return self
        ID = id(inst)
        if ID in self.instances:
            wr, om = self.instances[ID]
            if not wr():
                msg = "Object id %d should have been cleaned up"%(ID,)
                raise RuntimeError(msg)
        else:
            wr = weakref.ref(inst, Cleanup(ID, self.instances))
            om = ObservableMethod(inst, self._func)
            self.instances[ID] = (wr, om)
        return om

    def __set__(self, inst, val):
        raise RuntimeError("Assigning to ObservableMethod not supported")


def event(func):
    return ObservableMethodDescriptor(func)


class Cleanup(object):
    """
    I manage remove elements from a dict whenever I'm called.

    Use me as a weakref.ref callback to remove an object's id from a dict
    when that object is garbage collected.
    """
    def __init__(self, key, d):
        self.key = key
        self.d = d

    def __call__(self, wr):
        del self.d[self.key]

Aby tego użyć, po prostu dekorujemy metody, które chcemy uczynić obserwowalnymi za pomocą @event. Oto przykład

class Foo(object):
    def __init__(self, name):
        self.name = name

    @event
    def bar(self):
        print("%s called bar"%(self.name,))

    def baz(self):
        print("%s called baz"%(self.name,))

a = Foo('a')
b = Foo('b')
a.bar.addObserver(b.bar)
a.bar()
 8
Author: DanielSank,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2014-05-27 00:43:51

From wikipedia :

from collections import defaultdict

class Observable (defaultdict):

  def __init__ (self):
      defaultdict.__init__(self, object)

  def emit (self, *args):
      '''Pass parameters to all observers and update states.'''
      for subscriber in self:
          response = subscriber(*args)
          self[subscriber] = response

  def subscribe (self, subscriber):
      '''Add a new subscriber to self.'''
      self[subscriber]

  def stat (self):
      '''Return a tuple containing the state of each observer.'''
      return tuple(self.values())

Obserwowalny jest używany w ten sposób.

myObservable = Observable ()

# subscribe some inlined functions.
# myObservable[lambda x, y: x * y] would also work here.
myObservable.subscribe(lambda x, y: x * y)
myObservable.subscribe(lambda x, y: float(x) / y)
myObservable.subscribe(lambda x, y: x + y)
myObservable.subscribe(lambda x, y: x - y)

# emit parameters to each observer
myObservable.emit(6, 2)

# get updated values
myObservable.stat()         # returns: (8, 3.0, 4, 12)
 4
Author: Ewan Todd,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2009-12-14 23:57:24

Bazując na odpowiedzi Jasona, zaimplementowałem przykład zdarzeń podobnych do C#jako pełnoprawny moduł Pythona zawierający dokumentację i testy. I love fancy pythonic stuff :)

Więc, jeśli chcesz jakieś gotowe rozwiązanie, możesz po prostu użyć kodu na GitHubie .

 4
Author: aepsil0n,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2013-08-02 17:16:23

Przykład: twisted log observers

Aby zarejestrować obserwatora yourCallable() (wywołanie, które akceptuje Słownik), aby odbierać wszystkie zdarzenia dziennika (oprócz innych obserwatorów):

twisted.python.log.addObserver(yourCallable)

Przykład: kompletny przykład producenta / konsumenta

Z listy dyskusyjnej Twisted-Python:

#!/usr/bin/env python
"""Serve as a sample implementation of a twisted producer/consumer
system, with a simple TCP server which asks the user how many random
integers they want, and it sends the result set back to the user, one
result per line."""

import random

from zope.interface import implements
from twisted.internet import interfaces, reactor
from twisted.internet.protocol import Factory
from twisted.protocols.basic import LineReceiver

class Producer:
    """Send back the requested number of random integers to the client."""
    implements(interfaces.IPushProducer)
    def __init__(self, proto, cnt):
        self._proto = proto
        self._goal = cnt
        self._produced = 0
        self._paused = False
    def pauseProducing(self):
        """When we've produced data too fast, pauseProducing() will be
called (reentrantly from within resumeProducing's transport.write
method, most likely), so set a flag that causes production to pause
temporarily."""
        self._paused = True
        print('pausing connection from %s' % (self._proto.transport.getPeer()))
    def resumeProducing(self):
        self._paused = False
        while not self._paused and self._produced < self._goal:
            next_int = random.randint(0, 10000)
            self._proto.transport.write('%d\r\n' % (next_int))
            self._produced += 1
        if self._produced == self._goal:
            self._proto.transport.unregisterProducer()
            self._proto.transport.loseConnection()
    def stopProducing(self):
        pass

class ServeRandom(LineReceiver):
    """Serve up random data."""
    def connectionMade(self):
        print('connection made from %s' % (self.transport.getPeer()))
        self.transport.write('how many random integers do you want?\r\n')
    def lineReceived(self, line):
        cnt = int(line.strip())
        producer = Producer(self, cnt)
        self.transport.registerProducer(producer, True)
        producer.resumeProducing()
    def connectionLost(self, reason):
        print('connection lost from %s' % (self.transport.getPeer()))
factory = Factory()
factory.protocol = ServeRandom
reactor.listenTCP(1234, factory)
print('listening on 1234...')
reactor.run()
 2
Author: jfs,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2009-12-18 01:30:03

A functional approach to observer design:

def add_listener(obj, method_name, listener):

    # Get any existing listeners
    listener_attr = method_name + '_listeners'
    listeners = getattr(obj, listener_attr, None)

    # If this is the first listener, then set up the method wrapper
    if not listeners:

        listeners = [listener]
        setattr(obj, listener_attr, listeners)

        # Get the object's method
        method = getattr(obj, method_name)

        @wraps(method)
        def method_wrapper(*args, **kwags):
            method(*args, **kwags)
            for l in listeners:
                l(obj, *args, **kwags) # Listener also has object argument

        # Replace the original method with the wrapper
        setattr(obj, method_name, method_wrapper)

    else:
        # Event is already set up, so just add another listener
        listeners.append(listener)


def remove_listener(obj, method_name, listener):

    # Get any existing listeners
    listener_attr = method_name + '_listeners'
    listeners = getattr(obj, listener_attr, None)

    if listeners:
        # Remove the listener
        next((listeners.pop(i)
              for i, l in enumerate(listeners)
              if l == listener),
             None)

        # If this was the last listener, then remove the method wrapper
        if not listeners:
            method = getattr(obj, method_name)
            delattr(obj, listener_attr)
            setattr(obj, method_name, method.__wrapped__)

Te metody mogą być następnie użyte do dodania słuchacza do dowolnej metody klasy. Na przykład:

class MyClass(object):

    def __init__(self, prop):
        self.prop = prop

    def some_method(self, num, string):
        print('method:', num, string)

def listener_method(obj, num, string):
    print('listener:', num, string, obj.prop)

my = MyClass('my_prop')

add_listener(my, 'some_method', listener_method)
my.some_method(42, 'with listener')

remove_listener(my, 'some_method', listener_method)
my.some_method(42, 'without listener')

A Wyjście To:

method: 42 with listener
listener: 42 with listener my_prop
method: 42 without listener
 1
Author: Dane White,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2013-12-18 19:31:47

OP pyta "czy są jakieś przykładowe przykłady Gof Observer zaimplementowane w Pythonie?" To jest an Przykład w Pythonie 3.7. Ta obserwowalna Klasa spełnia wymóg stworzenia relacji pomiędzy jednym obserwowalnym i wieloma obserwatorami pozostając niezależnymi od ich struktury.

from functools import partial
from dataclasses import dataclass, field
import sys
from typing import List, Callable


@dataclass
class Observable:
    observers: List[Callable] = field(default_factory=list)

    def register(self, observer: Callable):
        self.observers.append(observer)

    def deregister(self, observer: Callable):
        self.observers.remove(observer)

    def notify(self, *args, **kwargs):
        for observer in self.observers:
            observer(*args, **kwargs)


def usage_demo():
    observable = Observable()

    # Register two anonymous observers using lambda.
    observable.register(
        lambda *args, **kwargs: print(f'Observer 1 called with args={args}, kwargs={kwargs}'))
    observable.register(
        lambda *args, **kwargs: print(f'Observer 2 called with args={args}, kwargs={kwargs}'))

    # Create an observer function, register it, then deregister it.
    def callable_3():
        print('Observer 3 NOT called.')

    observable.register(callable_3)
    observable.deregister(callable_3)

    # Create a general purpose observer function and register four observers.
    def callable_x(*args, **kwargs):
        print(f'{args[0]} observer called with args={args}, kwargs={kwargs}')

    for gui_field in ['Form field 4', 'Form field 5', 'Form field 6', 'Form field 7']:
        observable.register(partial(callable_x, gui_field))

    observable.notify('test')


if __name__ == '__main__':
    sys.exit(usage_demo())
 1
Author: lemi57ssss,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2018-11-06 22:09:49