Real-Time Live Updates in Django with WebSockets and Django Channels

Introduction

In this article, I'll explore a key feature of my recently finished pet project, SensorFusion. This small-scale endeavor serves as a dedicated platform for sensor integration, authentication, and the storage of sensor data, seamlessly presenting it through a visually appealing reports web page—all made possible through WebSockets implementation. To put it simply, a WebSocket API is an advanced technology that facilitates two-way communication in real time. An example of this technology in use is in the popular social chat systems such as WhatsApp. Now that that is out of the way, let us dive into the main piece of the cake.

Django Channels

Every good story needs a compelling beginning. Before we jump into the story of Django Channels, it's important to understand some key information about it. Django Channels is a powerful extension that provides Django applications with the ability to handle web sockets, HTTP2, MQTT (for IoT, chatbots, radios), and other real-time applications and asynchronous processes/protocols. Django is built on a Web Server Gateway Interface (WSGI), which is used by Python applications to handle synchronous processing. Channels allow Django to process asynchronous processing by leveraging the Asynchronous Server Gateway Interface (ASGI) capabilities of Django.

Channels consist of four main layers, which include, in no particular order of importance:

  • channels - This is the core layer

  • daphne - This is the HTTP/Websocket connection/termination layer. It handles the negotiation of protocols. This eliminates the need for URL prefixing to determine between HTTP endpoints and web sockets.

  • asgiref - This package contains ASGI base libraries for sync-to-async function wrappers, server base classes, and a WSGI-ASGI adapter.

  • channel-redis - This provides a Redis-based backing store for the channel layer. It can be optional, but I recommend using this for maximum functionality.

You may wonder if a channel is similar to a tunnel. The answer is yes and no. A channel is essentially an ordered queue with a message expiration feature. In essence, it's like a tunnel or pipe that allows data to flow from one end to the other and vice versa. However, for this connection to work, there must be a place of intermission, similar to a radio tower that receives and transmits signals. It's not a literal tower, but instead, a few lines of code. This is where the Consumers come into play.

Consumers

To clarify, when we say "consumer" in the context of Django, we are not referring to someone who buys goods or services. In Django, a consumer is a structured method that is wrapped in a class and is called when an event occurs. This is similar to Django views. There are different classes available in Python that can be inherited to create a consumer class, such as ASYNC/SYNC and JSON consumers, depending on the needs of your project.

Now that we have covered the essential information, we can proceed with creating a channel using Django channels.

Creating a channel

Prerequisites

This feature requires Django/Django Rest Framework, Docker/Linux WSL, Python programming language, PostgreSQL database, and Django Channels.

Setting up the environment

This project is not focused on anything other than a websocket feature. However, I would like to mention that the primary backend setup is deployed in a docker container with images for Redis. Please note that this article will not cover the environment setup.

Installations

To create a channel, we need to first install the channels package. You can do this on your command prompt or text editor terminal. This package is what we need for our application. To install use pipenv or pip. Like so:

pipenv install channels[daphne]

or

pip install channels[daphne]

Daphne will handle the HTTP/Websocket connection and URL prefixing, so we don't have to worry about that.

After installing channels, we need to add two more packages to our project's settings.py file. These packages are channels and daphne. Add them to the top of the installed_apps list like this:

INSTALLED_APPS = [
    "daphne",
    "channels",
    "django.contrib.admin",
    "django.contrib.auth",
    "django.contrib.contenttypes",
    "django.contrib.sessions",
    "django.contrib.messages",
    "django.contrib.staticfiles",
    "api.apps.ApiConfig",
    "rest_framework",
    "rest_framework.authtoken",
    "django_filters",
    "drf_yasg",
    "corsheaders",
]

Restart your server and there we go! We are ready to take the next step.

Setting up the ASGI File for Routing

When you create a new Django project, you can find two files in the main project directory where the settings.py file is located. These files are the WSGI file and the ASGI file, and they are automatically created. The purpose of these files is to serve as ASYNC/SYNC server gateways. For our application to process requests asynchronously, we need to use the ASGI file, which sets up the gateway to allow these requests over the server.

In the ASGI file, we can create an application that serves as a dictionary defining how the ASGI protocol types, such as 'https' and 'web socket', are handled within the application. This application represents the web socket routing configuration, similar to Django URLs.

