Introduction
Server-Sent Events (SSE) are an HTML5 API used to implement real-time web applications. Real-time web applications send data from the server to the client in real-time, without the client specifically requesting it. Normally, the client has to request data from the server, but in real-time web apps like chatrooms and online trading platforms, the server sends regular updates. HTML5 also introduced WebSockets, but this article will focus on SSE.
Overview
In a Server-Sent Event enabled application, the client initiates a request to the server, just like in traditional HTTP protocol, and the server responds normally, except that now the response body never ends – it is infinite. The server keeps the underlying TCP connection open (it is long-lived), and when it has updates to send, it simply writes them to the response stream where they are immediately received by the client.
An obvious advantage is that the server can now send updates instantly without client’s explicit request. However, SSE are unidirectional, meaning that after the initial request, only the server can send data. This limitation may not be suitable for all applications. (The client can of course always send data to the server with traditional HTTP request, but that has to occur over a different TCP connection.)
That being said, the data flow in many real-time applications is asymmetric: most of it is sent by the server while clients mostly listen and send only occasionally. Even in a chatroom setting, most of the data will often be coming from the server: in a chatroom with many users and public rooms, most of the traffic will be messages being sent from the server to clients. If the volume of data going from the server to clients considerably exceeds the volume in the other direction, the SSE may still be a good choice.
Message Format
Before discussing the message format in details, let’s take a look at an example of an HTTP response that streams Server-Sent Events. In this example, the server simply sends an update every second containing an id
that is monotonically increasing and the data
that displays the update as a string containing the current time. After five seconds we manually abort the stream.
HTTP/1.1 200 OK
date: Wed, 29 Mar 2023 10:27:58 GMT
server: uvicorn
cache-control: no-cache
connection: keep-alive
x-accel-buffering: no
content-type: text/event-stream; charset=utf-8
Transfer-Encoding: chunked
id: 1
data: An update from the server at 10:27:59.
id: 2
data: An update from the server at 10:28:00.
id: 3
data: An update from the server at 10:28:01.
id: 4
data: An update from the server at 10:28:02.
id: 5
data: An update from the server at 10:28:03.
While the contents of the stream are not interesting, the dynamics are: this stream never ends (unless we abort it); it continuously increments the id
and sends the update to the client every second. Let’s examine the message format in a bit more detail. Each data (event) has (or should have) a unique id.
id: 5
data: An update from the server at 10:28:03.
These ids allow the client to keep track of what messages it has seen. If the connection gets dropped, the client will reestablish it and send back a new HTTP request containing a header name Last-Event-ID
set to the largest id
the client has received so far. The server can then simply resume the stream starting from the appropriate id
. Further, events can have names. For instance.
id: 1
event: new user
data: Sven
id: 2
event: message
data: {user: 'Sven', message: 'Hi!'}
In this example, we can separate an event that signals the arrival of a new user to the chatroom and an event that signals that a user sent a message. Tagging messages with event names simplifies processing on clients as we see next.
Client-side JavaScript API
Server-Sent Events offer a straightforward JavaScript API. All we need to do is to subscribe to events using EventSource
object.
const source = new EventSource("/path/to/stream-url");
source.onopen = function (event) { ... };
source.onerror = function (event) { ... };
source.addEventListener("event_name", function (event) {
processEventName(event.data);
});
source.onmessage = function (event) {
console.log(event.id, event.data);
};
In case the client gets disconnected, the Eventsource
object will reconnect automatically and set the header value Last-Event-ID
to the largest id
it has seen so far which will allow the server to appropriately resume the stream.
Server-Side: Using an appropriate web application stack
To properly leverage Server-Sent Events, the web application must be able to generate an endless stream of data, an infinite page of contents if you will; once open, it has to run until the client is connected. And while the way this is achieved depends on the choice of the entire web application stack—the programming language, the web framework, the run-time environment—this is often implemented with an infinite loop (using a while True
idiom), some sort of an event loop, or something similar.
However, if you use a web application stack that uses a synchronous one-process-per-request model (which is typical in a LAMP set-up), you will run into serious performance issues when the number of clients increases: if every client gets its own process to handle its request (which in an SSE request is long-lived – it lasts until the client is connected to the stream), your server will eventually run out of available processes.
So to properly use SSE, your web application stack has to support long-lived connections. Generally speaking, this is more efficiently achieved with asynchronous web frameworks since they can handle multiple I/O operations simultaneously. Examples of such asynchronous frameworks in Python are Starlette, FastAPI, Tornado, and aiohttp.
Server-Side: A simple example with FastAPI
For starters, let’s build a simple “Hello World” example that generates an endless stream of synthetic events that were introduced in the example above.
We’ll use the FastAPI asynchronous Python web framework, the Starlette SSE library and the uvicorn asynchronous web server.
First, let’s install all dependencies; here we’re pinning them to specific versions, since these are confirmed to worked together.
pip install asyncio==3.4.3 fastapi==0.95.0 sse-starlette==1.3.3 starlette==0.26.1 uvicorn==0.21.1
Next, let’s code a very basic FastAPI endpoint that servers an endless stream of Server-Sent Events.
from fastapi import FastAPI, HTTPException, Request
from sse_starlette import EventSourceResponse
import asyncio
import time
app = FastAPI()
@app.get("/")
async def sse(req: Request):
return EventSourceResponse(generate_events())
async def generate_events():
i = 0
while True:
# simulate time between events
await asyncio.sleep(1)
# the contents of the new event
i += 1
msg = "An update from the server at %s." % time.strftime("%H:%M:%S", time.localtime())
# send the event to the client with yield: EventSourceResponse requires the
# event be returned as a dictionary, where keys (id, data, and event)
# correspond to SSE properties
yield {
"id": i,
"data": msg
}
Assuming you save this to file hwsse.py
, start the web application with the following command:
uvicorn hwsse:app --port 8080
Finally, call the root endpoint /
with cURL
and see events as they are being generated; the -N
flag disables buffering so contents are displayed as they arrive.
curl -N -i localhost:8080/
HTTP/1.1 200 OK
date: Thu, 30 Mar 2023 08:05:40 GMT
server: uvicorn
cache-control: no-cache
connection: keep-alive
x-accel-buffering: no
content-type: text/event-stream; charset=utf-8
Transfer-Encoding: chunked
id: 1
data: An update from the server at 10:05:41.
id: 2
data: An update from the server at 10:05:42.
id: 3
data: An update from the server at 10:05:43.
id: 4
data: An update from the server at 10:05:44.
id: 5
data: An update from the server at 10:05:45.
Once you’ve seen enough, close the cURL
stream with Ctrl-C
.
A more involved example using a database
To complete our tour of Server-Sent Events, let’s build a more realistic, but still simple, application that allows all connected clients to receive updates whenever some event on the server occurs.
We’ll build a simple TODO application; TODO items will be stored in a database, and whenever an item is added, Server-Sent Event will be delivered to all connected clients. Since this is a slightly more involved example, we’ll make use of Docker to spin-up required services.
The database
TODO items will be stored in a PostgreSQL database. The database structure is straightforward and is presented in the following schema. Save the contents to file named init.sql
.
CREATE TABLE todos (
id SERIAL PRIMARY KEY,
description TEXT NOT NULL,
completed BOOLEAN NOT NULL DEFAULT FALSE
);
CREATE OR REPLACE FUNCTION notify_todo_insert() RETURNS TRIGGER AS $$
BEGIN
PERFORM pg_notify('todo_inserted', '{ "id": ' || NEW.id::text || ', "description": "' || NEW.description::text || '", "completed": ' || NEW.completed::text || '}');
RETURN NULL;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER todo_insert_trigger AFTER INSERT ON todos
FOR EACH ROW EXECUTE FUNCTION notify_todo_insert();
The table todos
contain three columns with straightforward semantics. However, we also create a custom function notify_todo_insert()
and a trigger that calls the function whenever a new entry is added to the todos
table.
The notify_todo_insert()
function makes use of the PostgreSQL LISTEN
and NOTIFY
functionality: it sends a notification on some channel, in our case todo_inserted
, that a new row has been added to the table, together with the contents of the new row encoded as a JSON message.
Now, all we need to do in our application, is to listen to PostgreSQL notifications on channel todo_inserted
and whenever we receive one, we forward the message to SSE clients.
The web application
Again, we’ll make use of FastAPI, Starlette, asyncio and uvicorn to implement the web application. Here’s the main part of the application. Save it to file main.py
.
from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import HTMLResponse
from sse_starlette import EventSourceResponse
import asyncpg
import asyncio
import tenacity
app = FastAPI()
@app.on_event("startup")
@tenacity.retry(
wait=tenacity.wait_exponential(),
stop=tenacity.stop_after_attempt(5))
async def startup():
app.db_pool = await asyncpg.create_pool(
user="postgres",
password="postgres",
database="todos",
host="db")
@app.on_event("shutdown")
async def shutdown():
await app.db_pool.close()
@app.post("/todos", status_code=201)
async def create_todo(request: Request):
try:
form = await request.json()
description = form['description']
completed = form['completed']
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
async with app.db_pool.acquire() as conn:
id_ = await conn.fetchval(
"INSERT INTO todos (description, completed) VALUES ($1, $2) RETURNING id",
description, completed)
return {
"id": id_,
"description": description,
"completed": completed
}
@app.get("/todos")
async def get_todos():
async with app.db_pool.acquire() as conn:
todos = await conn.fetch("SELECT * FROM todos")
return [dict(todo) for todo in todos]
@app.get("/")
async def root():
return HTMLResponse("""
<!DOCTYPE html>
<html lang="en">
<title>Server-Sent Events with Fast API and PostgreSQL</title>
<h1>Realtime TODO watch</h1>
<ul id="todos"> </ul>
<script>
document.addEventListener("DOMContentLoaded", () => {
const source = new EventSource("http://localhost:9090/live-todos");
const ul = document.querySelector("#todos")
source.addEventListener('todo', event => {
const todo = JSON.parse(event.data);
const li = document.createElement("li");
if (todo.completed) {
li.innerText = String.fromCodePoint(0x1F5F5);
} else {
li.innerText = String.fromCodePoint(0x2610);
}
li.innerText += " " + todo.description;
ul.appendChild(li);
}, false);
source.addEventListener('error', event => {
console.log("Failed to connect to event stream.");
}, false);
});
</script>
""")
There is a lot of going on. Let’s break it down.
- Method
startup()
andshutdown()
are invoked at the application start and end, respectively. The first sets up a pool of connections to the database, and the latter tears it down. Thetenacity
decorator around thestartup()
method ensures the method is called a few times before it gives up: it may happen during start-up that connections cannot be immediately established, so it makes sense to re-try a few times before giving up. - Method
create_todo(request)
is mapped to URL/todos
using thePOST
request. The method expects a JSON payload that contains two attributes, a stringdescription
and a booleancompleted
, and if provided, inserts a new TODO item in the database. - Method
get_todos()
is also mapped to URL/todos
, but it uses theGET
request. It simply returns the list of all TODO items in the database. Methodscreate_todo(request)
andget_todos()
are RESTful endpoints and contain nothing specific to Server-Sent Events. - Method
root()
is mapped to URL/
using theGET
request. It returns a simple HTML page containing a minimal SSE client in JavaScript. Every time a new SSE event is received, a newli
element is created and added to theul
list. In a production, you’d probably use a front-end framework such as React, Vue, or similar to handle such events.
All we are missing now is the SSE endpoint that sends a message to the browser whenever PostgreSQL tells the app a new row has been added. This is achieved in the live_todos()
function.
@app.get("/live-todos")
async def live_todos():
queue = asyncio.Queue()
async def receive_pg_notification(conn, pid, channel, payload):
# Receives updates from the DB and forwards them to the queue
await queue.put(payload)
async def generate_stream():
async with app.db_pool.acquire() as conn: # gets a DB connection
await conn.add_listener("todo_inserted", receive_pg_notification) # listens for DB updates
try:
while True:
payload = await queue.get() # receives an update from queue
yield dict(data=payload, event="todo") # sends it to the browser
except asyncio.CancelledError as e:
# when the browser disconects, cancels notifications
await conn.remove_listener("todo_inserted", receive_pg_notification)
raise e
return EventSourceResponse(generate_stream())
The function is invoked with a GET
request to /live-todos
. Within this function, we define two auxiliary functions whose execution time is infinite and a helper queue that allows the functions to communicate.
The function then starts an infinite stream by calling the generate_stream()
. This one connects to the database, and starts listening for todo_inserted
notifications: whenever the database issues a notification on this channel, this client will be notified.
The notification will be received in the receive_pg_notification(conn, pid, channel, payload)
function. All this function does, is write the message (the payload) to the queue
.
The generate_stream()
function then proceeds into an infinite loop where it waits for messages to appear on the queue, and when they do, it simply forwards them to the client.
Finally, when the client disconnects, the function cancels the subscription for todo_inserted
notification for the given connection. We have to this, because the connection goes back into the pool to be reused.
Running with Docker
and docker-compose
Since this is no longer a trivial application and it consists of multiple services, it makes sense to run it with docker-compose
. But first, let’s package the web application into a docker image; save the following lines into file named Dockerfile
.
FROM python:3.11-slim
WORKDIR /app
RUN pip install --upgrade \
pip \
fastapi==0.95.0 \
asyncio==3.4.3 \
asyncpg==0.27.0 \
uvicorn==0.21.1 \
sse-starlette==1.3.3 \
starlette==0.26.1 \
tenacity==8.2.2
COPY . .
The image simply installs required Python dependencies and copies in the source files. Next, to run the PostgreSQL database, and connect it to the web application, use the following docker-compose.yml
file.
version: '3'
services:
db:
image: postgres:15.2-alpine3.17
environment:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
POSTGRES_DB: todos
volumes:
- ./init.sql:/docker-entrypoint-initdb.d/init.sql
ports:
- 5432:5432
web:
build: .
command: uvicorn main:app --host 0.0.0.0 --port 80
volumes:
- ./:/app
ports:
- 127.0.0.1:9090:80
depends_on:
- db
This will create two services, db
and web
. The db
will run the PostgreSQL database and initialize it with the commands from init.sql
. The web
service will build and run our web application on port http://localhost:9090
.
If you’ve followed so far, you should have the following files:
main.py
init.sql
Dockerfile
docker-compose.yml
(There should also be hwsse.py
from previous example, but it is not relevant here.) To run the application, simply issue docker-compose up
. This should build the web application image, download the PostgreSQL image, and run them.
Open your browser and go to http://localhost:9090
. Next, open a terminal window, and add a new TODO with cURL as follows.
curl -X POST http://localhost:9090/todos \
-H 'Content-Type: application/json' \
-d '{"description": "Buy coffee", "completed": false}'
As you press enter, the browser should display the newly created TODO item.
Conclusion
Congratulations, you’ve made it to the end! The topic of real-time web is fascinating, useful, and fun, but also demanding and complex. Even though our application is very basic, it is anything but trivial to understand. However, it can deliver events to clients in real-time and do so efficiently while saving both server resources and network bandwidth.