From MQTT to InfluxDB

Now that you have learned the basics of InfluxDB, it is time to forward data received by Mosquitto to InfluxDB. There are several ways to do this but we will do it by writing a small application that acts as a forwarder. The application will connect to Mosquitto and listen on one or more topics that receive messages. When a message arrives that matches a specific message type, we will store the data in InfluxDB.

In section, Writing an MQTT client, we sent fictitious air quality measurements to an MQTT topic called mytopic. In this section, we will send air quality data to a topic that includes the location as well. For instance:

airq/city_name/location_name/room_name

For a device in Antwerp, in the Ahlers Building in room Asterix, the MQTT topic would be:

airq/antwerp/ahlersbuilding/asterix

Our forwarder can then subscribe to topic airq/# where # acts as a wildcard. It can check for correct formatting and write air quality data in addition to location data to InfluxDB.

At a high level, the applications consists of several steps:

  • Setup: package imports and package level variables

  • Initialization: connect to InfluxDB and create the telemetry database if it does not exist (the name of the database is set via an environment variable)

  • main() function: connect to Mosquitto securely and subscribe to the configured topic (the name of the topic is set via an environment variable); when new messages arrive on the topic, execute the onMessageReceived handler

  • Message handler: when a message is received, verify the correctness of the MQTT topic; when the topic is correctly formatted, write the payload to InfluxDB

This forwarder is not built to scale but it can easily handle hundreds of devices if they send data not too frequently (like once a minute or less). In a production setting, it is recommended to use a multi-consumer queue between the MQTT broker and its consumers. Even in our scenario, there are already two consumers: the MQTT to InfluxDB forwarder and the MQTT to Redis forwarder. An example of a multi-consumer queue is Kafka, or in a cloud setting, Azure Event Hub.

Setup

package main

import (
    "errors"
    "log"
    "os"
    "os/signal"
    "strings"
    "syscall"
    "time"

    "crypto/tls"
    "encoding/json"
    "fmt"
    "strconv"

    MQTT "github.com/eclipse/paho.mqtt.golang"
    InfluxDB "github.com/influxdata/influxdb/client/v2"
    "github.com/kelseyhightower/envconfig"
)

Several package imports are required. We need to connect to Mosquitto and InfluxDB, and we do that with the Eclipse Paho MQTT client and the InfluxDB package from InfluxData. We also use the envconfig package to easily pick up our environment variables.

Let's take a look at package level variables and types:

// InfluxDB client initialised in init() and used in writeToInfluxDB
var c InfluxDB.Client

// Message expected to be received via MQTT
type Message struct {
    Device      string
    Type        string
    Temperature float64
    Humidity    float64
    Room        string
}

// Location derived from MQTT topic
type Location struct {
    City     string
    Building string
    Room     string
}

// Specification for environment variables
type Specification struct {
    MQTTHost          string `envconfig:"MQTT_MOSQUITTO_SERVICE_HOST" required:"true"` // matches Kubernetes environment variable for service mqtt-mostquitto
    MQTTPort          int    `envconfig:"MQTT_MOSQUITTO_SERVICE_PORT" default:"8883"`
    MQTTUser          string `envconfig:"MQTT_USER" default:""`
    MQTTPass          string `envconfig:"MQTT_PASS" default:""`
    MQTTTopic         string `envconfig:"MQTT_TOPIC" default:"#"`
    InfluxHost        string `envconfig:"DB_INFLUXDB_SERVICE_HOST" required:"true"` // matches Kubernetes environment variable for service db-influxdb
    InfluxPort        string `envconfig:"DB_INFLUXDB_SERVICE_PORT" required:"true"`
    InfluxDatabase    string `envconfig:"INFLUX_DATABASE" default:"telemetry"` // database will be created by this service if it does not exist
    InfluxUser        string `envconfig:"INFLUX_USER" default:""`
    InfluxPassword    string `envconfig:"INFLUX_PASSWORD" default:""`
    InfluxMeasurement string `envconfig:"INFLUX_MEASUREMENT" default:"iotdata"`
}

// defined at package level; initialised in init(); used in main()
var s Specification

A variable of type InfluxDB.Client is required to connect to InfluxDB. It is used later in the init() function.

We use two structs to capture the MQTT payload (Message) and location (Location):

  • Message: the JSON received on the topic is marshalled into a struct of this type

  • Location: the topic is expected to be formatted as airq/city/building/room; the city, building and room string is verified and extracted each time a message is received

