30 sept 2025

Lector de diferencial de potencial para modulo "ODEnergy meter" - medición de potencia consumida instantanea y estimación de acumulada.

 

lo improbable hace unos años, hecho realidad hoy

Hace muchos años adquirí este dispositivo IoT ensamblado en Spain de una empresa pionera en el tema de tener una lectura del diferencial de potencial eléctrico en un segmento de red determinado:

el cacharro en si consta de una entrada ethernet y una pinza para clipear en la fase. 


una portada con las opciones de configuración, montar un punto de acceso wifi y seguridad, ip de ethernet.  


y lo mas interesante, el archivo .xml con las lecturas instantáneas. Cada vez que refrescas se renueva la lectura.

 

Un aparato rudimentario y efectivo. Pensado para desarrollar aplicaciones en la nube, ya cuando lo compré esa opción había dejado de existir. Ahí quedo, interesante potencial pero complejo desarrollo para la captura y análisis de los datos recabados. Hasta hace poco.

 

Junto a la IA de google y mucha paciencia, ensayo y error, este código fue desarrollado:  

codename::live_plot.py::

 

import pandas as pd
import matplotlib.pyplot as plt
import os
import glob
import xmltodict
import time
from datetime import datetime, timedelta
from matplotlib.animation import FuncAnimation
from matplotlib.widgets import Slider, TextBox, Button
from xml.etree import ElementTree as ET
from scipy.interpolate import make_interp_spline
import numpy as np

# A global variable to store the time window, initially set to 10 minutes
time_window_minutes = 10
MAX_CONSUMPTION = 10000 # Set a maximum consumption threshold to filter out errors
fig, ax = plt.subplots(figsize=(12, 6))
line, = ax.plot([], [], label='Current Readings (W)', color='red', linewidth=1.5)

# Create a place for the slider below the plot
plt.subplots_adjust(bottom=0.25)
ax_slider = plt.axes([0.1, 0.1, 0.8, 0.03])
slider = Slider(
    ax=ax_slider,
    label='Time Window (min)',
    # Set a wider range for the slider, up to 90 days in minutes
    valmin=5,
    valmax=90 * 24 * 60,
    valinit=time_window_minutes,
    valstep=1
)

# Create a place for the text box for direct input
ax_textbox = plt.axes([0.1, 0.05, 0.1, 0.04])
textbox = TextBox(ax_textbox, 'Minutes')
textbox.set_val(str(time_window_minutes)) # Set initial value

# Create a place for the lower and upper limit text boxes
ax_lower_limit = plt.axes([0.3, 0.05, 0.1, 0.04])
lower_limit_textbox = TextBox(ax_lower_limit, 'Lower Limit (min)')
lower_limit_textbox.set_val(str(5)) # Set initial value

ax_upper_limit = plt.axes([0.5, 0.05, 0.1, 0.04])
upper_limit_textbox = TextBox(ax_upper_limit, 'Upper Limit (min)')
upper_limit_textbox.set_val(str(slider.valmax)) # Set initial value

# Create an axes for the new consumption calculation button
ax_button = plt.axes([0.7, 0.05, 0.2, 0.04])
calculate_button = Button(ax_button, 'Calculate Consumption (Wh)')

# Add a text annotation to the plot for the current consumption value
# This is our "digital analog meter"
current_consumption_text = ax.text(
    0.5, 0.95, '',
    horizontalalignment='center',
    verticalalignment='top',
    transform=ax.transAxes,
    fontsize=24,
    fontweight='bold',
    color='blue'
)

# Add a new text annotation for the accumulated consumption
accumulated_consumption_text = ax.text(
    0.5, 0.85, '',
    horizontalalignment='center',
    verticalalignment='top',
    transform=ax.transAxes,
    fontsize=16,
    color='green'
)

def get_file_timestamp(filename):
    """
    Returns the file's modification timestamp.
    """
    return os.path.getmtime(filename)

