This Python script achieves the following functionality: ### Purpose: The code aims...

March 30, 2025 at 04:28 PM

import serial # For communicating with Arduino (pip install pyserial) import csv # For CSV file operations import threading # To handle concurrent tasks for data acquisition and user input import datetime # To timestamp sensor readings import matplotlib.pyplot as plt import matplotlib.animation as animation import matplotlib.dates as mdates # --- Configuration --- serial_port = "/dev/ttyACM0" # Change this to your port (e.g., "COM3" on Windows) baud_rate = 9600 # Must match the Arduino's baud rate filename = "INA219-output.csv" # File where sensor data is recorded # --- Open the Serial Port --- ser = serial.Serial(serial_port, baud_rate) print("Connected to Arduino port: " + serial_port) # --- Global Data for Real-Time Plotting --- # We'll store tuples of (datetime, current_value) in this list. data_list = [] data_lock = threading.Lock() # Ensure thread-safe access to data_list # --- Thread Exit Flag --- stop_event = threading.Event() # --- Data Acquisition Function --- def read_data(): """Continuously read from the serial port, save to CSV, and update the global data_list.""" global data_list with open(filename, 'w', encoding='UTF8', newline='') as f: writer = csv.writer(f) writer.writerow(["Date", "Time", "Current"]) while not stop_event.is_set(): try: line = ser.readline().decode('utf-8').strip() except Exception as e: print("Error reading from serial:", e) continue if not line: continue # Skip if no data was received # Get the current timestamp now = datetime.datetime.now() date_str = now.strftime('%Y-%m-%d') time_str = now.strftime('%H:%M:%S') # Write the data to the CSV file sensor_data = [date_str, time_str, line] writer.writerow(sensor_data) print("Data written to CSV:", sensor_data) # Attempt to convert the sensor data to a float for plotting try: current_value = float(line) except ValueError: # Skip the data if it cannot be converted continue # Append the reading to the global data list in a thread-safe manner with data_lock: data_list.append((now, current_value)) # --- User Input Thread --- def wait_for_enter(): """Wait for user press Enter to signal stop of data collection.""" input("Press Enter to stop...\n") stop_event.set() # --- Set Up Threads for Data Acquisition and User Input --- data_thread = threading.Thread(target=read_data) input_thread = threading.Thread(target=wait_for_enter) data_thread.start() input_thread.start() # --- Real-Time Visualization Setup --- fig, ax = plt.subplots() line_plot, = ax.plot([], [], 'b-', label='Current (A)') ax.set_xlabel('Time') ax.set_ylabel('Current (A)') ax.set_title('Real-Time Sensor Data') ax.legend() ax.grid(True) # Format the x-axis to display time nicely xfmt = mdates.DateFormatter('%H:%M:%S') ax.xaxis.set_major_formatter(xfmt) def init(): """Initialize the plot axes limits and the line data.""" now = datetime.datetime.now() ax.set_xlim(now, now + datetime.timedelta(seconds=10)) ax.set_ylim(0, 10) # Adjust as needed for your sensor's expected range line_plot.set_data([], []) return line_plot, def update(frame): """Called periodically by FuncAnimation to update the plot with new data.""" with data_lock: if not data_list: return line_plot, # Copy the data safely so we don't hold the lock while updating the plot times, currents = zip(*data_list) # Update x-axis limits based on the data range ax.set_xlim(min(times), max(times)) # Optionally adjust the y-axis limits dynamically y_min = min(currents) y_max = max(currents) ax.set_ylim(y_min - 0.1, y_max + 0.1) # Update the line data line_plot.set_data(times, currents) return line_plot, # Create the animation object. Interval is in milliseconds. ani = animation.FuncAnimation(fig, update, init_func=init, interval=1000, blit=True) # Display the plot window (this call blocks until the plot window is closed) plt.show() # --- Wait for Threads to Finish --- data_thread.join() input_thread.join() ser.close() print("Script terminated.")

This Python script achieves the following functionality:

Purpose:

The code aims to read current sensor data from an Arduino device via a serial port, store it in a CSV file, and display it in real-time on a live plot. It uses threading to handle multiple tasks concurrently, including data acquisition, user input for stopping the script, and real-time plotting.


Step-by-step Functionality:

  1. Imports Necessary Libraries:

    • serial: To communicate with the Arduino over a serial connection.
    • csv: For writing sensor data to a CSV file.
    • threading: To run data acquisition and user input tasks concurrently.
    • datetime: For timestamping sensor readings with the current date and time.
    • matplotlib: To create real-time line plots.
  2. Configuration:

    • Defines the serial_port (e.g., "/dev/ttyACM0", Arduino connection) and baud_rate (communication speed, 9600 bps) to connect to the Arduino.
    • Specifies filename ("INA219-output.csv") as the CSV output file to log sensor data.
  3. Serial Port Initialization:

    • Opens a serial connection to the specified port and baud rate to communicate with the Arduino.
  4. Global Variables:

    • Initializes data_list to store sensor readings as tuples (timestamp, current_value).
    • Uses data_lock to synchronize access to data_list across multiple threads safely.
    • Creates a stop_event flag to signal when to exit the data acquisition loop.
  5. Data Acquisition:

    • A thread (read_data) continuously reads data from the serial port:
      • It decodes the incoming data.
      • Timestamps the data.
      • Writes it to the output CSV file (INA219-output.csv).
      • Parses the data as a floating-point number (current value in amperes) and appends it to data_list.
  6. User Input to Stop:

    • Another thread (wait_for_enter) waits for the user to press "Enter" to signal the script to stop.
    • When triggered, it sets the stop_event flag, which stops the data acquisition loop.
  7. Real-Time Plotting:

    • Uses matplotlib to display sensor data in real time:
      • Initializes a plot with time (x-axis) and current (y-axis).
      • Handles live updates through matplotlib.animation.FuncAnimation. The update function fetches the latest data from data_list and redraws the plot every second.
      • Formats the x-axis as time using mdates.DateFormatter.
  8. Threads and Plot Execution:

    • Starts the two threads: one for data acquisition and one for user input handling.
    • Displays the real-time plot (blocking until the plot window is closed).
    • Waits for the threads to finish execution cleanly after the user presses "Enter" or the program finishes.
  9. Cleanup:

    • Closes the serial connection and prints a termination message before exiting.

Key Functional Highlights:

  1. Data Acquisition:

    • Reads actual sensor data from the Arduino via serial communication.
    • Stores time-stamped data into both a CSV file and a Python list for live plotting.
  2. Real-Time Display:

    • Dynamically updates a plot with changing current readings over time.
  3. User Interaction:

    • Allows the user to stop the script by pressing "Enter," ensuring a clean exit.
  4. Thread-Safe Data Sharing:

    • Uses threading with locks (data_lock) to safely share data (data_list) between threads handling data acquisition, real-time plotting, and user input.
  5. Dynamic Plot Updates:

    • Automatically adjusts plot bounds (x-axis and y-axis) to fit the incoming data.

Typical Use Case:

  • This script would be used for logging and visualizing sensor data in real time, where the sensor is connected to an Arduino sending measurements over a serial connection (e.g., current readings from an INA219 sensor).
Generate your own explanations
Download our vscode extension
Read other generated explanations

Built by @thebuilderjr
Sponsored by beam analytics
Read our terms and privacy policy
Forked from openai-quickstart-node