A Practical Guide for Students
(Project Practice 2 2024/2025 - [Juan Manuel Martínez Moreno])

Ever wondered how your smartwatch knows how you're sleeping, walking, or jogging, all without draining your battery or sending your data to the cloud? That's Embedded Machine Learning (ML), smart decisions made by tiny, efficient computers right inside your devices. Embedded ML is everywhere and expanding insanely fast. By 2030, there'll be around 31.4 billion connected devices by 2030 according to Transforma Insights, all smarter and more self-contained than ever. This is why you should care:
Running sophisticated ML models on these tiny devices isn't easy. The constraints are harsh:
Edge Impulse is a leading MLOps platform specifically designed for edge devices. It is a poweful web-based toolkit that simplifies the entire process of creating embedded ML solutions.
Why use Edge Impulse?
The typical workflow in Edge Impulse looks like this:
This guide aims to help through the practical steps of using Edge Impulse to build your own simple embedded ML projects. You will learn:
We will cover three main hands-on projects:
Train a model to recognize the keyword "Good Morning" using simulated deployment on a Cortex-M4F microcontroller. This simulation approach allows you to understand performance characteristics on constrained targets without requiring the actual hardware.
goodmorning - Samples of you saying "Good Morning"noise - Background noise samplesunknown - Other spoken words/phrasesnoise and unknown labels to supplement your recordings
| Parameter | Value | Purpose | 
|---|---|---|
| Frame Length | 0.02s | Captures fine acoustic details | 
| Stride | 0.01s | Overlapping windows for smoother analysis | 
| FFT Length | 256 | Frequency resolution | 
| Number of Filters | 40 | Number of frequency bands | 
| Noise Floor | -60dB | Baseline noise level | 
| Pre-emphasis | 0.98 | Enhances higher frequencies | 
| Normalization Window | Per inference window | Normalizes each sample individually | 
| Number of Coefficients | 13 | Dimensions of audio features | 




Train a model to recognize different hand motions (circle, eight, idle, shake, tap) using an MPU6050 accelerometer connected to a Raspberry Pi, with inference running directly on the Pi.
Make the following connections:
| Raspberry Pi Pin | MPU6050 Pin | 
|---|---|
| 3.3V | VCC | 
| GND | GND | 
| SDA1 (GPIO 2) | SDA | 
| SCL1 (GPIO 3) | SCL | 
Enable I2C on the Raspberry Pi:
sudo raspi-config # Navigate to: Interfacing Options → I2C → Enable # Reboot your Pi
Verify the connection:
i2cdetect -y 1 # Use 0 instead of 1 for older Pi models
You should see the MPU6050 address (typically 0x68) in the output grid.
Install required dependencies:
sudo apt update && sudo apt install python3-pip pip3 install flask smbus2 requests numpy edge_impulse_linux
Get Edge Impulse API Keys:
Create the server script (server.py):
server.py on your Raspberry Pi.HMAC_KEY and API_KEY near the top of the script with the actual keys you copied from Edge Impulse.Click to expand/collapse server.py code
# --- START OF FILE server.py ---
import json
import time
import hmac
import hashlib
import requests
import re
import os
import numpy as np
import uuid
from edge_impulse_linux.runner import ImpulseRunner
from smbus2 import SMBus
from flask import Flask, jsonify, request
app = Flask(__name__)
current_dir = os.path.dirname(os.path.abspath(__file__))
model_path = os.path.join(current_dir, 'model.eim')
# ===============================================================
# == IMPORTANT: REPLACE WITH YOUR EDGE IMPULSE KEYS ==
# ===============================================================
HMAC_KEY = "YOUR_HMAC_KEY_HERE" # Replace with your HMAC Key
API_KEY = "YOUR_API_KEY_HERE"    # Replace with your API Key
# ===============================================================
runner = None
try:
    runner = ImpulseRunner(model_path)
    runner.init()
    print(f"Model '{model_path}' initialized successfully.")
except Exception as e:
    print(f"ERROR: Could not initialize model '{model_path}': {e}")
    print("Ensure the model file exists in the same directory as the server script.")
class MPU6050:
    def __init__(self, bus_num=1, address=0x68):
        try:
            self.bus = SMBus(bus_num)
            self.address = address
            # Wake up the MPU6050
            self.bus.write_byte_data(self.address, 0x6B, 0)
            print("MPU6050 initialized successfully.")
        except Exception as e:
            print(f"ERROR: Could not initialize MPU6050: {e}")
            print("Check I2C connection and configuration (sudo raspi-config).")
            # Optionally re-raise or handle appropriately
            raise
    def read_raw_data(self, addr):
        # Read two bytes (high and low) from the specified address
        high = self.bus.read_byte_data(self.address, addr)
        low = self.bus.read_byte_data(self.address, addr + 1)
        # Combine high and low bytes into a 16-bit signed value
        value = (high << 8) | low
        # Convert to signed value if necessary
        if value > 32768:
            value = value - 65536
        return value
    def get_acceleration(self):
        # Read raw accelerometer data
        acc_x_raw = self.read_raw_data(0x3B)
        acc_y_raw = self.read_raw_data(0x3D)
        acc_z_raw = self.read_raw_data(0x3F)
        # Convert raw data to m/s^2 (using default sensitivity AFS_SEL=0 -> 16384 LSB/g)
        # 1 g = 9.81 m/s^2
        acc_x = (acc_x_raw / 16384.0) * 9.81
        acc_y = (acc_y_raw / 16384.0) * 9.81
        acc_z = (acc_z_raw / 16384.0) * 9.81
        return [acc_x, acc_y, acc_z]
