Skip to content
Snippets Groups Projects

Add driver code

Merged suazo2 requested to merge driver-code into main
1 file
+ 130
0
Compare changes
  • Side-by-side
  • Inline
+ 130
0
import argparse
import logging
import sys
from energy_monitor import EnergyMonitor
def driver(energy_monitor):
""" Starts the main power monitor loop. """
logger.info("... Starting Raspberry Pi Power Monitor")
logger.info("Press Ctrl-c to quit...")
# Store values for each polling cycle
production_values = dict(power=[], pf=[], current=[])
home_consumption_values = dict(power=[], pf=[], current=[])
net_values = dict(power=[], current=[])
ct_dict = {channel: {'power': [], 'pf': [], 'current': []} for channel in energy_monitor.enabled_channels}
rms_voltages = []
SMA_Values = {channel: {'power': [], 'pf': [], 'current': []} for channel in energy_monitor.enabled_channels}
num_samples = 1000
# Counter for averaging function
i = 0
# Get the expected sample count for the current configuration.
samples = energy_monitor.collect_data(num_samples)
sample_count = sum([len(samples[x]) for x in samples.keys() if type(samples[x]) == list])
# Main loop
while True:
try:
board_voltage = energy_monitor.get_board_voltage()
samples = energy_monitor.collect_data(num_samples)
poll_time = samples['time']
duration = samples['duration']
sample_rate = round((sample_count / duration) / num_samples, 2)
results = energy_monitor.calculate_power(samples, board_voltage)
production_power = 0
production_current = 0
production_pf = 0 # Average power factor from all production sources
# Set the RMS voltage using one of the calculated voltages.
voltage = results[energy_monitor.enabled_channels[0]]['voltage']
# Set the current for any production sources negative if the power is negative, and find the total power and current from all production sources.
for chan_num in energy_monitor.production_channels:
production_power += results[chan_num]['power']
production_current += results[chan_num]['current']
# Home consumption power is the total power that the home is using. This is typically the mains plust the production sources.
# However, if you haven't setup any mains channels in config.toml, this will be the sum of all 'consumption' channels + 'production' channels
home_consumption_power = 0
home_consumption_current = 0
for chan_num in energy_monitor.consumption_channels:
home_consumption_power += results[chan_num]['power']
home_consumption_current += results[chan_num]['current']
net_power = home_consumption_power - production_power
net_current = home_consumption_current - production_current
if net_power < 0:
current_status = "Producing"
else:
current_status = "Consuming"
# Average 10 readings before sending to db
if i < 10:
production_values['power'].append(production_power)
production_values['current'].append(production_current)
production_values['pf'].append(production_pf)
home_consumption_values['power'].append(home_consumption_power)
home_consumption_values['current'].append(home_consumption_current)
net_values['power'].append(net_power)
net_values['current'].append(net_current)
for chan_num in energy_monitor.enabled_channels:
ct_dict[chan_num]['power'].append(results[chan_num]['power'])
ct_dict[chan_num]['current'].append(results[chan_num]['current'])
ct_dict[chan_num]['pf'].append(results[chan_num]['pf'])
rms_voltages.append(voltage)
i += 1
else:
# Calculate the average, buffer the result, and write to Influx
# if the batch size has met the threshold.
production_values = dict(power=[], pf=[], current=[])
home_consumption_values = dict(power=[], pf=[], current=[])
net_values = dict(power=[], current=[])
ct_dict = {channel: {'power': [], 'pf': [], 'current': []} for channel in energy_monitor.enabled_channels}
rms_voltages = []
i = 0
if energy_monitor.terminal_mode:
energy_monitor.print_results(results, sample_rate)
except KeyboardInterrupt:
energy_monitor.cleanup(0)
if __name__ == '__main__':
# Logging Config
logger = logging.getLogger('energy-monitor')
logger.setLevel(logging.INFO)
ch = logging.StreamHandler(sys.stdout)
ch.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s : %(message)s', datefmt='%Y-%m-%d %H:%M:%S')
ch_formatter = logging.Formatter('%(levelname)s : %(message)s')
ch.setFormatter(ch_formatter)
logger.addHandler(ch)
parser = argparse.ArgumentParser(description='Power Monitor CLI Interface')
parser.add_argument('--mode', type=str, help="Operating Mode. Defaults to 'terminal' if not specified.", default='terminal', required=False, choices=['db', 'terminal'])
parser.add_argument('-v', '--verbose', help='Increases verbosity of program output.', action='store_true')
args = parser.parse_args()
if args.verbose:
ch.setLevel(logging.DEBUG)
logger.setLevel(logging.DEBUG)
logger.debug("Verbose logs output enabled.")
em = EnergyMonitor()
if args.mode == 'terminal':
em.terminal_mode = True
logger.debug("Enabled terminal mode.")
driver(em)
Loading