Uniendo MQTT e Influx DB con Python

Objetivos

 

  • Veremos como suscribir topicos MQTT desde Python 3.
  • Usaremos Python para pasar esos datos a Influx DB
  • Veremos el resultado con Grafana.
     

    Material requerido.

     

     Vista mayor  Una servidor MQTT

     

    Reemplazando Telegraf por Python

     

    En el modelo de nuestro servidor IOT, que hemos ido desarrollando en las últimas sesiones usábamos Telegraf para suscribir los tópicos de nuestro interés e insertarlos en Influx DB sobre la marcha. Telegraf es una utilidad magnifica y sin duda tendería a usarla cuando el número de medidas o puntos a insertar fuera elevado, pero tiene un pequeño inconveniente; No podemos ver si nuestros sensores están en marcha o se han parado en algún momento.

    Vemos, Telegraf está muy bien, pero cuando encendemos nuestra Raspberry para que todo se inicie según lo previsto, no tenemos forma de comprobar que todo fluye correctamente, más que usando Grafana y viendo que las medidas entran. Y cuando fallan… ¿Como saber lo que falla? ¿Será un problema del MQTT? ¿O quizás de Telegraf… o de Influx?

    No cabe duda que todo esto es un tanto insatisfactorio (Cuando hay problemas) por lo que si yo quiero comprobar que el proceso va fino voy a necesitar algo que me de la tranquilidad de ver como son las cosas (Uno es perro viejo ya) y para eso, nada como usar un script Python que me deje fisgar entre los procesos y ver qué pasa.

    Por eso en esta ocasión vamos a escribir ese script que como veremos enseguida es bastante fácil con mi lenguaje favorito: Python.

     

    Suscribiendo tópicos en Python 3

     

    Como era de esperar alguien se ha molestado en escribir una librería Python para conectar con el servidor MQTT y es de lo más fácil de usar. Se llama paho-mqtt y lo puedes instalar en tu Raspberry con el comando:

    pip install paho-mqtt

    libreria python para MQTT

    Con la librería instalada podemos iniciar nuestro programa Python importándola:

    import paho.mqtt.client as mqtt

    Antes de empezar con el código principal del programa que nos suscriba al Boker MQTT, tenemos que definir un par de funciones que nos informen de que ha conectado correctamente, y otra que recoja el mensaje que nos envía el Mosquitto (O similar) cuando nos envié un tópico al que nos hayamos suscrito.

    La función que nos informe de la conexión con el Broker seria:

      def on_connect(client, userdata, flags, rc):
      print("Connected with result code "+str(rc))
      client.subscribe("Prosensor/+")

    Como estoy usando el sensor SCD41 para publicar valores en el MQTT, hay tres tópicos que me interesan: CO2, Temperatura y Humedad, y en lugar de suscribirlos uno por uno, he optado por usar el comodín “+”, para indicar que me interesan todos los tópicos que se publique baja el paraguas de “Prosensor”, y de paso, si en el futuro añado más sensores como Volatile Organic Compounds o VOCs, no tendré que tocar el programa Python para verlos.

    La segunda función Callback es la que recibe directamente los mensajes que nos envían el Broker MQTT:

      def on_message(client, userdata, msg):
      print(msg.payload.decode())

    Que no puede ser más sencilla y lo único que hace (Por ahora) es imprimir los mensajes entrantes. Y esto nos permite empezar directamente con el programa principal, y en particular definir la conexión al broker MQTT:

    client = mqtt.Client()
    client.username_pw_set("charly", "contrase")    #set username and password
    client.on_message= on_message
    client.connect("localhost",1883,60)

    La primera línea crea una instancia del cliente MQTT, y en la segunda, definimos el usuario a usar y en la tercera línea conectamos la función Callback que hemos definido arriba con el evento de entrada de mensajes (Es una forma complicada de decir que cuando hay un evento de mensaje de entrada se llame a la función on_message que hemos definido mas arriba)

    Ahora ya podemos conectar con el servidor MQTT mediante la instrucción:

    client.connect("localhost",1883,60)

    Solo nos queda conectar la segunda función Callback al evento de conexión:

    client.on_connect = on_connect

    y llamar al bucle de leer el MQTT (Imprescindible. ¡No olvidar!):

    client.loop_forever()

    Y eso es todo. El programa completo es simplemente: Mqtt

    import paho.mqtt.client as mqtt
    
    def on_connect(client, userdata, flags, rc):
      print("Connected with result code "+str(rc))
      client.subscribe("Prosensor/+")
    
    def on_message(client, userdata, msg):
      print(msg.payload.decode())
    
    client = mqtt.Client()
    client.username_pw_set("charly", "contrase")    #set username and password
    client.on_message= on_message
    client.connect("localhost",1883,60)
    client.on_connect = on_connect
    client.on_message = on_message
    
    #while True:
    client.loop_forever()

    Si ejecutamos el script desde una ventana de comandos, veremos algo como esto:

    Presentacion de lecturas en consola

    Vale, ha sido hasta demasiado fácil. Y esto nos lleva a la siguiente pregunta: ¿Podríamos hacer las veces del Telegraf para insertar los valores que leamos desde Python? Naturalmente, la respuesta es que si (Faltaría más) y no es mucho más complicado.

     

    Accediendo a Influx DB desde Python

     

    Para acceder a Influx DB desde Python 3, tenemos que instalar la librería correspondiente, llamada Influxdb con el sorprendente comando de consola:

    pip install Influxdb

    Ahora ya podemos empezar con el script, empezando por hacer import de la librería:

    from influxdb import InfluxDBClient

    Tendremos que conectar a Influx creando una instancia de cliente:

    Iclient = InfluxDBClient('192.168.1.52','8086','charly','contrase','test')

    Donde puedes ver la dirección IP del servidor Influx, el puerto de escucha, 8086, y mi usuario y contraseña para acceder a la BBDD test (En último lugar).

    Los lectores menos avezados podrían pensar que la parte difícil será conseguir que la función on_message, que recibe el mensaje del broker MQTT, lo inserte en Influx, pero no. En realidad, es más bien poca cosa. Vamos a reescribir esta función así:

    def on_message(client, userdata, msg):
       payload = msg.payload.decode()
       print(payload)
       Iclient.write_points(payload, protocol='line')

    Como los mensajes que publicamos en los tópicos corresponden a instrucciones validas para Influx DB, el único cambio que tenemos que hacer es mandar el payload a un write del cliente Influx, informándole de que el protocolo es la sintaxis habitual que ya vimos de Influx que llama ‘line’, en contraposición a otras posibilidades que podrían ser JSON, por ejemplo.

    Así que el programa final, es decepcionantemente sencillo:

    import paho.mqtt.client as mqtt
    from influxdb import InfluxDBClient

    def on_connect(client, userdata, flags, rc):
    print(«Connected with result code «+str(rc))
    client.subscribe(«Prosensor/+»)

    def on_message(client, userdata, msg):
    payload = msg.payload.decode()
    print(payload)

    Iclient.write_points(payload, protocol=’line’)

    client = mqtt.Client()
    client.username_pw_set(«charly», «contrase»)    #set username and password
    client.on_message= on_message
    client.connect(«localhost»,1883,60)

    client.on_connect = on_connect
    client.on_message = on_message
    Iclient = InfluxDBClient(‘192.168.1.52′,’8086′,’charly’,’contrase’,’test’)

    #while True:
    client.loop_forever()

    Para probar nuestro pequeño programa Python, tienes que parar el Telegraf. De este modo ya solo Python podrá insertar puntos en Influx BD y podremos comprobar que funciona:

    Systemctl stop Influx

    Y ahora ya puedes ejecutar el script que os dejo aquí debajo:

    MQTT + Influx

    ¡Accede al contenido!

    Dashboard Grafana

    Como las gráficas de Grafana salían un poco sosas le he dado una buena soplada al sensor para menear un poco el cotarro y el resultado está a la vista.

  • Es fácil observar que faltan datos de temperatura y es que, jugando con los comandos, me cepillé, por error, toda una serie de lecturas.
  •  

    IMAGEN DE MARCA