Streams

Redis streams is a new data-type available in Redis 5.0 which provides a persistent, append-only log. Redis streams are a complex topic, so I strongly recommend reading the streams introduction.

I like to think of streams as having two modes of operation:

  • standalone-mode: streams act much like every other data-structure

  • consumer-groups: streams become stateful, with state such as “which messages were read?”, “who read what?”, etc are tracked within Redis.

Stream objects in walrus can be used standalone or within the context of a ConsumerGroup.

Standalone streams

In standalone mode, streams behave much like every other data-structure in Redis. By this, I mean that they act as a dumb container: you append items, you read them, you delete them – everything happens explicitly. Streams support the following operations:

To get started with streams, we’ll create a Database instance and use it to instantiate a Stream:

from walrus import Database  # A subclass of the redis-py Redis client.

db = Database()
stream = db.Stream('stream-a')  # Create a new stream instance.

When adding data to a stream, Redis can automatically provide you with a unique timestamp-based identifier, which is almost always what you want. When a new message is added, the message id is returned:

msgid = stream.add({'message': 'hello streams'})
print(msgid)

# Prints something like:
# b'1539008591844-0'

Message ids generated by Redis consist of a millisecond timestamp along with a sequence number (for ordering messages that arrived on the same millisecond). Let’s add() a couple more items:

msgid2 = stream.add({'message': 'message 2'})
msgid3 = stream.add({'message': 'message 3'})

Ranges of records can be read using either the range() method, or using Python’s slice notation. The message ids provided as the range endpoints are inclusive when using the range API:

# Get messages 2 and newer:
messages = stream[msgid2:]

# messages contains:
[(b'1539008914283-0', {b'message': b'message 2'}),
 (b'1539008918230-0', {b'message': b'message 3'})]

# We can use the "step" parameter to limit the number of records returned.
messages = stream[msgid::2]

# messages contains the first two messages:
[(b'1539008903588-0', {b'message': b'hello, stream'}),
 (b'1539008914283-0', {b'message': b'message 2'})]

# Get all messages in stream:
messages = list(stream)
[(b'1539008903588-0', {b'message': b'hello, stream'}),
 (b'1539008914283-0', {b'message': b'message 2'}),
 (b'1539008918230-0', {b'message': b'message 3'})]

The size of streams can be managed by deleting messages by id, or by “trimming” the stream, which removes the oldest messages. The desired size is specified when issuing a trim() operation, though, due to the internal implementation of the stream data-structures, the size is considered approximate by default.

# Adding and deleting a message:
msgid4 = stream.add({'message': 'delete me'})
del stream[msgid4]

# How many items are in the stream?
print(len(stream))  # Prints 3.

To see how trimming works, let’s create another stream and fill it with 1000 items, then request it to be trimmed to 10 items:

# Add 1000 items to "stream-2".
stream2 = db.Stream('stream-2')
for i in range(1000):
    stream2.add({'data': 'message-%s' % i})

# Trim stream-2 to (approximately) 10 most-recent messages.
nremoved = stream2.trim(10)
print(nremoved)
# 909
print(len(stream2))
# 91

# To trim to an exact number, specify `approximate=False`:
stream2.trim(10, approximate=False)  # Returns 81.
print(len(stream2))
# 10

The previous examples show how to add(), read a range() of messages, delete() messages, and manage the size using the trim() method. When processing a continuous stream of events, though, it may be desirable to block until messages are added. For this we can use the read() API, which supports blocking until messages become available.

# By default, calling `stream.read()` returns all messages in the stream:
stream.read()

# Returns:
[(b'1539008903588-0', {b'message': b'hello, stream'}),
 (b'1539008914283-0', {b'message': b'message 2'}),
 (b'1539008918230-0', {b'message': b'message 3'})]

We can pass a message id to read(), and unlike the slicing operations, this id is considered the “last-read message” and acts as an exclusive lower-bound:

# Read any messages newer than msgid2.
stream.read(last_id=msgid2)

# Returns:
[(b'1539008918230-0', {b'message': b'message 3'})]

