Building a Delayed Message System with Redis and FastAPI
Recently, I had a chance to help a fellow software engineer with an interesting project – building a system that allows printing delayed messages using Redis. The requirements were simple:
1. Implement a POST API endpoint that accepts a string message and a relative time in seconds (since now).
2. Implement an independent system that shows the messages on the screen at the right time, within a second granularity.
3. Use Redis as your primary data store.
We can take different approaches, each with pros and cons, so I want to start by building a simple working prototype and then keep improving it as needed. Our programming language will be Python combined with the lightweight FastAPI framework.
By reading the requirements, we can see that an efficient solution will probably use a priority queue sorted by the timestamp of when we need to show a given message. Redis’s closest to a priority queue is its ZSET data structure, which is a sorted set (implemented using a hash table and a skip list).
When we want to push a message, we will use the ZADD instruction and set the score (the attribute the set is sorted by) to the timestamp for printing the message. This operation will have an O(log n) time complexity. Then, on the consumer side, we will use the ZRANGEBYSCORE instruction with a time filter to get the messages we need to print and remove them from the set using the ZREM instruction. Both of these operations also have O(log n) time complexity.
The API implementation looks like this:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | class Message(BaseModel): message: str delay_sec: int = Field(..., ge=0) @app.post("/messages") async def post_message(msg: Message): try: # Calculate absolute time as a UNIX timestamp absolute_time = (datetime.now(timezone.utc) + timedelta(seconds=msg.delay_sec)).timestamp() # Insert the message into Redis ZSET redis_client.zadd(REDIS_KEY_NAME, {msg.message: absolute_time}) return JSONResponse( status_code=201, content={"scheduled_for": absolute_time} ) except Exception as e: raise HTTPException(status_code=500, detail=f"Failed to add message: {e}") |
In this example, we introduce a POST /messages endpoint which calculates the absolute timestamp in UTC of when the message should be printed, converts it to UNIX epoch time in seconds, and pushes it to the sorted set.
Note that the call to zadd () has other optional arguments that can define the set’s behaviour in case of duplicate keys (e.g., insert regardless, update the score of the existing key, fail the operation, etc.).
API calls can happen in parallel, meaning there will be concurrent calls to ZADD. Luckily, we don’t need to do anything extra here because Redis will handle the concurrency for us and ensure the integrity of the data structure. Internally, Redis queues every incoming request and then processes it sequentially in a single-threaded event loop.
Now, as discussed, the consumer side looks like this:
1 2 3 4 5 6 7 8 9 10 11 12 | def poll_messages(): while True: current_time = datetime.now(timezone.utc).timestamp() messages = redis_client.zrangebyscore(REDIS_KEY_NAME, "-inf", current_time, withscores=True) if messages: for message, timestamp in messages: print(f"{datetime.fromtimestamp(timestamp, timezone.utc).strftime('%Y-%m-%d %H:%M:%S')}: {message}") redis_client.zrem(REDIS_KEY_NAME, message) time.sleep(1) |
In a loop that runs once a second, we poll for new messages in Redis with a timestamp less than Now(), then print and delete them from the set.
At this stage, we have a good prototype with decent performance, as the producer and consumer operate in a logarithmic time complexity (1M messages ~ 20 operations, 100M messages ~ 26 operations).
If we want to optimize further, we can “shard” our set into buckets (let’s say 10 minutes each) to reduce the overall data size of each bucket, resulting in even lower latency on all sides. However, it does add a bit of complexity, as now you need to ensure you have consumed a bucket before moving to the next one.
Now, what will happen if we run multiple pollers? Here’s where the fun begins. The poller function calls to ZRANGE and then ZREM without any atomicity or locking around it, meaning that we may print the same message in more than one poller. Also, we risk the chance of one poller trying to remove a message already removed by another poller.
We don’t need to worry about removing a non-existing message as it is handled on the Redis side and won’t return any failure, but how can we change our code so that the same message is not printed more than once?
As an option, we may write a simple LUA script that does ZRANGE, then ZREM and then returns the read messages, and executes it directly from Python (using redis_client.eval) atomically – due to the single-threaded nature of Redis. That will solve the problem of printing the same message twice but will introduce a new one – losing messages. Imagine one of your pollers got a bunch of messages to print and crashed before it could finish. These messages were already deleted from Redis; hence, they won’t be retried and will be lost forever.
Another approach would be to use Redis Streams. Redis Streams is a robust data structure designed to handle real-time event-driven data. It allows producers to append messages to a stream, which acts as a log of events and enables consumers to read these messages in a reliable and ordered manner. Streams support features like message IDs, consumer groups for distributed processing, and the ability to track pending messages for acknowledgment.
So basically, we can get the messages we read from ZRANGE, push them to a stream, and then have a bunch of consumers listening to the stream, getting the messages and acknowledging them. The stream will ensure one-time delivery as a message read from the stream becomes “pending” and won’t be visible to other consumers. Once a message is acknowledged, it will be removed from the stream. In case we crash and don’t acknowledge a message, it remains in the PEL (Pending Entries List) associated with the consumer group. Then, using the XPENDING command, we can see the messages that weren’t acknowledged and decide what to do. Either reassign them to another consumer using XCLAIM or print and XACK them within the same consumer group.
Note: It won’t guarantee 100% success in preventing printing the same message twice (as the consumer may crash after printing the message but before acknowledging it), but it will significantly reduce the chances of it happening.
So, to recap, we would use the Redis Sorted Set to act like a priority queue, helping us get the messages we need to print at the right time. Then, we would use Redis streams to ensure the same message wouldn’t be printed more than once.
The producer code (API implementation) won’t change and will continue pushing the received messages to the sorted set. On the other hand, the consumer will now have two threads instead of one. The first will poll for messages from within the sorted set and push them to the stream. The second will get messages from the stream, print them, and acknowledge them. I won’t implement the reclaiming of messages from the PEL list here, and I will leave it as a good exercise for you. The new consumer code looks like this:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 | # Thread 1 def poll_messages(): lua_script = """ local sorted_set_key = KEYS[1] local stream_key = KEYS[2] local current_time = ARGV[1] -- Get messages ready for processing local messages = redis.call("ZRANGEBYSCORE", sorted_set_key, "-inf", current_time, "WITHSCORES") for i = 1, #messages, 2 do local message = messages[i] redis.call("XADD", stream_key, "*", "message", message) redis.call("ZREM", sorted_set_key, message) end return #messages / 2 """ while True: current_time = datetime.now(timezone.utc).timestamp() redis_client.eval(lua_script, 2, SORTED_SET_KEY, STREAM_KEY, current_time) time.sleep(1) # Thread 2 def process_stream(): while True: messages = redis_client.xreadgroup(CONSUMER_GROUP, "consumer", {STREAM_KEY: ">"}, count=10) for _, entries in messages: for message_id, fields in entries: message = fields.get("message") print(f"{datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S')}: {message}") redis_client.xack(STREAM_KEY, CONSUMER_GROUP, message_id) time.sleep(1) |
The complete code can be found here, including a small README file with instructions on running it and a small producer script that can generate messages to test the whole system.
While playing around with it, you may notice that while a message exists in the sorted set, you cannot send another message with the exact text. The API will return a success, BUT the sorted list will duplicate it, as we use it as the key. I am also leaving it as a challenge for you to fix. A slight hint is that you can make the sorted set keys unique by adding a random UUID prefix to each message when you push it and then removing it on the consumer side before printing it. Other optimizations can also be made.
– Alexander.