try:
    mpu = MPU6050()
except Exception as e:
    # If MPU fails to init, set it to None so endpoints can handle it
    mpu = None
    print("Continuing without MPU6050 sensor...")
@app.route('/readings', methods=['GET'])
def get_readings():
    """Get current accelerometer readings"""
    if mpu is None:
         return jsonify({'success': False, 'error': 'MPU6050 not initialized'}), 500
    try:
        readings = mpu.get_acceleration()
        return jsonify({
            'success': True,
            'data': {
                'x': readings[0],
                'y': readings[1],
                'z': readings[2]
            }
        })
    except Exception as e:
        print(f"Error reading MPU6050: {e}")
        return jsonify({'success': False, 'error': f'Error reading sensor: {str(e)}'}), 500
@app.route('/predict', methods=['GET'])
def predict():
    """Get current accelerometer reading and run inference"""
    if mpu is None:
        return jsonify({'success': False, 'error': 'MPU6050 not initialized'}), 500
    if runner is None:
        return jsonify({'success': False, 'error': 'Edge Impulse model runner not initialized'}), 500
    try:
        samples = []
        print("Starting to collect samples for prediction...")
        # Assuming the model expects 100 samples based on previous context.
        # Adjust this based on your Edge Impulse window size and frequency.
        # Example: 2 second window at 62.5Hz -> 125 samples
        # Example: If your window is 2000ms and interval is 16ms (62.5Hz):
        num_samples_expected = runner.model_info.get('model_parameters', {}).get('input_features_count', 100) # Example, adjust if needed
        sampling_interval_seconds = 0.016 # Approx 62.5 Hz
        for _ in range(num_samples_expected):
            readings = mpu.get_acceleration()
            # Ensure readings are floats
            readings = [float(x) for x in readings]
            samples.append(readings)
            time.sleep(sampling_interval_seconds) # Match approx sampling frequency used in training
        print(f"Collected {len(samples)} samples.")
        # Flatten the list of lists into a single list of features
        # Expected format is typically [ax1, ay1, az1, ax2, ay2, az2, ...]
        features = [item for sublist in samples for item in sublist]
        print(f"Feature vector length: {len(features)}")
        # Run inference
        print("Running inference...")
        try:
            result = runner.classify(features)
            print(f"Raw inference result: {result}")
            # Process the result
            if result and 'result' in result and 'classification' in result['result']:
                classification = result['result']['classification']
                # Find the prediction with the highest score
                best_prediction = max(classification.items(), key=lambda item: item[1])
                motion_type, confidence = best_prediction
                return jsonify({
                    'success': True,
                    'prediction': str(motion_type),
                    'confidence': float(confidence * 100), # Convert to percentage
                    'raw_scores': {str(k): float(v) for k, v in classification.items()}, # Send all scores
                    #'raw_data': samples # Optional: send the collected data back
                })
            else:
                 print("Inference result structure not as expected.")
                 return jsonify({'success': False, 'error': 'Invalid inference result format'}), 500
        except Exception as e:
            print(f"Inference error: {str(e)}")
            return jsonify({
                'success': False,
                'error': f'Inference error: {str(e)}'
            }), 500
    except Exception as e:
        print(f"Prediction endpoint error: {str(e)}")
        return jsonify({
            'success': False,
            'error': f'Error during prediction process: {str(e)}'
        }), 500
