A self-hosted MQTT environment for Internet of Things – Part 3

In the last post in this series, I will add persisting of sensor data to a database. I will also use an additional subscriber as a proxy for sending the data to Adafruit IO. I will then have all data available locally, but when the Adafruit IO proxy is running, I will also have the data available in the cloud.

This post builds on the two previous posts:

So far I have sensor data measured by small ESP8266 boards that are placed around our home. About every 10 minutes a new measurement is made and this data is published to a Mosquitto MQTT broker that runs on a Raspberry Pi. There is a Python MQTT client setup that subscribes to the topics that are published and thus gets notifications when new data are published. In the previous post, this data was only traced as output to the terminal window.

Part 3 – Storing measurements locally and in the cloud

To persist my gathered data for future visualizations and analysis, I want to store it in a database. As I don’t know how the data structures will evolve, it is hard to setup up database schema beforehand – thus I opt for using a document database.

I will also implement an optional integration with a cloud storage (Adafruit IO).

Installing MongoDB and pymongo

MongoDB is a document database that is very easy to get started with and it can be hosted on a Raspberry Pi. To install it:

sudo apt-get install mongodb

and to work with MongoDB via Python 3.*, we can use the PyMongo library:

sudo python3 -m easy_install pymongo

MongoDB holds one or several databases where any number of collections keeps the data. I will use a database called “SensorData” that has a collection “home_data” for my measurements.

Note that on a 32-bit OS, MongoDB can only hold 2GB of data, so for storing large amounts of measurements it is preferable to use a 64-bit OS for the MongoDB server.

Creating a new MQTT subscriber that persists data

As in the previous post, I use paho-mqtt for subscribing to messages that are being published to a Mosquitto broker. In the on_message handler, the data is formed into a JSON object that is inserted into the database.
The sensor values from the ESP8266 boards are floats or strings. The Mosquitto messages are byte arrays, so the floats are converted to ascii byte array strings before being published. On the subscriber side, I need to convert back to float values so that the data is inserted as floats and not strings in the database. I do this by first converting the byte string to a unicode string and then casting to a float. The float casting only works on unicode strings – that’s why a msg.payload.decode(“utf-8”) call is needed. As the message payload can be of other types than float, a try/except is used for the string to float casting. If the casting fails, the data will be stored as a string.


#!/usr/bin/env python3
import paho.mqtt.client as mqtt
import datetime
from pymongo import MongoClient
def on_connect(client, userdata, flags, rc):
print("Connected with result code "+str(rc))
client.subscribe("Home/#")
def on_message(client, userdata, msg):
receiveTime=datetime.datetime.now()
message=msg.payload.decode("utf-8")
isfloatValue=False
try:
# Convert the string to a float so that it is stored as a number and not a string in the database
val = float(message)
isfloatValue=True
except:
isfloatValue=False
if isfloatValue:
print(str(receiveTime) + ": " + msg.topic + " " + str(val))
post={"time":receiveTime,"topic":msg.topic,"value":val}
else:
print(str(receiveTime) + ": " + msg.topic + " " + message)
post={"time":receiveTime,"topic":msg.topic,"value":message}
collection.insert_one(post)
# Set up client for MongoDB
mongoClient=MongoClient()
db=mongoClient.SensorData
collection=db.home_data
# Initialize the client that should connect to the Mosquitto broker
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.connect("192.168.1.16", 1883, 60)
# Blocking loop to the Mosquitto broker
client.loop_forever()

With this subscriber up-and-running, data is building up in the MongoDB database and by starting the MongoDB shell we can query the data, e.g.:


mongo
use SensorData
db.home_data.find({topic:"Home/Outdoor/Humidity"}).count()
db.home_data.find({topic:"Home/Outdoor/Humidity",value:{"$gte":80}})

Using an additional subscriber as a proxy for Adafruit IO

Previously, I’ve had all my ESP8266 boards setup to directly publish data to Adafruit IO. I have explained it in this post:

https://larsbergqvist.wordpress.com/2016/04/11/exploring-adafruit-io/

Now I store all data locally with my own local MQTT environment. To get the benefits of Adafruit IO as well, I can add an additional subscriber that routes all incoming messages to Adafruit IO. Adafruit IO uses feed names instead of a topic level hierarchy. I thus convert the topic to a feedname before publishing to Adafruit IO.
I use the Adafruit_IO library as an additional MQTT client for publishing. This library has a nice loop_background() function that keeps the connection loop in a separate thread. I can thus issue this call and then use a blocking loop_forever() call for my subscriber client.


#!/usr/bin/env python3
import paho.mqtt.client as mqtt
import datetime
import time
from Adafruit_IO import MQTTClient
# Set to your Adafruit IO key and username
ADAFRUIT_IO_KEY = 'XXX'
ADAFRUIT_IO_USERNAME = 'XXX'
def on_connect(client, userdata, flags, rc):
print("Connected with result code "+str(rc))
client.subscribe("Home/#")
def on_message(client, userdata, msg):
print(str(datetime.datetime.now()) + ": " + msg.topic + " " + str(msg.payload))
# Send the data to Adafruit IO. Replace topic with a feed name
feedname=msg.topic.replace("/","_")
print("Publish to Adafruit feedname: " + feedname)
adafruitClient.publish(feedname,msg.payload)
# Initialize the client that should connect to the Mosquitto broker
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.connect("192.168.1.16", 1883, 60)
# Initialize the client that should connect to io.adafruit.com
adafruitClient = MQTTClient(ADAFRUIT_IO_USERNAME, ADAFRUIT_IO_KEY)
adafruitClient.connect()
# Run loop in a separate thread
adafruitClient.loop_background()
# Blocking loop to the Mosquitto broker
client.loop_forever()

With this subscriber running, data will be stored in my Adafruit IO feeds, and I can use dashboards like this one to view current and historical data:

AdafruitIO_screenshot

Summary

In this series I have showed how I have setup a local MQTT environment where sensor data from ESP8266 boards are stored in a MongoDB database on a Raspberry Pi. I have also used an additional subscriber as proxy to get the data uploaded into the Adafruit IO cloud.

The subscribers can be setup as services according to the steps in: https://larsbergqvist.wordpress.com/2016/06/18/daemonize-that-python-script/

Then, by changing what services are started or not, I can decide to store data locally, in the cloud or in both places.

All code for my setup is available from GitHub:

https://github.com/LarsBergqvist/MQTT_IoT

Addendum, March 2017

I’ve started storing my sensor data in an InfluxDB time series database. With this data source, I can use Grafana for viewing the data. See this post:

InfluxDB and Grafana for sensor time series

The MQTT_IoT repository has been updated with a subscriber that stores the data in an InfluxDB database.