We'll again leverage the code that the official library provides in its documentation as it is a fantastic example of just how you can achieve concurrency within your own RxPY programs.
In this example, we'll use the rx.concurrency ThreadPoolScheduler class as our necessary cross-thread scheduler. We will then create three distinct observables that emit events at various intervals:
import multiprocessing
import random
import time
from threading import current_thread
from rx import Observable
from rx.concurrency import ThreadPoolScheduler
def processHeavyCalc(value):
time.sleep(random.randint(5,20) * .1)
return value
# calculate number of CPU's, then create a
ThreadPoolScheduler with that number of threads
optimal_thread_count = multiprocessing.cpu_count()
pool_scheduler =
ThreadPoolScheduler(optimal_thread_count)
# Create Process 1
Observable.from_(["Alpha", "Beta", "Gamma",
"Delta", "Epsilon"]) \
.map(lambda s: processHeavyCalc(s)) \
.subscribe_on(pool_scheduler) \
.subscribe(on_next=lambda s: print("PROCESS 1:
{0} {1}".format(current_thread().name, s)),
on_error=lambda e: print(e),
on_completed=lambda: print("PROCESS 1
done!"))
# Create Process 2
Observable.range(1, 10) \
.map(lambda s: processHeavyCalc(s)) \
.subscribe_on(pool_scheduler) \
.subscribe(on_next=lambda i: print("PROCESS 2:
{0} {1}".format(current_thread().name, i)),
on_error=lambda e: print(e),
on_completed=lambda: print("PROCESS 2 done!"))
# Create Process 3, which is infinite
Observable.interval(1000) \
.map(lambda i: i * 100) \
.observe_on(pool_scheduler) \
.map(lambda s: processHeavyCalc(s)) \
.subscribe(on_next=lambda i: print("PROCESS 3: {0} {1}".format(current_thread().name, i)),
on_error=lambda e: print(e))
input("Press any key to exit\n")