.. _streams: .. py:module:: walrus Streams ======= `Redis streams `_ is a new data-type available in Redis 5.0 which provides a persistent, append-only log. Redis streams are a complex topic, so I strongly recommend reading `the streams introduction `_. I like to think of streams as having two modes of operation: * standalone-mode: streams act much like every other data-structure * consumer-groups: streams become stateful, with state such as "which messages were read?", "who read what?", etc are tracked within Redis. :py:class:`Stream` objects in walrus can be used standalone or within the context of a :py:class:`ConsumerGroup`. Standalone streams ------------------ In standalone mode, streams behave much like every other data-structure in Redis. By this, I mean that they act as a dumb container: you append items, you read them, you delete them -- everything happens explicitly. Streams support the following operations: * Add a new item (``XADD``) - :py:meth:`Stream.add` * Read a range of items (``XRANGE``) - :py:meth:`Stream.range` * Read new messages, optionally blocking (``XREAD``) - :py:meth:`Stream.read` * Delete one or more items (``XDEL``) - :py:meth:`Stream.delete` * Get the length of the stream (``XLEN``) - :py:meth:`Stream.length` * Trim the length to a given size (``XTRIM``) - :py:meth:`Stream.trim` * Set the maximum allowable ID (``XSETID``) - :py:meth:`Stream.set_id` To get started with streams, we'll create a :py:class:`Database` instance and use it to instantiate a :py:class:`Stream`: .. code-block:: python from walrus import Database # A subclass of the redis-py Redis client. db = Database() stream = db.Stream('stream-a') # Create a new stream instance. When adding data to a stream, Redis can automatically provide you with a unique timestamp-based identifier, which is almost always what you want. When a new message is added, the message id is returned: .. code-block:: python msgid = stream.add({'message': 'hello streams'}) print(msgid) # Prints something like: # b'1539008591844-0' Message ids generated by Redis consist of a millisecond timestamp along with a sequence number (for ordering messages that arrived on the same millisecond). Let's :py:meth:`~Stream.add` a couple more items: .. code-block:: python msgid2 = stream.add({'message': 'message 2'}) msgid3 = stream.add({'message': 'message 3'}) Ranges of records can be read using either the :py:meth:`~Stream.range` method, or using Python's slice notation. The message ids provided as the range endpoints are inclusive when using the range API: .. code-block:: python # Get messages 2 and newer: messages = stream[msgid2:] # messages contains: [(b'1539008914283-0', {b'message': b'message 2'}), (b'1539008918230-0', {b'message': b'message 3'})] # We can use the "step" parameter to limit the number of records returned. messages = stream[msgid::2] # messages contains the first two messages: [(b'1539008903588-0', {b'message': b'hello, stream'}), (b'1539008914283-0', {b'message': b'message 2'})] # Get all messages in stream: messages = list(stream) [(b'1539008903588-0', {b'message': b'hello, stream'}), (b'1539008914283-0', {b'message': b'message 2'}), (b'1539008918230-0', {b'message': b'message 3'})] The size of streams can be managed by deleting messages by id, or by "trimming" the stream, which removes the oldest messages. The desired size is specified when issuing a :py:meth:`~Stream.trim` operation, though, due to the internal implementation of the stream data-structures, the size is considered approximate by default. .. code-block:: python # Adding and deleting a message: msgid4 = stream.add({'message': 'delete me'}) del stream[msgid4] # How many items are in the stream? print(len(stream)) # Prints 3. To see how trimming works, let's create another stream and fill it with 1000 items, then request it to be trimmed to 10 items: .. code-block:: python # Add 1000 items to "stream-2". stream2 = db.Stream('stream-2') for i in range(1000): stream2.add({'data': 'message-%s' % i}) # Trim stream-2 to (approximately) 10 most-recent messages. nremoved = stream2.trim(10) print(nremoved) # 909 print(len(stream2)) # 91 # To trim to an exact number, specify `approximate=False`: stream2.trim(10, approximate=False) # Returns 81. print(len(stream2)) # 10 The previous examples show how to :py:meth:`~Stream.add`, read a :py:meth:`~Stream.range` of messages, :py:meth:`~Stream.delete` messages, and manage the size using the :py:meth:`~Stream.trim` method. When processing a continuous stream of events, though, it may be desirable to **block** until messages are added. For this we can use the :py:meth:`~Stream.read` API, which supports blocking until messages become available. .. code-block:: python # By default, calling `stream.read()` returns all messages in the stream: stream.read() # Returns: [(b'1539008903588-0', {b'message': b'hello, stream'}), (b'1539008914283-0', {b'message': b'message 2'}), (b'1539008918230-0', {b'message': b'message 3'})] We can pass a message id to :py:meth:`~Stream.read`, and unlike the slicing operations, this id is considered the "last-read message" and acts as an **exclusive** lower-bound: .. code-block:: python # Read any messages newer than msgid2. stream.read(last_id=msgid2) # Returns: [(b'1539008918230-0', {b'message': b'message 3'})] # This returns None since there are no messages newer than msgid3. stream.read(last_id=msgid3) We can make :py:meth:`~Stream.read` blocking by specifying a special id, ``"$"``, and a ``block`` in milliseconds. To block forever, you can use ``block=0``. .. code-block:: python # This will block for 2 seconds, after which an empty list is returned # (provided no messages are added while waiting). stream.read(block=2000, last_id='$') While its possible to build consumers using these APIs, the client is still responsible for keeping track of the last-read message ID and coming up with semantics for retrying failed messages, etc. In the next section, we'll see how consumer groups can greatly simplify building a stream processing pipeline. Consumer groups --------------- In consumer-group mode, streams retain the behaviors of standalone mode, adding functionality which makes them *stateful*. What state is tracked? * Read any unseen messages (``XREAD``) - :py:meth:`ConsumerGroupStream.read` * List messages that were read, but not acknowledged (``XPENDING``) - :py:meth:`ConsumerGroupStream.pending` * Acknowledge one or more pending messages (``XACK``) - :py:meth:`ConsumerGroupStream.ack` * Claim one or more pending messages for re-processing (``XCLAIM``) - :py:meth:`ConsumerGroupStream.claim` :py:class:`ConsumerGroup` objects provide the building-blocks for robust message processing pipelines or task queues. Ordinarily this type of stuff would be implemented by the client -- having it in Redis means that we have a single, unified interface (rather than implementation-specific, with all the bugs that likely entails). Furthermore, consumer group state is tracked by the RDB and replicated. .. code-block:: python # Consumer groups require that a stream exist before the group can be # created, so we have to add an empty message. stream_keys = ['stream-a', 'stream-b', 'stream-c'] for stream in stream_keys: db.xadd(stream, {'data': ''}) # Create a consumer-group for streams a, b, and c. We will mark all # messages as having been processed, so only messages added after the # creation of the consumer-group will be read. cg = db.consumer_group('cg-abc', stream_keys) cg.create() # Create the consumer group. cg.set_id('$') To read from all the streams in a consumer group, we can use the :py:meth:`ConsumerGroupStream.read` method. Since we marked all messages as read and have not added anything new since creating the consumer group, the return value is an empty list: .. code-block:: python resp = cg.read() # Returns an empty list: [] For convenience, walrus exposes the individual streams within a consumer group as attributes on the :py:class:`ConsumerGroup` instance. Let's add some messages to streams *a*, *b*, and *c*: .. code-block:: python cg.stream_a.add({'message': 'new a'}) cg.stream_b.add({'message': 'new for b'}) for i in range(10): cg.stream_c.add({'message': 'c-%s' % i}) Now let's try reading from the consumer group again. We'll pass ``count=1`` so that we read no more than one message from each stream in the group: .. code-block:: python # Read up to one message from each stream in the group. cg.read(count=1) # Returns: [('stream-a', [(b'1539023088125-0', {b'message': b'new a'})]), ('stream-b', [(b'1539023088125-0', {b'message': b'new for b'})]), ('stream-c', [(b'1539023088126-0', {b'message': b'c-0'})])] We've now read all the unread messages from streams *a* and *b*, but stream *c* still has messages. Calling ``read()`` again will give us the next unread message from stream *c*: .. code-block:: python # Read up to 1 message from each stream in the group. Since # we already read everything in streams a and b, we will only # get the next unread message in stream c. cg.read(count=1) # Returns: [('stream-c', [(b'1539023088126-1', {b'message': b'c-1'})])] When using consumer groups, messages that are read need to be **acknowledged**. Let's look at the **pending** (read but unacknowledged) messages from stream *a* using the :py:meth:`~ConsumerGroupStream.pending` method, which returns a list of metadata about each unacknowledged message: .. code-block:: python # We read one message from stream a, so we should see one pending message. cg.stream_a.pending() # Returns a list of: # [message id, consumer name, message age, delivery count] [[b'1539023088125-0', b'cg-abc.c1', 22238, 1]] To acknowledge receipt of a message and remove it from the pending list, use the :py:meth:`~ConsumerGroupStream.ack` method on the consumer group stream: .. code-block:: python # View the pending message list for stream a. pending_list = cg.stream_a.pending() msg_id = pending_list[0]['message_id'] # Acknowledge the message. cg.stream_a.ack(msg_id) # Returns number of pending messages successfully acknowledged: 1 Consumer groups have the concept of individual **consumers**. These might be workers in a process pool, for example. Note that the :py:meth:`~ConsumerGroupStream.pending` method returned the consumer name as ``"cg-abc.c1"``. Walrus uses the consumer group name + ``".c1"`` as the name for the default consumer name. To create another consumer within a given group, we can use the :py:meth:`~ConsumerGroupStream.consumer` method: .. code-block:: python # Create a second consumer within the consumer group. cg2 = cg.consumer('cg-abc.c2') Creating a new consumer within a consumer group does not affect the state of the group itself. Calling :py:meth:`~ConsumerGroupStream.read` using our new consumer will pick up from the last-read message, as you would expect: .. code-block:: python # Read from our consumer group using the new consumer. Recall # that we read all the messages from streams a and b, and the # first two messages in stream c. cg2.read(count=1) # Returns: [('stream-c', [(b'1539023088126-2', {b'message': b'c-2'})])] If we look at the pending message status for stream *c*, we will see that the first and second messages were read by the consumer *"cg-abc.c1"* and the third message was read by our new consumer, *"cg-abc.c2"*: .. code-block:: python # What messages have been read, but were not acknowledged, from stream c? cg.stream_c.pending() # Returns list of [message id, consumer, message age, delivery count]: [{'message_id': b'1539023088126-0', 'consumer': b'cg-abc.c1', 'time_since_delivered': 51329, 'times_delivered': 1}], {'message_id': b'1539023088126-1', 'consumer': b'cg-abc.c1', 'time_since_delivered': 43772, 'times_delivered': 1}, {'message_id': b'1539023088126-2', 'consumer': b'cg-abc.c2', 'time_since_delivered': 5966, 'times_delivered': 1}] Consumers can :py:meth:`~ConsumerGroupStream.claim` pending messages, which transfers ownership of the message and returns a list of (message id, data) tuples to the caller: .. code-block:: python # Unpack the pending messages into a couple variables. mc1, mc2, mc3 = cg.stream_c.pending() # Claim the first message for consumer 2: cg2.stream_c.claim(mc1['message_id']) # Returns a list of (message id, data) tuples for the claimed messages: [(b'1539023088126-0', {b'message': b'c-0'})] Re-inspecting the pending messages for stream *c*, we can see that the consumer for the first message has changed and the message age has been reset: .. code-block:: python # What messages are pending in stream c? cg.stream_c.pending() # Returns: [{'message_id': b'1539023088126-0', 'consumer': b'cg-abc.c2', 'time_since_delivered': 2168, 'times_delivered': 1}, {'message_id': b'1539023088126-1', 'consumer': b'cg-abc.c1', 'time_since_delivered': 47141, 'times_delivered': 1}, {'message_id': b'1539023088126-2', 'consumer': b'cg-abc.c2', 'time_since_delivered': 9335, 'times_delivered': 1}] The individual streams within the consumer group support a number of useful APIs: * ``consumer_group.stream.ack(*id_list)`` - acknowledge one or more messages read from the given stream. * ``consumer_group.stream.add(data, id='*', maxlen=None, approximate=True)`` - add a new message to the stream. The ``maxlen`` parameter can be used to keep the stream from growing without bounds. If given, the ``approximate`` flag indicates whether the stream maxlen should be approximate or exact. * ``consumer_group.stream.claim(*id_list)`` - claim one or more pending messages. * ``consumer_group.stream.delete(*id_list)`` - delete one or more messages by ID. * ``consumer_group.stream.pending(start='-', stop='+', count=1000)`` - get the list of unacknowledged messages in the stream. The ``start`` and ``stop`` parameters can be message ids, while the ``count`` parameter can be used to limit the number of results returned. * ``consumer_group.stream.read(count=None, block=None)`` - monitor the stream for new messages within the context of the consumer group. This method can be made to block by specifying a ``block`` (or ``0`` to block forever). * ``consumer_group.stream.set_id(id='$')`` - set the id of the last-read message for the consumer group. Use the special id ``"$"`` to indicate all messages have been read, or ``"0-0"`` to mark all messages as unread. * ``consumer_group.stream.trim(count, approximate=True)`` - trim the stream to the given size. TimeSeries ---------- Redis automatically uses the millisecond timestamp plus a sequence number to uniquely identify messages added to a stream. This makes streams a natural fit for time-series data. To simplify working with streams as time-series in Python, you can use the special :py:class:`TimeSeries` helper class, which acts just like the :py:class:`ConsumerGroup` from the previous section with the exception that it can translate between Python ``datetime`` objects and message ids automatically. To get started, we'll create a :py:class:`TimeSeries` instance, specifying the stream keys, just like we did with :py:class:`ConsumerGroup`: .. code-block:: python # Create a time-series consumer group named "demo-ts" for the # streams s1 and s2. ts = db.time_series('demo-ts', ['s1', 's2']) # Add dummy data and create the consumer group. db.xadd('s1', {'': ''}, id='0-1') db.xadd('s2', {'': ''}, id='0-1') ts.create() ts.set_id('$') # Do not read the dummy items. Let's add some messages to the time-series, one for each day between January 1st and 10th, 2018: .. code-block:: python from datetime import datetime, timedelta date = datetime(2018, 1, 1) for i in range(10): ts.s1.add({'message': 's1-%s' % date}, id=date) date += timedelta(days=1) We can read messages from the stream using the familiar slicing API. For example, to read 3 messages starting at January 2nd, 2018: .. code-block:: python ts.s1[datetime(2018, 1, 2)::3] # Returns messages for Jan 2nd - 4th: [, , ] Note that the values returned are :py:class:`Message` objects. Message objects provide some convenience functions, such as extracting timestamp and sequence values from stream message ids: .. code-block:: python for message in ts.s1[datetime(2018, 1, 1)::3]: print(message.stream, message.timestamp, message.sequence, message.data) # Prints: s1 2018-01-01 00:00:00 0 {'message': 's1-2018-01-01 00:00:00'} s1 2018-01-02 00:00:00 0 {'message': 's1-2018-01-02 00:00:00'} s1 2018-01-03 00:00:00 0 {'message': 's1-2018-01-03 00:00:00'} Let's add some messages to stream "s2" as well: .. code-block:: python date = datetime(2018, 1, 1) for i in range(5): ts.s2.add({'message': 's2-%s' % date}, id=date) date += timedelta(days=1) One difference between :py:class:`TimeSeries` and :py:class:`ConsumerGroup` is what happens when reading from multiple streams. ConsumerGroup returns a dictionary keyed by stream, along with a corresponding list of messages read from each stream. TimeSeries, however, returns a flat list of Message objects: .. code-block:: python # Read up to 2 messages from each stream (s1 and s2): messages = ts.read(count=2) # "messages" is a list of messages from both streams: [, , , ] When inspecting pending messages within a :py:class:`TimeSeries` the message ids are unpacked into (datetime, seq) 2-tuples: .. code-block:: python ts.s1.pending() # Returns: [((datetime.datetime(2018, 1, 1, 0, 0), 0), 'events-ts.c', 1578, 1), ((datetime.datetime(2018, 1, 2, 0, 0), 0), 'events-ts.c', 1578, 1)] # Acknowledge the pending messages: for msgts_seq, _, _, _ in ts.s1.pending(): ts.s1.ack(msgts_seq) We can set the last-read message id using a datetime: .. code-block:: python ts.s1.set_id(datetime(2018, 1, 1)) # Next read will be 2018-01-02, ... ts.s1.read(count=2) # Returns: [, ] As with :py:class:`ConsumerGroup`, the :py:class:`TimeSeries` helper provides stream-specific APIs for claiming unacknowledged messages, creating additional consumers, etc. Learning more ------------- For more information, the following links may be helpful: * `Redis streams introduction `_. * `Example multi-process task queue using walrus and streams `_. * API docs for :py:class:`Stream`, :py:class:`ConsumerGroup`, :py:class:`ConsumerGroupStream` and :py:class:`TimeSeries`.