# This returns None since there are no messages newer than msgid3.
stream.read(last_id=msgid3)

We can make read() blocking by specifying a special id, "$", and a block in milliseconds. To block forever, you can use block=0.

# This will block for 2 seconds, after which an empty list is returned
# (provided no messages are added while waiting).
stream.read(block=2000, last_id='$')

While its possible to build consumers using these APIs, the client is still responsible for keeping track of the last-read message ID and coming up with semantics for retrying failed messages, etc. In the next section, we’ll see how consumer groups can greatly simplify building a stream processing pipeline.

Consumer groups

In consumer-group mode, streams retain the behaviors of standalone mode, adding functionality which makes them stateful. What state is tracked?

  • Read any unseen messages (XREAD) - ConsumerGroupStream.read()

  • List messages that were read, but not acknowledged (XPENDING) - ConsumerGroupStream.pending()

  • Acknowledge one or more pending messages (XACK) - ConsumerGroupStream.ack()

  • Claim one or more pending messages for re-processing (XCLAIM) - ConsumerGroupStream.claim()

ConsumerGroup objects provide the building-blocks for robust message processing pipelines or task queues. Ordinarily this type of stuff would be implemented by the client – having it in Redis means that we have a single, unified interface (rather than implementation-specific, with all the bugs that likely entails). Furthermore, consumer group state is tracked by the RDB and replicated.

# Consumer groups require that a stream exist before the group can be
# created, so we have to add an empty message.
stream_keys = ['stream-a', 'stream-b', 'stream-c']
for stream in stream_keys:
    db.xadd(stream, {'data': ''})

# Create a consumer-group for streams a, b, and c. We will mark all
# messages as having been processed, so only messages added after the
# creation of the consumer-group will be read.
cg = db.consumer_group('cg-abc', stream_keys)
cg.create()  # Create the consumer group.
cg.set_id('$')

To read from all the streams in a consumer group, we can use the ConsumerGroupStream.read() method. Since we marked all messages as read and have not added anything new since creating the consumer group, the return value is an empty list:

resp = cg.read()

# Returns an empty list:
[]

For convenience, walrus exposes the individual streams within a consumer group as attributes on the ConsumerGroup instance. Let’s add some messages to streams a, b, and c:

cg.stream_a.add({'message': 'new a'})
cg.stream_b.add({'message': 'new for b'})
for i in range(10):
    cg.stream_c.add({'message': 'c-%s' % i})

Now let’s try reading from the consumer group again. We’ll pass count=1 so that we read no more than one message from each stream in the group:

# Read up to one message from each stream in the group.
cg.read(count=1)

# Returns:
[('stream-a', [(b'1539023088125-0', {b'message': b'new a'})]),
 ('stream-b', [(b'1539023088125-0', {b'message': b'new for b'})]),
 ('stream-c', [(b'1539023088126-0', {b'message': b'c-0'})])]

We’ve now read all the unread messages from streams a and b, but stream c still has messages. Calling read() again will give us the next unread message from stream c:

# Read up to 1 message from each stream in the group. Since
# we already read everything in streams a and b, we will only
# get the next unread message in stream c.
cg.read(count=1)

# Returns:
[('stream-c', [(b'1539023088126-1', {b'message': b'c-1'})])]

When using consumer groups, messages that are read need to be acknowledged. Let’s look at the pending (read but unacknowledged) messages from stream a using the pending() method, which returns a list of metadata about each unacknowledged message:

# We read one message from stream a, so we should see one pending message.
cg.stream_a.pending()

# Returns a list of:
# [message id, consumer name, message age, delivery count]
[[b'1539023088125-0', b'cg-abc.c1', 22238, 1]]

To acknowledge receipt of a message and remove it from the pending list, use the ack() method on the consumer group stream:

# View the pending message list for stream a.
pending_list = cg.stream_a.pending()
msg_id = pending_list[0]['message_id']

# Acknowledge the message.
cg.stream_a.ack(msg_id)

# Returns number of pending messages successfully acknowledged:
1