The Specification struct is used with the envconfig package. That package will read the environment variables set in the container. The name of the variables need to match what is specified after envconfig. The line below defines that an environment variable INFLUX_USER will be mapped to InfluxUser in the struct:

InfluxUser        string `envconfig:"INFLUX_USER" default:""`

We will later set the environment variables in a YAML file that we use with kubectl apply.

Initialization

The init() function is executed first:

func init() {
    // get environment variables via Specification
    err := envconfig.Process("", &s)
    if err != nil {
        log.Fatal(err.Error())
    }

    // Create a new HTTPClient to connect to InfluxDB
    c, err = InfluxDB.NewHTTPClient(InfluxDB.HTTPConfig{
        Addr:     "http://" + s.InfluxHost + ":" + s.InfluxPort,
        Username: s.InfluxUser,
        Password: s.InfluxPassword,
    })
    if err != nil {
        log.Fatal(err)
    }

    // Create database if it does not exist
    _, err = queryDB(c, fmt.Sprintf("CREATE DATABASE %s", s.InfluxDatabase))
    if err != nil {
        log.Fatal(err)
    }
}

We first get the environment variables with a simple call to envconfig.Process. If that fails, we exit the program. Next, we connect to InfluxDB and use the values from the Specification struct in the package level variable s to retrieve the InfluxDB hostname, port and more.

You might be wondering why the environment variable for s.InfluxHost is set the way it is. Actually, when you have defined a Kubernetes service such as db-influxdb, Kubernetes automatically injects an environment variable that is set to the ClusterIP of the service in each container. Similar environment variables are set for ports as well. So, instead of using the Kubernetes DNS service, we use this alternative here. It's important to note that you need to use mqtt or db in the --name parameter when you deploy Mosquitto and InfluxDB with Helm. If you do not, the environment variables will not be correctly named for this application.

Also note that we create a database in InfluxDB if it does not exist. The name of the database can be configured with an environment variable.

main()

After init() finishes, the main() function will run:

func main() {
    c := make(chan os.Signal, 1)
    signal.Notify(c, os.Interrupt, syscall.SIGTERM)

    // client ID will be the hostname
    hostname, _ := os.Hostname()

    server := "tcps://" + s.MQTTHost + ":" + strconv.Itoa(s.MQTTPort)

    connOpts := MQTT.NewClientOptions()
    connOpts.AddBroker(server)
    connOpts.SetClientID(hostname)
    connOpts.SetCleanSession(true)
    connOpts.SetUsername(s.MQTTUser)
    connOpts.SetPassword(s.MQTTPass)
    tlsConfig := &tls.Config{InsecureSkipVerify: true}
    connOpts.SetTLSConfig(tlsConfig)

    connOpts.OnConnect = func(c MQTT.Client) {
        if token := c.Subscribe(s.MQTTTopic, 0, onMessageReceived); token.Wait() && token.Error() != nil {
            panic(token.Error())
        }
    }

    client := MQTT.NewClient(connOpts)
    if token := client.Connect(); token.Wait() && token.Error() != nil {
        panic(token.Error())
    } else {
        log.Println("Connected to ", server)
    }

    // wait for SIGTERM
    <-c
}

If you are new to Go, the first and last lines might be unclear. On the first line, a channel c is created of type os.Signal. With signal.Notify we send a message to the channel when a SIGTERM is detected. The last line basically waits for that message to be received.

The program does its work until a SIGTERM is detected, so basically until the container is stopped if there are no other errors. The actual work is done by a message handler called onMessageReceived. To configure the handler, we first need to connect to Mosquitto.

The connection to Mosquitto is similar to what we have seen in Writing an MQTT client. We connect without verifying the TLS certificate and we use the username and password set via environment variables. The topic we connect to is set via an environment variable. We will use the airq/# topic in order to catch all topics under airq.

Message Handler

The message handler takes a variable of type MQTT.Message as the second parameter. We can extract the topic and the payload from that message:

// onMessageReceived is triggered by subscription on MQTTTopic (default #)
func onMessageReceived(client MQTT.Client, message MQTT.Message) {
    log.Printf("Received message on topic: %s\nMessage: %s\n", message.Topic(), message.Payload())

    //verify the MQTT topic
    location, err := verifyTopic(message.Topic())
    if err != nil {
        log.Println(err)
        return
    }

    // write data to InfluxDB
    err = writeToInfluxDB(string(message.Payload()), location)
    if err != nil {
        // just log the error but keep going
        log.Println("Error while writing to InfluxDB: ", err)
    }
}