@app.route('/collect', methods=['POST'])
def collect_data():
    """Collect and send data to Edge Impulse"""
    if mpu is None:
         return jsonify({'success': False, 'error': 'MPU6050 not initialized'}), 500
    if not HMAC_KEY or not API_KEY or "YOUR_" in HMAC_KEY or "YOUR_" in API_KEY:
         return jsonify({'success': False, 'error': 'API_KEY or HMAC_KEY not configured in server.py'}), 400
    try:
        data = request.get_json()
        if not data:
            return jsonify({'success': False, 'error': 'No JSON data received'}), 400
        label = data.get('label', 'collected_data') # Get label from request, default if not provided
        sample_length_seconds = data.get('duration', 2)
        interval_ms = data.get('interval', 16) # Default to ~62.5 Hz
        if interval_ms <= 0:
            return jsonify({'success': False, 'error': 'Interval must be positive'}), 400
        num_samples = int((sample_length_seconds * 1000) / interval_ms)
        print(f"Collecting {num_samples} samples over {sample_length_seconds}s with label '{label}'...")
        values_list = []
        start_time = time.time()
        for i in range(num_samples):
            loop_start = time.time()
            try:
                values = mpu.get_acceleration()
                values_list.append([float(v) for v in values]) # Ensure floats
            except Exception as e:
                print(f"Warning: Failed to read sensor on sample {i+1}: {e}")
                # Decide how to handle: skip, fill with zeros, etc. Here we skip.
                # If skipping, adjust sleep time calculation or it might finish too early.
                # For simplicity, we'll still sleep, but the sample count will be lower.
            # Precise sleep to maintain interval
            elapsed = time.time() - loop_start
            sleep_time = (interval_ms / 1000.0) - elapsed
            if sleep_time > 0:
                time.sleep(sleep_time)
        end_time = time.time()
        print(f"Collection finished in {end_time - start_time:.2f} seconds. Collected {len(values_list)} actual samples.")
        # Get a unique device ID (MAC address)
        try:
             device_mac = ':'.join(re.findall('..', '%012x' % uuid.getnode()))
        except:
             device_mac = "unknown_pi_" + str(uuid.uuid4())[:8] # Fallback
        # Prepare payload for Edge Impulse Ingestion API
        payload = {
            "protected": {
                "ver": "v1",
                "alg": "HS256",
                "iat": int(time.time()) # Use integer timestamp
            },
            "signature": "", # Placeholder, calculated later
            "payload": {
                "device_name": device_mac,
                "device_type": "RASPBERRY_PI_MPU6050", # Be descriptive
                "interval_ms": interval_ms,
                "sensors": [
                    {"name": "accX", "units": "m/s2"},
                    {"name": "accY", "units": "m/s2"},
                    {"name": "accZ", "units": "m/s2"}
                ],
                "values": values_list
            }
        }
        # Calculate HMAC signature
        # The signature is calculated over the JSON string of the payload
        # (with the signature field empty or not present initially)
        encoded_payload = json.dumps(payload).encode('utf-8')
        signature = hmac.new(
            bytes(HMAC_KEY, 'utf-8'),
            msg=encoded_payload,
            digestmod=hashlib.sha256
        ).hexdigest()
        # Update the signature in the payload
        payload['signature'] = signature
        # Send data to Edge Impulse
        print("Uploading data to Edge Impulse...")
        try:
            res = requests.post(
                url='https://ingestion.edgeimpulse.com/api/training/data',
                data=json.dumps(payload), # Send the final payload with signature
                headers={
                    'Content-Type': 'application/json',
                    'x-file-name': f"{label}.{int(time.time())}", # Include label in filename
                    'x-api-key': API_KEY
                },
                timeout=30 # Add a timeout
            )
            print(f"Upload response status code: {res.status_code}")
            print(f"Upload response text: {res.text}")
            if res.status_code == 200:
                return jsonify({
                    'success': True,
                    'message': f'Data for label "{label}" collected and uploaded successfully.'
                })
            else:
                return jsonify({
                    'success': False,
                    'error': f'Upload failed. Status code: {res.status_code}. Response: {res.text}'
                }), res.status_code # Return server error code
        except requests.exceptions.RequestException as req_e:
            print(f"Error uploading data: {req_e}")
            return jsonify({'success': False, 'error': f'Failed to connect to Edge Impulse ingestion API: {str(req_e)}'}), 500
    except Exception as e:
        print(f"Error in /collect endpoint: {e}")
        # Include traceback for debugging if possible/desired
        # import traceback
        # print(traceback.format_exc())
        return jsonify({'success': False, 'error': f'An unexpected error occurred: {str(e)}'}), 500
if __name__ == '__main__':
    print("Starting Flask server...")
    # Make sure the host is accessible from your client machine
    app.run(host='0.0.0.0', port=5000)