Consumer groups have the concept of individual consumers. These might be workers in a process pool, for example. Note that the pending() method returned the consumer name as "cg-abc.c1". Walrus uses the consumer group name + ".c1" as the name for the default consumer name. To create another consumer within a given group, we can use the consumer() method:

# Create a second consumer within the consumer group.
cg2 = cg.consumer('cg-abc.c2')

Creating a new consumer within a consumer group does not affect the state of the group itself. Calling read() using our new consumer will pick up from the last-read message, as you would expect:

# Read from our consumer group using the new consumer. Recall
# that we read all the messages from streams a and b, and the
# first two messages in stream c.
cg2.read(count=1)

# Returns:
[('stream-c', [(b'1539023088126-2', {b'message': b'c-2'})])]

If we look at the pending message status for stream c, we will see that the first and second messages were read by the consumer “cg-abc.c1” and the third message was read by our new consumer, “cg-abc.c2”:

# What messages have been read, but were not acknowledged, from stream c?
cg.stream_c.pending()

# Returns list of [message id, consumer, message age, delivery count]:
[{'message_id': b'1539023088126-0', 'consumer': b'cg-abc.c1',
  'time_since_delivered': 51329, 'times_delivered': 1}],
 {'message_id': b'1539023088126-1', 'consumer': b'cg-abc.c1',
  'time_since_delivered': 43772, 'times_delivered': 1},
 {'message_id': b'1539023088126-2', 'consumer': b'cg-abc.c2',
  'time_since_delivered': 5966, 'times_delivered': 1}]

Consumers can claim() pending messages, which transfers ownership of the message and returns a list of (message id, data) tuples to the caller:

# Unpack the pending messages into a couple variables.
mc1, mc2, mc3 = cg.stream_c.pending()

# Claim the first message for consumer 2:
cg2.stream_c.claim(mc1['message_id'])

# Returns a list of (message id, data) tuples for the claimed messages:
[(b'1539023088126-0', {b'message': b'c-0'})]

Re-inspecting the pending messages for stream c, we can see that the consumer for the first message has changed and the message age has been reset:

# What messages are pending in stream c?
cg.stream_c.pending()

# Returns:
[{'message_id': b'1539023088126-0', 'consumer': b'cg-abc.c2',
  'time_since_delivered': 2168, 'times_delivered': 1},
 {'message_id': b'1539023088126-1', 'consumer': b'cg-abc.c1',
  'time_since_delivered': 47141, 'times_delivered': 1},
 {'message_id': b'1539023088126-2', 'consumer': b'cg-abc.c2',
  'time_since_delivered': 9335, 'times_delivered': 1}]

The individual streams within the consumer group support a number of useful APIs:

  • consumer_group.stream.ack(*id_list) - acknowledge one or more messages read from the given stream.

  • consumer_group.stream.add(data, id='*', maxlen=None, approximate=True) - add a new message to the stream. The maxlen parameter can be used to keep the stream from growing without bounds. If given, the approximate flag indicates whether the stream maxlen should be approximate or exact.

  • consumer_group.stream.claim(*id_list) - claim one or more pending messages.

  • consumer_group.stream.delete(*id_list) - delete one or more messages by ID.

  • consumer_group.stream.pending(start='-', stop='+', count=1000) - get the list of unacknowledged messages in the stream. The start and stop parameters can be message ids, while the count parameter can be used to limit the number of results returned.

  • consumer_group.stream.read(count=None, block=None) - monitor the stream for new messages within the context of the consumer group. This method can be made to block by specifying a block (or 0 to block forever).

  • consumer_group.stream.set_id(id='$') - set the id of the last-read message for the consumer group. Use the special id "$" to indicate all messages have been read, or "0-0" to mark all messages as unread.

  • consumer_group.stream.trim(count, approximate=True) - trim the stream to the given size.

TimeSeries

Redis automatically uses the millisecond timestamp plus a sequence number to uniquely identify messages added to a stream. This makes streams a natural fit for time-series data. To simplify working with streams as time-series in Python, you can use the special TimeSeries helper class, which acts just like the ConsumerGroup from the previous section with the exception that it can translate between Python datetime objects and message ids automatically.

