#!/usr/bin/env python3 # -*- coding: utf-8 -*- # Copyright (C) 2019 Hauke Petersen # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . import os import sys import yaml import json import logging import argparse import urllib.request from influxdb import InfluxDBClient from apscheduler.schedulers.blocking import BlockingScheduler CONFIGFILE = "/opt/steckie/config.yml" CMD_SENSOR = "cm?cmnd=Status%208" CFG_DEFAULTS = { "STECKIE_ITVL": 5, "INFLUXDB_PORT": 8086, } CFG_NEEDED = { "INFLUXDB_HOST": "hostname", "INFLUXDB_DB": "database_name", "INFLUXDB_USER": "username", "INFLUXDB_USER_PASSWORD": "SuperSecure", } CFG_VAR = { "name": r'STECKIE_DEV_\d+_NAME', "url": r'STECKIE_DEV_\d+_URL', } class Steckie: def __init__(self, env_file): self.cfg = dict() env = os.environ print("Env file: {}".format(env_file)) # read configuration if env_file: env = self.read_env(env_file) for val in CFG_DEFAULTS: self.cfg[val] = env[val] if val in env else CFG_DEFAULTS[val] for val in CFG_NEEDED: if val in env: self.cfg[val] = env[val] else: logging.error("unable to read {} fron env".format(val)) sys.exit(1) if cfgfile: try: with open(cfgfile, 'r', encoding="utf-8") as f: self.cfg.update(yaml.load(f, Loader=yaml.BaseLoader)) except: logging.error("unable to read config file '{}'".format(cfgfile)) sys.exit(1) self.scheduler = BlockingScheduler() self.db = InfluxDBClient(host=self.cfg['db']['host'], port=int(self.cfg['db']['port']), database=self.cfg['db']['name'], username=self.cfg['db']['user'], password=self.cfg['db']['pass']) def run(self): self.scheduler.add_job(self.update, 'interval', seconds=int(self.cfg['update_itvl'])) self.scheduler.start() def update(self): for dev in self.cfg['devs']: url = "{}/{}".format(dev['url'], CMD_SENSOR) data = {} try: with urllib.request.urlopen(url) as resp: data = json.loads(resp.read().decode("utf-8")) print(data) except: logging.warning("unable to read data from '{}'".format(dev['name'])) continue point = [{ "measurement": "stromie", "tags": { "name": dev['name'], "url": dev['url'], "type": "tasmota_awp07l", }, "time": data['StatusSNS']['Time'], "fields": { "voltage": float(data['StatusSNS']['ENERGY']['Voltage']), "current": float(data['StatusSNS']['ENERGY']['Current']), "pwr": float(data['StatusSNS']['ENERGY']['ApparentPower']), "pwr_r": float(data['StatusSNS']['ENERGY']['ReactivePower']), "factor": float(data['StatusSNS']['ENERGY']['Factor']), "e_daily": float(data['StatusSNS']['ENERGY']['Today']), } }] try: self.db.write_points(point) except: logging.warning("{}: unable to write to DB".format(dev['name'])) def main(args): logging.basicConfig(format='%(asctime)s %(message)s', level=logging.WARNING) steckie = Steckie(args.env_file) steckie.run() if __name__ == "__main__": p = argparse.ArgumentParser() p.add_argument("env_file", default=None, nargs="?", help="output dump") args = p.parse_args() main(args)