Streams¶
Redis streams is a new data-type available in Redis 5.0 which provides a persistent, append-only log. Redis streams are a complex topic, so I strongly recommend reading the streams introduction.
I like to think of streams as having two modes of operation:
standalone-mode: streams act much like every other data-structure
consumer-groups: streams become stateful, with state such as “which messages were read?”, “who read what?”, etc are tracked within Redis.
Stream
objects in walrus can be used standalone or within the
context of a ConsumerGroup
.
Standalone streams¶
In standalone mode, streams behave much like every other data-structure in Redis. By this, I mean that they act as a dumb container: you append items, you read them, you delete them – everything happens explicitly. Streams support the following operations:
Add a new item (
XADD
) -Stream.add()
Read a range of items (
XRANGE
) -Stream.range()
Read new messages, optionally blocking (
XREAD
) -Stream.read()
Delete one or more items (
XDEL
) -Stream.delete()
Get the length of the stream (
XLEN
) -Stream.length()
Trim the length to a given size (
XTRIM
) -Stream.trim()
Set the maximum allowable ID (
XSETID
) -Stream.set_id()
To get started with streams, we’ll create a Database
instance and
use it to instantiate a Stream
:
from walrus import Database # A subclass of the redis-py Redis client.
db = Database()
stream = db.Stream('stream-a') # Create a new stream instance.
When adding data to a stream, Redis can automatically provide you with a unique timestamp-based identifier, which is almost always what you want. When a new message is added, the message id is returned:
msgid = stream.add({'message': 'hello streams'})
print(msgid)
# Prints something like:
# b'1539008591844-0'
Message ids generated by Redis consist of a millisecond timestamp along with a
sequence number (for ordering messages that arrived on the same millisecond).
Let’s add()
a couple more items:
msgid2 = stream.add({'message': 'message 2'})
msgid3 = stream.add({'message': 'message 3'})
Ranges of records can be read using either the range()
method,
or using Python’s slice notation. The message ids provided as the range
endpoints are inclusive when using the range API:
# Get messages 2 and newer:
messages = stream[msgid2:]
# messages contains:
[(b'1539008914283-0', {b'message': b'message 2'}),
(b'1539008918230-0', {b'message': b'message 3'})]
# We can use the "step" parameter to limit the number of records returned.
messages = stream[msgid::2]
# messages contains the first two messages:
[(b'1539008903588-0', {b'message': b'hello, stream'}),
(b'1539008914283-0', {b'message': b'message 2'})]
# Get all messages in stream:
messages = list(stream)
[(b'1539008903588-0', {b'message': b'hello, stream'}),
(b'1539008914283-0', {b'message': b'message 2'}),
(b'1539008918230-0', {b'message': b'message 3'})]
The size of streams can be managed by deleting messages by id, or by “trimming”
the stream, which removes the oldest messages. The desired size is specified
when issuing a trim()
operation, though, due to the internal
implementation of the stream data-structures, the size is considered
approximate by default.
# Adding and deleting a message:
msgid4 = stream.add({'message': 'delete me'})
del stream[msgid4]
# How many items are in the stream?
print(len(stream)) # Prints 3.
To see how trimming works, let’s create another stream and fill it with 1000 items, then request it to be trimmed to 10 items:
# Add 1000 items to "stream-2".
stream2 = db.Stream('stream-2')
for i in range(1000):
stream2.add({'data': 'message-%s' % i})
# Trim stream-2 to (approximately) 10 most-recent messages.
nremoved = stream2.trim(10)
print(nremoved)
# 909
print(len(stream2))
# 91
# To trim to an exact number, specify `approximate=False`:
stream2.trim(10, approximate=False) # Returns 81.
print(len(stream2))
# 10
The previous examples show how to add()
, read a
range()
of messages, delete()
messages, and
manage the size using the trim()
method. When processing a
continuous stream of events, though, it may be desirable to block until
messages are added. For this we can use the read()
API, which
supports blocking until messages become available.
# By default, calling `stream.read()` returns all messages in the stream:
stream.read()
# Returns:
[(b'1539008903588-0', {b'message': b'hello, stream'}),
(b'1539008914283-0', {b'message': b'message 2'}),
(b'1539008918230-0', {b'message': b'message 3'})]
We can pass a message id to read()
, and unlike the slicing
operations, this id is considered the “last-read message” and acts as an
exclusive lower-bound:
# Read any messages newer than msgid2.
stream.read(last_id=msgid2)
# Returns:
[(b'1539008918230-0', {b'message': b'message 3'})]
# This returns None since there are no messages newer than msgid3.
stream.read(last_id=msgid3)
We can make read()
blocking by specifying a special id,
"$"
, and a block
in milliseconds. To block forever, you can use
block=0
.
# This will block for 2 seconds, after which an empty list is returned
# (provided no messages are added while waiting).
stream.read(block=2000, last_id='$')
While its possible to build consumers using these APIs, the client is still responsible for keeping track of the last-read message ID and coming up with semantics for retrying failed messages, etc. In the next section, we’ll see how consumer groups can greatly simplify building a stream processing pipeline.
Consumer groups¶
In consumer-group mode, streams retain the behaviors of standalone mode, adding functionality which makes them stateful. What state is tracked?
Read any unseen messages (
XREAD
) -ConsumerGroupStream.read()
List messages that were read, but not acknowledged (
XPENDING
) -ConsumerGroupStream.pending()
Acknowledge one or more pending messages (
XACK
) -ConsumerGroupStream.ack()
Claim one or more pending messages for re-processing (
XCLAIM
) -ConsumerGroupStream.claim()
ConsumerGroup
objects provide the building-blocks for robust
message processing pipelines or task queues. Ordinarily this type of stuff
would be implemented by the client – having it in Redis means that we have a
single, unified interface (rather than implementation-specific, with all the
bugs that likely entails). Furthermore, consumer group state is tracked by the
RDB and replicated.
# Consumer groups require that a stream exist before the group can be
# created, so we have to add an empty message.
stream_keys = ['stream-a', 'stream-b', 'stream-c']
for stream in stream_keys:
db.xadd(stream, {'data': ''})
# Create a consumer-group for streams a, b, and c. We will mark all
# messages as having been processed, so only messages added after the
# creation of the consumer-group will be read.
cg = db.consumer_group('cg-abc', stream_keys)
cg.create() # Create the consumer group.
cg.set_id('$')
To read from all the streams in a consumer group, we can use the
ConsumerGroupStream.read()
method. Since we marked all messages as
read and have not added anything new since creating the consumer group, the
return value is an empty list:
resp = cg.read()
# Returns an empty list:
[]
For convenience, walrus exposes the individual streams within a consumer group
as attributes on the ConsumerGroup
instance. Let’s add some
messages to streams a, b, and c:
cg.stream_a.add({'message': 'new a'})
cg.stream_b.add({'message': 'new for b'})
for i in range(10):
cg.stream_c.add({'message': 'c-%s' % i})
Now let’s try reading from the consumer group again. We’ll pass count=1
so
that we read no more than one message from each stream in the group:
# Read up to one message from each stream in the group.
cg.read(count=1)
# Returns:
[('stream-a', [(b'1539023088125-0', {b'message': b'new a'})]),
('stream-b', [(b'1539023088125-0', {b'message': b'new for b'})]),
('stream-c', [(b'1539023088126-0', {b'message': b'c-0'})])]
We’ve now read all the unread messages from streams a and b, but stream c
still has messages. Calling read()
again will give us the next unread
message from stream c:
# Read up to 1 message from each stream in the group. Since
# we already read everything in streams a and b, we will only
# get the next unread message in stream c.
cg.read(count=1)
# Returns:
[('stream-c', [(b'1539023088126-1', {b'message': b'c-1'})])]
When using consumer groups, messages that are read need to be acknowledged.
Let’s look at the pending (read but unacknowledged) messages from
stream a using the pending()
method, which
returns a list of metadata about each unacknowledged message:
# We read one message from stream a, so we should see one pending message.
cg.stream_a.pending()
# Returns a list of:
# [message id, consumer name, message age, delivery count]
[[b'1539023088125-0', b'cg-abc.c1', 22238, 1]]
To acknowledge receipt of a message and remove it from the pending list, use
the ack()
method on the consumer group stream:
# View the pending message list for stream a.
pending_list = cg.stream_a.pending()
msg_id = pending_list[0]['message_id']
# Acknowledge the message.
cg.stream_a.ack(msg_id)
# Returns number of pending messages successfully acknowledged:
1
Consumer groups have the concept of individual consumers. These might be
workers in a process pool, for example. Note that the
pending()
method returned the consumer name as
"cg-abc.c1"
. Walrus uses the consumer group name + ".c1"
as the name
for the default consumer name. To create another consumer within a given group,
we can use the consumer()
method:
# Create a second consumer within the consumer group.
cg2 = cg.consumer('cg-abc.c2')
Creating a new consumer within a consumer group does not affect the state of
the group itself. Calling read()
using our new
consumer will pick up from the last-read message, as you would expect:
# Read from our consumer group using the new consumer. Recall
# that we read all the messages from streams a and b, and the
# first two messages in stream c.
cg2.read(count=1)
# Returns:
[('stream-c', [(b'1539023088126-2', {b'message': b'c-2'})])]
If we look at the pending message status for stream c, we will see that the first and second messages were read by the consumer “cg-abc.c1” and the third message was read by our new consumer, “cg-abc.c2”:
# What messages have been read, but were not acknowledged, from stream c?
cg.stream_c.pending()
# Returns list of [message id, consumer, message age, delivery count]:
[{'message_id': b'1539023088126-0', 'consumer': b'cg-abc.c1',
'time_since_delivered': 51329, 'times_delivered': 1}],
{'message_id': b'1539023088126-1', 'consumer': b'cg-abc.c1',
'time_since_delivered': 43772, 'times_delivered': 1},
{'message_id': b'1539023088126-2', 'consumer': b'cg-abc.c2',
'time_since_delivered': 5966, 'times_delivered': 1}]
Consumers can claim()
pending messages, which
transfers ownership of the message and returns a list of (message id, data)
tuples to the caller:
# Unpack the pending messages into a couple variables.
mc1, mc2, mc3 = cg.stream_c.pending()
# Claim the first message for consumer 2:
cg2.stream_c.claim(mc1['message_id'])
# Returns a list of (message id, data) tuples for the claimed messages:
[(b'1539023088126-0', {b'message': b'c-0'})]
Re-inspecting the pending messages for stream c, we can see that the consumer for the first message has changed and the message age has been reset:
# What messages are pending in stream c?
cg.stream_c.pending()
# Returns:
[{'message_id': b'1539023088126-0', 'consumer': b'cg-abc.c2',
'time_since_delivered': 2168, 'times_delivered': 1},
{'message_id': b'1539023088126-1', 'consumer': b'cg-abc.c1',
'time_since_delivered': 47141, 'times_delivered': 1},
{'message_id': b'1539023088126-2', 'consumer': b'cg-abc.c2',
'time_since_delivered': 9335, 'times_delivered': 1}]
The individual streams within the consumer group support a number of useful APIs:
consumer_group.stream.ack(*id_list)
- acknowledge one or more messages read from the given stream.consumer_group.stream.add(data, id='*', maxlen=None, approximate=True)
- add a new message to the stream. Themaxlen
parameter can be used to keep the stream from growing without bounds. If given, theapproximate
flag indicates whether the stream maxlen should be approximate or exact.consumer_group.stream.claim(*id_list)
- claim one or more pending messages.consumer_group.stream.delete(*id_list)
- delete one or more messages by ID.consumer_group.stream.pending(start='-', stop='+', count=1000)
- get the list of unacknowledged messages in the stream. Thestart
andstop
parameters can be message ids, while thecount
parameter can be used to limit the number of results returned.consumer_group.stream.read(count=None, block=None)
- monitor the stream for new messages within the context of the consumer group. This method can be made to block by specifying ablock
(or0
to block forever).consumer_group.stream.set_id(id='$')
- set the id of the last-read message for the consumer group. Use the special id"$"
to indicate all messages have been read, or"0-0"
to mark all messages as unread.consumer_group.stream.trim(count, approximate=True)
- trim the stream to the given size.
TimeSeries¶
Redis automatically uses the millisecond timestamp plus a sequence number to
uniquely identify messages added to a stream. This makes streams a natural fit
for time-series data. To simplify working with streams as time-series in
Python, you can use the special TimeSeries
helper class, which acts
just like the ConsumerGroup
from the previous section with the
exception that it can translate between Python datetime
objects and message
ids automatically.
To get started, we’ll create a TimeSeries
instance, specifying the
stream keys, just like we did with ConsumerGroup
:
# Create a time-series consumer group named "demo-ts" for the
# streams s1 and s2.
ts = db.time_series('demo-ts', ['s1', 's2'])
# Add dummy data and create the consumer group.
db.xadd('s1', {'': ''}, id='0-1')
db.xadd('s2', {'': ''}, id='0-1')
ts.create()
ts.set_id('$') # Do not read the dummy items.
Let’s add some messages to the time-series, one for each day between January 1st and 10th, 2018:
from datetime import datetime, timedelta
date = datetime(2018, 1, 1)
for i in range(10):
ts.s1.add({'message': 's1-%s' % date}, id=date)
date += timedelta(days=1)
We can read messages from the stream using the familiar slicing API. For example, to read 3 messages starting at January 2nd, 2018:
ts.s1[datetime(2018, 1, 2)::3]
# Returns messages for Jan 2nd - 4th:
[<Message s1 1514872800000-0: {'message': 's1-2018-01-02 00:00:00'}>,
<Message s1 1514959200000-0: {'message': 's1-2018-01-03 00:00:00'}>,
<Message s1 1515045600000-0: {'message': 's1-2018-01-04 00:00:00'}>]
Note that the values returned are Message
objects. Message objects
provide some convenience functions, such as extracting timestamp and sequence
values from stream message ids:
for message in ts.s1[datetime(2018, 1, 1)::3]:
print(message.stream, message.timestamp, message.sequence, message.data)
# Prints:
s1 2018-01-01 00:00:00 0 {'message': 's1-2018-01-01 00:00:00'}
s1 2018-01-02 00:00:00 0 {'message': 's1-2018-01-02 00:00:00'}
s1 2018-01-03 00:00:00 0 {'message': 's1-2018-01-03 00:00:00'}
Let’s add some messages to stream “s2” as well:
date = datetime(2018, 1, 1)
for i in range(5):
ts.s2.add({'message': 's2-%s' % date}, id=date)
date += timedelta(days=1)
One difference between TimeSeries
and ConsumerGroup
is
what happens when reading from multiple streams. ConsumerGroup returns a
dictionary keyed by stream, along with a corresponding list of messages read
from each stream. TimeSeries, however, returns a flat list of Message objects:
# Read up to 2 messages from each stream (s1 and s2):
messages = ts.read(count=2)
# "messages" is a list of messages from both streams:
[<Message s1 1514786400000-0: {'message': 's1-2018-01-01 00:00:00'}>,
<Message s2 1514786400000-0: {'message': 's2-2018-01-01 00:00:00'}>,
<Message s1 1514872800000-0: {'message': 's1-2018-01-02 00:00:00'}>,
<Message s2 1514872800000-0: {'message': 's2-2018-01-02 00:00:00'}>]
When inspecting pending messages within a TimeSeries
the message
ids are unpacked into (datetime, seq) 2-tuples:
ts.s1.pending()
# Returns:
[((datetime.datetime(2018, 1, 1, 0, 0), 0), 'events-ts.c', 1578, 1),
((datetime.datetime(2018, 1, 2, 0, 0), 0), 'events-ts.c', 1578, 1)]
# Acknowledge the pending messages:
for msgts_seq, _, _, _ in ts.s1.pending():
ts.s1.ack(msgts_seq)
We can set the last-read message id using a datetime:
ts.s1.set_id(datetime(2018, 1, 1))
# Next read will be 2018-01-02, ...
ts.s1.read(count=2)
# Returns:
[<Message s1 1514872800000-0: {'message': 's1-2018-01-02 00:00:00'}>,
<Message s1 1514959200000-0: {'message': 's1-2018-01-03 00:00:00'}>]
As with ConsumerGroup
, the TimeSeries
helper provides
stream-specific APIs for claiming unacknowledged messages, creating additional
consumers, etc.
Learning more¶
For more information, the following links may be helpful:
API docs for
Stream
,ConsumerGroup
,ConsumerGroupStream
andTimeSeries
.