To get started, we’ll create a TimeSeries instance, specifying the stream keys, just like we did with ConsumerGroup:

# Create a time-series consumer group named "demo-ts" for the
# streams s1 and s2.
ts = db.time_series('demo-ts', ['s1', 's2'])

# Add dummy data and create the consumer group.
db.xadd('s1', {'': ''}, id='0-1')
db.xadd('s2', {'': ''}, id='0-1')
ts.create()
ts.set_id('$')  # Do not read the dummy items.

Let’s add some messages to the time-series, one for each day between January 1st and 10th, 2018:

from datetime import datetime, timedelta

date = datetime(2018, 1, 1)
for i in range(10):
    ts.s1.add({'message': 's1-%s' % date}, id=date)
    date += timedelta(days=1)

We can read messages from the stream using the familiar slicing API. For example, to read 3 messages starting at January 2nd, 2018:

ts.s1[datetime(2018, 1, 2)::3]

# Returns messages for Jan 2nd - 4th:
[<Message s1 1514872800000-0: {'message': 's1-2018-01-02 00:00:00'}>,
 <Message s1 1514959200000-0: {'message': 's1-2018-01-03 00:00:00'}>,
 <Message s1 1515045600000-0: {'message': 's1-2018-01-04 00:00:00'}>]

Note that the values returned are Message objects. Message objects provide some convenience functions, such as extracting timestamp and sequence values from stream message ids:

for message in ts.s1[datetime(2018, 1, 1)::3]:
    print(message.stream, message.timestamp, message.sequence, message.data)

# Prints:
s1 2018-01-01 00:00:00 0 {'message': 's1-2018-01-01 00:00:00'}
s1 2018-01-02 00:00:00 0 {'message': 's1-2018-01-02 00:00:00'}
s1 2018-01-03 00:00:00 0 {'message': 's1-2018-01-03 00:00:00'}

Let’s add some messages to stream “s2” as well:

date = datetime(2018, 1, 1)
for i in range(5):
    ts.s2.add({'message': 's2-%s' % date}, id=date)
    date += timedelta(days=1)

One difference between TimeSeries and ConsumerGroup is what happens when reading from multiple streams. ConsumerGroup returns a dictionary keyed by stream, along with a corresponding list of messages read from each stream. TimeSeries, however, returns a flat list of Message objects:

# Read up to 2 messages from each stream (s1 and s2):
messages = ts.read(count=2)

# "messages" is a list of messages from both streams:
[<Message s1 1514786400000-0: {'message': 's1-2018-01-01 00:00:00'}>,
 <Message s2 1514786400000-0: {'message': 's2-2018-01-01 00:00:00'}>,
 <Message s1 1514872800000-0: {'message': 's1-2018-01-02 00:00:00'}>,
 <Message s2 1514872800000-0: {'message': 's2-2018-01-02 00:00:00'}>]

When inspecting pending messages within a TimeSeries the message ids are unpacked into (datetime, seq) 2-tuples:

ts.s1.pending()

# Returns:
[((datetime.datetime(2018, 1, 1, 0, 0), 0), 'events-ts.c', 1578, 1),
 ((datetime.datetime(2018, 1, 2, 0, 0), 0), 'events-ts.c', 1578, 1)]

# Acknowledge the pending messages:
for msgts_seq, _, _, _ in ts.s1.pending():
    ts.s1.ack(msgts_seq)

We can set the last-read message id using a datetime:

ts.s1.set_id(datetime(2018, 1, 1))

# Next read will be 2018-01-02, ...
ts.s1.read(count=2)

# Returns:
[<Message s1 1514872800000-0: {'message': 's1-2018-01-02 00:00:00'}>,
 <Message s1 1514959200000-0: {'message': 's1-2018-01-03 00:00:00'}>]

As with ConsumerGroup, the TimeSeries helper provides stream-specific APIs for claiming unacknowledged messages, creating additional consumers, etc.

Learning more

For more information, the following links may be helpful: