A Practical Guide for Students
(Project Practice 2 2024/2025 - [Juan Manuel Martínez Moreno])
Ever wondered how your smartwatch knows how you're sleeping, walking, or jogging, all without draining your battery or sending your data to the cloud? That's Embedded Machine Learning (ML), smart decisions made by tiny, efficient computers right inside your devices. Embedded ML is everywhere and expanding insanely fast. By 2030, there'll be around 31.4 billion connected devices by 2030 according to Transforma Insights, all smarter and more self-contained than ever. This is why you should care:
Running sophisticated ML models on these tiny devices isn't easy. The constraints are harsh:
Edge Impulse is a leading MLOps platform specifically designed for edge devices. It is a poweful web-based toolkit that simplifies the entire process of creating embedded ML solutions.
Why use Edge Impulse?
The typical workflow in Edge Impulse looks like this:
This guide aims to help through the practical steps of using Edge Impulse to build your own simple embedded ML projects. You will learn:
We will cover three main hands-on projects:
Train a model to recognize the keyword "Good Morning" using simulated deployment on a Cortex-M4F microcontroller. This simulation approach allows you to understand performance characteristics on constrained targets without requiring the actual hardware.
goodmorning
- Samples of you saying "Good Morning"noise
- Background noise samplesunknown
- Other spoken words/phrasesnoise
and unknown
labels to supplement your recordingsParameter | Value | Purpose |
---|---|---|
Frame Length | 0.02s | Captures fine acoustic details |
Stride | 0.01s | Overlapping windows for smoother analysis |
FFT Length | 256 | Frequency resolution |
Number of Filters | 40 | Number of frequency bands |
Noise Floor | -60dB | Baseline noise level |
Pre-emphasis | 0.98 | Enhances higher frequencies |
Normalization Window | Per inference window | Normalizes each sample individually |
Number of Coefficients | 13 | Dimensions of audio features |
Train a model to recognize different hand motions (circle, eight, idle, shake, tap) using an MPU6050 accelerometer connected to a Raspberry Pi, with inference running directly on the Pi.
Make the following connections:
Raspberry Pi Pin | MPU6050 Pin |
---|---|
3.3V | VCC |
GND | GND |
SDA1 (GPIO 2) | SDA |
SCL1 (GPIO 3) | SCL |
Enable I2C on the Raspberry Pi:
sudo raspi-config # Navigate to: Interfacing Options → I2C → Enable # Reboot your Pi
Verify the connection:
i2cdetect -y 1 # Use 0 instead of 1 for older Pi models
You should see the MPU6050 address (typically 0x68) in the output grid.
Install required dependencies:
sudo apt update && sudo apt install python3-pip pip3 install flask smbus2 requests numpy edge_impulse_linux
Get Edge Impulse API Keys:
Create the server script (server.py
):
server.py
on your Raspberry Pi.HMAC_KEY
and API_KEY
near the top of the script with the actual keys you copied from Edge Impulse.Click to expand/collapse server.py code
# --- START OF FILE server.py --- import json import time import hmac import hashlib import requests import re import os import numpy as np import uuid from edge_impulse_linux.runner import ImpulseRunner from smbus2 import SMBus from flask import Flask, jsonify, request app = Flask(__name__) current_dir = os.path.dirname(os.path.abspath(__file__)) model_path = os.path.join(current_dir, 'model.eim') # =============================================================== # == IMPORTANT: REPLACE WITH YOUR EDGE IMPULSE KEYS == # =============================================================== HMAC_KEY = "YOUR_HMAC_KEY_HERE" # Replace with your HMAC Key API_KEY = "YOUR_API_KEY_HERE" # Replace with your API Key # =============================================================== runner = None try: runner = ImpulseRunner(model_path) runner.init() print(f"Model '{model_path}' initialized successfully.") except Exception as e: print(f"ERROR: Could not initialize model '{model_path}': {e}") print("Ensure the model file exists in the same directory as the server script.") class MPU6050: def __init__(self, bus_num=1, address=0x68): try: self.bus = SMBus(bus_num) self.address = address # Wake up the MPU6050 self.bus.write_byte_data(self.address, 0x6B, 0) print("MPU6050 initialized successfully.") except Exception as e: print(f"ERROR: Could not initialize MPU6050: {e}") print("Check I2C connection and configuration (sudo raspi-config).") # Optionally re-raise or handle appropriately raise def read_raw_data(self, addr): # Read two bytes (high and low) from the specified address high = self.bus.read_byte_data(self.address, addr) low = self.bus.read_byte_data(self.address, addr + 1) # Combine high and low bytes into a 16-bit signed value value = (high << 8) | low # Convert to signed value if necessary if value > 32768: value = value - 65536 return value def get_acceleration(self): # Read raw accelerometer data acc_x_raw = self.read_raw_data(0x3B) acc_y_raw = self.read_raw_data(0x3D) acc_z_raw = self.read_raw_data(0x3F) # Convert raw data to m/s^2 (using default sensitivity AFS_SEL=0 -> 16384 LSB/g) # 1 g = 9.81 m/s^2 acc_x = (acc_x_raw / 16384.0) * 9.81 acc_y = (acc_y_raw / 16384.0) * 9.81 acc_z = (acc_z_raw / 16384.0) * 9.81 return [acc_x, acc_y, acc_z] try: mpu = MPU6050() except Exception as e: # If MPU fails to init, set it to None so endpoints can handle it mpu = None print("Continuing without MPU6050 sensor...") @app.route('/readings', methods=['GET']) def get_readings(): """Get current accelerometer readings""" if mpu is None: return jsonify({'success': False, 'error': 'MPU6050 not initialized'}), 500 try: readings = mpu.get_acceleration() return jsonify({ 'success': True, 'data': { 'x': readings[0], 'y': readings[1], 'z': readings[2] } }) except Exception as e: print(f"Error reading MPU6050: {e}") return jsonify({'success': False, 'error': f'Error reading sensor: {str(e)}'}), 500 @app.route('/predict', methods=['GET']) def predict(): """Get current accelerometer reading and run inference""" if mpu is None: return jsonify({'success': False, 'error': 'MPU6050 not initialized'}), 500 if runner is None: return jsonify({'success': False, 'error': 'Edge Impulse model runner not initialized'}), 500 try: samples = [] print("Starting to collect samples for prediction...") # Assuming the model expects 100 samples based on previous context. # Adjust this based on your Edge Impulse window size and frequency. # Example: 2 second window at 62.5Hz -> 125 samples # Example: If your window is 2000ms and interval is 16ms (62.5Hz): num_samples_expected = runner.model_info.get('model_parameters', {}).get('input_features_count', 100) # Example, adjust if needed sampling_interval_seconds = 0.016 # Approx 62.5 Hz for _ in range(num_samples_expected): readings = mpu.get_acceleration() # Ensure readings are floats readings = [float(x) for x in readings] samples.append(readings) time.sleep(sampling_interval_seconds) # Match approx sampling frequency used in training print(f"Collected {len(samples)} samples.") # Flatten the list of lists into a single list of features # Expected format is typically [ax1, ay1, az1, ax2, ay2, az2, ...] features = [item for sublist in samples for item in sublist] print(f"Feature vector length: {len(features)}") # Run inference print("Running inference...") try: result = runner.classify(features) print(f"Raw inference result: {result}") # Process the result if result and 'result' in result and 'classification' in result['result']: classification = result['result']['classification'] # Find the prediction with the highest score best_prediction = max(classification.items(), key=lambda item: item[1]) motion_type, confidence = best_prediction return jsonify({ 'success': True, 'prediction': str(motion_type), 'confidence': float(confidence * 100), # Convert to percentage 'raw_scores': {str(k): float(v) for k, v in classification.items()}, # Send all scores #'raw_data': samples # Optional: send the collected data back }) else: print("Inference result structure not as expected.") return jsonify({'success': False, 'error': 'Invalid inference result format'}), 500 except Exception as e: print(f"Inference error: {str(e)}") return jsonify({ 'success': False, 'error': f'Inference error: {str(e)}' }), 500 except Exception as e: print(f"Prediction endpoint error: {str(e)}") return jsonify({ 'success': False, 'error': f'Error during prediction process: {str(e)}' }), 500 @app.route('/collect', methods=['POST']) def collect_data(): """Collect and send data to Edge Impulse""" if mpu is None: return jsonify({'success': False, 'error': 'MPU6050 not initialized'}), 500 if not HMAC_KEY or not API_KEY or "YOUR_" in HMAC_KEY or "YOUR_" in API_KEY: return jsonify({'success': False, 'error': 'API_KEY or HMAC_KEY not configured in server.py'}), 400 try: data = request.get_json() if not data: return jsonify({'success': False, 'error': 'No JSON data received'}), 400 label = data.get('label', 'collected_data') # Get label from request, default if not provided sample_length_seconds = data.get('duration', 2) interval_ms = data.get('interval', 16) # Default to ~62.5 Hz if interval_ms <= 0: return jsonify({'success': False, 'error': 'Interval must be positive'}), 400 num_samples = int((sample_length_seconds * 1000) / interval_ms) print(f"Collecting {num_samples} samples over {sample_length_seconds}s with label '{label}'...") values_list = [] start_time = time.time() for i in range(num_samples): loop_start = time.time() try: values = mpu.get_acceleration() values_list.append([float(v) for v in values]) # Ensure floats except Exception as e: print(f"Warning: Failed to read sensor on sample {i+1}: {e}") # Decide how to handle: skip, fill with zeros, etc. Here we skip. # If skipping, adjust sleep time calculation or it might finish too early. # For simplicity, we'll still sleep, but the sample count will be lower. # Precise sleep to maintain interval elapsed = time.time() - loop_start sleep_time = (interval_ms / 1000.0) - elapsed if sleep_time > 0: time.sleep(sleep_time) end_time = time.time() print(f"Collection finished in {end_time - start_time:.2f} seconds. Collected {len(values_list)} actual samples.") # Get a unique device ID (MAC address) try: device_mac = ':'.join(re.findall('..', '%012x' % uuid.getnode())) except: device_mac = "unknown_pi_" + str(uuid.uuid4())[:8] # Fallback # Prepare payload for Edge Impulse Ingestion API payload = { "protected": { "ver": "v1", "alg": "HS256", "iat": int(time.time()) # Use integer timestamp }, "signature": "", # Placeholder, calculated later "payload": { "device_name": device_mac, "device_type": "RASPBERRY_PI_MPU6050", # Be descriptive "interval_ms": interval_ms, "sensors": [ {"name": "accX", "units": "m/s2"}, {"name": "accY", "units": "m/s2"}, {"name": "accZ", "units": "m/s2"} ], "values": values_list } } # Calculate HMAC signature # The signature is calculated over the JSON string of the payload # (with the signature field empty or not present initially) encoded_payload = json.dumps(payload).encode('utf-8') signature = hmac.new( bytes(HMAC_KEY, 'utf-8'), msg=encoded_payload, digestmod=hashlib.sha256 ).hexdigest() # Update the signature in the payload payload['signature'] = signature # Send data to Edge Impulse print("Uploading data to Edge Impulse...") try: res = requests.post( url='https://ingestion.edgeimpulse.com/api/training/data', data=json.dumps(payload), # Send the final payload with signature headers={ 'Content-Type': 'application/json', 'x-file-name': f"{label}.{int(time.time())}", # Include label in filename 'x-api-key': API_KEY }, timeout=30 # Add a timeout ) print(f"Upload response status code: {res.status_code}") print(f"Upload response text: {res.text}") if res.status_code == 200: return jsonify({ 'success': True, 'message': f'Data for label "{label}" collected and uploaded successfully.' }) else: return jsonify({ 'success': False, 'error': f'Upload failed. Status code: {res.status_code}. Response: {res.text}' }), res.status_code # Return server error code except requests.exceptions.RequestException as req_e: print(f"Error uploading data: {req_e}") return jsonify({'success': False, 'error': f'Failed to connect to Edge Impulse ingestion API: {str(req_e)}'}), 500 except Exception as e: print(f"Error in /collect endpoint: {e}") # Include traceback for debugging if possible/desired # import traceback # print(traceback.format_exc()) return jsonify({'success': False, 'error': f'An unexpected error occurred: {str(e)}'}), 500 if __name__ == '__main__': print("Starting Flask server...") # Make sure the host is accessible from your client machine app.run(host='0.0.0.0', port=5000) # --- END OF FILE server.py ---
Download the Edge Impulse model:
int8
) or unquantized (float32
) model. Quantized is usually faster and smaller..eim
file.model.eim
file in the same directory as server.py
on your Raspberry Pi. The server script expects it there.Install dependencies on your computer (where you will run the GUI):
pip install requests tkinter
Create the client script (client.py
):
client.py
.Click to expand/collapse client.py code
# --- START OF FILE client.py --- import tkinter as tk from tkinter import ttk, messagebox import requests import threading import time import json class AccelerometerClientGUI: def __init__(self, root): self.root = root self.root.title("Remote Accelerometer Control") # Use a themed style style = ttk.Style() # Try different themes if 'clam' isn't ideal on your OS # Available themes: style.theme_names() -> ('winnative', 'clam', 'alt', 'default', 'classic', 'vista', 'xpnative') try: style.theme_use('clam') # 'clam', 'alt', 'default', etc. might look better depending on OS except tk.TclError: print("Clam theme not available, using default.") style.theme_use(style.theme_names()[0]) # Fallback to the first available theme self.server_running = False self.api_base_url = None self.live_update_active = False # Flag to control the live update thread # Main frame for padding main_frame = ttk.Frame(root, padding="20 20 20 20") main_frame.pack(expand=True, fill='both') # Connection Frame conn_frame = ttk.LabelFrame(main_frame, text="Server Connection", padding="15 10 15 10") conn_frame.pack(fill='x', padx=5, pady=(0, 10)) conn_frame.grid_columnconfigure(1, weight=1) # Make entry expand ttk.Label(conn_frame, text="IP/Hostname:").grid(row=0, column=0, sticky=tk.W, pady=5, padx=(0, 5)) self.hostname_var = tk.StringVar(value="192.168.1.100") # Default IP, change if needed hostname_entry = ttk.Entry(conn_frame, textvariable=self.hostname_var, width=25) hostname_entry.grid(row=0, column=1, sticky='ew', padx=5, pady=5) ttk.Label(conn_frame, text="Port:").grid(row=1, column=0, sticky=tk.W, pady=5, padx=(0, 5)) self.port_var = tk.StringVar(value="5000") port_entry = ttk.Entry(conn_frame, textvariable=self.port_var, width=10) port_entry.grid(row=1, column=1, sticky='w', padx=5, pady=5) # Align left self.connect_button = ttk.Button(conn_frame, text="Connect", command=self.connect_to_server) self.connect_button.grid(row=2, column=0, columnspan=2, pady=(10, 5)) # Settings Frame for Collection settings_frame = ttk.LabelFrame(main_frame, text="Data Collection Settings", padding="15 10 15 10") settings_frame.pack(fill='x', padx=5, pady=10) settings_frame.grid_columnconfigure(1, weight=1) # Make entries expand ttk.Label(settings_frame, text="Label:").grid(row=0, column=0, sticky=tk.W, pady=5, padx=(0, 5)) self.label_var = tk.StringVar(value="idle") # Default label label_entry = ttk.Entry(settings_frame, textvariable=self.label_var, width=15) label_entry.grid(row=0, column=1, sticky='w', padx=5, pady=5) ttk.Label(settings_frame, text="Duration (s):").grid(row=1, column=0, sticky=tk.W, pady=5, padx=(0, 5)) self.duration_var = tk.StringVar(value="2") duration_entry = ttk.Entry(settings_frame, textvariable=self.duration_var, width=10) duration_entry.grid(row=1, column=1, sticky='w', padx=5, pady=5) ttk.Label(settings_frame, text="Interval (ms):").grid(row=2, column=0, sticky=tk.W, pady=5, padx=(0, 5)) self.interval_var = tk.StringVar(value="16") # Approx 62.5 Hz interval_entry = ttk.Entry(settings_frame, textvariable=self.interval_var, width=10) interval_entry.grid(row=2, column=1, sticky='w', padx=5, pady=5) self.collect_button = ttk.Button(settings_frame, text="Start Collection", command=self.start_collection) self.collect_button.grid(row=3, column=0, columnspan=2, pady=(10, 5)) self.collect_button.state(['disabled']) # Initially disabled # Live Data Frame live_frame = ttk.LabelFrame(main_frame, text="Live Data", padding="15 10 15 10") live_frame.pack(fill='x', padx=5, pady=10) self.acc_x_var = tk.StringVar(value="X: --- m/s²") self.acc_y_var = tk.StringVar(value="Y: --- m/s²") self.acc_z_var = tk.StringVar(value="Z: --- m/s²") style.configure('DataLabel.TLabel', font=('Helvetica', 11)) # Slightly smaller font ttk.Label(live_frame, textvariable=self.acc_x_var, style='DataLabel.TLabel').pack(pady=2, anchor='w') ttk.Label(live_frame, textvariable=self.acc_y_var, style='DataLabel.TLabel').pack(pady=2, anchor='w') ttk.Label(live_frame, textvariable=self.acc_z_var, style='DataLabel.TLabel').pack(pady=2, anchor='w') # Live Prediction Frame prediction_frame = ttk.LabelFrame(main_frame, text="Live Prediction", padding="15 10 15 10") prediction_frame.pack(fill='x', padx=5, pady=10) self.prediction_var = tk.StringVar(value="Prediction: ---") self.confidence_var = tk.StringVar(value="Confidence: ---%") style.configure('Prediction.TLabel', font=('Helvetica', 12, 'bold')) ttk.Label(prediction_frame, textvariable=self.prediction_var, style='Prediction.TLabel').pack(pady=3, anchor='w') ttk.Label(prediction_frame, textvariable=self.confidence_var, style='DataLabel.TLabel').pack(pady=3, anchor='w') self.test_button = ttk.Button(prediction_frame, text="Test Prediction", command=self.test_prediction) self.test_button.pack(pady=(10, 5)) self.test_button.state(['disabled']) # Initially disabled # Status Bar Frame status_frame = ttk.Frame(root, relief=tk.SUNKEN, borderwidth=1) status_frame.pack(side=tk.BOTTOM, fill='x') self.status_var = tk.StringVar(value="Status: Not connected") self.status_label = ttk.Label(status_frame, textvariable=self.status_var, anchor=tk.W, padding="5 2 5 2") self.status_label.pack(fill='x') self.update_thread = None def set_status(self, message, is_error=False): """Updates the status bar""" prefix = "Status: " if is_error: prefix = "Error: " # Optionally change label color for errors # self.status_label.config(foreground='red') # else: # self.status_label.config(foreground='black') # Reset color self.status_var.set(prefix + message) print(prefix + message) # Also print to console def connect_to_server(self): """Attempts to connect to the Flask server.""" hostname = self.hostname_var.get().strip() port = self.port_var.get().strip() if not hostname or not port: self.set_status("Hostname and Port cannot be empty.", is_error=True) return try: port_num = int(port) self.api_base_url = f"http://{hostname}:{port_num}" self.set_status(f"Connecting to {self.api_base_url}...") self.connect_button.state(['disabled']) # Disable button during attempt # Run connection test in a separate thread to keep UI responsive thread = threading.Thread(target=self._test_connection) thread.daemon = True thread.start() except ValueError: self.set_status("Invalid Port number.", is_error=True) self.connect_button.state(['!disabled']) except Exception as e: self.set_status(f"Connection setup error: {str(e)}", is_error=True) self.connect_button.state(['!disabled']) def _test_connection(self): """Background task to test server connection.""" try: # Use the /readings endpoint as a simple health check response = requests.get(f"{self.api_base_url}/readings", timeout=5) response.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx) if response.status_code == 200: data = response.json() if data.get('success'): self.server_running = True self.set_status("Connected to server successfully.") self.collect_button.state(['!disabled']) self.test_button.state(['!disabled']) # Start live data updates if not already running if not self.live_update_active: self.live_update_active = True self.update_thread = threading.Thread(target=self.update_live_data) self.update_thread.daemon = True self.update_thread.start() else: # Server responded but with success=False errmsg = data.get('error', 'Unknown server error') self.set_status(f"Server connection issue: {errmsg}", is_error=True) self._handle_disconnection() else: # Should be caught by raise_for_status, but handle just in case self.set_status(f"Server responded with status {response.status_code}", is_error=True) self._handle_disconnection() except requests.exceptions.ConnectionError: self.set_status("Connection refused. Is the server running?", is_error=True) self._handle_disconnection() except requests.exceptions.Timeout: self.set_status("Connection timed out.", is_error=True) self._handle_disconnection() except requests.exceptions.RequestException as e: self.set_status(f"Connection error: {str(e)}", is_error=True) self._handle_disconnection() except json.JSONDecodeError: self.set_status("Invalid response from server (not JSON).", is_error=True) self._handle_disconnection() finally: # Re-enable connect button only if connection failed if not self.server_running: self.connect_button.state(['!disabled']) def _handle_disconnection(self): """Resets state upon disconnection or connection failure.""" self.server_running = False self.live_update_active = False # Signal the update thread to stop self.collect_button.state(['disabled']) self.test_button.state(['disabled']) self.acc_x_var.set("X: --- m/s²") self.acc_y_var.set("Y: --- m/s²") self.acc_z_var.set("Z: --- m/s²") self.prediction_var.set("Prediction: ---") self.confidence_var.set("Confidence: ---%") self.api_base_url = None def update_live_data(self): """Periodically fetches live data from the server.""" while self.live_update_active: # Check the flag if self.server_running and self.api_base_url: try: response = requests.get(f"{self.api_base_url}/readings", timeout=1) if response.status_code == 200: data = response.json() if data.get('success'): readings = data['data'] self.acc_x_var.set(f"X: {readings.get('x', 0.0):.2f} m/s²") self.acc_y_var.set(f"Y: {readings.get('y', 0.0):.2f} m/s²") self.acc_z_var.set(f"Z: {readings.get('z', 0.0):.2f} m/s²") else: # Server reported an issue getting readings self.set_status(f"Server error getting readings: {data.get('error', 'Unknown')}", is_error=True) # Consider stopping live updates if this persists else: self.set_status(f"Live data request failed (Status: {response.status_code})", is_error=True) # Consider stopping live updates except requests.exceptions.RequestException as e: # Don't flood status, just log it maybe, or set a general 'connection lost' status # print(f"Live data update failed: {e}") # Log to console instead of status bar # If connection is lost, the main connect logic should handle resetting state pass except json.JSONDecodeError: print("Error decoding live data JSON") # Log to console else: # If server is not marked as running, stop trying break time.sleep(0.1) # Update roughly 10 times per second print("Live update thread stopped.") def start_collection(self): """Sends a request to the server to start data collection.""" if not self.server_running or not self.api_base_url: self.set_status("Not connected to server.", is_error=True) return try: label = self.label_var.get().strip() duration_str = self.duration_var.get().strip() interval_str = self.interval_var.get().strip() if not label: messagebox.showerror("Input Error", "Label cannot be empty.") return if not duration_str or not interval_str: messagebox.showerror("Input Error", "Duration and Interval cannot be empty.") return duration = float(duration_str) interval = int(interval_str) if duration <= 0 or interval <= 0: messagebox.showerror("Input Error", "Duration and Interval must be positive.") return self.collect_button.state(['disabled']) # Disable button during collection self.test_button.state(['disabled']) # Disable prediction during collection self.set_status(f"Collecting data for '{label}' ({duration}s)...") # Run collection in a separate thread thread = threading.Thread(target=self._collect_data_task, args=(label, duration, interval)) thread.daemon = True thread.start() except ValueError: messagebox.showerror("Input Error", "Duration must be a number and Interval must be an integer.") self.collect_button.state(['!disabled']) # Re-enable if input was bad self.test_button.state(['!disabled']) except Exception as e: self.set_status(f"Error starting collection: {str(e)}", is_error=True) self.collect_button.state(['!disabled']) self.test_button.state(['!disabled']) def _collect_data_task(self, label, duration, interval): """Background task for data collection.""" try: # Timeout slightly longer than collection duration + some buffer request_timeout = duration + 10 response = requests.post( f"{self.api_base_url}/collect", json={ 'label': label, 'duration': duration, 'interval': interval }, timeout=request_timeout ) response.raise_for_status() # Check for HTTP errors if response.status_code == 200: data = response.json() if data.get('success'): self.set_status(data.get('message', "Data collection successful.")) else: self.set_status(f"Collection failed: {data.get('error', 'Unknown server error')}", is_error=True) # raise_for_status handles non-200 codes here except requests.exceptions.Timeout: self.set_status(f"Collection request timed out after {request_timeout}s.", is_error=True) except requests.exceptions.RequestException as e: self.set_status(f"Collection request error: {str(e)}", is_error=True) except json.JSONDecodeError: self.set_status("Invalid response from server during collection.", is_error=True) except Exception as e: self.set_status(f"Unexpected error during collection: {str(e)}", is_error=True) finally: # Re-enable buttons after collection attempt finishes or fails self.collect_button.state(['!disabled']) self.test_button.state(['!disabled']) def test_prediction(self): """Requests a prediction from the server.""" if not self.server_running or not self.api_base_url: self.set_status("Not connected to server.", is_error=True) return self.test_button.state(['disabled']) # Disable button during prediction self.collect_button.state(['disabled']) # Disable collection during prediction self.set_status("Running prediction...") self.prediction_var.set("Prediction: Running...") self.confidence_var.set("Confidence: ---%") # Run prediction in a separate thread thread = threading.Thread(target=self._predict_task) thread.daemon = True thread.start() def _predict_task(self): """Background task for running prediction.""" try: # Timeout for prediction (e.g., 10 seconds) response = requests.get(f"{self.api_base_url}/predict", timeout=10) response.raise_for_status() # Check for HTTP errors if response.status_code == 200: data = response.json() if data.get('success'): self.prediction_var.set(f"Prediction: {data.get('prediction', 'N/A')}") self.confidence_var.set(f"Confidence: {data.get('confidence', 0.0):.1f}%") self.set_status("Prediction successful.") else: error_msg = data.get('error', 'Unknown server error') self.set_status(f"Prediction failed: {error_msg}", is_error=True) self.prediction_var.set("Prediction: Failed") self.confidence_var.set("Confidence: ---%") # raise_for_status handles non-200 codes except requests.exceptions.Timeout: self.set_status("Prediction request timed out.", is_error=True) self.prediction_var.set("Prediction: Timeout") self.confidence_var.set("Confidence: ---%") except requests.exceptions.RequestException as e: self.set_status(f"Prediction request error: {str(e)}", is_error=True) self.prediction_var.set("Prediction: Error") self.confidence_var.set("Confidence: ---%") except json.JSONDecodeError: self.set_status("Invalid response from server during prediction.", is_error=True) self.prediction_var.set("Prediction: Bad Response") self.confidence_var.set("Confidence: ---%") except Exception as e: self.set_status(f"Unexpected error during prediction: {str(e)}", is_error=True) self.prediction_var.set("Prediction: Exception") self.confidence_var.set("Confidence: ---%") finally: # Re-enable buttons after prediction attempt finishes or fails # Only re-enable if still connected theoretically if self.server_running: self.test_button.state(['!disabled']) self.collect_button.state(['!disabled']) def cleanup(self): """Properly closes the application and stops threads.""" print("Cleaning up...") self.live_update_active = False # Signal thread to stop if self.update_thread and self.update_thread.is_alive(): self.update_thread.join(timeout=0.5) # Wait briefly for thread to exit self.root.destroy() if __name__ == "__main__": root = tk.Tk() app = AccelerometerClientGUI(root) # Handle window close event gracefully root.protocol("WM_DELETE_WINDOW", app.cleanup) root.mainloop() # --- END OF FILE client.py ---
On the Raspberry Pi, navigate to your project directory:
cd /path/to/your/project
Start the server:
python3 server.py
Note your Raspberry Pi's IP address:
hostname -I
On your computer, run the client:
python client.py
Enter the Raspberry Pi's IP address and click "Connect"
circle
, figure_eight
, idle
, shake
, tap
):
Parameter | Value | Purpose |
---|---|---|
Filter | Low-pass 3Hz | Removes high-frequency noise |
FFT length | 256 | Resolution of frequency analysis |
Overlap | 0.45 | Smooths transitions between windows |
Noise floor | -60dB | Sets baseline for noise filtering |
server.py
) is running on the Pi with the model.eim
fileTrain a model using images from the COCO dataset to distinguish between a "Person" and "No Person". Then, deploy and test this model for live inference using a Raspberry Pi and Camera Module. This approach leverages a large, diverse dataset for training while still providing hands-on deployment experience. Enable the camera interface: Optional Verification: On your Raspberry Pi terminal, run: Obtain Images: Upload to Edge Impulse: Option B (CLI Uploader - faster for many files): Upload using commands like: Refer to Edge Impulse Uploader Docs for details. Verify Data: Check the Data Acquisition dashboard to ensure you have a roughly balanced number of images for both Transfer the Run live classification on the Raspberry Pi: Test the Deployed Model:5. Project 3: Simple Image Recognition (Using COCO Dataset for Training)
Goal
Hardware Requirements (for Deployment & Inference)
Software Requirements
pycocotools
can help find/download images.
5.1. Hardware Setup (Prepare the RPi for Inference)
sudo raspi-config
# Navigate to: Interface Options -> Camera -> Enable
# Reboot when prompted
libcamera-still -o test.jpg
# Check that test.jpg was created and shows camera view
5.2. Install Edge Impulse Linux CLI (Prepare the RPi for Inference)
# Install dependencies and the CLI tool
wget -q -O - https://cdn.edgeimpulse.com/firmware/linux/setup.sh | sudo bash
5.3. Create Project in Edge Impulse
5.4. Acquire and Upload Training Data (Using COCO)
Click to expand/collapse coco.py code
import json, os, shutil
from pycocotools.coco import COCO
# paths (val instead of train)
ANNOTATIONS = 'annotations/instances_val2017.json'
IMAGES_DIR = 'val2017'
OUT_PERSON = 'val_person'
OUT_NO_PERSON = 'val_no_person'
os.makedirs(OUT_PERSON, exist_ok=True)
os.makedirs(OUT_NO_PERSON, exist_ok=True)
coco = COCO(ANNOTATIONS)
person_cat_id = coco.getCatIds(catNms=['person'])[0]
person_img_ids = coco.getImgIds(catIds=[person_cat_id])
all_img_ids = set(coco.getImgIds())
no_person_img_ids = all_img_ids - set(person_img_ids)
# how many to grab
N = 200
person_imgs = coco.loadImgs(person_img_ids[:N])
no_person_imgs = coco.loadImgs(list(no_person_img_ids)[:N])
def copy_images(imgs, out_dir):
for img in imgs:
src = os.path.join(IMAGES_DIR, img['file_name'])
dst = os.path.join(out_dir, img['file_name'])
if os.path.exists(src):
shutil.copy(src, dst)
copy_images(person_imgs, OUT_PERSON)
copy_images(no_person_imgs, OUT_NO_PERSON)
print("done.")
person
class: Download a diverse set of images clearly containing people (different scales, lighting, backgrounds, partial views). Aim for at least 100-200 images if possible.no_person
class: This requires careful selection. Download images from COCO categories that do not contain people (e.g., landscapes, inanimate objects like 'traffic light', 'bench', 'car', indoor scenes without people). Crucially, ensure these images are visually diverse and somewhat representative of backgrounds where you might expect not to see a person in your RPi deployment scenario. Aim for a similar number of images as the person
class.coco_person
and coco_no_person
).
person
images.person
as the label.no_person
images, entering no_person
as the label.
npm install -g edge-impulse-cli
edge-impulse-cli login
edge-impulse-uploader --category training --label person coco_person/*.jpg
edge-impulse-uploader --category training --label no_person coco_no_person/*.jpg
# Adjust paths and wildcards as needed
# You might need separate commands for training/testing splits
person
and no_person
classes in your training and testing sets.5.5. Design Your Impulse
96
x 96
pixels
Grayscale
(recommended) or RGB
5.6. Configure Image Block (Feature Generation)
5.7. Train the Model
5.8. Test Your Model (Using COCO Test Data)
5.9. Deploy and Run Live Inference on Raspberry Pi
Click to expand/collapse camera_infer.py code
import cv2
import numpy as np
from edge_impulse_linux.runner import ImpulseRunner
MODEL_PATH = './model.eim' # .eim file
runner = ImpulseRunner(MODEL_PATH)
runner.init()
cam = cv2.VideoCapture(0) # default camera
try:
while True:
ret, frame = cam.read()
if not ret:
print('no frame')
break
rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
resized = cv2.resize(rgb, (96, 96)) # resize
features = np.array(resized, dtype=np.float32).flatten()
res = runner.classify(features)
out = res['result']['classification']
print('prediction:', out)
# display window with result
label = max(out, key=out.get)
cv2.putText(frame, label, (10, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
cv2.imshow('camera', frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
finally:
cam.release()
cv2.destroyAllWindows()
runner.stop()
.eim
file..eim
file to your Raspberry Pi (which you prepared in steps 5.1 & 5.2).
# Ensure no other Edge Impulse process is running
edge-impulse-linux-runner --model-file your-downloaded-model-name.eim
Advanced Applications
For more complex applications, explore the Edge Impulse Python SDK for Linux.
Data is King: The quality and quantity of your training data directly impact your model's performance. Always aim for balanced datasets across all your classes. Data augmentation techniques significantly improve robustness - our audio project showed how noise injection and time masking helped the model recognize keywords in varied environments.
Performance Trade-offs: When deploying on embedded devices, you'll constantly balance three competing factors: accuracy vs. latency vs. resource usage (RAM/Flash). Our experiments demonstrated that quantized (Int8) models provided up to 3.6x speedups with only 1-2% accuracy reduction compared to full-precision (Float32) models.
Model Selection: Start with simpler architectures before attempting complex ones. Edge Impulse's EON Tuner can automatically search for optimal models that balance performance and resource constraints for your specific device target.
Platform Matters: Our benchmarks showed dramatic performance differences between the simulated Cortex-M4F microcontroller and the Raspberry Pi 4. The RPi offered 8-10x faster inference times for the same model. Always validate on your actual target hardware when possible.
Iterative Process: Embedded ML development is fundamentally iterative. Build your initial model, test thoroughly, analyze weak points (like our issues with the "shake" gesture), refine your approach, and test again.
RPi I2C Issues: If i2cdetect -y 1
doesn't show your MPU6050 (typically at address 0x68), first check your physical wiring connections. Ensure I2C is properly enabled in raspi-config
and that the sensor is receiving power (the MPU6050 should have a small LED lit when powered).
EI Data Forwarder/Daemon Issues: If your data isn't uploading to Edge Impulse, verify your API keys and HMAC keys are correctly entered in your code. Check your network connection stability and firewall settings that might block the connections.
Low Accuracy: Consider whether you need more training data or more diverse data. For motion recognition, try collecting samples at different speeds and intensities. For audio, collect samples in different acoustic environments. Experiment with different model architectures - sometimes a deeper model helps, but not always.
Model Doesn't Fit: If your model exceeds resource constraints, try quantization first (reduces memory by ~4x). If still too large, simplify the model architecture by reducing layer count or neuron count. For image projects, reducing input resolution (e.g., from 96x96 to 64x64) dramatically reduces memory needs. Edge Impulse's EON Tuner can automatically find optimized models.
Gesture Recognition Confusion: If specific motions are being confused (like our "shake" and "circle" overlap), try refining your feature extraction parameters. For spectral analysis, adjusting the FFT length and filter settings can better separate similar motions.
This guide walks you through the practical process of creating three working embedded machine learning solutions using Edge Impulse. We've covered audio keyword detection, motion gesture recognition, and image classification - three of the most common applications in the embedded ML space.
Through these hands-on projects, you've experienced firsthand how modern tools like Edge Impulse are democratizing what was once a highly specialized field. Just a few years ago, running neural networks on microcontrollers required deep expertise in both ML and embedded systems. Today, with the right tools and approach, students and hobbyists can create sophisticated intelligent devices.
The embedded ML landscape is expanding rapidly, with applications ranging from predictive maintenance in industrial settings to smart agriculture, healthcare monitoring, and countless IoT innovations. The techniques and workflow you've learned in this guide provide a solid foundation for exploring these possibilities.
As resource constraints continue to loosen with more powerful microcontrollers and more efficient models, the line between what's possible on tiny devices versus in the cloud will continue to blur. By understanding the fundamental trade-offs and techniques demonstrated in this guide, you're well-equipped to participate in this exciting field.
Thanks for reading this guide.