![]() |
lo improbable hace unos años, hecho realidad hoy |
Hace muchos años adquirí este dispositivo IoT ensamblado en Spain de una empresa pionera en el tema de tener una lectura del diferencial de potencial eléctrico en un segmento de red determinado:
![]() |
el cacharro en si consta de una entrada ethernet y una pinza para clipear en la fase. |
![]() | |
una portada con las opciones de configuración, montar un punto de acceso wifi y seguridad, ip de ethernet. |
![]() |
y lo mas interesante, el archivo .xml con las lecturas instantáneas. Cada vez que refrescas se renueva la lectura. |
Un aparato rudimentario y efectivo. Pensado para desarrollar aplicaciones en la nube, ya cuando lo compré esa opción había dejado de existir. Ahí quedo, interesante potencial pero complejo desarrollo para la captura y análisis de los datos recabados. Hasta hace poco.
Junto a la IA de google y mucha paciencia, ensayo y error, este código fue desarrollado:
codename::live_plot.py::
import pandas as pd
import matplotlib.pyplot as plt
import os
import glob
import xmltodict
import time
from datetime import datetime, timedelta
from matplotlib.animation import FuncAnimation
from matplotlib.widgets import Slider, TextBox, Button
from xml.etree import ElementTree as ET
from scipy.interpolate import make_interp_spline
import numpy as np
# A global variable to store the time window, initially set to 10 minutes
time_window_minutes = 10
MAX_CONSUMPTION = 10000 # Set a maximum consumption threshold to filter out errors
fig, ax = plt.subplots(figsize=(12, 6))
line, = ax.plot([], [], label='Current Readings (W)', color='red', linewidth=1.5)
# Create a place for the slider below the plot
plt.subplots_adjust(bottom=0.25)
ax_slider = plt.axes([0.1, 0.1, 0.8, 0.03])
slider = Slider(
ax=ax_slider,
label='Time Window (min)',
# Set a wider range for the slider, up to 90 days in minutes
valmin=5,
valmax=90 * 24 * 60,
valinit=time_window_minutes,
valstep=1
)
# Create a place for the text box for direct input
ax_textbox = plt.axes([0.1, 0.05, 0.1, 0.04])
textbox = TextBox(ax_textbox, 'Minutes')
textbox.set_val(str(time_window_minutes)) # Set initial value
# Create a place for the lower and upper limit text boxes
ax_lower_limit = plt.axes([0.3, 0.05, 0.1, 0.04])
lower_limit_textbox = TextBox(ax_lower_limit, 'Lower Limit (min)')
lower_limit_textbox.set_val(str(5)) # Set initial value
ax_upper_limit = plt.axes([0.5, 0.05, 0.1, 0.04])
upper_limit_textbox = TextBox(ax_upper_limit, 'Upper Limit (min)')
upper_limit_textbox.set_val(str(slider.valmax)) # Set initial value
# Create an axes for the new consumption calculation button
ax_button = plt.axes([0.7, 0.05, 0.2, 0.04])
calculate_button = Button(ax_button, 'Calculate Consumption (Wh)')
# Add a text annotation to the plot for the current consumption value
# This is our "digital analog meter"
current_consumption_text = ax.text(
0.5, 0.95, '',
horizontalalignment='center',
verticalalignment='top',
transform=ax.transAxes,
fontsize=24,
fontweight='bold',
color='blue'
)
# Add a new text annotation for the accumulated consumption
accumulated_consumption_text = ax.text(
0.5, 0.85, '',
horizontalalignment='center',
verticalalignment='top',
transform=ax.transAxes,
fontsize=16,
color='green'
)
def get_file_timestamp(filename):
"""
Returns the file's modification timestamp.
"""
return os.path.getmtime(filename)
def get_data(data_folder):
"""
Reads XML data from a folder, processes it, and returns a pandas DataFrame.
It handles errors gracefully for individual files.
"""
all_files = glob.glob(os.path.join(data_folder, "*.xml*"))
print(f"Found {len(all_files)} files.")
if not all_files:
print("No XML files found.")
return None
all_files.sort(key=get_file_timestamp)
all_data = []
for filename in all_files:
try:
with open(filename, 'r') as file:
xml_data = file.read()
file_mod_timestamp = get_file_timestamp(filename)
data_dict = xmltodict.parse(xml_data)
device_data = data_dict.get('device', {})
instant_data = device_data.get('instant', {})
# Change to look for 'current_L1' as requested
current_l1_str = instant_data.get('current_L1')
if current_l1_str is not None:
try:
current_l1 = float(current_l1_str)
all_data.append({
'timestamp': file_mod_timestamp,
'current_L1': current_l1
})
except ValueError:
print(f"Could not convert current_l1 value '{current_l1_str}' to float in file {filename}")
except Exception as e:
print(f"Error processing {filename}: {e}")
if not all_data:
print("No relevant data extracted from XML files.")
return None
df = pd.DataFrame(all_data)
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='s')
df.set_index('timestamp', inplace=True)
# Filter out values that are higher than our set maximum consumption
df = df[df['current_L1'] <= MAX_CONSUMPTION]
print(f"Extracted {len(df)} data points from files.")
print(f"Data range: {df.index.min()} to {df.index.max()}")
return df
def draw_plot(df, time_window_minutes):
"""
Draws the plot based on the provided dataframe and time window.
This function is now called by both the animation and the slider.
Returns the filtered dataframe.
"""
# Adjust for the one-hour time difference
end_time = datetime.now() - timedelta(hours=1)
start_time = end_time - timedelta(minutes=time_window_minutes)
# Filter the DataFrame to the current time window
df_filtered = df.loc[start_time:end_time].copy() # Use .copy() to avoid SettingWithCopyWarning
# Determine the title based on the time window
if time_window_minutes < 60:
title_text = f'Live Energy Readings (Last {time_window_minutes} Minutes)'
elif time_window_minutes < 24 * 60:
hours = time_window_minutes // 60
minutes = time_window_minutes % 60
title_text = f'Live Energy Readings (Last {hours} Hours'
if minutes > 0:
title_text += f' and {minutes} Minutes)'
else:
title_text += ')'
else:
days = time_window_minutes // (24 * 60)
remaining_minutes = time_window_minutes % (24 * 60)
hours = remaining_minutes // 60
title_text = f'Live Energy Readings (Last {days} Days'
if hours > 0:
title_text += f' and {hours} Hours)'
else:
title_text += ')'
if df_filtered.empty:
print(f"No data found for the last {time_window_minutes} minutes. Displaying all available data.")
df_filtered = df.copy() # Use .copy() here as well
ax.set_title('Live Energy Readings (Showing All Data)')
else:
ax.set_title(title_text)
# Use spline interpolation to smooth the line
if len(df_filtered) > 1:
x_data = df_filtered.index.astype(int) / 10**9 # Convert to UNIX timestamp for interpolation
y_data = df_filtered['current_L1'].values
x_new = np.linspace(x_data.min(), x_data.max(), 300)
# Create a spline representation and evaluate it on the new x values
spl = make_interp_spline(x_data, y_data, k=3) # k=3 for a cubic spline
y_new = spl(x_new)
# Convert interpolated x values back to datetime
x_new_datetime = pd.to_datetime(x_new, unit='s')
# Use set_data to efficiently update the plot line
line.set_data(x_new_datetime, y_new)
# Update the digital consumption meter
latest_value = df_filtered['current_L1'].iloc[-1]
current_consumption_text.set_text(f'{latest_value:.2f} W')
elif len(df_filtered) == 1:
# If less than 2 points, just plot the single point to avoid interpolation errors
line.set_data(df_filtered.index, df_filtered['current_L1'])
latest_value = df_filtered['current_L1'].iloc[-1]
current_consumption_text.set_text(f'{latest_value:.2f} W')
else:
# No data to plot
line.set_data([], [])
current_consumption_text.set_text('No Data')
# Correctly set X and Y axis limits based on the data that is actually plotted
if not df_filtered.empty:
ax.set_xlim(df_filtered.index.min(), df_filtered.index.max())
# Dynamically set Y-axis limits based on the data in the current window
min_y = df_filtered['current_L1'].min()
max_y = df_filtered['current_L1'].max()
padding_y = (max_y - min_y) * 0.1 if (max_y - min_y) > 0 else 100 # Add a buffer
ax.set_ylim(min_y - padding_y, max_y + padding_y)
else:
# Set a default Y-limit if no data is found to prevent errors
ax.set_xlim(start_time, end_time)
ax.set_ylim(0, 1000) # Revert to a safe, default range
ax.set_xlabel('Date and Time')
ax.set_ylabel('Current Readings (W)')
ax.grid(True)
ax.legend()
ax.tick_params(axis='x', rotation=45)
# Redraw the canvas immediately to ensure the slider effect is visible
fig.canvas.draw_idle()
return df_filtered
def update(frame):
"""
This function is called by FuncAnimation to update the plot.
It reads new data and calls the drawing function.
"""
data_folder_path = './data'
df = get_data(data_folder_path)
if df is not None and not df.empty:
draw_plot(df, slider.val)
def submit_textbox(text):
"""
This function is called when the user submits text from the input box.
"""
try:
val = int(text)
if slider.valmin <= val <= slider.valmax:
slider.set_val(val)
else:
print(f"Input value out of range. Please enter a number between {slider.valmin} and {slider.valmax}.")
except ValueError:
print("Invalid input. Please enter a valid number.")
def submit_lower_limit(text):
"""
This function is called when the user submits text from the lower limit input box.
"""
try:
val = int(text)
if val < slider.valmax:
slider.valmin = val
slider.set_val(max(val, slider.val)) # Set the slider to the new lower bound if it's currently below it
else:
print(f"Lower limit must be less than the upper limit ({slider.valmax}).")
except ValueError:
print("Invalid input. Please enter a valid number.")
def submit_upper_limit(text):
"""
This function is called when the user submits text from the upper limit input box.
"""
try:
val = int(text)
if val > slider.valmin:
slider.valmax = val
slider.set_val(min(val, slider.val)) # Set the slider to the new upper bound if it's currently above it
else:
print(f"Upper limit must be greater than the lower limit ({slider.valmin}).")
except ValueError:
print("Invalid input. Please enter a valid number.")
def update_time_window(val):
"""
This function is called when the slider's value changes.
It triggers an immediate redraw of the plot and updates the text box.
"""
textbox.set_val(str(int(val)))
data_folder_path = './data'
df = get_data(data_folder_path)
if df is not None and not df.empty:
draw_plot(df, val)
def calculate_consumption(event):
"""
Calculates the accumulated energy consumption (Wh) for the current time window.
"""
data_folder_path = './data'
df = get_data(data_folder_path)
if df is None or df.empty:
print("No data available to calculate consumption.")
accumulated_consumption_text.set_text("No Data for Calculation")
return
df_filtered = draw_plot(df, slider.val)
if df_filtered.empty:
print("No data in the current time window to calculate consumption.")
accumulated_consumption_text.set_text("No Data in Window")
return
# Convert timestamps to seconds and find the time differences
times_in_seconds = df_filtered.index.astype(int) / 10**9
time_deltas = np.diff(times_in_seconds)
# Use the trapezoidal rule to numerically integrate the power curve
# The result is in Watt-seconds, so we convert to Watt-hours by dividing by 3600
accumulated_wh = np.trapz(df_filtered['current_L1'].values, x=times_in_seconds) / 3600
accumulated_consumption_text.set_text(f'Accumulated Consumption: {accumulated_wh:.2f} Wh')
fig.canvas.draw_idle()
slider.on_changed(update_time_window)
textbox.on_submit(submit_textbox)
lower_limit_textbox.on_submit(submit_lower_limit)
upper_limit_textbox.on_submit(submit_upper_limit)
calculate_button.on_clicked(calculate_consumption)
if __name__ == "__main__":
print("Starting live plot. Use the slider to adjust the time window. Press Ctrl+C to stop.")
try:
# Use FuncAnimation to call the update function every 6 seconds (6000 ms)
ani = FuncAnimation(fig, update, interval=6000, cache_frame_data=False)
plt.show()
except KeyboardInterrupt:
print("\nPlotting stopped by user.")
plt.ioff()
plt.show()
Para esto hay que crear un comando que descarga el archivo .xml en una carpeta determinada. En esta entrada se describe la técnica. La carpeta, en este caso, la publicamos directamente en un servidor web local apache.
sudo watch -n 6 wget -P /var/www/html/energy/data/ 192.168.1.4/data.xml
sudo /var/www/html/energy/
sudo python3 data_plotter.py
Y este es el resultado, con el frontend y las dos consolas de backend. Es posible cambiar los intervalos de datos y calcular -mediante aproximación geométrica- el consumo acumulado en kWh del periodo deseado:
No hay comentarios:
Publicar un comentario