#!/usr/bin/env ruby # FIXME $LOAD_PATH << "/home/pks/.local/lib/ruby/gems/json-2.3.1/lib/" $LOAD_PATH << "/home/pks/.local/lib/ruby/gems/mqtt-0.5.0/lib/" $LOAD_PATH << "/home/pks/.local/lib/ruby/gems/optimist-3.0.1/lib/" $LOAD_PATH << "/home/pks/.local/lib/ruby/gems/sqlite3-1.4.2/lib/" $LOAD_PATH << "/home/pks/.local/lib/ruby/gems/zipf-1.2.6/lib/" require 'json' require 'mqtt' require 'optimist' require 'sqlite3' require 'time' require 'zipf' def shutdown STDERR.write "Shutting down ...\n" $db.close $logfile.close STDERR.write "Done!\n" exit end Signal.trap("INT") { shutdown } Signal.trap("TERM") { shutdown } def db_open_or_new filename if File.exists? filename $db = SQLite3::Database.open filename else $db = SQLite3::Database.new filename $db.execute <<-SQL create table power_log( id INTEGER PRIMARY KEY, timestamp DATETIME, device_name TEXT, device_location TEXT, total FLOAT, total_start_time DATETIME ); SQL $db.execute <<-SQL create table power_meter( id INTEGER PRIMARY KEY, timestamp DATETIME, total FLOAT ); SQL end end def db_execute sql, data 3.times { begin $db.execute sql, data break rescue SQLite3::BusyException STDERR.write "DB busy, skipping data point '#{d.to_s}'\n" sleep 1 end } end def parse_topic topic parts = topic.split "/" else _, device_name, _, device_location_1, device_location_2, _ = parts end device_location = [device_location_1, device_location_2] return device_name, device_location end def parse_message message return JSON.parse message end def insert_power device_name, device_location, data if data.has_key? "ENERGY" timestamp = Time.parse(data["Time"]).utc.to_i total_start_time = Time.parse(data["ENERGY"]["TotalStartTime"]).utc.to_i db_execute \ "INSERT INTO power(timestamp, device_name, device_location_primary, device_location_secondary, total, total_start_time) VALUES(?,?,?,?,?,?)", \ [timestamp, device_name, device_location[0], device_location[1], data["ENERGY"]["Total"], total_start_time] end end def main options = Optimist::options do opt :host, "MQTT hostname", :type => :string, :default => 'localhost', :short => '-h' opt :port, "MQTT port", :type => :int, :default => 1883, :short => '-p' opt :db, "SQLite3 database file", :type => :string, :required => true, :short => '-d' opt :log, "Logfile", :type => :string, :required => true, :short => '-l' end db_open_or_new options[:db] $logfile = WriteFile.new options[:log] $mqtt_client = MQTT::Client.connect( :host => options[:host], :port => options[:port], :ssl => false) $mqtt_client.subscribe '#' $mqtt_client.get do |topic,message| begin puts "#{topic}\t#{message} #device_name, device_location = parse_topic topic #data = parse_message message #insert_power device_name, device_location, data #$logfile.write "#{topic}\t#{message}\n" rescue => exception $logfile.write "[Ignoring]\t#{topic}\t#{message}\n" end end shutdown end main