Because we do not want to process payloads coming from irrelevant topics, we first verify the topic with the verifyTopic function. When that function does not throw an error, we write the payload to InfluxDB via the writeToInfluxDB function.

verifyTopic is pretty straightforward:

// verifyTopic checks the MQTT topic conforms to airq/city/building/room
func verifyTopic(topic string) (*Location, error) {
    location := &Location{}
    items := strings.Split(topic, "/")
    if len(items) != 4 {
        return nil, errors.New("MQTT topic requires 4 sections: airq, city, building, room")
    }

    location.City = items[1]
    location.Building = items[2]
    location.Room = items[3]

    if items[0] != "airq" {
        return nil, errors.New("MQTT topic needs to start with airq")
    }

    if location.City == "" || location.Building == "" || location.Room == "" {
        return nil, errors.New("MQTT topic needs to have a city, building and room")
    }

    return location, nil
}

When invalid formatting is encountered an error is thrown. When there are no errors, the function returns a pointer to a variable of type Location. The function uses the typical Go construct of returning two values, the actual value or an error. When there is an error we return nil and the error. When there is no error, we return a result (the location) and nil for the error.

You can then easily check the result of the call to this function with:

//verify the MQTT topic
location, err := verifyTopic(message.Topic())
if err != nil {
  log.Println(err)
  return
}

To write to InfluxDB, we use the following function:

func writeToInfluxDB(message string, location *Location) error {
    // decode message (json) in m which is Message struct
    // json in form of: {"device":"deviceX", "temperature": 23.3,"humidity": 50.2,"co2": 200}
    var m Message
    err := json.Unmarshal([]byte(message), &m)
    if err != nil {
        return err
    }

    m.Room = location.Room

    log.Printf("Writing message %s from room %s to database %s\n", message, location.Room, s.InfluxDatabase)

    // Create a new point batch
    bp, err := InfluxDB.NewBatchPoints(InfluxDB.BatchPointsConfig{
        Database:  s.InfluxDatabase,
        Precision: "s",
    })
    if err != nil {
        return err
    }

    // Create a point and add to batch
    tags := map[string]string{"device": m.Device, "type": m.Type, "room": m.Room}
    fields := map[string]interface{}{
        "temperature": m.Temperature,
        "humidity":    m.Humidity,
    }

    pt, err := InfluxDB.NewPoint(s.InfluxMeasurement, tags, fields, time.Now())
    if err != nil {
        return err
    }
    bp.AddPoint(pt)

    // Write the batch
    if err := c.Write(bp); err != nil {
        return err
    }

    return nil
}

Because we expect to receive a JSON message in the MQTT payload, we first unmarshal that payload into a Message struct. When that succeeds, we set the field Room in the Message struct to the location that was passed to this function. We then write a new point to the measurement set in s.InfluxMeasurement which is configured via an environment variable. Refer to InfluxDB Primer, where we discussed measurements, points, tags, fields and how to write data points to a measurement with Go. Although not strictly necessary, the above code can write multiple points in one operation, which is similar to writing multiple points with the line protocol over HTTP. With the HTTP API, the points need to be separated by newlines as discussed in InfluxDB Primer.

In the NewBatchPoint method, a BatchPointsConfig is passed that sets the precision to a second (s). If you prefer, you can set the precision to a nanosecond (ns) to use timestamps with nanosecond precision. This is necessary if you write more than one point per second from the same device. Internally, InfluxDB works with nanosecond precision.

Building the executable

On a Linux system, just type:

go build -o mqtt-client .

On a Windows system, you can type:

GOOS=linux go build -o mqtt-client .

If the folder you are running the above command in, is called mqtt-client, you can omit the -o flag. Next, let's build the container and put it in a repository.

Building the container image

To build the container image yourself, you need to install Docker on your machine. Head over to https://www.docker.com to find out more. A container image is built from a Dockerfile, which contains build instructions. You can define the starting image, the files you want to add, the process to be started in the container and much more.

We can get away with a minimal Dockerfile:

FROM golang:alpine

ADD mqtt-client /

CMD ["/mqtt-client"]

Before we build the image and push it to a repository, you should get an account at Docker Hub. You will need your account name when you push the image. Let's build the image. From the folder that contains the Dockerfile and the compiled mqtt-client Linux executable, run the following command:

docker build -t <your-Docker-ID>/mqtt-client .

Note the . at the end to use the current directory to pick up the Dockerfile. The -t flag specifies the tag for this image on your local machine. That same tag is also used to determine where to push your container to. You can now push the image to Docker Hub using the following command:

docker push <your-Docker-ID>/mqtt-client

You will need to provide credentials to be able to push the image. You can also set your credentials beforehand with docker login. In my case, I tagged my image gbaeke/mqtt-client. The Docker push command will realize it needs to push to Docker Hub, to a repository of account gbaeke.

Normally, your image should be publicly available if you follow the above steps. That is ok in this case. You can also create private repositories on Docker Hub or you can use private registries like Azure Container Registry. Note that if you use a private repository or registry, you will need to instruct Kubernetes how to connect to such a system.

Now we will create a Kubernetes manifest using the YAML format, that creates a deployment that uses our image in its template.

Kubernetes manifest

The YAML file, in its simplest form, will look like below:

apiVersion: extensions/v1beta1
kind: Deployment
metadata:
  name: mqtt-client
spec:
  replicas: 1
  template:
    metadata:
      labels:
        app: mqtt-client
    spec:
      containers:
       - name: mqtt-client
         image: gbaeke/mqtt-client
         env:
          - name: MQTT_USER
            value: "admin"
          - name: MQTT_PASS
            value: "test"
          - name: INFLUX_USER
            value: "admin"
          - name: INFLUX_PASSWORD
            value: "test"
          - name: INFLUX_DATABASE
            name: "telemetry"
          - name: MQTT_TOPIC
            value: "airq/#"

Ouch, we put passwords in a manifest instead of using secrets. We will leave that for now, and focus on deployment. Make sure you update the image name to your image or use my image since it is publicly available.

Note that above, we configure a Kubernetes Deployment which is a construct on top of a ReplicaSet. The ReplicaSet takes care of running the requested number of pods on your cluster. The deployment controls how you deploy and especially update your pod(s).

Create the deployment with:

kubectl apply -f <your-YAML-file>

When you run kubectl get deployment you should see similar output as below:

NAME             DESIRED   CURRENT   UP-TO-DATE   AVAILABLE   AGE
db-influxdb      1         1         1            1           20h
mqtt-client      1         1         1            1           4h
mqtt-mosquitto   1         1         1            1           1d

Use kubectl get rs to see the ReplicaSet and of course, kubectl get pods to see the mqtt-client pod:

NAME                             READY     STATUS    RESTARTS   AGE
db-influxdb-154626948-85qzk      1/1       Running   1          20h
mqtt-client-2383379092-tjc38     1/1       Running   0          39m
mqtt-mosquitto-298995702-h14k8   1/1       Running   0          1d

Use kubectl logs <your-mqtt-client-pod> to see the log output. We used the log package to log errors to stdout so you should see that output. With a client connected, you see the following output:

2018/02/04 20:02:15 Received message on topic: airq/antwerp/ahlers/asterix
Message: {"device": "simulator", "type": "airq","temperature": 36.88 , "humidity": 46.53}
2018/02/04 20:02:15 Writing message {"device": "simulator", "type": "airq","temperature": 36.88 , "humidity": 46.53} from room asterix to database telemetry
2018/02/04 20:02:16 Received message on topic: airq/antwerp/ahlers/asterix
Message: {"device": "simulator", "type": "airq","temperature": 37.22 , "humidity": 42.87}
2018/02/04 20:02:16 Writing message {"device": "simulator", "type": "airq","temperature": 37.22 , "humidity": 42.87} from room asterix to database telemetry

Like with the Linux tail command, you can use -f with kubectl logs to follow the logs.

As we demonstrated before, you can port forward the InfluxDB port to your local machine and use the influx client to see the measurement iotdata. After authenticating, use the two commands below:

use telemetry
select * from iotdata

Fantastic! If you got this far, you deployed an MQTT server (Mosquitto) and a time-series database (InfluxDB) using Helm. Next, you wrote a small application that acts as an MQTT-to-InfluxDB forwarder. You deployed that application using standard Kubernetes techniques, namely with a YAML manifest.

Next, it is time to actually do something with the data. We will write a REST API that allows you to query the InfluxDB data from other applications such as a website or a mobile app. Head over to the next chapter, to see how to do that.

Last updated