def get_data(data_folder):
    """
    Reads XML data from a folder, processes it, and returns a pandas DataFrame.
    It handles errors gracefully for individual files.
    """
    all_files = glob.glob(os.path.join(data_folder, "*.xml*"))
    print(f"Found {len(all_files)} files.")
    
    if not all_files:
        print("No XML files found.")
        return None
    
    all_files.sort(key=get_file_timestamp)

    all_data = []
    
    for filename in all_files:
        try:
            with open(filename, 'r') as file:
                xml_data = file.read()
            
            file_mod_timestamp = get_file_timestamp(filename)
            
            data_dict = xmltodict.parse(xml_data)
            
            device_data = data_dict.get('device', {})
            instant_data = device_data.get('instant', {})
            
            # Change to look for 'current_L1' as requested
            current_l1_str = instant_data.get('current_L1')
            
            if current_l1_str is not None:
                try:
                    current_l1 = float(current_l1_str)
                    all_data.append({
                        'timestamp': file_mod_timestamp,
                        'current_L1': current_l1
                    })
                except ValueError:
                    print(f"Could not convert current_l1 value '{current_l1_str}' to float in file {filename}")
                
        except Exception as e:
            print(f"Error processing {filename}: {e}")
    
    if not all_data:
        print("No relevant data extracted from XML files.")
        return None
        
    df = pd.DataFrame(all_data)
    df['timestamp'] = pd.to_datetime(df['timestamp'], unit='s')
    df.set_index('timestamp', inplace=True)
    
    # Filter out values that are higher than our set maximum consumption
    df = df[df['current_L1'] <= MAX_CONSUMPTION]
    
    print(f"Extracted {len(df)} data points from files.")
    print(f"Data range: {df.index.min()} to {df.index.max()}")
    
    return df

def draw_plot(df, time_window_minutes):
    """
    Draws the plot based on the provided dataframe and time window.
    This function is now called by both the animation and the slider.
    Returns the filtered dataframe.
    """
    # Adjust for the one-hour time difference
    end_time = datetime.now() - timedelta(hours=1)
    start_time = end_time - timedelta(minutes=time_window_minutes)
    
    # Filter the DataFrame to the current time window
    df_filtered = df.loc[start_time:end_time].copy() # Use .copy() to avoid SettingWithCopyWarning
    
    # Determine the title based on the time window
    if time_window_minutes < 60:
        title_text = f'Live Energy Readings (Last {time_window_minutes} Minutes)'
    elif time_window_minutes < 24 * 60:
        hours = time_window_minutes // 60
        minutes = time_window_minutes % 60
        title_text = f'Live Energy Readings (Last {hours} Hours'
        if minutes > 0:
            title_text += f' and {minutes} Minutes)'
        else:
            title_text += ')'
    else:
        days = time_window_minutes // (24 * 60)
        remaining_minutes = time_window_minutes % (24 * 60)
        hours = remaining_minutes // 60
        title_text = f'Live Energy Readings (Last {days} Days'
        if hours > 0:
            title_text += f' and {hours} Hours)'
        else:
            title_text += ')'

    if df_filtered.empty:
        print(f"No data found for the last {time_window_minutes} minutes. Displaying all available data.")
        df_filtered = df.copy() # Use .copy() here as well
        ax.set_title('Live Energy Readings (Showing All Data)')
    else:
        ax.set_title(title_text)

    # Use spline interpolation to smooth the line
    if len(df_filtered) > 1:
        x_data = df_filtered.index.astype(int) / 10**9 # Convert to UNIX timestamp for interpolation
        y_data = df_filtered['current_L1'].values
        x_new = np.linspace(x_data.min(), x_data.max(), 300)
        
        # Create a spline representation and evaluate it on the new x values
        spl = make_interp_spline(x_data, y_data, k=3) # k=3 for a cubic spline
        y_new = spl(x_new)
        
        # Convert interpolated x values back to datetime
        x_new_datetime = pd.to_datetime(x_new, unit='s')
        
        # Use set_data to efficiently update the plot line
        line.set_data(x_new_datetime, y_new)
        
        # Update the digital consumption meter
        latest_value = df_filtered['current_L1'].iloc[-1]
        current_consumption_text.set_text(f'{latest_value:.2f} W')
    elif len(df_filtered) == 1:
        # If less than 2 points, just plot the single point to avoid interpolation errors
        line.set_data(df_filtered.index, df_filtered['current_L1'])
        latest_value = df_filtered['current_L1'].iloc[-1]
        current_consumption_text.set_text(f'{latest_value:.2f} W')
    else:
        # No data to plot
        line.set_data([], [])
        current_consumption_text.set_text('No Data')
        
    # Correctly set X and Y axis limits based on the data that is actually plotted
    if not df_filtered.empty:
        ax.set_xlim(df_filtered.index.min(), df_filtered.index.max())
        
        # Dynamically set Y-axis limits based on the data in the current window
        min_y = df_filtered['current_L1'].min()
        max_y = df_filtered['current_L1'].max()
        padding_y = (max_y - min_y) * 0.1 if (max_y - min_y) > 0 else 100 # Add a buffer
        ax.set_ylim(min_y - padding_y, max_y + padding_y)

    else:
        # Set a default Y-limit if no data is found to prevent errors
        ax.set_xlim(start_time, end_time)
        ax.set_ylim(0, 1000) # Revert to a safe, default range

    ax.set_xlabel('Date and Time')
    ax.set_ylabel('Current Readings (W)')
    ax.grid(True)
    ax.legend()
    ax.tick_params(axis='x', rotation=45)
    
    # Redraw the canvas immediately to ensure the slider effect is visible
    fig.canvas.draw_idle()
    
    return df_filtered

