In this example, we'll implement PrintObserver, which will subclass the rx.Observer class. This will implement the three necessary functions required: on_next(), on_completed(), and on_error(). Each of these three functions has an important role to play within our observers:
-
on_next(self, value): This is called whenever the observer receives a new event.
-
on_completed(self): This is called whenever our observable notifies it that it has completed its task.
-
on_error(self, error): This is called whenever we wish to handle error cases. Thankfully, in our simple example, we shouldn't have to worry about this.
This will be a relatively simple example that will just print out the values received when the on_next() function is called, but it should give you the basic idea of how to define your own observers.
Let's examine this code snippet that features a very simple RxPY observer and observable:
from rx import Observable, Observer
def push_five_strings(observer):
observer.on_next("Alpha")
observer.on_next("Beta")
observer.on_next("Gamma")
observer.on_next("Delta")
observer.on_next("Epsilon")
observer.on_completed()
class PrintObserver(Observer):
def on_next(self, value):
print("Received {0}".format(value))
def on_completed(self):
print("Done!")
def on_error(self, error):
print("Error Occurred: {0}".format(error))
source = Observable.create(push_five_strings)
source.subscribe(PrintObserver())
When we execute the preceding Python program, you should see that the five distinct strings are all printed out one after the other before the on_completed() function is called. This signals to our observer that there will be no further events, and the program then terminates:
$ python3.6 07_createObservable.py
Received Alpha
Received Beta
Received Gamma
Received Delta
Received Epsilon
Done!