Update the asgi.py file to include the routing application:

import os

from channels.auth import AuthMiddlewareStack
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.security.websocket import AllowedHostsOriginValidator
from django.core.asgi import get_asgi_application
from django.urls import path

from api.consumers import SensorConsumer


os.environ.setdefault("DJANGO_SETTINGS_MODULE", "config.settings")

django_aspi_app = get_asgi_application()


application = ProtocolTypeRouter({
    "http": django_aspi_app,
    "websocket": AllowedHostsOriginValidator(
        AuthMiddlewareStack(
            URLRouter(
                [
                 # Set up URLS here   
                ]
            )
        ),
    ),
})

The ProtocolTypeRouter allows you to route connections based on the protocol used (HTTP or WebSocket).

WebSockets are initiated as HTTP requests that include all the cookies and headers. As a result, a basic Authentication protocol can be used to retrieve user IDs by grabbing sessions. However, there is a risk of Cross-Site Request Forgery (CSRF) when using WebSockets. That's because they can be initiated from any site on the internet to your domain, and still have the user's cookies and session from your site. If you serve private data down the socket, it's crucial to restrict the sites that are allowed to open sockets to you. Hence why the channels.security.websocket.AllowHostsOriginValidator package is used. The AllowedHostsOriginValidator restricts the sites that are allowed to open sockets to the application by performing a security check for the Host header. You can learn more about security measures on the channels documentation. One last thing, to ensure our application starts the server with the asgi configurations, we need to set it up in the settings.py file

ASGI_APPLICATION = "config.asgi.application"

And voila! Now that the routing is in order, the next step awaits.

Creating a Consumer

I have discussed the importance of consumers and how there are different classes of consumers to choose from depending on the project requirements. For this specific project, the requirement is for a consumer that allows for asynchronous processing of data in JSON format. This is why the AsyncJsonWebConsumer class module was selected.

Create a consumers.py file in the app directory and create a class, you can call it whatever you choose. That class will inherit from channels.generic.websocket.AsyncJsonWebConsumer.

import asyncio
import logging
from channels.db import database_sync_to_async
from channels.generic.websocket import AsyncJsonWebsocketConsumer
from api.models import SensorData  
from api.serializers import SensorDataDetailsSerializer

logger = logging.getLogger(__name__) #logging 

class SensorConsumer(AsyncJsonWebsocketConsumer):
   async def connect(self):
       await self.accept()


   async def disconnect(self, close_code):
        logger.info("WebSocket disconnected with code: %s", close_code)

   async def receive_json(self, content, **kwargs):
        try:
            action = content.get('action')
            if action == 'request.data':
                await self.send_sensor_data()
        except Exception as e:
            print(f"Exception in receive_json: {e}")

   def connection_open(self):
        return True

   @database_sync_to_async
   def sensor_data(self):
       queryset = SensorData.objects.all() 
       serializer = SensorDataDetailsSerializer(queryset, many=True)
       if serializer.data:
          return serializer.data
       else:
           print('Serializer is not valid', queryset)

   async def send_sensor_data(self, **kwargs):
        while self.connection_open():
            sensor_data = await self.sensor_data()  
            await self.send_json(sensor_data)
            self.last_sent_data = sensor_data
            await asyncio.sleep(3)

A lot is happening here, but don't panic. Let me explain. To create a consumer, you need to create a class and inherit from a consumer module such as AsyncJsonWebsocketConsumer . Just like the case in any Consumer class, you need to specify a connect method and a disconnect method. The connect method is called when a client connects to the WebSocket. We simply call the accept method to accept the connection. The disconnect method simply disconnects the web socket. Nothing has to be done in that method, or else you want to log some errors, just in case the connection did not break on its own. I suggest setting up a logger sequence.

Remember that we are inheriting from the AsyncJsonWebsocketConsumer module, therefore using async and await would enable us to leverage the full capabilities of the async functionalities of channels.

The receive_json method receives the request made to the consumer, usually in a json format. If the action is present in the json request received, it will trigger a call to the send_sensor_data method

The sensor_data method retrieves sensor data that are stored in the database and serializes the data then returns the data if the serializer is valid.