# --- END OF FILE server.py ---
Download the Edge Impulse model:
int8) or unquantized (float32) model. Quantized is usually faster and smaller..eim file.model.eim file in the same directory as server.py on your Raspberry Pi. The server script expects it there.Install dependencies on your computer (where you will run the GUI):
pip install requests tkinter
Create the client script (client.py):
client.py.Click to expand/collapse client.py code
# --- START OF FILE client.py ---
import tkinter as tk
from tkinter import ttk, messagebox
import requests
import threading
import time
import json
class AccelerometerClientGUI:
    def __init__(self, root):
        self.root = root
        self.root.title("Remote Accelerometer Control")
        # Use a themed style
        style = ttk.Style()
        # Try different themes if 'clam' isn't ideal on your OS
        # Available themes: style.theme_names() -> ('winnative', 'clam', 'alt', 'default', 'classic', 'vista', 'xpnative')
        try:
            style.theme_use('clam') # 'clam', 'alt', 'default', etc. might look better depending on OS
        except tk.TclError:
            print("Clam theme not available, using default.")
            style.theme_use(style.theme_names()[0]) # Fallback to the first available theme
        self.server_running = False
        self.api_base_url = None
        self.live_update_active = False # Flag to control the live update thread
        # Main frame for padding
        main_frame = ttk.Frame(root, padding="20 20 20 20")
        main_frame.pack(expand=True, fill='both')
        # Connection Frame
        conn_frame = ttk.LabelFrame(main_frame, text="Server Connection", padding="15 10 15 10")
        conn_frame.pack(fill='x', padx=5, pady=(0, 10))
        conn_frame.grid_columnconfigure(1, weight=1) # Make entry expand
        ttk.Label(conn_frame, text="IP/Hostname:").grid(row=0, column=0, sticky=tk.W, pady=5, padx=(0, 5))
        self.hostname_var = tk.StringVar(value="192.168.1.100") # Default IP, change if needed
        hostname_entry = ttk.Entry(conn_frame, textvariable=self.hostname_var, width=25)
        hostname_entry.grid(row=0, column=1, sticky='ew', padx=5, pady=5)
        ttk.Label(conn_frame, text="Port:").grid(row=1, column=0, sticky=tk.W, pady=5, padx=(0, 5))
        self.port_var = tk.StringVar(value="5000")
        port_entry = ttk.Entry(conn_frame, textvariable=self.port_var, width=10)
        port_entry.grid(row=1, column=1, sticky='w', padx=5, pady=5) # Align left
        self.connect_button = ttk.Button(conn_frame, text="Connect", command=self.connect_to_server)
        self.connect_button.grid(row=2, column=0, columnspan=2, pady=(10, 5))
        # Settings Frame for Collection
        settings_frame = ttk.LabelFrame(main_frame, text="Data Collection Settings", padding="15 10 15 10")
        settings_frame.pack(fill='x', padx=5, pady=10)
        settings_frame.grid_columnconfigure(1, weight=1) # Make entries expand
        ttk.Label(settings_frame, text="Label:").grid(row=0, column=0, sticky=tk.W, pady=5, padx=(0, 5))
        self.label_var = tk.StringVar(value="idle") # Default label
        label_entry = ttk.Entry(settings_frame, textvariable=self.label_var, width=15)
        label_entry.grid(row=0, column=1, sticky='w', padx=5, pady=5)
        ttk.Label(settings_frame, text="Duration (s):").grid(row=1, column=0, sticky=tk.W, pady=5, padx=(0, 5))
        self.duration_var = tk.StringVar(value="2")
        duration_entry = ttk.Entry(settings_frame, textvariable=self.duration_var, width=10)
        duration_entry.grid(row=1, column=1, sticky='w', padx=5, pady=5)
        ttk.Label(settings_frame, text="Interval (ms):").grid(row=2, column=0, sticky=tk.W, pady=5, padx=(0, 5))
        self.interval_var = tk.StringVar(value="16") # Approx 62.5 Hz
        interval_entry = ttk.Entry(settings_frame, textvariable=self.interval_var, width=10)
        interval_entry.grid(row=2, column=1, sticky='w', padx=5, pady=5)
        self.collect_button = ttk.Button(settings_frame, text="Start Collection", command=self.start_collection)
        self.collect_button.grid(row=3, column=0, columnspan=2, pady=(10, 5))
        self.collect_button.state(['disabled']) # Initially disabled
        # Live Data Frame
        live_frame = ttk.LabelFrame(main_frame, text="Live Data", padding="15 10 15 10")
        live_frame.pack(fill='x', padx=5, pady=10)
        self.acc_x_var = tk.StringVar(value="X: --- m/s²")
        self.acc_y_var = tk.StringVar(value="Y: --- m/s²")
        self.acc_z_var = tk.StringVar(value="Z: --- m/s²")
        style.configure('DataLabel.TLabel', font=('Helvetica', 11)) # Slightly smaller font
        ttk.Label(live_frame, textvariable=self.acc_x_var, style='DataLabel.TLabel').pack(pady=2, anchor='w')
        ttk.Label(live_frame, textvariable=self.acc_y_var, style='DataLabel.TLabel').pack(pady=2, anchor='w')
        ttk.Label(live_frame, textvariable=self.acc_z_var, style='DataLabel.TLabel').pack(pady=2, anchor='w')
        # Live Prediction Frame
        prediction_frame = ttk.LabelFrame(main_frame, text="Live Prediction", padding="15 10 15 10")
        prediction_frame.pack(fill='x', padx=5, pady=10)
        self.prediction_var = tk.StringVar(value="Prediction: ---")
        self.confidence_var = tk.StringVar(value="Confidence: ---%")
        style.configure('Prediction.TLabel', font=('Helvetica', 12, 'bold'))
        ttk.Label(prediction_frame, textvariable=self.prediction_var, style='Prediction.TLabel').pack(pady=3, anchor='w')
        ttk.Label(prediction_frame, textvariable=self.confidence_var, style='DataLabel.TLabel').pack(pady=3, anchor='w')
        self.test_button = ttk.Button(prediction_frame, text="Test Prediction", command=self.test_prediction)
        self.test_button.pack(pady=(10, 5))
        self.test_button.state(['disabled']) # Initially disabled
        # Status Bar Frame
        status_frame = ttk.Frame(root, relief=tk.SUNKEN, borderwidth=1)
        status_frame.pack(side=tk.BOTTOM, fill='x')
        self.status_var = tk.StringVar(value="Status: Not connected")
        self.status_label = ttk.Label(status_frame, textvariable=self.status_var, anchor=tk.W, padding="5 2 5 2")
        self.status_label.pack(fill='x')
        self.update_thread = None
    def set_status(self, message, is_error=False):
        """Updates the status bar"""
        prefix = "Status: "
        if is_error:
            prefix = "Error: "
            # Optionally change label color for errors
            # self.status_label.config(foreground='red')
        # else:
            # self.status_label.config(foreground='black') # Reset color
        self.status_var.set(prefix + message)
        print(prefix + message) # Also print to console
    def connect_to_server(self):
        """Attempts to connect to the Flask server."""
        hostname = self.hostname_var.get().strip()
        port = self.port_var.get().strip()
        if not hostname or not port:
            self.set_status("Hostname and Port cannot be empty.", is_error=True)
            return
        try:
            port_num = int(port)
            self.api_base_url = f"http://{hostname}:{port_num}"
            self.set_status(f"Connecting to {self.api_base_url}...")
            self.connect_button.state(['disabled']) # Disable button during attempt
            # Run connection test in a separate thread to keep UI responsive
            thread = threading.Thread(target=self._test_connection)
            thread.daemon = True
            thread.start()
        except ValueError:
            self.set_status("Invalid Port number.", is_error=True)
            self.connect_button.state(['!disabled'])
        except Exception as e:
            self.set_status(f"Connection setup error: {str(e)}", is_error=True)
            self.connect_button.state(['!disabled'])
    def _test_connection(self):
        """Background task to test server connection."""
        try:
            # Use the /readings endpoint as a simple health check
            response = requests.get(f"{self.api_base_url}/readings", timeout=5)
            response.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx)
            if response.status_code == 200:
                data = response.json()
                if data.get('success'):
                    self.server_running = True
                    self.set_status("Connected to server successfully.")
                    self.collect_button.state(['!disabled'])
                    self.test_button.state(['!disabled'])
                    # Start live data updates if not already running
                    if not self.live_update_active:
                        self.live_update_active = True
                        self.update_thread = threading.Thread(target=self.update_live_data)
                        self.update_thread.daemon = True
                        self.update_thread.start()
                else:
                    # Server responded but with success=False
                    errmsg = data.get('error', 'Unknown server error')
                    self.set_status(f"Server connection issue: {errmsg}", is_error=True)
                    self._handle_disconnection()
            else:
                # Should be caught by raise_for_status, but handle just in case
                self.set_status(f"Server responded with status {response.status_code}", is_error=True)
                self._handle_disconnection()
        except requests.exceptions.ConnectionError:
            self.set_status("Connection refused. Is the server running?", is_error=True)
            self._handle_disconnection()
        except requests.exceptions.Timeout:
            self.set_status("Connection timed out.", is_error=True)
            self._handle_disconnection()
        except requests.exceptions.RequestException as e:
            self.set_status(f"Connection error: {str(e)}", is_error=True)
            self._handle_disconnection()
        except json.JSONDecodeError:
            self.set_status("Invalid response from server (not JSON).", is_error=True)
            self._handle_disconnection()
        finally:
            # Re-enable connect button only if connection failed
            if not self.server_running:
                self.connect_button.state(['!disabled'])
    def _handle_disconnection(self):
        """Resets state upon disconnection or connection failure."""
        self.server_running = False
        self.live_update_active = False # Signal the update thread to stop
        self.collect_button.state(['disabled'])
        self.test_button.state(['disabled'])
        self.acc_x_var.set("X: --- m/s²")
        self.acc_y_var.set("Y: --- m/s²")
        self.acc_z_var.set("Z: --- m/s²")
        self.prediction_var.set("Prediction: ---")
        self.confidence_var.set("Confidence: ---%")
        self.api_base_url = None
    def update_live_data(self):
        """Periodically fetches live data from the server."""
        while self.live_update_active: # Check the flag
            if self.server_running and self.api_base_url:
                try:
                    response = requests.get(f"{self.api_base_url}/readings", timeout=1)
                    if response.status_code == 200:
                        data = response.json()
                        if data.get('success'):
                            readings = data['data']
                            self.acc_x_var.set(f"X: {readings.get('x', 0.0):.2f} m/s²")
                            self.acc_y_var.set(f"Y: {readings.get('y', 0.0):.2f} m/s²")
                            self.acc_z_var.set(f"Z: {readings.get('z', 0.0):.2f} m/s²")
                        else:
                            # Server reported an issue getting readings
                            self.set_status(f"Server error getting readings: {data.get('error', 'Unknown')}", is_error=True)
                            # Consider stopping live updates if this persists
                    else:
                        self.set_status(f"Live data request failed (Status: {response.status_code})", is_error=True)
                        # Consider stopping live updates
                except requests.exceptions.RequestException as e:
                    # Don't flood status, just log it maybe, or set a general 'connection lost' status
                    # print(f"Live data update failed: {e}") # Log to console instead of status bar
                    # If connection is lost, the main connect logic should handle resetting state
                    pass
                except json.JSONDecodeError:
                    print("Error decoding live data JSON") # Log to console
            else:
                # If server is not marked as running, stop trying
                break
            time.sleep(0.1) # Update roughly 10 times per second
        print("Live update thread stopped.")
    def start_collection(self):
        """Sends a request to the server to start data collection."""
        if not self.server_running or not self.api_base_url:
            self.set_status("Not connected to server.", is_error=True)
            return
        try:
            label = self.label_var.get().strip()
            duration_str = self.duration_var.get().strip()
            interval_str = self.interval_var.get().strip()
            if not label:
                messagebox.showerror("Input Error", "Label cannot be empty.")
                return
            if not duration_str or not interval_str:
                 messagebox.showerror("Input Error", "Duration and Interval cannot be empty.")
                 return
            duration = float(duration_str)
            interval = int(interval_str)
            if duration <= 0 or interval <= 0:
                 messagebox.showerror("Input Error", "Duration and Interval must be positive.")
                 return
            self.collect_button.state(['disabled']) # Disable button during collection
            self.test_button.state(['disabled']) # Disable prediction during collection
            self.set_status(f"Collecting data for '{label}' ({duration}s)...")
            # Run collection in a separate thread
            thread = threading.Thread(target=self._collect_data_task, args=(label, duration, interval))
            thread.daemon = True
            thread.start()
        except ValueError:
             messagebox.showerror("Input Error", "Duration must be a number and Interval must be an integer.")
             self.collect_button.state(['!disabled']) # Re-enable if input was bad
             self.test_button.state(['!disabled'])
        except Exception as e:
            self.set_status(f"Error starting collection: {str(e)}", is_error=True)
            self.collect_button.state(['!disabled'])
            self.test_button.state(['!disabled'])
    def _collect_data_task(self, label, duration, interval):
        """Background task for data collection."""
        try:
            # Timeout slightly longer than collection duration + some buffer
            request_timeout = duration + 10
            response = requests.post(
                f"{self.api_base_url}/collect",
                json={
                    'label': label,
                    'duration': duration,
                    'interval': interval
                },
                timeout=request_timeout
            )
            response.raise_for_status() # Check for HTTP errors
            if response.status_code == 200:
                data = response.json()
                if data.get('success'):
                    self.set_status(data.get('message', "Data collection successful."))
                else:
                    self.set_status(f"Collection failed: {data.get('error', 'Unknown server error')}", is_error=True)
            # raise_for_status handles non-200 codes here
        except requests.exceptions.Timeout:
            self.set_status(f"Collection request timed out after {request_timeout}s.", is_error=True)
        except requests.exceptions.RequestException as e:
            self.set_status(f"Collection request error: {str(e)}", is_error=True)
        except json.JSONDecodeError:
             self.set_status("Invalid response from server during collection.", is_error=True)
        except Exception as e:
            self.set_status(f"Unexpected error during collection: {str(e)}", is_error=True)
        finally:
            # Re-enable buttons after collection attempt finishes or fails
            self.collect_button.state(['!disabled'])
            self.test_button.state(['!disabled'])
    def test_prediction(self):
        """Requests a prediction from the server."""
        if not self.server_running or not self.api_base_url:
            self.set_status("Not connected to server.", is_error=True)
            return
        self.test_button.state(['disabled']) # Disable button during prediction
        self.collect_button.state(['disabled']) # Disable collection during prediction
        self.set_status("Running prediction...")
        self.prediction_var.set("Prediction: Running...")
        self.confidence_var.set("Confidence: ---%")
        # Run prediction in a separate thread
        thread = threading.Thread(target=self._predict_task)
        thread.daemon = True
        thread.start()
    def _predict_task(self):
        """Background task for running prediction."""
        try:
            # Timeout for prediction (e.g., 10 seconds)
            response = requests.get(f"{self.api_base_url}/predict", timeout=10)
            response.raise_for_status() # Check for HTTP errors
            if response.status_code == 200:
                data = response.json()
                if data.get('success'):
                    self.prediction_var.set(f"Prediction: {data.get('prediction', 'N/A')}")
                    self.confidence_var.set(f"Confidence: {data.get('confidence', 0.0):.1f}%")
                    self.set_status("Prediction successful.")
                else:
                    error_msg = data.get('error', 'Unknown server error')
                    self.set_status(f"Prediction failed: {error_msg}", is_error=True)
                    self.prediction_var.set("Prediction: Failed")
                    self.confidence_var.set("Confidence: ---%")
            # raise_for_status handles non-200 codes
        except requests.exceptions.Timeout:
            self.set_status("Prediction request timed out.", is_error=True)
            self.prediction_var.set("Prediction: Timeout")
            self.confidence_var.set("Confidence: ---%")
        except requests.exceptions.RequestException as e:
            self.set_status(f"Prediction request error: {str(e)}", is_error=True)
            self.prediction_var.set("Prediction: Error")
            self.confidence_var.set("Confidence: ---%")
        except json.JSONDecodeError:
             self.set_status("Invalid response from server during prediction.", is_error=True)
             self.prediction_var.set("Prediction: Bad Response")
             self.confidence_var.set("Confidence: ---%")
        except Exception as e:
            self.set_status(f"Unexpected error during prediction: {str(e)}", is_error=True)
            self.prediction_var.set("Prediction: Exception")
            self.confidence_var.set("Confidence: ---%")
        finally:
            # Re-enable buttons after prediction attempt finishes or fails
            # Only re-enable if still connected theoretically
            if self.server_running:
                self.test_button.state(['!disabled'])
                self.collect_button.state(['!disabled'])
    def cleanup(self):
        """Properly closes the application and stops threads."""
        print("Cleaning up...")
        self.live_update_active = False # Signal thread to stop
        if self.update_thread and self.update_thread.is_alive():
            self.update_thread.join(timeout=0.5) # Wait briefly for thread to exit
        self.root.destroy()