def update(frame):
    """
    This function is called by FuncAnimation to update the plot.
    It reads new data and calls the drawing function.
    """
    data_folder_path = './data'
    df = get_data(data_folder_path)
    if df is not None and not df.empty:
        draw_plot(df, slider.val)

def submit_textbox(text):
    """
    This function is called when the user submits text from the input box.
    """
    try:
        val = int(text)
        if slider.valmin <= val <= slider.valmax:
            slider.set_val(val)
        else:
            print(f"Input value out of range. Please enter a number between {slider.valmin} and {slider.valmax}.")
    except ValueError:
        print("Invalid input. Please enter a valid number.")

def submit_lower_limit(text):
    """
    This function is called when the user submits text from the lower limit input box.
    """
    try:
        val = int(text)
        if val < slider.valmax:
            slider.valmin = val
            slider.set_val(max(val, slider.val)) # Set the slider to the new lower bound if it's currently below it
        else:
            print(f"Lower limit must be less than the upper limit ({slider.valmax}).")
    except ValueError:
        print("Invalid input. Please enter a valid number.")

def submit_upper_limit(text):
    """
    This function is called when the user submits text from the upper limit input box.
    """
    try:
        val = int(text)
        if val > slider.valmin:
            slider.valmax = val
            slider.set_val(min(val, slider.val)) # Set the slider to the new upper bound if it's currently above it
        else:
            print(f"Upper limit must be greater than the lower limit ({slider.valmin}).")
    except ValueError:
        print("Invalid input. Please enter a valid number.")

def update_time_window(val):
    """
    This function is called when the slider's value changes.
    It triggers an immediate redraw of the plot and updates the text box.
    """
    textbox.set_val(str(int(val)))
    data_folder_path = './data'
    df = get_data(data_folder_path)
    if df is not None and not df.empty:
        draw_plot(df, val)

def calculate_consumption(event):
    """
    Calculates the accumulated energy consumption (Wh) for the current time window.
    """
    data_folder_path = './data'
    df = get_data(data_folder_path)
    if df is None or df.empty:
        print("No data available to calculate consumption.")
        accumulated_consumption_text.set_text("No Data for Calculation")
        return
        
    df_filtered = draw_plot(df, slider.val)
    
    if df_filtered.empty:
        print("No data in the current time window to calculate consumption.")
        accumulated_consumption_text.set_text("No Data in Window")
        return

    # Convert timestamps to seconds and find the time differences
    times_in_seconds = df_filtered.index.astype(int) / 10**9
    time_deltas = np.diff(times_in_seconds)

    # Use the trapezoidal rule to numerically integrate the power curve
    # The result is in Watt-seconds, so we convert to Watt-hours by dividing by 3600
    accumulated_wh = np.trapz(df_filtered['current_L1'].values, x=times_in_seconds) / 3600
    
    accumulated_consumption_text.set_text(f'Accumulated Consumption: {accumulated_wh:.2f} Wh')
    fig.canvas.draw_idle()

slider.on_changed(update_time_window)
textbox.on_submit(submit_textbox)
lower_limit_textbox.on_submit(submit_lower_limit)
upper_limit_textbox.on_submit(submit_upper_limit)
calculate_button.on_clicked(calculate_consumption)

if __name__ == "__main__":
    print("Starting live plot. Use the slider to adjust the time window. Press Ctrl+C to stop.")
    
    try:
        # Use FuncAnimation to call the update function every 6 seconds (6000 ms)
        ani = FuncAnimation(fig, update, interval=6000, cache_frame_data=False)
        plt.show()
    except KeyboardInterrupt:
        print("\nPlotting stopped by user.")
        plt.ioff()
        plt.show()


Para esto hay que crear un comando que descarga el archivo .xml en una carpeta determinada. En esta entrada se describe la técnica. La carpeta, en este caso, la publicamos directamente en un servidor web local apache.



sudo watch -n 6 wget -P /var/www/html/energy/data/ 192.168.1.4/data.xml


sudo /var/www/html/energy/
sudo python3 data_plotter.py  

 

Y este es el resultado, con el frontend y las dos consolas de backend. Es posible cambiar los intervalos de datos y calcular -mediante aproximación geométrica- el consumo acumulado en kWh del periodo deseado:


 

No hay comentarios:

Publicar un comentario