Reading device logs from openbalena-api

I setup a test raspberry pi with an application which print the date like this every 5 secs

2023-04-07 07:36:00.965886

Then I tried to read the device logs using the python balena-sdk:

from os import getenv

from balena import Balena

# from pprint import pprint


credentials = {
    "username": getenv("BALENA_LOGIN_USER"),
    "password": getenv("BALENA_LOGIN_PASSWORD"),
}


class BalenaManager:
    def __init__(self, credentials: dict):
        self.balena = Balena()
        self.balena.settings.set("pine_endpoint", "https://api.balena.apicai.de/v6/")
        self.balena.settings.set("api_endpoint", "https://api.balena.apicai.de/")
        self.balena.auth.login(**credentials)
        # pprint(self.balena.models.device.get_all())

    def send_log_to_server(self, uuid: str, msg: str, error: bool):
        if not error:
            msg_content = msg["message"]
            print(f"{uuid}|{msg_content}")
        else:
            print(f"{uuid}|{msg}")

    def setup_subscribe(self, uuid: str):
        def msg_callback(msg):
            self.send_log_to_server(uuid, msg, error=False)

        def error_callback(msg):
            self.send_log_to_server(uuid, msg, error=True)

        self.balena.logs.subscribe(uuid, msg_callback, error_callback, 1)


if __name__ == "__main__":
    if credentials["username"] is None or credentials["password"] is None:
        print("you need to specify the balena credentials via env variables")
        exit(-1)
    b = BalenaManager(credentials)
    b.setup_subscribe("device_uuid_here")

This works fine for about 1 minute, but than stops printing the logs:

device_uuid_here|2023-04-07 07:44:41.005696
device_uuid_here|2023-04-07 07:44:46.006184
device_uuid_here|2023-04-07 07:44:51.006586
device_uuid_here|2023-04-07 07:44:56.006985
device_uuid_here|2023-04-07 07:45:01.007352
device_uuid_here|2023-04-07 07:45:06.007717
device_uuid_here|2023-04-07 07:45:11.008169
device_uuid_here|2023-04-07 07:45:16.008526
device_uuid_here|2023-04-07 07:45:21.008990
device_uuid_here|2023-04-07 07:45:26.009453
device_uuid_here|2023-04-07 07:45:31.009821
device_uuid_here|2023-04-07 07:45:36.010198
device_uuid_here|2023-04-07 07:45:41.010658

I could not find out why, so I tried reading the logs directly out of the redis log backend:
https://github.com/balena-io/open-balena-api/blob/master/src/features/device-logs/lib/backends/redis.ts

I implemented increasing the "{device:1}:subscribers" key so that you can read the logs from the channel "{device:1}:logs". Then I used your avro schema:

{
	name: 'log',
	type: 'record',
	fields: [
		{ name: 'version', type: 'int', default: 1 },
		{ name: 'createdAt', type: 'long' },
		{ name: 'timestamp', type: 'long' },
		{ name: 'isSystem', type: 'boolean', default: false },
		{ name: 'isStdErr', type: 'boolean', default: false },
		{ name: 'serviceId', type: ['null', 'int'], default: null },
		{ name: 'message', type: 'string' },
	],
}

to decode the logs from the channel:

device 1: {'version': 1, 'createdAt': 7037864708678381089, 'timestamp': -7037864708666847010, 'isSystem': False, 'isStdErr': False, 'serviceId': 3, 'message': '2023-04-07 07:54:06.053250\r'}

This is now working, except that the timestamps are wrong and look like the overflowed. Do you have any idea why?
I also think, every long value decoded from avro is wrong. When trying to decode the logs from another device, I also couldn’t read the log message, which in avro uses a long value to encode the string length.
I am not good at js, could it be some custom long type, in the avro library you are using: [Advanced usage · mtth/avsc Wiki · GitHub] ?

Thank you for your help

Hello,

thanks for your request and the details you’ve shared.

  • Can you please try the log streaming withthe balean-cli to ensure that identify that the streaming by the open-balena-api is the problem?
    balena logs <device_uud> --tail balena CLI Documentation - Balena Documentation
    For the CLI to connect to your custom open-balena URL you have to specify: BALENARC_API_URL

  • Can you also share the implementation of the REDIS subscription? It looks like that the decoding doesn’t properly extract from the REDIS binary format? Please check the approach from open-balena-api here: open-balena-api/redis.ts at 2f75fada517150826f9df39feaa3775699c7c121 · balena-io/open-balena-api · GitHub

  • The 1 minute failure reads to me like a pattern of a dropped stream or a dropped subscription to a stream.

  • Can you please share if you can get all logs from the non stream endpoint /device/v2/:uuid/logs

Thanks and best regards
Harald

Hello,

  • log streaming using the balena cli works just fine, it does not stop after ~1min
  • The redis subscribe implementation:

avro.py

import io
import json

import avro.io
import avro.schema

balena_schema_dict = {
    "name": "log",
    "type": "record",
    "fields": [
        {"name": "version", "type": "int", "default": 1},
        {"name": "createdAt", "type": "long"},
        {"name": "timestamp", "type": "long"},
        {"name": "isSystem", "type": "boolean", "default": False},
        {"name": "isStdErr", "type": "boolean", "default": False},
        {"name": "serviceId", "type": ["null", "int"], "default": None},
        {"name": "message", "type": "string"},
    ],
}
balena_schema = avro.schema.parse(json.dumps(balena_schema_dict))
avro_reader = avro.io.DatumReader(balena_schema)


def avro_decode(record: bytes) -> dict:
    bytes_reader = io.BytesIO(record)
    decoder = avro.io.BinaryDecoder(bytes_reader)
    return avro_reader.read(decoder)

redis.py

import asyncio

from redis.asyncio.client import PubSub


async def redis_subscribe(redis_pubsub: PubSub, channel: str, callback):
    try:
        await redis_pubsub.psubscribe(channel)
        while True:
            message = await redis_pubsub.get_message(
                ignore_subscribe_messages=True, timeout=0.1
            )
            if message is not None:
                await callback(message)
    except asyncio.CancelledError:
        pass

main.py

import asyncio

import redis.asyncio

from balena_logs_to_grafana.avro import avro_decode
from balena_logs_to_grafana.redis import redis_subscribe


class LogReceiver:
    def __init__(self):
        self.conn = redis.asyncio.Redis()
        self.pubsub = self.conn.pubsub()

    async def __aenter__(self):
        await self.test_connection()
        await self.enable_device_logs_channel(1)
        return self

    async def __aexit__(self, exc_type, exc, tb):
        await asyncio.shield(self.cleanup())

    async def cleanup(self):
        await self.disable_device_logs_channel(1)
        await self.disconnect()

    async def test_connection(self):
        assert await self.conn.ping()

    async def disconnect(self):
        await self.pubsub.close()

    async def subscriber_callback(self, redis_msg):
        device_id = LogReceiver.parse_device_id_from_channel(
            redis_msg["channel"].decode()
        )
        log_entry = avro_decode(redis_msg["data"])["message"]
        print(f"device {device_id}: {log_entry}")

    @staticmethod
    def parse_device_id_from_channel(channel: str) -> int:
        return int(channel.split(":")[1][0])

    @staticmethod
    def get_redis_key(device_id: int, postfix: str):
        return f"{{device:{device_id}}}:{postfix}"

    async def enable_device_logs_channel(self, device_id: int):
        await self.conn.incr(LogReceiver.get_redis_key(device_id, "subscribers"))

    async def disable_device_logs_channel(self, device_id: int):
        key = LogReceiver.get_redis_key(device_id, "subscribers")
        subscriber_count = await self.conn.decr(key)
        if subscriber_count == 0:
            await self.conn.delete(key)

    async def test(self):
        await redis_subscribe(self.pubsub, "{device:*}:logs", self.subscriber_callback)


async def main():
    async with LogReceiver() as log_receiver:
        await log_receiver.test()


if __name__ == "__main__":
    asyncio.run(main(), debug=True)

  • So could the 1 minute failure be a bug in the python-sdk, as the balena cli works with tail?

  • Yes I get all the previous logs from the non stream endpoint

Thank you for your answer
Mo