if __name__ == "__main__":
    root = tk.Tk()
    app = AccelerometerClientGUI(root)
    # Handle window close event gracefully
    root.protocol("WM_DELETE_WINDOW", app.cleanup)
    root.mainloop()
# --- END OF FILE client.py ---

On the Raspberry Pi, navigate to your project directory:
cd /path/to/your/project
Start the server:
python3 server.py
Note your Raspberry Pi's IP address:
hostname -I
On your computer, run the client:
python client.py
Enter the Raspberry Pi's IP address and click "Connect"
circle, figure_eight, idle, shake, tap):
| Parameter | Value | Purpose | 
|---|---|---|
| Filter | Low-pass 3Hz | Removes high-frequency noise | 
| FFT length | 256 | Resolution of frequency analysis | 
| Overlap | 0.45 | Smooths transitions between windows | 
| Noise floor | -60dB | Sets baseline for noise filtering | 



server.py) is running on the Pi with the model.eim file
Train a model using images from the COCO dataset to distinguish between a "Person" and "No Person". Then, deploy and test this model for live inference using a Raspberry Pi and Camera Module. This approach leverages a large, diverse dataset for training while still providing hands-on deployment experience. Enable the camera interface: Optional Verification: On your Raspberry Pi terminal, run: Obtain Images: Upload to Edge Impulse: Option B (CLI Uploader - faster for many files): Upload using commands like: Refer to Edge Impulse Uploader Docs for details. Verify Data: Check the Data Acquisition dashboard to ensure you have a roughly balanced number of images for both  Transfer the  Run live classification on the Raspberry Pi: Test the Deployed Model:5. Project 3: Simple Image Recognition (Using COCO Dataset for Training)
Goal
Hardware Requirements (for Deployment & Inference)
Software Requirements
pycocotools can help find/download images.
5.1. Hardware Setup (Prepare the RPi for Inference)
sudo raspi-config
# Navigate to: Interface Options -> Camera -> Enable
# Reboot when prompted
libcamera-still -o test.jpg
# Check that test.jpg was created and shows camera view
5.2. Install Edge Impulse Linux CLI (Prepare the RPi for Inference)
# Install dependencies and the CLI tool
wget -q -O - https://cdn.edgeimpulse.com/firmware/linux/setup.sh | sudo bash
5.3. Create Project in Edge Impulse
5.4. Acquire and Upload Training Data (Using COCO)
Click to expand/collapse coco.py code
import json, os, shutil
from pycocotools.coco import COCO
# paths (val instead of train)
ANNOTATIONS = 'annotations/instances_val2017.json'
IMAGES_DIR = 'val2017'
OUT_PERSON = 'val_person'
OUT_NO_PERSON = 'val_no_person'
os.makedirs(OUT_PERSON, exist_ok=True)
os.makedirs(OUT_NO_PERSON, exist_ok=True)
coco = COCO(ANNOTATIONS)
person_cat_id = coco.getCatIds(catNms=['person'])[0]
person_img_ids = coco.getImgIds(catIds=[person_cat_id])
all_img_ids = set(coco.getImgIds())
no_person_img_ids = all_img_ids - set(person_img_ids)
# how many to grab
N = 200
person_imgs = coco.loadImgs(person_img_ids[:N])
no_person_imgs = coco.loadImgs(list(no_person_img_ids)[:N])
def copy_images(imgs, out_dir):
    for img in imgs:
        src = os.path.join(IMAGES_DIR, img['file_name'])
        dst = os.path.join(out_dir, img['file_name'])
        if os.path.exists(src):
            shutil.copy(src, dst)