The send_sensor_data method is a crucial part of the inner workings of channels. It represents the logic behind web sockets, which run like event loops. Each event loop repeats itself over and over, allowing for the update of each sequence with new information. This is truly fantastic! When I figured it out, I shouted "Eureka!" To create the illusion of real-time updates, a loop must be used - it's that simple!

The connection method returns True. The event loop runs continuously as long as there is a true connection and the web socket is connected. This is what we would ideally want. However, the loop also sends old data again, which is not desirable as it leads to repeated or multiple data on the client side. Therefore, the problem must be resolved by finding a way to transmit only the most recent changes or data.

I solved that by creating a variable called last_sent_data in the connect method and set it to None

class SensorConsumer(AsyncJsonWebsocketConsumer):
   async def connect(self):
       await self.accept()
       self.last_sent_data = None

So each time the connection is made, that variable is by default set to None. Now we just apply a little logic, by using a condition that checks if the queryset instance in sensor_data is the same as whatever is stored in the variable last_sent_data. If it's false, it sends sensor data and updates the last_sent_data variable but if it is true, it does nothing, and the asyncio.sleep(3) is triggered (we will get to that in a moment).

async def send_sensor_data(self, **kwargs):
        while self.connection_open():
            sensor_data = await self.sensor_data()  
            if sensor_data != self.last_sent_data:
                await self.send_json(sensor_data)
                self.last_sent_data = sensor_data
            await asyncio.sleep(3)

This resolved the problem.

The asyncio library is a Python asynchronous library used for async coroutines. There are other functionalities embedded in this advanced tool. In the case of this project, it is used as asyncio.sleep. You might be wondering why I did not just use the usual time.sleep, well the first reason is because I do not want to stop time. This is an asynchronous process, if I implemented time.sleep it would block everything from running until the time specified was exhausted. This is not intended as the connection needs to stay on for maximum effectiveness of channels' web sockets. The second reason is that asyncio.sleep just waits around for us. it does not block time, it just waits while allowing the event loop to run other things before during that time frame. That is the whole deal with Asynchronous Programming.

Our Consumer file is ready, we just need a layer to proceed.

Channel Layers

We have everything set up including the consumer configurations, but it's important to use a channel layer. What is that? Well, that's a great question.

The channel layer is a feature in Django that enables various components of an application to exchange messages. It acts as a messaging system that allows different parts of the application to communicate with each other, regardless of whether the page is refreshed or not. It is important to note that the channel layer is strictly asynchronous. If you want to use it synchronously, you need to wrap it in a converter or set it to an empty dictionary.

The channel layer is a useful tool when you want to avoid sending all messages or events through the database. I suggest using the Redis channel layer as a backing store, as it supports both single-server and shared configurations and also group support. This is the best option as it is the only official Django-maintained channel layer supported for production use.

So to create a channel layer, you must first install the channel-redis package using pipenv or pip. Once installed include it in INSTALLED_APPS in your settings.py file. Now we have to include a channel_redis later setting in the same file.

CHANNEL_LAYERS = {
    "default": {
        "BACKEND": "channels_redis.core.RedisChannelLayer",
        "CONFIG": {
            "hosts": [("redis", 6379)],
        },
    },
}

For the purpose of this project, my docker-image for Redis is already built and that is what I am referring to in this setting. In your case, you may want to set up a WSL for Linux on your system (For Windows users) to use Redis. Once all that is set up and ready, open the command prompt or PowerShell and type wsl hostname -I. This command will produce an address which is the host (Linux/wsl) on the system. Update the setting with this address and port number.

Alright, I think we're done here. It's time to wrap things up.

Wrapping up...

Now that everything is set up, please make sure to check for any minor issues that you might have missed and then run the server. Your WebSocket is now ready and connected.

I hope you enjoyed learning about how I implemented web sockets for this task. If you have any questions, comments, or suggestions, or if you noticed anything wrong with what I wrote or think I may be misinformed about anything, please feel free to let me know in the comments section below.

Thank you for taking the time to read this!

Check out the GitHub repository at lady-thee/RemoteSensor-Data-System: This is a data saving API system that saves data from remote sensors and uses data virtualization to represent the data using Django channels (github.com)

References