A library for composing asynchronous and event-based programs using observable collections and query operator functions in Python
This branch is work-in-progress.
FOR V 1.X PLEASE GO TO STABLE BRANCH
Reactive Extensions for Python (RxPY) is a set of libraries for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators in Python. Using Rx, developers represent asynchronous data streams with Observables, query asynchronous data streams using operators, and parameterize concurrency in data/event streams using Schedulers.
Using Rx, you can represent multiple asynchronous data streams (that come from diverse sources, e.g., stock quote, Tweets, computer events, web service requests, etc.), and subscribe to the event stream using the Observer object. The Observable notifies the subscribed Observer instance whenever an event occurs. You can put various transformations in-between the source Observable and the consuming Observer as well.
Because Observable sequences are data streams, you can query them using standard LINQ-like query operators implemented by the Observable type. Thus you can filter, map, reduce, compose and perform time-based operations on multiple events easily by using these static LINQ operators. In addition, there are a number of other reactive stream specific operators that allow powerful queries to be written. Cancellation, exceptions, and synchronization are also handled gracefully by using the methods on the Observable object.
- Getting started with RxPY
- General Rx/ReactiveX tutorials
RxPY v3.x runs on Python 3.6 or above. To install RxPY:
pip3 install --pre rx
For Python 2.x you need to use version 1.6
pip install rx==1.6.1
Join the conversation on Slack!
The gracious folks at PySlackers have given us a home in the #rxpy Slack channel. Please join us there for questions, conversations, and all things related to RxPy.
To join, navigate the page above to receive an email invite. After signing up, join us in the #rxpy channel.
Please follow the community guidelines and terms of service.
An Observable
is the core type in ReactiveX. It serially pushes items, known as emissions, through a series of operators until it finally arrives at an Observer
, where they are consumed.
Push-based (rather than pull-based) iteration opens up powerful new possibilities to express code and concurrency much more quickly. Because an Observable
treats events as data and data as events, composing the two together becomes trivial.
There are many ways to create an Observable
that hands items to an Observer
. You can use a create()
factory and pass it a function that hands items to the Observer
. The Observer
implements on_next()
, on_completed()
, and on_error()
functions. The on_next()
is used to pass items. The on_completed()
will signal no more items are coming, and the on_error()
signals an error.
For instance, you can implement an Observer
with these three methods and simply print these events. Then the create()
can leverage a function that passes five strings to the Observer
by calling those events.
import rx
def push_five_strings(observer):
observer.on_next("Alpha")
observer.on_next("Beta")
observer.on_next("Gamma")
observer.on_next("Delta")
observer.on_next("Epsilon")
observer.on_completed()
class PrintObserver(rx.Observer):
def on_next(self, value):
print("Received {0}".format(value))
def on_completed(self):
print("Done!")
def on_error(self, error):
print("Error Occurred: {0}".format(error))
source = rx.create(push_five_strings)
source.subscribe(PrintObserver())
OUTPUT:
Received Alpha
Received Beta
Received Gamma
Received Delta
Received Epsilon
Done!
However, there are many Observable
factories for common sources of emissions. To simply push five items, we can rid the create()
and its backing function, and use of()
. This factory accepts an argument list, iterates each emission as an on_next()
, and then calls on_completed()
when iteration is complete. Therefore, we can simply pass these five Strings as arguments to it.
import rx
class PrintObserver(rx.Observer):
def on_next(self, value):
print("Received {0}".format(value))
def on_completed(self):
print("Done!")
def on_error(self, error):
print("Error Occurred: {0}".format(error))
source = rx.of("Alpha", "Beta", "Gamma", "Delta", "Epsilon")
source.subscribe(PrintObserver())
Most of the time you will not want to go through the verbosity of implementing your own Observer
. You can instead pass 1 to 3 lambda arguments to subscribe_()
specifying the on_next
, on_complete
, and on_error
actions.
import rx
source = rx.of("Alpha", "Beta", "Gamma", "Delta", "Epsilon")
source.subscribe(
on_next=lambda value: print("Received {0}".format(value)),
on_completed=lambda: print("Done!"),
on_error=lambda error: print("Error Occurred: {0}".format(error))
)
You do not have to specify all three events types. You can pick and choose which events you want to observe using the named arguments, or simply provide a single lambda for the on_next
. Typically in production, you will want to provide an on_error
so errors are explicitly handled by the subscriber.
import rx
source = rx.of("Alpha", "Beta", "Gamma", "Delta", "Epsilon")
source.subscribe(lambda value: print("Received {0}".format(value)))
OUTPUT:
Received Alpha
Received Beta
Received Gamma
Received Delta
Received Epsilon
You can also derive new Observables using over 130 operators available in RxPY. Each operator will yield a new Observable
that transforms emissions from the source in some way. For example, we can map()
each String
to its length, then filter()
for lengths being at least 5. These will yield two separate Observables built off each other.
import rx
from rx import operators as ops
source = rx.of("Alpha", "Beta", "Gamma", "Delta", "Epsilon")
composed = source.pipe(
ops.map(lambda s: len(s)),
ops.filter(lambda i: i >= 5)
)
composed.subscribe(lambda value: print("Received {0}".format(value)))
OUTPUT:
Received 5
Received 5
Received 5
Received 7
Typically, you do not want to save Observables into intermediary variables for each operator, unless you want to have multiple subscribers at that point. Instead, you want to strive to inline and create an "Observable pipeline" of operations. That way your code is readable and tells a story much more easily.
import rx
from rx import operators as ops
rx.of("Alpha", "Beta", "Gamma", "Delta", "Epsilon").pipe(
ops.map(lambda s: len(s)),
ops.filter(lambda i: i >= 5)
).subscribe(lambda value: print("Received {0}".format(value)))
On top of data, Observables can also emit events. By treating data and events the same way, you can do powerful compositions to make the two work together. Below, we have an Observable
that emits a consecutive integer every 1 second. This Observable
will run infinitely and never call on_complete
.
import rx
from rx import operators as ops
rx.interval(1.0).pipe(
ops.map(lambda i: "{0} Mississippi".format(i))
).subscribe(lambda s: print(s))
input("Press any key to quit\n")
OUTPUT:
0 Mississippi
1 Mississippi
2 Mississippi
3 Mississippi
4 Mississippi
5 Mississippi
6 Mississippi
...
Because interval()
operates on a separate thread (via the TimeoutScheduler
), we need to prevent the application from exiting prematurely before it has a chance to fire. We can use an input()
to stop the main thread until a key is pressed. Observables can be created for button events, requests, timers, and even live Twitter feeds.
Each Subscriber to an Observable
often will receive a separate stream of emissions. For instance, having two subscribers to this Observable
emitting three random integers will result in both subscribers getting different numbers.
import rx
from rx import operators as ops
from random import randint
three_emissions = rx.from_range(1, 3)
mapper = ops.map(lambda i: randint(1, 100000))
three_random_ints = three_emissions.pipe(mapper)
three_random_ints.subscribe(lambda i: print("Subscriber 1 Received: {0}".format(i)))
three_random_ints.subscribe(lambda i: print("Subscriber 2 Received: {0}".format(i)))
OUTPUT:
Subscriber 1 Received: 79262
Subscriber 1 Received: 20892
Subscriber 1 Received: 69197
Subscriber 2 Received: 66574
Subscriber 2 Received: 41177
Subscriber 2 Received: 47445
To force a specifc point in an Observable
chain to push the same emissions to all subscribers (rather than generating a separate stream of emissions for each subscriber), you can call publish()
to return a ConnectableObservable
. Then you can set up your subscribers and call connect()
when they are ready to receive the same stream of emissions.
from rx import from_range, operators as op
from random import randint
three_emissions = from_range(1, 3)
three_random_ints = three_emissions.pipe(
op.map(lambda i: randint(1, 100000)).publish()
)
three_random_ints.subscribe(lambda i: print("Subscriber 1 Received: {0}".format(i)))
three_random_ints.subscribe(lambda i: print("Subscriber 2 Received: {0}".format(i)))
three_random_ints.connect()
OUTPUT:
Subscriber 1 Received: 90994
Subscriber 2 Received: 90994
Subscriber 1 Received: 91213
Subscriber 2 Received: 91213
Subscriber 1 Received: 42335
Subscriber 2 Received: 42335
This takes a cold Observable
(which "replays" operations for each subscriber) and makes it hot by putting all Observers on the same stream of emissions which are broadcasted in live time. Be sure to have all your Observers set up before calling connect()
, as any tardy Observers that subscribe after connect()
is called will miss any previous emissions.
Another way to implement mutlicasting is to use the auto_connect()
operator on a ConnectableObservable
. This will start firing emissions the moment it gets a subscriber, and will continue to fire even as subscribers come and go. If you provide an integer argument, it will hold off firing until there are that many subscribers subscribed to it.
from random import randint
import rx
from rx import operators as ops
three_emissions = rx.range(1, 3)
three_random_ints = three_emissions.pipe(
ops.map(lambda i: randint(1, 100000)).publish().auto_connect(2)
)
three_random_ints.subscribe(lambda i: print("Subscriber 1 Received: {0}".format(i)))
three_random_ints.subscribe(lambda i: print("Subscriber 2 Received: {0}".format(i))) # second subscriber triggers firing
You can compose different Observables together using factories like merge()
, concat()
, zip()
, and Observable.combine_latest()
. Even if Observables are working on different threads (using the subscribe_on()
and observe_on()
operators), they will be combined safely. For instance, we can use zip()
to slow down emitting 5 Strings by zipping them with an Observable.interval()
. We will take one emission from each source and zip them into a tuple.
import rx
letters = rx.of("Alpha", "Beta", "Gamma", "Delta", "Epsilon")
intervals = rx.interval(1.0)
rx.zip(letters, intervals, lambda s, i: (s, i)) \
.subscribe(lambda t: print(t))
input("Press any key to quit\n")
OUTPUT:
('Alpha', 0)
('Beta', 1)
('Gamma', 2)
('Delta', 3)
('Epsilon', 4)
You can create Observables off of virtually anything, and it is often helpful to create API's and helper functions that return tailored Observables. For instance, you can create an Observable
off a SQL query using SQLAlchemy, and return a CUSTOMER
table record for a given customer ID. You can also use flat_map()
to map each emission to an Observable
and merge their emissions together into a single Observable
. This allows us to query for three different customers as shown below.
from sqlalchemy import create_engine, text
import rx
from rx import operators as ops
engine = create_engine('sqlite:///rexon_metals.db')
conn = engine.connect()
def customer_for_id(customer_id):
stmt = text("SELECT * FROM CUSTOMER WHERE CUSTOMER_ID = :id")
return Observable.from_(conn.execute(stmt, id=customer_id))
# Query customers with IDs 1, 3, and 5
rx.of(1, 3, 5).pipe(
ops.flat_map(lambda id: customer_for_id(id))
).subscribe(lambda r: print(r))
OUTPUT:
(1, 'LITE Industrial', 'Southwest', '729 Ravine Way', 'Irving', 'TX', 75014)
(3, 'Re-Barre Construction', 'Southwest', '9043 Windy Dr', 'Irving', 'TX', 75032)
(5, 'Marsh Lane Metal Works', 'Southeast', '9143 Marsh Ln', 'Avondale', 'LA', 79782)
To achieve concurrency, you use two operators: subscribe_on()
and observe_on()
. Both need a Scheduler
which provides a thread for each subscription to do work (see section on Schedulers below). The ThreadPoolScheduler
is a good choice to create a pool of reusable worker threads.
Keep in mind Python's GIL has the potential to undermine your concurrency performance, as it prevents multiple threads from accessing the same line of code simultaneously. Libraries like NumPy can mitigate this for parallel intensive computations as they free the GIL. RxPy may also minimize thread overlap to some degree. Just be sure to test your application with concurrency and ensure there is a performance gain.
The subscribe_on()
instructs the source Observable
at the start of the chain which scheduler to use (and it does not matter where you put this operator). The observe_on()
, however, will switch to a different Scheduler
at that point in the Observable
chain, effectively moving an emission from one thread to another. Some Observable
factories and operators, like Observable.interval()
and delay()
, already have a default Scheduler
and thus will ignore any subscribe_on()
you specify (although you can pass a Scheduler
usually as an argument).
Below, we run three different processes concurrently rather than sequentially using subscribe_on()
as well as an observe_on()
.
import multiprocessing
import random
import time
from threading import current_thread
import rx
from rx import operators as ops
from rx.concurrency import ThreadPoolScheduler
def intense_calculation(value):
# sleep for a random short duration between 0.5 to 2.0 seconds to simulate a long-running calculation
time.sleep(random.randint(5, 20) * .1)
return value
# calculate number of CPU's, then create a ThreadPoolScheduler with that number of threads
optimal_thread_count = multiprocessing.cpu_count()
pool_scheduler = ThreadPoolScheduler(optimal_thread_count)
# Create Process 1
rx.of("Alpha", "Beta", "Gamma", "Delta", "Epsilon").pipe(
ops.map(lambda s: intense_calculation(s)),
ops.subscribe_on(pool_scheduler)
).subscribe(
on_next=lambda s: print("PROCESS 1: {0} {1}".format(current_thread().name, s)),
on_error=lambda e: print(e),
on_completed=lambda: print("PROCESS 1 done!")
)
# Create Process 2
rx.range(1, 10).pipe(
ops.map(lambda s: intense_calculation(s)),
ops.subscribe_on(pool_scheduler)
).subscribe(
on_next=lambda i: print("PROCESS 2: {0} {1}".format(current_thread().name, i)),
on_error=lambda e: print(e), on_completed=lambda: print("PROCESS 2 done!")
)
# Create Process 3, which is infinite
rx.interval(1.0).pipe(
ops.map(lambda i: i * 100),
ops.observe_on(pool_scheduler),
ops.map(lambda s: intense_calculation(s))
).subscribe(
on_next=lambda i: print("PROCESS 3: {0} {1}".format(current_thread().name, i)),
on_error=lambda e: print(e)
)
input("Press any key to exit\n")
OUTPUT:
Press any key to exit
PROCESS 1: Thread-1 Alpha
PROCESS 2: Thread-2 1
PROCESS 3: Thread-4 0
PROCESS 2: Thread-2 2
PROCESS 1: Thread-1 Beta
PROCESS 3: Thread-7 100
PROCESS 3: Thread-7 200
PROCESS 2: Thread-2 3
PROCESS 1: Thread-1 Gamma
PROCESS 1: Thread-1 Delta
PROCESS 2: Thread-2 4
PROCESS 3: Thread-7 300
...
For more in-depth tutorials, check out Reactive Python for Data Science which is available on both the O'Reilly Store as well as O'Reilly Safari.
Disposables implements a context manager so you may use them in with
statements.
Observable sequences may be concatenated using +
, so you can write:
import rx
xs = rx.of(1,2,3)
ys = rx.of(4,5,6)
zs = xs + ys # Concatenate observables
Observable sequences may be sliced using [start:stop:step]
, so you can write:
import rx
xs = rx.of(1,2,3,4,5,6)
ys = xs[1:-1]
Observable sequences may be turned into an iterator so you can use generator expressions, or iterate over them (uses queueing and blocking).
from rx import of
xs = of(1,2,3,4,5,6)
ys = xs.to_blocking()
zs = (x*x for x in ys if x > 3)
for x in zs:
print(x)
RxPY is a fairly complete implementation of Rx v2.2 with more than 134 query operators, and over 1100 passing unit-tests. RxPY is mostly a direct port of RxJS, but also borrows a bit from RxNET and RxJava in terms of threading and blocking operators.
RxPY follows PEP 8, so all function and method names are lowercase with words separated by underscores as necessary to improve readability.
Thus .NET code such as:c# var group = source.GroupBy(i => i % 3);
need to be written with an _
in Python:python group = source.group_by(lambda i: i % 3)
With RxPY you should use named keyword arguments instead of positional arguments when an operator has multiple optional arguments. RxPY will not try to detect which arguments you are giving to the operator (or not).
from rx import timer
res = timer(5.0) # Yes
res = timer(5.0, 1.0) # Yes
res = timer(5.0, 1.0, Scheduler.timeout) # Yes
res = timer(, scheduler=Scheduler.timeout) # Yes, but must name
res = timer(5.0, Scheduler.timeout) # No, this is an error
Thus when an operator like timer()
has multiple optional arguments you should name your arguments. At least the arguments marked as optional.
In RxPY you can choose to run fully asynchronously or you may decide to schedule work and timeouts using threads.
For time and scheduler handing you will need to supply datetime for absolute time values and timedelta for relative time. You may also use int
to represent milliseconds.
RxPY also comes with batteries included, and has a number of Python specific mainloop schedulers to make it easier for you to use RxPY with your favorite Python framework.
ThreadPoolScheduler
to create a fixed sized pool of Schedulers.NewThreadScheduler
to create a new thread for each subscriptionAsyncIOScheduler
for use with AsyncIO. (requires Python 3.4 or trollius, a port ofasyncio
compatible with Python 2.6-3.5).EventLetEventScheduler
for use with Eventlet.IOLoopScheduler
for use with Tornado IOLoop. See theautocomplete and konamicode examples for how to use RxPY with your Tornado application.GEventScheduler
for use with GEvent. (Python 2.7 only).TwistedScheduler
for use with Twisted.TkinterScheduler
for use with Tkinter. See the timeflies example for how to use RxPY with your Tkinter application.PyGameScheduler
for use with PyGame. See the chess example for how to use RxPY with your PyGame application.QtScheduler
for use with PyQt4,PyQt5, and PySide. See the timeflies example for how to use RxPY with your Qt application.GtkScheduler
for use with Python GTK+ 3. See the timeflies example for how to use RxPY with your GTK+ application.WxScheduler
for use with wxPython. See the timeflies example for how to use RxPY with your wx application.
You can contribute by reviewing and sending feedback on code checkins, suggesting and trying out new features as they are implemented, register issues and help us verify fixes as they are checked in, as well as submit code fixes or code contributions of your own.
The main repository is at ReactiveX/RxPY. There are currently an outdated mirror at Reactive-Extensions/RxPy. Please register any issues to ReactiveX/RxPY/issues.
Please submit any pull requests against the master branch at ReactiveX/RxPY.
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.