1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
|
#!/usr/bin/env ruby
# FIXME
$LOAD_PATH << '/usr/local/lib/ruby/gems/mqtt-0.5.0/lib/'
$LOAD_PATH << '/usr/local/lib/ruby/gems/optimist-3.0.1/lib/'
$stdout.sync = true
$stderr.sync = true
require 'json'
require 'mqtt'
require 'optimist'
require 'sqlite3'
require 'time'
$handles = {
'baker' => 'fridge',
'batum' => 'microwave',
'bembry' => 'modem+printer',
'crowder' => 'desk-2',
'davis' => 'hifi',
'doncic' => 'desk-1',
'fournier' => 'air_washer',
'gibson' => 'dishwasher',
'gooden' => 'couch',
'grant' => 'washing_machine',
'kukoc' => 'rack',
'nogueira' => 'kitchen_tech',
'world-peace' => 'kettle+toaster'
}
def shutdown
STDERR.write 'Shutting down ...'
$db.close
STDERR.write " done!\n"
exit
end
Signal.trap('INT') { shutdown }
Signal.trap('TERM') { shutdown }
def db_open_or_create filename
if File.exists? filename
$db = SQLite3::Database.open filename
else
$db = SQLite3::Database.new filename
$db.execute <<-SQL
create table power(
id INTEGER PRIMARY KEY,
timestamp DATETIME,
device TEXT,
handle TEXT,
total FLOAT,
total_start_time DATETIME
);
SQL
end
end
def db_execute sql, data
3.times {
begin
$db.execute sql, data
break
rescue SQLite3::BusyException
STDERR.write "DB busy, skipping data point '#{d.to_s}'\n"
sleep rand
end
}
end
def db_insert device, handle, data
timestamp = Time.parse(data['Time']).utc.to_i
total_start_time = Time.parse(data['ENERGY']['TotalStartTime']).utc.to_i
db_execute \
"INSERT INTO power(timestamp, device, handle, total, total_start_time) VALUES(?,?,?,?,?)", \
[timestamp, device, handle, data['ENERGY']['Total'], total_start_time]
end
def parse_topic topic
parts = topic.split '/'
_, device, _ = parts
handle = $handles[device]
return device, handle
end
def parse_message message
return JSON.parse message
end
def check_and_maybe_create_pidfile filename
if File.exists? filename
File.open filename, 'r' do |f|
STDERR.write "Possibly already running as PID #{f.read}, exiting"
end
else
File.open filename, 'w' do |f|
puts "Running as PID #{Process.pid}"
f.write "#{Process.pid}\n"
end
end
end
def main
options = Optimist::options do
opt :host, 'MQTT hostname', :type => :string, :default => 'localhost', :short => '-h'
opt :port, 'MQTT port', :type => :int, :default => 1883, :short => '-p'
opt :db, 'SQLite3 database file', :type => :string, :default => '/usr/local/share/power_logger/power.db', :short => '-d'
opt :pidfile, 'PID file', :type => :string, :default => '/run/power_logger/logger.pid', :short => '-P'
end
check_and_maybe_create_pidfile options[:pidfile]
db_open_or_create options[:db]
$mqtt_client = MQTT::Client.connect(
:host => options[:host],
:port => options[:port],
:ssl => false)
$mqtt_client.subscribe '#'
$mqtt_client.get do |topic,message|
begin
logged = false
if topic.end_with? '/SENSOR'
data = parse_message message
if data.keys.include? 'ENERGY'
puts "[logged] topic=#{topic} message=#{message}"
device, handle = parse_topic topic
db_insert device, handle, data
logged = true
end
end
if not logged then STDERR.write "[ignored] topic=#{topic} message=#{message}\n" end
rescue => exception
STDERR.write "Exception: #{exception}\n"
end
end
shutdown
end
main
|