Build log: Detecting if the main water pipe breaks

I have just finalized my first application, waiting for the smart meter to arrive.
My initial attempt was to use all the balena features to configure the slave registers as Device Variables, so that they would be easily configured in the balena dashboard.

serverparams

So the code would be pretty simple, since I´d only have to load the variables…

    modbus_host = os.environ['MODBUS_HOST_IP']
    modbus_port = os.environ['MODBUS_HOST_PORT']
    address = os.environ['INST_FLOW_ADDRESS']
    size = os.environ['INST_FLOW_SIZE']
    pollingtime = os.environ[INST_FLOW_POLLINGTIME]

And then configure the server connection, and loop to request to the meter.

    modbusc.connect()
    while (not error):
        rr = modbusc.read_holding_registers(address,
                                            size,
                                            unit=1)
        decoder = BinaryPayloadDecoder.fromRegisters(rr.registers,
                                                     Endian.Little,
                                                     Endian.Little)
        value = decoder.decode_32bit_float()
        sleep(pollingtime)

But this became a problem at the very beginning. Very useful for testing, but not very flexible, with many parameters each of them with different configurations. These would include the format, the polling time, etc. So I saw immediately the lack of scalability of this approach.

Following @rahul-thakoor ‘s hint (Thanks!!!), I looked at the modbus2mqtt project, which is using a data model description based on modbus-herdsman-converters. The idea is simple: define a json file with the data model, and abstract all the reading loop using the model.

I noticed that there were some things missing in the description that wouldn´t suit me totally:

  • each parameter was set to 1 register (this smart meter has many using 2 registers)
  • every parameter would share the same polling time (not useful for reasons explained below)
  • and all the data formats are decoded in the same way (At least I have IEEE754 and float)

I preferred to continue my own way using this idea. Maybe in the future, I will merge the projects if I can, but at the moment let´s see where this takes me.

    "instant_flow": {
                "address": 1,
                "length": 2,
                "polling_secs": 25,
                "format": "ieee754"
    }

So now I have defined the Smart Meter’s data model in a datamodel.json file that allows me to define the parameters I want to read from my modbus slave. The idea is that anyone can use this to add any other parameter with my initial smart meter, or with any other device using modbus.

There’s another design decision I still have to validate, which is the polling time. Different parameters need different polling times. For instance, I would like to read the instant_flow every hour to check if there’s a continuous flow for some time. But the daily accumulated flow and the monthly accumulate flow parameters would be better served with once a day and once a month pollings.

That’s the reason why I’ve included in the data model the polling time. And what I’ve ended up doing is spawning a process for each parameter:

for key, value in datamodel['fromModbus']['input'].items():
    try:
        log.info("Launching process to read %s from address %s, length %s, polling time:%s, format: %s" %(key, value['address'], value['length'], value['polling_secs'], value['format']))
        _thread.start_new_thread( 
            loop_read_register, (key, 
                                 value['address'],
                                 value['length'],
                                 value['polling_secs'],
                                 value['format']) )
    except Exception as e:
        log.exception("Could not start process to read")
        log.exception(e)

where each process will have a specific polling period, or sleep, according to what they need.

def loop_read_register(name, address, size, polling_secs, format):
    error = False
    while( not error ):
        try:
            value = readInputRegisters(address, size, format)
            log.info("Read: " + name +  ":" + str(value))
            msg = mqttc.publish("sensors",
                                json.dumps( { name: value } )) 
            if not msg.is_published():
                raise(Exception("Message not published in mqtt broker"))
        except Exception as e:
            log.log (logging.CRITICAL, e)
            error = True #TO-REVIEW if we want to stop the loop only because one reading fails. Could it happen?
        time.sleep(polling_secs) # Waiting for next poll,

And that’s where I am now. I will next clean the code, make some comments, and publish the Github repo here. Since there´s still time until the meter arrives, I might spend some time improving this generic way of treating with modbus.

1 Like