In this example, we'll extend the previous example and flesh it out a bit. We'll define a Subscriber class, which will subclass the Observer class. This class will implement the on_next(), on_completed(), and on_error() functions and simply print out any value passed to them.
We'll then use the same three_random_ints Observable that we defined in the previous example and publish these emissions using the .publish() function.
Below this, we'll subscribe three distinct subscribers to our Observable before calling the .connect() function, which defines that all our subscribers are ready so that they receive the same stream of emissions:
from rx import Observable, Observer
from random import randint
class Subscriber(Observer):
def __init__(self, ident):
self.id = ident
def on_next(self, value):
print("Subscriber: {} Received:
{}".format(self.id, value))
def on_completed(self):
print("Subscriber: {} Received
Events".format(self.id))
def on_error(self, error):
print("Error Occurred: {}".format(error))
three_emissions = Observable.range(1,3)
three_random_ints = three_emissions.map(lambda i:
randint(1, 10000)).publish()
three_random_ints.subscribe(Subscriber("Grant"))
three_random_ints.subscribe(Subscriber("Barry"))
three_random_ints.subscribe(Subscriber("Sophie"))
three_random_ints.connect()