copy_images(person_imgs, OUT_PERSON)
copy_images(no_person_imgs, OUT_NO_PERSON)
print("done.")
person class: Download a diverse set of images clearly containing people (different scales, lighting, backgrounds, partial views). Aim for at least 100-200 images if possible.no_person class: This requires careful selection. Download images from COCO categories that do not contain people (e.g., landscapes, inanimate objects like 'traffic light', 'bench', 'car', indoor scenes without people). Crucially, ensure these images are visually diverse and somewhat representative of backgrounds where you might expect not to see a person in your RPi deployment scenario. Aim for a similar number of images as the person class.coco_person and coco_no_person).
person images.person as the label.no_person images, entering no_person as the label.
npm install -g edge-impulse-cliedge-impulse-cli login
edge-impulse-uploader --category training --label person coco_person/*.jpg
edge-impulse-uploader --category training --label no_person coco_no_person/*.jpg
# Adjust paths and wildcards as needed
# You might need separate commands for training/testing splits
person and no_person classes in your training and testing sets.5.5. Design Your Impulse
96 x 96 pixels
Grayscale (recommended) or RGB
5.6. Configure Image Block (Feature Generation)
5.7. Train the Model
5.8. Test Your Model (Using COCO Test Data)
5.9. Deploy and Run Live Inference on Raspberry Pi
Click to expand/collapse camera_infer.py code
import cv2
import numpy as np
from edge_impulse_linux.runner import ImpulseRunner
MODEL_PATH = './model.eim'  # .eim file
runner = ImpulseRunner(MODEL_PATH)
runner.init()
cam = cv2.VideoCapture(0)  # default camera
try:
    while True:
        ret, frame = cam.read()
        if not ret:
            print('no frame')
            break
        rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        resized = cv2.resize(rgb, (96, 96))  # resize
        features = np.array(resized, dtype=np.float32).flatten()
        res = runner.classify(features)
        out = res['result']['classification']
        print('prediction:', out)
        # display window with result
        label = max(out, key=out.get)
        cv2.putText(frame, label, (10, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
        cv2.imshow('camera', frame)
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break
finally:
    cam.release()
    cv2.destroyAllWindows()
    runner.stop()
.eim file..eim file to your Raspberry Pi (which you prepared in steps 5.1 & 5.2).
# Ensure no other Edge Impulse process is running
edge-impulse-linux-runner --model-file your-downloaded-model-name.eim
Advanced Applications
For more complex applications, explore the Edge Impulse Python SDK for Linux.
Data is King: The quality and quantity of your training data directly impact your model's performance. Always aim for balanced datasets across all your classes. Data augmentation techniques significantly improve robustness - our audio project showed how noise injection and time masking helped the model recognize keywords in varied environments.
Performance Trade-offs: When deploying on embedded devices, you'll constantly balance three competing factors: accuracy vs. latency vs. resource usage (RAM/Flash). Our experiments demonstrated that quantized (Int8) models provided up to 3.6x speedups with only 1-2% accuracy reduction compared to full-precision (Float32) models.
Model Selection: Start with simpler architectures before attempting complex ones. Edge Impulse's EON Tuner can automatically search for optimal models that balance performance and resource constraints for your specific device target.
Platform Matters: Our benchmarks showed dramatic performance differences between the simulated Cortex-M4F microcontroller and the Raspberry Pi 4. The RPi offered 8-10x faster inference times for the same model. Always validate on your actual target hardware when possible.
Iterative Process: Embedded ML development is fundamentally iterative. Build your initial model, test thoroughly, analyze weak points (like our issues with the "shake" gesture), refine your approach, and test again.
RPi I2C Issues: If i2cdetect -y 1 doesn't show your MPU6050 (typically at address 0x68), first check your physical wiring connections. Ensure I2C is properly enabled in raspi-config and that the sensor is receiving power (the MPU6050 should have a small LED lit when powered).
EI Data Forwarder/Daemon Issues: If your data isn't uploading to Edge Impulse, verify your API keys and HMAC keys are correctly entered in your code. Check your network connection stability and firewall settings that might block the connections.
Low Accuracy: Consider whether you need more training data or more diverse data. For motion recognition, try collecting samples at different speeds and intensities. For audio, collect samples in different acoustic environments. Experiment with different model architectures - sometimes a deeper model helps, but not always.
Model Doesn't Fit: If your model exceeds resource constraints, try quantization first (reduces memory by ~4x). If still too large, simplify the model architecture by reducing layer count or neuron count. For image projects, reducing input resolution (e.g., from 96x96 to 64x64) dramatically reduces memory needs. Edge Impulse's EON Tuner can automatically find optimized models.
Gesture Recognition Confusion: If specific motions are being confused (like our "shake" and "circle" overlap), try refining your feature extraction parameters. For spectral analysis, adjusting the FFT length and filter settings can better separate similar motions.
This guide walks you through the practical process of creating three working embedded machine learning solutions using Edge Impulse. We've covered audio keyword detection, motion gesture recognition, and image classification - three of the most common applications in the embedded ML space.
Through these hands-on projects, you've experienced firsthand how modern tools like Edge Impulse are democratizing what was once a highly specialized field. Just a few years ago, running neural networks on microcontrollers required deep expertise in both ML and embedded systems. Today, with the right tools and approach, students and hobbyists can create sophisticated intelligent devices.
The embedded ML landscape is expanding rapidly, with applications ranging from predictive maintenance in industrial settings to smart agriculture, healthcare monitoring, and countless IoT innovations. The techniques and workflow you've learned in this guide provide a solid foundation for exploring these possibilities.
As resource constraints continue to loosen with more powerful microcontrollers and more efficient models, the line between what's possible on tiny devices versus in the cloud will continue to blur. By understanding the fundamental trade-offs and techniques demonstrated in this guide, you're well-equipped to participate in this exciting field.
Thanks for reading this guide.