From MQTT to InfluxDB
Now that you have learned the basics of InfluxDB, it is time to forward data received by Mosquitto to InfluxDB. There are several ways to do this but we will do it by writing a small application that acts as a forwarder. The application will connect to Mosquitto and listen on one or more topics that receive messages. When a message arrives that matches a specific message type, we will store the data in InfluxDB.
In section, Writing an MQTT client, we sent fictitious air quality measurements to an MQTT topic called mytopic. In this section, we will send air quality data to a topic that includes the location as well. For instance:
For a device in Antwerp, in the Ahlers Building in room Asterix, the MQTT topic would be:
Our forwarder can then subscribe to topic airq/# where # acts as a wildcard. It can check for correct formatting and write air quality data in addition to location data to InfluxDB.
At a high level, the applications consists of several steps:
Setup: package imports and package level variables
Initialization: connect to InfluxDB and create the telemetry database if it does not exist (the name of the database is set via an environment variable)
main() function: connect to Mosquitto securely and subscribe to the configured topic (the name of the topic is set via an environment variable); when new messages arrive on the topic, execute the onMessageReceived handler
Message handler: when a message is received, verify the correctness of the MQTT topic; when the topic is correctly formatted, write the payload to InfluxDB
This forwarder is not built to scale but it can easily handle hundreds of devices if they send data not too frequently (like once a minute or less). In a production setting, it is recommended to use a multi-consumer queue between the MQTT broker and its consumers. Even in our scenario, there are already two consumers: the MQTT to InfluxDB forwarder and the MQTT to Redis forwarder. An example of a multi-consumer queue is Kafka, or in a cloud setting, Azure Event Hub.
Setup
Several package imports are required. We need to connect to Mosquitto and InfluxDB, and we do that with the Eclipse Paho MQTT client and the InfluxDB package from InfluxData. We also use the envconfig package to easily pick up our environment variables.
Let's take a look at package level variables and types:
A variable of type InfluxDB.Client is required to connect to InfluxDB. It is used later in the init() function.
We use two structs to capture the MQTT payload (Message) and location (Location):
Message: the JSON received on the topic is marshalled into a struct of this type
Location: the topic is expected to be formatted as airq/city/building/room; the city, building and room string is verified and extracted each time a message is received
The Specification struct is used with the envconfig package. That package will read the environment variables set in the container. The name of the variables need to match what is specified after envconfig. The line below defines that an environment variable INFLUX_USER will be mapped to InfluxUser in the struct:
We will later set the environment variables in a YAML file that we use with kubectl apply.
Initialization
The init() function is executed first:
We first get the environment variables with a simple call to envconfig.Process. If that fails, we exit the program. Next, we connect to InfluxDB and use the values from the Specification struct in the package level variable s to retrieve the InfluxDB hostname, port and more.
You might be wondering why the environment variable for s.InfluxHost is set the way it is. Actually, when you have defined a Kubernetes service such as db-influxdb, Kubernetes automatically injects an environment variable that is set to the ClusterIP of the service in each container. Similar environment variables are set for ports as well. So, instead of using the Kubernetes DNS service, we use this alternative here. It's important to note that you need to use mqtt or db in the --name parameter when you deploy Mosquitto and InfluxDB with Helm. If you do not, the environment variables will not be correctly named for this application.
Also note that we create a database in InfluxDB if it does not exist. The name of the database can be configured with an environment variable.
main()
After init() finishes, the main() function will run:
If you are new to Go, the first and last lines might be unclear. On the first line, a channel c is created of type os.Signal. With signal.Notify we send a message to the channel when a SIGTERM is detected. The last line basically waits for that message to be received.
The program does its work until a SIGTERM is detected, so basically until the container is stopped if there are no other errors. The actual work is done by a message handler called onMessageReceived. To configure the handler, we first need to connect to Mosquitto.
The connection to Mosquitto is similar to what we have seen in Writing an MQTT client. We connect without verifying the TLS certificate and we use the username and password set via environment variables. The topic we connect to is set via an environment variable. We will use the airq/# topic in order to catch all topics under airq.
Message Handler
The message handler takes a variable of type MQTT.Message as the second parameter. We can extract the topic and the payload from that message:
Because we do not want to process payloads coming from irrelevant topics, we first verify the topic with the verifyTopic function. When that function does not throw an error, we write the payload to InfluxDB via the writeToInfluxDB function.
verifyTopic is pretty straightforward:
When invalid formatting is encountered an error is thrown. When there are no errors, the function returns a pointer to a variable of type Location. The function uses the typical Go construct of returning two values, the actual value or an error. When there is an error we return nil and the error. When there is no error, we return a result (the location) and nil for the error.
You can then easily check the result of the call to this function with:
To write to InfluxDB, we use the following function:
Because we expect to receive a JSON message in the MQTT payload, we first unmarshal that payload into a Message struct. When that succeeds, we set the field Room in the Message struct to the location that was passed to this function. We then write a new point to the measurement set in s.InfluxMeasurement which is configured via an environment variable. Refer to InfluxDB Primer, where we discussed measurements, points, tags, fields and how to write data points to a measurement with Go. Although not strictly necessary, the above code can write multiple points in one operation, which is similar to writing multiple points with the line protocol over HTTP. With the HTTP API, the points need to be separated by newlines as discussed in InfluxDB Primer.
In the NewBatchPoint method, a BatchPointsConfig is passed that sets the precision to a second (s). If you prefer, you can set the precision to a nanosecond (ns) to use timestamps with nanosecond precision. This is necessary if you write more than one point per second from the same device. Internally, InfluxDB works with nanosecond precision.
Building the executable
On a Linux system, just type:
On a Windows system, you can type:
If the folder you are running the above command in, is called mqtt-client, you can omit the -o flag. Next, let's build the container and put it in a repository.
Building the container image
To build the container image yourself, you need to install Docker on your machine. Head over to https://www.docker.com to find out more. A container image is built from a Dockerfile, which contains build instructions. You can define the starting image, the files you want to add, the process to be started in the container and much more.
We can get away with a minimal Dockerfile:
Before we build the image and push it to a repository, you should get an account at Docker Hub. You will need your account name when you push the image. Let's build the image. From the folder that contains the Dockerfile and the compiled mqtt-client Linux executable, run the following command:
Note the . at the end to use the current directory to pick up the Dockerfile. The -t flag specifies the tag for this image on your local machine. That same tag is also used to determine where to push your container to. You can now push the image to Docker Hub using the following command:
You will need to provide credentials to be able to push the image. You can also set your credentials beforehand with docker login. In my case, I tagged my image gbaeke/mqtt-client. The Docker push command will realize it needs to push to Docker Hub, to a repository of account gbaeke.
Normally, your image should be publicly available if you follow the above steps. That is ok in this case. You can also create private repositories on Docker Hub or you can use private registries like Azure Container Registry. Note that if you use a private repository or registry, you will need to instruct Kubernetes how to connect to such a system.
Now we will create a Kubernetes manifest using the YAML format, that creates a deployment that uses our image in its template.
Kubernetes manifest
The YAML file, in its simplest form, will look like below:
Ouch, we put passwords in a manifest instead of using secrets. We will leave that for now, and focus on deployment. Make sure you update the image name to your image or use my image since it is publicly available.
Note that above, we configure a Kubernetes Deployment which is a construct on top of a ReplicaSet. The ReplicaSet takes care of running the requested number of pods on your cluster. The deployment controls how you deploy and especially update your pod(s).
Create the deployment with:
When you run kubectl get deployment you should see similar output as below:
Use kubectl get rs to see the ReplicaSet and of course, kubectl get pods to see the mqtt-client pod:
Use kubectl logs <your-mqtt-client-pod> to see the log output. We used the log package to log errors to stdout so you should see that output. With a client connected, you see the following output:
Like with the Linux tail command, you can use -f with kubectl logs to follow the logs.
As we demonstrated before, you can port forward the InfluxDB port to your local machine and use the influx client to see the measurement iotdata. After authenticating, use the two commands below:
Fantastic! If you got this far, you deployed an MQTT server (Mosquitto) and a time-series database (InfluxDB) using Helm. Next, you wrote a small application that acts as an MQTT-to-InfluxDB forwarder. You deployed that application using standard Kubernetes techniques, namely with a YAML manifest.
Next, it is time to actually do something with the data. We will write a REST API that allows you to query the InfluxDB data from other applications such as a website or a mobile app. Head over to the next chapter, to see how to do that.
Last updated