LAB13: Distributed Data

Edge-to-Cloud Data Pipelines

PDF Textbook Reference

For detailed theoretical foundations, mathematical proofs, and algorithm derivations, see Chapter 13: Edge-to-Cloud Data Ingestion and Storage in the PDF textbook.

The PDF chapter includes: - Detailed database schema design theory for time-series data - Complete REST API architecture and HTTP protocol analysis - In-depth coverage of SQL optimization for IoT workloads - Mathematical analysis of edge-cloud trade-offs and latency models - Comprehensive distributed system consistency and CAP theorem

Open In Colab

Open In Colab

Download Notebook

Learning Objectives

By the end of this lab you should be able to:

  • Design a simple relational schema for IoT time-series data (devices + readings)
  • Implement basic and aggregate SQL queries for recent and historical sensor data
  • Build a small REST API for ingesting sensor readings from edge devices
  • Decide between single-row vs batch inserts for different workloads
  • Connect an Arduino/ESP32 to your API and reason about edge vs cloud storage trade-offs

Theory Summary

Edge-to-Cloud Data Pipelines

Edge devices generate continuous sensor streams, but RAM is limited and data needs to persist for historical analysis, compliance, and cross-device queries. This lab introduces distributed data architectures where edge devices send data to databases via REST APIs.

The fundamental architecture involves three layers:

  1. Edge Devices (Arduino, ESP32) collect sensor data locally
  2. Edge Gateway (Raspberry Pi) runs a Flask API that receives HTTP requests
  3. Database (SQLite, MySQL, PostgreSQL) stores time-series data persistently

Unlike purely cloud-based systems, this edge-first approach keeps data close to its source, reducing latency, improving reliability during network outages, lowering bandwidth costs, and enhancing privacy. Data can optionally sync to cloud services for long-term storage and analytics.

Database Schema Design

Well-designed schemas separate devices (metadata: name, type, location) from sensor_readings (time-series data: temperature, humidity, timestamps). A foreign key relationship ensures referential integrity—you can’t insert readings for non-existent devices.

Indexes on (device_id, timestamp) are critical for performance. Without indexes, queries scan every row linearly (O(n)). With proper indexes, queries complete in milliseconds even with millions of rows. However, indexes slow down INSERT operations, so design them carefully for high-frequency IoT data.

Batch vs Single Insert Performance

Single inserts commit each row individually—simple but slow (hundreds of inserts per second). Batch inserts use executemany() to commit hundreds of rows in one transaction—10-100× faster. For edge gateways buffering data from multiple sensors or replaying data after network recovery, batch inserts are essential for acceptable performance.

Key Concepts at a Glance

Core Concepts
  • Edge-First Storage: Keep data local for low latency, reliability, and privacy; sync to cloud optionally
  • Foreign Keys: Enforce referential integrity between devices and readings tables
  • Composite Indexes: CREATE INDEX ON readings(device_id, timestamp) speeds up time-range queries
  • Batch Inserts: Use executemany() for 10-100× performance improvement over single execute()
  • REST API: Flask endpoints accept sensor data via HTTP POST and return JSON responses
  • Query Optimization: Use LIMIT, WHERE with indexed columns, and aggregate functions (AVG, MIN, MAX) with GROUP BY

Common Pitfalls

Mistakes to Avoid
Forgetting Database Indexes
Your schema works fine with 1,000 rows but grinds to a halt with 1,000,000. Always add indexes on columns used in WHERE, JOIN, and ORDER BY clauses. For IoT data, CREATE INDEX ON readings(device_id, timestamp) is essential.
Ignoring Batch Insert Performance
Sending one HTTP request per sensor reading wastes bandwidth and time. Buffer 10-100 readings and send as a batch. Use executemany() instead of multiple execute() calls.
Using Unique MACs on Arduino Ethernet Shields
Multiple Arduino boards with the default MAC address cause network conflicts. Always change the MAC address: byte mac[] = { 0xDE, 0xAD, 0xBE, 0xEF, 0xFE, 0xED }; (modify the last bytes).
Missing API Error Handling
Your API should return proper HTTP status codes: 200 (OK), 400 (Bad Request for missing fields), 500 (Server Error). Validate all inputs before inserting into the database.
Not Using F() Macro on Arduino
String literals consume precious RAM on Arduino. Use F("string") to store them in flash memory instead.

Quick Reference

Key SQL Patterns

-- Latest reading per device
SELECT * FROM sensor_readings
WHERE device_id = 1
ORDER BY timestamp DESC
LIMIT 1;

-- Hourly averages
SELECT
  strftime('%Y-%m-%d %H', timestamp) AS hour,
  AVG(temperature) AS avg_temp,
  COUNT(*) AS num_readings
FROM sensor_readings
WHERE device_id = 1
  AND timestamp >= datetime('now', '-24 hours')
GROUP BY hour;

-- Anomaly detection (values > 2σ from mean)
SELECT * FROM sensor_readings
WHERE temperature > (
  SELECT AVG(temperature) + 2*STDEV(temperature)
  FROM sensor_readings
);

Flask API Endpoints

Method Endpoint Purpose Example
POST /api/reading Insert single reading ?device_id=1&temperature=22.5&humidity=65
GET /api/readings/<id> Get recent readings /api/readings/1?limit=100
GET /api/stats/<id> Get aggregated stats /api/stats/1?period=hourly

Arduino HTTP Request Format

client.print(F("GET /api/reading?device_id="));
client.print(DEVICE_ID);
client.print(F("&temperature="));
client.print(temperature, 1);  // 1 decimal place
client.print(F("&humidity="));
client.print(humidity, 1);
client.println(F(" HTTP/1.1"));
client.print(F("Host: "));
client.println(server);
client.println(F("Connection: close"));
client.println();  // Blank line ends headers

Storage Trade-offs

Approach Latency Offline Support Capacity Query Power Cost Privacy
Edge-Only (SQLite on Pi) Low (ms) Excellent Limited (GB) Moderate Low Excellent
Cloud-Only (MySQL on server) High (100+ ms) None Large (TB) High Ongoing Poor
Hybrid (Edge + cloud sync) Low Good Large High Moderate Good

When to Use Edge-First Storage: - Real-time control systems (latency critical) - Privacy-sensitive applications (medical, personal data) - Unreliable network connectivity (remote locations) - Bandwidth-constrained deployments

Link to Related Simulation: Explore database query patterns and performance interactively with our SQL Query Visualizer.


Related Concepts in PDF Chapter 13
  • Section 13.2: Database schema design with ERD diagrams
  • Section 13.3: Single vs batch insert performance benchmarks
  • Section 13.4: SQL aggregation patterns for time-series data
  • Section 13.5: Flask API implementation details
  • Section 13.6: Arduino Ethernet Shield configuration

Self-Assessment Checkpoints

Test your understanding before proceeding to the exercises.

Answer:

SELECT device_id,
       AVG(temperature) AS avg_temp,
       COUNT(*) AS num_readings
FROM sensor_readings
WHERE timestamp >= datetime('now', '-24 hours')
GROUP BY device_id;

This uses datetime('now', '-24 hours') for the time window, GROUP BY device_id to aggregate per device, and AVG(temperature) to compute the mean. Adding COUNT(*) shows how many readings contributed to each average. For proper performance with millions of rows, ensure you have an index: CREATE INDEX idx_device_time ON sensor_readings(device_id, timestamp);

Answer: Without an index, the database performs a full table scan checking every row sequentially (O(n) complexity). For 1 million rows, this means 1 million comparisons. With an index on (device_id, timestamp), the database uses a B-tree structure to jump directly to relevant rows (O(log n) complexity). For 1 million rows, this requires only ~20 comparisons (log₂(1,000,000) ≈ 20). The trade-off: indexes speed up SELECT queries but slow down INSERT operations because the index must be updated. For IoT data with high write rates, carefully choose which columns to index—typically device_id and timestamp.

Answer: Batch inserts are 10-100× faster. Single inserts: Each INSERT commits a transaction with full ACID guarantees (disk sync, lock acquisition, index update). For 1000 inserts, this means 1000 disk syncs = seconds. Batch inserts using executemany(): All 1000 rows inserted in ONE transaction with ONE disk sync = milliseconds. Example code:

# Slow (1000 transactions)
for reading in readings:
    cursor.execute("INSERT INTO readings VALUES (?, ?, ?)", reading)

# Fast (1 transaction)
cursor.executemany("INSERT INTO readings VALUES (?, ?, ?)", readings)

For edge gateways buffering data from sensors, batch inserts are essential for acceptable performance.

Answer: Network failures, API errors, or missing error handling are likely causes. Debugging steps: (1) Check Arduino serial output for HTTP response codes—are you seeing 200 OK or 500 errors? (2) Check Flask logs for error messages or missing requests, (3) Add retry logic with exponential backoff in Arduino code, (4) Verify timestamp mismatches—if your SQL query filters by timestamp but Arduino sends Unix epoch while DB expects ISO format, rows might be inserted but not queried correctly, (5) Test network reliability—WiFi dropouts cause silent failures. Solution: Buffer readings on Arduino (SD card or EEPROM) and replay during network recovery.

Answer: Use edge-first when: (1) Latency is critical (<10ms queries for real-time control), (2) Privacy/compliance requires data stay local (HIPAA, GDPR), (3) Network is unreliable (remote locations, mobile deployments), (4) Bandwidth costs are high or limited. Use cloud-only when: (1) Need centralized multi-device queries and analytics, (2) Data volume exceeds edge storage (TB+), (3) Guaranteed reliable internet, (4) Want managed backups and scaling. Hybrid approach (best for IoT): Store recent data (1-7 days) on edge for low-latency access, sync historical data to cloud for long-term analytics. This balances latency, reliability, and capacity.

Distributed Systems Patterns

The following sections demonstrate essential distributed systems patterns for edge analytics, building on the database and API foundations covered earlier.

MQTT Message Queue Example

MQTT (Message Queue Telemetry Transport) provides lightweight publish/subscribe messaging ideal for distributed IoT systems. Unlike HTTP’s request/response model, MQTT enables asynchronous communication where edge devices publish sensor data to topics and cloud services subscribe to receive updates.

Why MQTT for Edge Devices?
  • Low overhead: 2-byte headers vs 200+ bytes for HTTP
  • Persistent connections: Avoids repeated TCP handshakes
  • Quality of Service: QoS 0 (best effort), QoS 1 (at-least-once), QoS 2 (exactly-once)
  • Last Will Testament: Automatic notification if device disconnects unexpectedly
  • Battery-friendly: Reduces power consumption for wireless sensors

Python MQTT Publisher (Edge Device)

This example simulates a temperature sensor publishing readings to an MQTT broker:

import paho.mqtt.client as mqtt
import time
import random
import json

# MQTT Configuration
BROKER = "localhost"  # Use "test.mosquitto.org" for public broker
PORT = 1883
TOPIC = "sensors/temperature/room1"
CLIENT_ID = f"edge-sensor-{random.randint(1000, 9999)}"

# Callback when connection succeeds
def on_connect(client, userdata, flags, rc):
    if rc == 0:
        print(f"Connected to MQTT broker at {BROKER}")
    else:
        print(f"Connection failed with code {rc}")

# Callback when message is published
def on_publish(client, userdata, mid):
    print(f"Message {mid} published")

# Initialize MQTT client
client = mqtt.Client(CLIENT_ID)
client.on_connect = on_connect
client.on_publish = on_publish

try:
    # Connect to broker
    client.connect(BROKER, PORT, keepalive=60)
    client.loop_start()  # Start background thread for network loop

    # Publish sensor readings every 5 seconds
    for i in range(20):
        # Simulate temperature reading with drift
        temperature = 20.0 + random.gauss(0, 2.0)  # Mean 20°C, std 2°C
        humidity = 50.0 + random.gauss(0, 10.0)

        # Create JSON payload
        payload = {
            "device_id": "sensor_room1",
            "timestamp": time.time(),
            "temperature": round(temperature, 2),
            "humidity": round(humidity, 2)
        }

        # Publish with QoS 1 (at-least-once delivery)
        result = client.publish(TOPIC, json.dumps(payload), qos=1)

        if result.rc == mqtt.MQTT_ERR_SUCCESS:
            print(f"Published: {payload}")
        else:
            print(f"Publish failed: {result.rc}")

        time.sleep(5)

except KeyboardInterrupt:
    print("Interrupted by user")
finally:
    client.loop_stop()
    client.disconnect()
    print("Disconnected from MQTT broker")

Python MQTT Subscriber (Cloud Service)

This subscriber receives sensor data and stores it in a database:

import paho.mqtt.client as mqtt
import json
import sqlite3
from datetime import datetime

BROKER = "localhost"
PORT = 1883
TOPIC = "sensors/temperature/#"  # Wildcard: subscribe to all temperature topics
CLIENT_ID = "cloud-aggregator"

# Initialize database
conn = sqlite3.connect("mqtt_data.db")
cursor = conn.cursor()
cursor.execute("""
    CREATE TABLE IF NOT EXISTS sensor_readings (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        device_id TEXT,
        timestamp REAL,
        temperature REAL,
        humidity REAL,
        received_at TEXT
    )
""")
conn.commit()

def on_connect(client, userdata, flags, rc):
    if rc == 0:
        print(f"Subscriber connected to {BROKER}")
        client.subscribe(TOPIC)
        print(f"Subscribed to topic: {TOPIC}")
    else:
        print(f"Connection failed: {rc}")

def on_message(client, userdata, msg):
    """Handle incoming MQTT messages"""
    try:
        # Parse JSON payload
        payload = json.loads(msg.payload.decode())
        device_id = payload["device_id"]
        timestamp = payload["timestamp"]
        temperature = payload["temperature"]
        humidity = payload["humidity"]

        # Insert into database
        cursor.execute("""
            INSERT INTO sensor_readings
            (device_id, timestamp, temperature, humidity, received_at)
            VALUES (?, ?, ?, ?, ?)
        """, (device_id, timestamp, temperature, humidity,
              datetime.now().isoformat()))
        conn.commit()

        print(f"Stored: {device_id} | Temp: {temperature}°C | Humidity: {humidity}%")

        # Check for anomalies
        if temperature > 30 or temperature < 10:
            print(f"⚠️  ALERT: Abnormal temperature detected: {temperature}°C")

    except Exception as e:
        print(f"Error processing message: {e}")

# Initialize subscriber
client = mqtt.Client(CLIENT_ID)
client.on_connect = on_connect
client.on_message = on_message

try:
    client.connect(BROKER, PORT, keepalive=60)
    print("MQTT Subscriber running. Press Ctrl+C to stop.")
    client.loop_forever()  # Blocking call that processes network traffic
except KeyboardInterrupt:
    print("\nShutting down subscriber...")
finally:
    conn.close()
    client.disconnect()
MQTT Best Practices for Edge
  1. Use unique client IDs: Generate from device MAC address or UUID to prevent conflicts
  2. Set appropriate QoS: Use QoS 0 for non-critical telemetry, QoS 1 for important data
  3. Implement reconnection logic: Handle broker failures with exponential backoff
  4. Use Last Will: Set client.will_set() to notify subscribers if device crashes
  5. Optimize topic structure: Use hierarchy like building/floor/room/sensor_type
  6. Secure with TLS: Use port 8883 with certificates for production deployments

Distributed Inference Coordinator

In multi-device edge deployments, a coordinator pattern distributes inference tasks across multiple edge nodes. This example shows a simple round-robin coordinator that balances workload.

import time
import threading
import queue
from dataclasses import dataclass
from typing import List, Dict
from enum import Enum

class NodeStatus(Enum):
    IDLE = "idle"
    BUSY = "busy"
    OFFLINE = "offline"

@dataclass
class EdgeNode:
    """Represents an edge device capable of running inference"""
    node_id: str
    ip_address: str
    model_type: str  # e.g., "image_classification", "object_detection"
    status: NodeStatus = NodeStatus.IDLE
    last_heartbeat: float = 0.0
    tasks_completed: int = 0

    def is_alive(self, timeout=30):
        """Check if node has sent heartbeat within timeout"""
        return (time.time() - self.last_heartbeat) < timeout

class InferenceCoordinator:
    """Coordinates inference tasks across multiple edge nodes"""

    def __init__(self):
        self.nodes: Dict[str, EdgeNode] = {}
        self.task_queue = queue.Queue()
        self.results_queue = queue.Queue()
        self.next_node_idx = 0  # For round-robin scheduling
        self.lock = threading.Lock()

    def register_node(self, node: EdgeNode):
        """Register a new edge node with the coordinator"""
        with self.lock:
            self.nodes[node.node_id] = node
            node.last_heartbeat = time.time()
            print(f"✓ Registered node: {node.node_id} ({node.model_type})")

    def update_heartbeat(self, node_id: str):
        """Update node's last seen timestamp"""
        with self.lock:
            if node_id in self.nodes:
                self.nodes[node_id].last_heartbeat = time.time()
                self.nodes[node_id].status = NodeStatus.IDLE

    def get_available_nodes(self, model_type: str = None) -> List[EdgeNode]:
        """Get list of available nodes, optionally filtered by model type"""
        available = []
        with self.lock:
            for node in self.nodes.values():
                if node.is_alive() and node.status == NodeStatus.IDLE:
                    if model_type is None or node.model_type == model_type:
                        available.append(node)
        return available

    def assign_task(self, task_data: dict) -> str:
        """
        Assign inference task to next available node (round-robin).
        Returns assigned node_id or None if no nodes available.
        """
        model_type = task_data.get("model_type")
        available = self.get_available_nodes(model_type)

        if not available:
            print(f"⚠️  No available nodes for model type: {model_type}")
            return None

        # Round-robin selection
        with self.lock:
            node = available[self.next_node_idx % len(available)]
            self.next_node_idx += 1
            node.status = NodeStatus.BUSY

            print(f"→ Assigned task to {node.node_id}: {task_data['task_id']}")
            # In real implementation, send task via HTTP/MQTT to node.ip_address
            return node.node_id

    def complete_task(self, node_id: str, task_id: str, result: dict):
        """Mark task as completed and store result"""
        with self.lock:
            if node_id in self.nodes:
                node = self.nodes[node_id]
                node.status = NodeStatus.IDLE
                node.tasks_completed += 1

                result["node_id"] = node_id
                result["task_id"] = task_id
                self.results_queue.put(result)

                print(f"✓ Task {task_id} completed by {node_id}")

    def get_cluster_stats(self) -> dict:
        """Get statistics about the edge cluster"""
        with self.lock:
            total = len(self.nodes)
            alive = sum(1 for n in self.nodes.values() if n.is_alive())
            idle = sum(1 for n in self.nodes.values()
                      if n.is_alive() and n.status == NodeStatus.IDLE)
            busy = sum(1 for n in self.nodes.values()
                      if n.is_alive() and n.status == NodeStatus.BUSY)
            total_tasks = sum(n.tasks_completed for n in self.nodes.values())

            return {
                "total_nodes": total,
                "alive_nodes": alive,
                "idle_nodes": idle,
                "busy_nodes": busy,
                "total_tasks_completed": total_tasks
            }

    def health_check_loop(self, interval=10):
        """Periodically check node health (run in background thread)"""
        while True:
            time.sleep(interval)
            with self.lock:
                for node in self.nodes.values():
                    if not node.is_alive():
                        if node.status != NodeStatus.OFFLINE:
                            print(f"⚠️  Node {node.node_id} went offline")
                            node.status = NodeStatus.OFFLINE

# Example usage
if __name__ == "__main__":
    coordinator = InferenceCoordinator()

    # Register edge nodes
    coordinator.register_node(EdgeNode("edge-pi-01", "192.168.1.101", "image_classification"))
    coordinator.register_node(EdgeNode("edge-pi-02", "192.168.1.102", "image_classification"))
    coordinator.register_node(EdgeNode("edge-jetson-01", "192.168.1.103", "object_detection"))

    # Start health check in background
    health_thread = threading.Thread(target=coordinator.health_check_loop, daemon=True)
    health_thread.start()

    # Simulate task assignment
    for i in range(5):
        task = {
            "task_id": f"task_{i:03d}",
            "model_type": "image_classification",
            "image_path": f"/data/image_{i}.jpg"
        }

        node_id = coordinator.assign_task(task)

        if node_id:
            # Simulate task completion after 2 seconds
            time.sleep(2)
            coordinator.complete_task(node_id, task["task_id"],
                                     {"prediction": "cat", "confidence": 0.95})

    # Print cluster statistics
    stats = coordinator.get_cluster_stats()
    print(f"\nCluster Stats: {stats}")
Coordinator Pitfalls
Single Point of Failure
If the coordinator crashes, the entire cluster becomes unresponsive. Solution: Implement leader election with multiple coordinator replicas (use Raft or Paxos consensus).
Task Queue Overflow
If tasks arrive faster than nodes can process, the queue grows unbounded. Solution: Set max queue size and reject new tasks when full, or use priority queues.
Stale Node State
Network delays can cause outdated status information. Solution: Use pessimistic locking (assume busy until confirmed) and timeout-based state transitions.

Edge Synchronization Example

Edge devices often need to synchronize state or share data with peers. This example demonstrates eventual consistency with conflict resolution.

import time
import hashlib
import json
from typing import Dict, Any, Tuple
from dataclasses import dataclass, field
from datetime import datetime

@dataclass
class VersionedValue:
    """Value with vector clock for tracking causality"""
    value: Any
    timestamp: float
    node_id: str
    version: int = 1

    def to_dict(self):
        return {
            "value": self.value,
            "timestamp": self.timestamp,
            "node_id": self.node_id,
            "version": self.version
        }

class EdgeDataStore:
    """
    Distributed key-value store with eventual consistency.
    Uses last-write-wins conflict resolution.
    """

    def __init__(self, node_id: str):
        self.node_id = node_id
        self.data: Dict[str, VersionedValue] = {}
        self.sync_log = []  # Track synchronization events

    def put(self, key: str, value: Any) -> None:
        """Store a value locally with version metadata"""
        if key in self.data:
            # Increment version for updates
            old_version = self.data[key].version
            new_version = old_version + 1
        else:
            new_version = 1

        versioned = VersionedValue(
            value=value,
            timestamp=time.time(),
            node_id=self.node_id,
            version=new_version
        )

        self.data[key] = versioned
        print(f"[{self.node_id}] PUT {key} = {value} (v{new_version})")

    def get(self, key: str) -> Any:
        """Retrieve a value"""
        if key in self.data:
            return self.data[key].value
        return None

    def get_versioned(self, key: str) -> VersionedValue:
        """Get value with metadata"""
        return self.data.get(key)

    def sync_with_peer(self, peer_store: 'EdgeDataStore') -> Tuple[int, int, int]:
        """
        Synchronize data with another edge node.
        Returns (keys_sent, keys_received, conflicts_resolved)
        """
        keys_sent = 0
        keys_received = 0
        conflicts = 0

        # Get all keys from both stores
        all_keys = set(self.data.keys()) | set(peer_store.data.keys())

        for key in all_keys:
            local_val = self.get_versioned(key)
            peer_val = peer_store.get_versioned(key)

            # Case 1: Key only exists locally - send to peer
            if peer_val is None:
                peer_store.data[key] = local_val
                keys_sent += 1
                print(f"  → Sent {key} to {peer_store.node_id}")

            # Case 2: Key only exists on peer - receive from peer
            elif local_val is None:
                self.data[key] = peer_val
                keys_received += 1
                print(f"  ← Received {key} from {peer_store.node_id}")

            # Case 3: Key exists on both - resolve conflict
            else:
                winner = self._resolve_conflict(key, local_val, peer_val)
                if winner != local_val:
                    self.data[key] = winner
                    conflicts += 1
                    print(f"  ⚡ Conflict on {key}: chose {winner.node_id}'s version")

                if winner != peer_val:
                    peer_store.data[key] = winner

        self.sync_log.append({
            "timestamp": time.time(),
            "peer": peer_store.node_id,
            "sent": keys_sent,
            "received": keys_received,
            "conflicts": conflicts
        })

        return (keys_sent, keys_received, conflicts)

    def _resolve_conflict(self, key: str, val1: VersionedValue,
                         val2: VersionedValue) -> VersionedValue:
        """
        Resolve conflicts using Last-Write-Wins (LWW) strategy.
        Fallback to node_id comparison for ties.
        """
        # Compare timestamps (last write wins)
        if val1.timestamp != val2.timestamp:
            return val1 if val1.timestamp > val2.timestamp else val2

        # Timestamps equal - use node_id as tiebreaker
        return val1 if val1.node_id > val2.node_id else val2

    def get_checksum(self) -> str:
        """Compute checksum of entire datastore for verification"""
        content = json.dumps({k: v.to_dict() for k, v in sorted(self.data.items())})
        return hashlib.md5(content.encode()).hexdigest()

# Example usage: Multi-node edge synchronization
if __name__ == "__main__":
    # Create three edge nodes
    node_a = EdgeDataStore("edge-node-A")
    node_b = EdgeDataStore("edge-node-B")
    node_c = EdgeDataStore("edge-node-C")

    print("=== Initial State ===")
    # Each node collects different sensor data
    node_a.put("temperature", 22.5)
    node_a.put("humidity", 60)

    node_b.put("temperature", 23.0)  # Slightly different reading
    node_b.put("pressure", 1013)

    node_c.put("light_level", 450)

    time.sleep(1)  # Simulate time passing

    print("\n=== Synchronization Round 1: A ↔ B ===")
    sent, recv, conflicts = node_a.sync_with_peer(node_b)
    print(f"Summary: sent={sent}, received={recv}, conflicts={conflicts}")

    print("\n=== Synchronization Round 2: B ↔ C ===")
    sent, recv, conflicts = node_b.sync_with_peer(node_c)
    print(f"Summary: sent={sent}, received={recv}, conflicts={conflicts}")

    print("\n=== Synchronization Round 3: A ↔ C ===")
    sent, recv, conflicts = node_a.sync_with_peer(node_c)
    print(f"Summary: sent={sent}, received={recv}, conflicts={conflicts}")

    print("\n=== Final State ===")
    for node in [node_a, node_b, node_c]:
        print(f"{node.node_id}: {list(node.data.keys())} | Checksum: {node.get_checksum()[:8]}")

    # Verify eventual consistency
    if node_a.get_checksum() == node_b.get_checksum() == node_c.get_checksum():
        print("✓ All nodes converged to same state (eventual consistency achieved)")
    else:
        print("✗ Nodes have different states - consistency not achieved")
Synchronization Strategies

Last-Write-Wins (LWW): Simple but can lose data. Good for sensor readings where latest value matters.

Vector Clocks: Track causality across nodes. Detects concurrent writes but requires more metadata.

CRDTs (Conflict-Free Replicated Data Types): Guarantee convergence without coordination. Use for counters, sets, or registers.

Application-Level Merge: Custom conflict resolution based on domain logic (e.g., take max temperature, merge alert lists).

CAP Theorem Demonstration

The CAP theorem states distributed systems can provide only 2 of 3 guarantees: Consistency, Availability, Partition tolerance. This example demonstrates trade-offs.

import time
import random
import threading
from enum import Enum
from typing import Optional, List

class ConsistencyModel(Enum):
    STRONG = "strong"        # CP: Consistent + Partition-tolerant
    EVENTUAL = "eventual"    # AP: Available + Partition-tolerant

class Node:
    """Represents a distributed database node"""

    def __init__(self, node_id: str, consistency: ConsistencyModel):
        self.node_id = node_id
        self.consistency = consistency
        self.data = {}
        self.is_partitioned = False  # Simulates network partition
        self.connected_nodes: List['Node'] = []
        self.lock = threading.Lock()
        self.write_count = 0
        self.read_count = 0

    def connect_to(self, other_node: 'Node'):
        """Establish bidirectional connection"""
        if other_node not in self.connected_nodes:
            self.connected_nodes.append(other_node)
            other_node.connected_nodes.append(self)

    def partition(self):
        """Simulate network partition"""
        self.is_partitioned = True
        print(f"⚠️  {self.node_id} is now partitioned from the network")

    def heal(self):
        """Restore network connectivity"""
        self.is_partitioned = False
        print(f"✓ {self.node_id} network partition healed")

    def write(self, key: str, value: any, timeout: float = 2.0) -> bool:
        """
        Write data with consistency guarantees.
        Returns True if write succeeds, False otherwise.
        """
        start_time = time.time()

        if self.consistency == ConsistencyModel.STRONG:
            return self._strong_consistency_write(key, value, timeout)
        else:
            return self._eventual_consistency_write(key, value, timeout)

    def _strong_consistency_write(self, key: str, value: any, timeout: float) -> bool:
        """
        CP System: Require majority quorum before write succeeds.
        Sacrifices availability during partitions.
        """
        # Count reachable nodes (including self)
        reachable = [self] + [n for n in self.connected_nodes if not n.is_partitioned]
        total_nodes = len(self.connected_nodes) + 1
        quorum_size = (total_nodes // 2) + 1  # Majority

        if len(reachable) < quorum_size:
            print(f"✗ {self.node_id}: WRITE REJECTED (no quorum: {len(reachable)}/{quorum_size})")
            return False

        # Write to all reachable nodes
        with self.lock:
            self.data[key] = value
            self.write_count += 1

        for node in reachable:
            if node != self:
                with node.lock:
                    node.data[key] = value

        print(f"✓ {self.node_id}: WRITE COMMITTED {key}={value} (quorum: {len(reachable)}/{total_nodes})")
        return True

    def _eventual_consistency_write(self, key: str, value: any, timeout: float) -> bool:
        """
        AP System: Always accept writes locally.
        Sacrifices consistency - eventual convergence when partition heals.
        """
        with self.lock:
            self.data[key] = value
            self.write_count += 1

        print(f"✓ {self.node_id}: WRITE ACCEPTED {key}={value} (local only)")

        # Async replication to reachable nodes (fire-and-forget)
        def replicate():
            for node in self.connected_nodes:
                if not node.is_partitioned:
                    with node.lock:
                        node.data[key] = value

        threading.Thread(target=replicate, daemon=True).start()
        return True

    def read(self, key: str) -> Optional[any]:
        """Read data from local node"""
        with self.lock:
            self.read_count += 1
            value = self.data.get(key)

        print(f"  {self.node_id}: READ {key}{value}")
        return value

    def get_stats(self) -> dict:
        """Return node statistics"""
        return {
            "node_id": self.node_id,
            "consistency": self.consistency.value,
            "partitioned": self.is_partitioned,
            "keys": len(self.data),
            "writes": self.write_count,
            "reads": self.read_count
        }

# Demonstration
def demonstrate_cap_theorem():
    print("=" * 60)
    print("CAP THEOREM DEMONSTRATION")
    print("=" * 60)

    # Scenario 1: CP System (Strong Consistency)
    print("\n### Scenario 1: CP System (Strong Consistency) ###\n")

    node1_cp = Node("Node1-CP", ConsistencyModel.STRONG)
    node2_cp = Node("Node2-CP", ConsistencyModel.STRONG)
    node3_cp = Node("Node3-CP", ConsistencyModel.STRONG)

    node1_cp.connect_to(node2_cp)
    node1_cp.connect_to(node3_cp)
    node2_cp.connect_to(node3_cp)

    # All nodes healthy - writes succeed
    print("1. All nodes healthy:")
    node1_cp.write("temperature", 22.5)

    # Partition node3 - still have quorum (2/3)
    print("\n2. Partition one node (quorum maintained):")
    node3_cp.partition()
    node1_cp.write("temperature", 23.0)

    # Partition node2 - lose quorum (1/3)
    print("\n3. Partition second node (quorum lost):")
    node2_cp.partition()
    success = node1_cp.write("temperature", 24.0)

    print(f"\n→ CP System: Chose CONSISTENCY over AVAILABILITY")
    print(f"   During partition, writes are rejected to prevent inconsistency")

    # Scenario 2: AP System (Eventual Consistency)
    print("\n" + "=" * 60)
    print("### Scenario 2: AP System (Eventual Consistency) ###\n")

    node1_ap = Node("Node1-AP", ConsistencyModel.EVENTUAL)
    node2_ap = Node("Node2-AP", ConsistencyModel.EVENTUAL)
    node3_ap = Node("Node3-AP", ConsistencyModel.EVENTUAL)

    node1_ap.connect_to(node2_ap)
    node1_ap.connect_to(node3_ap)
    node2_ap.connect_to(node3_ap)

    # All nodes healthy
    print("1. All nodes healthy:")
    node1_ap.write("temperature", 22.5)
    time.sleep(0.1)  # Allow replication

    # Partition nodes - writes still accepted
    print("\n2. Partition two nodes (create split-brain):")
    node2_ap.partition()
    node3_ap.partition()

    node1_ap.write("temperature", 23.0)  # Write to node1
    time.sleep(0.1)
    node2_ap.write("temperature", 25.5)  # Concurrent write to node2!

    print("\n3. Read from different nodes (inconsistent):")
    node1_ap.read("temperature")
    node2_ap.read("temperature")

    # Heal partition - eventual consistency
    print("\n4. Heal partition (eventual convergence):")
    node2_ap.heal()
    node3_ap.heal()
    time.sleep(0.2)  # Allow sync

    print(f"\n→ AP System: Chose AVAILABILITY over CONSISTENCY")
    print(f"   During partition, writes accepted but data temporarily inconsistent")
    print(f"   System eventually converges when partition heals")

    # Summary
    print("\n" + "=" * 60)
    print("SUMMARY: CAP THEOREM TRADE-OFFS")
    print("=" * 60)
    print("\nCP System (Strong Consistency):")
    print("  ✓ Always consistent reads")
    print("  ✓ No split-brain scenarios")
    print("  ✗ Unavailable during partitions")
    print("  Use case: Banking, inventory, critical IoT control")

    print("\nAP System (Eventual Consistency):")
    print("  ✓ Always available for reads/writes")
    print("  ✓ Resilient to network failures")
    print("  ✗ Temporary inconsistencies")
    print("  Use case: Sensor telemetry, social media, caching")
    print("\n" + "=" * 60)

if __name__ == "__main__":
    demonstrate_cap_theorem()
CAP Theorem for Edge Analytics

Consistency (C): All nodes see the same data at the same time - Example: All edge devices report identical model version - Requires: Coordination, quorum, locks

Availability (A): Every request receives a response (success or failure) - Example: Sensor readings always accepted, even during network outage - Requires: Local processing, asynchronous replication

Partition Tolerance (P): System continues operating despite network failures - Example: Edge devices work when cloud connection drops - Requires: Distributed architecture (unavoidable in edge systems)

Edge systems typically choose AP: Edge devices must continue operating during network outages, accepting temporary inconsistencies. Synchronization happens when connectivity is restored.

Choosing Your CAP Priorities

Choose CP (Consistency + Partition Tolerance) when: - Data correctness is critical (financial transactions, medical records) - Stale data is dangerous (actuator commands, safety systems) - Acceptable to block operations during failures - Example: Distributed locking for actuator control

Choose AP (Availability + Partition Tolerance) when: - System must always respond (sensor data collection, monitoring) - Temporary staleness is acceptable (cached predictions, telemetry) - Read-heavy workloads with infrequent conflicts - Example: Multi-sensor aggregation, edge analytics dashboards

Hybrid Approach (common in edge systems): - Critical control path: CP (actuators, safety) - Telemetry and monitoring: AP (sensors, logs) - Use application-specific conflict resolution

Practical Exercises

Challenge: Modify the MQTT subscriber to detect when a sensor stops publishing (hasn’t sent data in 60 seconds) and send an alert.

Hints: - Track last seen timestamp per device_id - Use a background thread that checks timestamps every 30 seconds - Publish alerts to a separate topic: alerts/sensor_offline

Solution approach:

device_last_seen = {}
TIMEOUT = 60

def check_timeouts():
    while True:
        time.sleep(30)
        current_time = time.time()
        for device_id, last_seen in device_last_seen.items():
            if current_time - last_seen > TIMEOUT:
                alert = {"device_id": device_id, "status": "offline",
                        "last_seen": last_seen}
                client.publish("alerts/sensor_offline", json.dumps(alert))

Challenge: Extend the InferenceCoordinator to support task priorities (HIGH, MEDIUM, LOW). High-priority tasks should be assigned before low-priority ones.

Hints: - Use queue.PriorityQueue instead of regular queue - Store tasks as tuples: (priority, timestamp, task_data) - Lower priority number = higher priority (HIGH=0, MEDIUM=1, LOW=2)

Solution approach:

from queue import PriorityQueue

class Priority(Enum):
    HIGH = 0
    MEDIUM = 1
    LOW = 2

task_queue = PriorityQueue()
task_queue.put((Priority.HIGH.value, time.time(), task_data))
priority, timestamp, task = task_queue.get()  # Gets highest priority first

::: {.callout-note collapse=“true” title=“Exercise 3: Add vector clocks to edge synchronization”> Challenge: Replace timestamp-based conflict resolution with vector clocks to detect concurrent writes.

Hints: - Vector clock is a dict mapping node_id → version number - Increment own version on each write - Merge by taking max of each node’s version - Concurrent if neither vector dominates the other

Solution approach:

@dataclass
class VectorClock:
    clocks: Dict[str, int] = field(default_factory=dict)

    def increment(self, node_id: str):
        self.clocks[node_id] = self.clocks.get(node_id, 0) + 1

    def dominates(self, other: 'VectorClock') -> bool:
        """Returns True if self is strictly newer than other"""
        has_greater = False
        for node_id in set(self.clocks.keys()) | set(other.clocks.keys()):
            if self.clocks.get(node_id, 0) < other.clocks.get(node_id, 0):
                return False
            if self.clocks.get(node_id, 0) > other.clocks.get(node_id, 0):
                has_greater = True
        return has_greater

:::

Try It Yourself: Executable Python Examples

The following code blocks demonstrate distributed data management patterns for edge analytics. All examples are self-contained and runnable.

1. SQLite Database Operations

Comprehensive SQLite operations for IoT sensor data:

Code
import sqlite3
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from datetime import datetime, timedelta
import time

# Create in-memory database for demonstration
print("=== Setting Up SQLite Database ===\n")
conn = sqlite3.connect(':memory:')  # Use ':memory:' for demo, or 'sensors.db' for persistent
cursor = conn.cursor()

# Create devices table
cursor.execute("""
    CREATE TABLE devices (
        device_id INTEGER PRIMARY KEY,
        name TEXT NOT NULL,
        type TEXT NOT NULL,
        location TEXT,
        installed_date TEXT
    )
""")

# Create sensor_readings table with foreign key
cursor.execute("""
    CREATE TABLE sensor_readings (
        reading_id INTEGER PRIMARY KEY AUTOINCREMENT,
        device_id INTEGER NOT NULL,
        timestamp TEXT NOT NULL,
        temperature REAL,
        humidity REAL,
        FOREIGN KEY (device_id) REFERENCES devices(device_id)
    )
""")

# Create index for fast queries
cursor.execute("""
    CREATE INDEX idx_device_timestamp
    ON sensor_readings(device_id, timestamp)
""")

conn.commit()
print("✓ Tables created: devices, sensor_readings")
print("✓ Index created on (device_id, timestamp)")

# Insert sample devices
devices_data = [
    (1, 'Sensor_Room1', 'DHT22', 'Living Room', '2024-01-01'),
    (2, 'Sensor_Room2', 'DHT22', 'Bedroom', '2024-01-01'),
    (3, 'Sensor_Outdoor', 'BME280', 'Garden', '2024-01-15'),
]

cursor.executemany("""
    INSERT INTO devices (device_id, name, type, location, installed_date)
    VALUES (?, ?, ?, ?, ?)
""", devices_data)
conn.commit()
print(f"✓ Inserted {len(devices_data)} devices")

# Generate synthetic sensor data
print("\n=== Generating Synthetic Sensor Data ===")
np.random.seed(42)

readings = []
base_time = datetime.now() - timedelta(days=7)  # Start 7 days ago

for device_id in [1, 2, 3]:
    base_temp = 20.0 if device_id in [1, 2] else 15.0  # Indoor vs outdoor
    base_humidity = 50.0

    for i in range(1000):  # 1000 readings per device
        timestamp = base_time + timedelta(minutes=i*10)  # Every 10 minutes
        temperature = base_temp + np.random.randn() * 2.0  # Gaussian noise
        humidity = base_humidity + np.random.randn() * 5.0

        readings.append((
            device_id,
            timestamp.isoformat(),
            round(temperature, 2),
            round(humidity, 2)
        ))

print(f"Generated {len(readings)} synthetic readings")

# Insert readings (single inserts for comparison later)
start_time = time.perf_counter()
for reading in readings[:100]:  # Insert first 100 one by one
    cursor.execute("""
        INSERT INTO sensor_readings (device_id, timestamp, temperature, humidity)
        VALUES (?, ?, ?, ?)
    """, reading)
conn.commit()
single_insert_time = time.perf_counter() - start_time
print(f"Single inserts (100 rows): {single_insert_time*1000:.1f} ms")

# Batch insert remaining data
start_time = time.perf_counter()
cursor.executemany("""
    INSERT INTO sensor_readings (device_id, timestamp, temperature, humidity)
    VALUES (?, ?, ?, ?)
""", readings[100:])
conn.commit()
batch_insert_time = time.perf_counter() - start_time
print(f"Batch insert ({len(readings)-100} rows): {batch_insert_time*1000:.1f} ms")

speedup = (single_insert_time * len(readings[100:])/100) / batch_insert_time
print(f"Batch insert speedup: {speedup:.1f}x faster\n")

# Query examples
print("=== SQL Query Examples ===\n")

# 1. Latest reading per device
print("1. Latest reading per device:")
query1 = """
    SELECT d.name, d.location, sr.temperature, sr.humidity, sr.timestamp
    FROM sensor_readings sr
    JOIN devices d ON sr.device_id = d.device_id
    WHERE sr.reading_id IN (
        SELECT MAX(reading_id)
        FROM sensor_readings
        GROUP BY device_id
    )
"""
df_latest = pd.read_sql_query(query1, conn)
print(df_latest.to_string(index=False))

# 2. Hourly averages for last 24 hours
print("\n2. Hourly averages (last 24 hours) for Device 1:")
query2 = """
    SELECT
        strftime('%Y-%m-%d %H:00', timestamp) AS hour,
        COUNT(*) AS num_readings,
        ROUND(AVG(temperature), 2) AS avg_temp,
        ROUND(AVG(humidity), 2) AS avg_humidity,
        ROUND(MIN(temperature), 2) AS min_temp,
        ROUND(MAX(temperature), 2) AS max_temp
    FROM sensor_readings
    WHERE device_id = 1
      AND timestamp >= datetime('now', '-24 hours')
    GROUP BY hour
    ORDER BY hour DESC
    LIMIT 10
"""
df_hourly = pd.read_sql_query(query2, conn)
print(df_hourly.to_string(index=False))

# 3. Anomaly detection (values > 2 std from mean)
print("\n3. Anomaly detection (temperature outliers):")
query3 = """
    WITH stats AS (
        SELECT
            AVG(temperature) AS mean_temp,
            AVG(temperature * temperature) - AVG(temperature) * AVG(temperature) AS var_temp
        FROM sensor_readings
        WHERE device_id = 1
    )
    SELECT
        sr.timestamp,
        sr.temperature,
        ROUND(s.mean_temp, 2) AS mean,
        ROUND(SQRT(s.var_temp), 2) AS std_dev,
        ROUND((sr.temperature - s.mean_temp) / SQRT(s.var_temp), 2) AS z_score
    FROM sensor_readings sr, stats s
    WHERE sr.device_id = 1
      AND ABS(sr.temperature - s.mean_temp) > 2 * SQRT(s.var_temp)
    ORDER BY ABS(sr.temperature - s.mean_temp) DESC
    LIMIT 5
"""
df_anomalies = pd.read_sql_query(query3, conn)
if not df_anomalies.empty:
    print(df_anomalies.to_string(index=False))
else:
    print("No anomalies detected (data within 2σ)")

# Visualization
print("\n=== Visualizing Data ===")

# Query all data for Device 1
query_viz = """
    SELECT timestamp, temperature, humidity
    FROM sensor_readings
    WHERE device_id = 1
    ORDER BY timestamp
"""
df_viz = pd.read_sql_query(query_viz, conn)
df_viz['timestamp'] = pd.to_datetime(df_viz['timestamp'])

fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(14, 8))

# Temperature over time
ax1.plot(df_viz['timestamp'], df_viz['temperature'], linewidth=1, color='#ff6b6b', alpha=0.7)
ax1.axhline(y=df_viz['temperature'].mean(), color='black', linestyle='--', linewidth=2, label=f'Mean: {df_viz["temperature"].mean():.1f}°C')
ax1.axhline(y=df_viz['temperature'].mean() + 2*df_viz['temperature'].std(), color='orange', linestyle=':', linewidth=1.5, label='±2σ')
ax1.axhline(y=df_viz['temperature'].mean() - 2*df_viz['temperature'].std(), color='orange', linestyle=':', linewidth=1.5)
ax1.set_ylabel('Temperature (°C)', fontsize=11, fontweight='bold')
ax1.set_title('Sensor Readings: Device 1 (Living Room)', fontsize=13, fontweight='bold')
ax1.legend()
ax1.grid(alpha=0.3)

# Humidity over time
ax2.plot(df_viz['timestamp'], df_viz['humidity'], linewidth=1, color='#45b7d1', alpha=0.7)
ax2.axhline(y=df_viz['humidity'].mean(), color='black', linestyle='--', linewidth=2, label=f'Mean: {df_viz["humidity"].mean():.1f}%')
ax2.set_xlabel('Time', fontsize=11, fontweight='bold')
ax2.set_ylabel('Humidity (%)', fontsize=11, fontweight='bold')
ax2.legend()
ax2.grid(alpha=0.3)

plt.tight_layout()
plt.show()

# Database statistics
cursor.execute("SELECT COUNT(*) FROM devices")
num_devices = cursor.fetchone()[0]

cursor.execute("SELECT COUNT(*) FROM sensor_readings")
num_readings = cursor.fetchone()[0]

cursor.execute("SELECT MIN(timestamp), MAX(timestamp) FROM sensor_readings")
time_range = cursor.fetchone()

print(f"\n=== Database Statistics ===")
print(f"Total devices: {num_devices}")
print(f"Total readings: {num_readings:,}")
print(f"Time range: {time_range[0]} to {time_range[1]}")
print(f"Database size (in-memory): ~{num_readings * 32 / 1024:.1f} KB")

conn.close()

print("\nInsight: Proper indexing and batch inserts are crucial for IoT data performance.")
print("SQLite is excellent for edge storage (Raspberry Pi), supporting millions of rows efficiently.")
=== Setting Up SQLite Database ===

✓ Tables created: devices, sensor_readings
✓ Index created on (device_id, timestamp)
✓ Inserted 3 devices

=== Generating Synthetic Sensor Data ===
Generated 3000 synthetic readings
Single inserts (100 rows): 0.3 ms
Batch insert (2900 rows): 4.9 ms
Batch insert speedup: 1.7x faster

=== SQL Query Examples ===

1. Latest reading per device:
          name    location  temperature  humidity                  timestamp
  Sensor_Room1 Living Room        19.67     46.28 2025-12-14T23:44:39.330453
  Sensor_Room2     Bedroom        23.29     51.80 2025-12-14T23:44:39.330453
Sensor_Outdoor      Garden        15.94     44.15 2025-12-14T23:44:39.330453

2. Hourly averages (last 24 hours) for Device 1:
            hour  num_readings  avg_temp  avg_humidity  min_temp  max_temp
2025-12-14 23:00             5     20.86         50.91     19.67     22.38
2025-12-14 22:00             6     19.56         51.13     18.06     21.91
2025-12-14 21:00             6     20.66         49.00     19.61     22.25
2025-12-14 20:00             6     19.78         54.80     15.73     22.11
2025-12-14 19:00             6     19.90         49.84     16.45     22.25
2025-12-14 18:00             6     20.87         49.22     17.54     25.29
2025-12-14 17:00             6     19.14         51.06     14.94     24.15
2025-12-14 16:00             6     19.01         50.49     16.40     23.47
2025-12-14 15:00             6     20.66         51.18     19.48     22.35
2025-12-14 14:00             6     19.19         50.00     17.53     21.09

3. Anomaly detection (temperature outliers):
                 timestamp  temperature  mean  std_dev  z_score
2025-12-08T23:04:39.330453        13.52 20.07     1.92    -3.41
2025-12-09T17:04:39.330453        26.16 20.07     1.92     3.17
2025-12-14T03:04:39.330453        14.59 20.07     1.92    -2.85
2025-12-10T07:04:39.330453        14.61 20.07     1.92    -2.84
2025-12-10T08:54:39.330453        14.70 20.07     1.92    -2.79

=== Visualizing Data ===

=== Database Statistics ===
Total devices: 3
Total readings: 3,000
Time range: 2025-12-08T01:14:39.330453 to 2025-12-14T23:44:39.330453
Database size (in-memory): ~93.8 KB

Insight: Proper indexing and batch inserts are crucial for IoT data performance.
SQLite is excellent for edge storage (Raspberry Pi), supporting millions of rows efficiently.

2. Batch vs Single Insert Comparison

Benchmark insert performance for different strategies:

Code
import sqlite3
import numpy as np
import time
import matplotlib.pyplot as plt
import pandas as pd

def benchmark_inserts(num_rows, batch_sizes):
    """
    Benchmark different insert strategies.

    Args:
        num_rows: Total number of rows to insert
        batch_sizes: List of batch sizes to test (1 = single inserts)
    """
    results = []

    for batch_size in batch_sizes:
        # Create fresh database
        conn = sqlite3.connect(':memory:')
        cursor = conn.cursor()

        cursor.execute("""
            CREATE TABLE sensor_readings (
                reading_id INTEGER PRIMARY KEY AUTOINCREMENT,
                device_id INTEGER,
                timestamp TEXT,
                temperature REAL,
                humidity REAL
            )
        """)

        # Generate data
        data = [
            (
                np.random.randint(1, 4),  # device_id
                datetime.now().isoformat(),
                round(20 + np.random.randn() * 2, 2),
                round(50 + np.random.randn() * 5, 2)
            )
            for _ in range(num_rows)
        ]

        # Benchmark inserts
        start_time = time.perf_counter()

        if batch_size == 1:
            # Single inserts
            for row in data:
                cursor.execute("""
                    INSERT INTO sensor_readings (device_id, timestamp, temperature, humidity)
                    VALUES (?, ?, ?, ?)
                """, row)
                conn.commit()  # Commit each insert
        else:
            # Batch inserts
            for i in range(0, len(data), batch_size):
                batch = data[i:i+batch_size]
                cursor.executemany("""
                    INSERT INTO sensor_readings (device_id, timestamp, temperature, humidity)
                    VALUES (?, ?, ?, ?)
                """, batch)
                conn.commit()  # Commit each batch

        elapsed_time = time.perf_counter() - start_time
        throughput = num_rows / elapsed_time

        results.append({
            'Batch Size': batch_size if batch_size > 1 else 'Single',
            'Time (s)': elapsed_time,
            'Throughput (rows/s)': throughput
        })

        conn.close()
        print(f"Batch size {batch_size if batch_size > 1 else 'Single':>6}: {elapsed_time:>6.3f}s  ({throughput:>8.0f} rows/s)")

    return results

print("=== Batch Insert Performance Benchmark ===\n")
print("Testing with 1000 rows...\n")

batch_sizes = [1, 10, 50, 100, 200, 500, 1000]
results = benchmark_inserts(num_rows=1000, batch_sizes=batch_sizes)

# Create results dataframe
df_results = pd.DataFrame(results)
print("\n" + df_results.to_string(index=False))

# Visualize results
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 5))

# Time vs batch size
batch_labels = [str(r['Batch Size']) for r in results]
times = [r['Time (s)'] for r in results]
ax1.bar(batch_labels, times, color=['#ff6b6b' if b == 'Single' else '#45b7d1' for b in batch_labels],
        edgecolor='black', linewidth=1.5)
ax1.set_xlabel('Batch Size', fontsize=11, fontweight='bold')
ax1.set_ylabel('Time (seconds)', fontsize=11, fontweight='bold')
ax1.set_title('Insert Time vs Batch Size (1000 rows)', fontsize=13, fontweight='bold')
ax1.grid(axis='y', alpha=0.3)
ax1.tick_params(axis='x', rotation=45)

# Throughput vs batch size
throughputs = [r['Throughput (rows/s)'] for r in results]
ax2.plot(range(len(batch_sizes)), throughputs, marker='o', linewidth=2.5, markersize=10, color='#1dd1a1')
ax2.set_xticks(range(len(batch_sizes)))
ax2.set_xticklabels(batch_labels, rotation=45)
ax2.set_xlabel('Batch Size', fontsize=11, fontweight='bold')
ax2.set_ylabel('Throughput (rows/second)', fontsize=11, fontweight='bold')
ax2.set_title('Insert Throughput vs Batch Size', fontsize=13, fontweight='bold')
ax2.grid(alpha=0.3)

plt.tight_layout()
plt.show()

# Calculate speedup
single_time = results[0]['Time (s)']
best_time = min(r['Time (s)'] for r in results[1:])
max_speedup = single_time / best_time

print(f"\n=== Performance Analysis ===")
print(f"Single insert time: {single_time:.3f} s")
print(f"Best batch time: {best_time:.3f} s")
print(f"Maximum speedup: {max_speedup:.1f}x")
print(f"\nOptimal batch size for this workload: {results[-2]['Batch Size']} (diminishing returns beyond this)")

print("\nInsight: Batch inserts provide 10-100x speedup by reducing transaction overhead.")
print("For edge gateways, buffer 10-100 readings before inserting to maximize throughput.")
=== Batch Insert Performance Benchmark ===

Testing with 1000 rows...

Batch size Single:  0.004s  (  222939 rows/s)
Batch size     10:  0.002s  (  660822 rows/s)
Batch size     50:  0.001s  (  796537 rows/s)
Batch size    100:  0.001s  (  822450 rows/s)
Batch size    200:  0.001s  (  846909 rows/s)
Batch size    500:  0.001s  (  857267 rows/s)
Batch size   1000:  0.001s  (  857554 rows/s)

Batch Size  Time (s)  Throughput (rows/s)
    Single  0.004486        222939.443181
        10  0.001513        660822.353762
        50  0.001255        796537.293061
       100  0.001216        822449.583867
       200  0.001181        846908.571974
       500  0.001166        857266.793446
      1000  0.001166        857554.238191

=== Performance Analysis ===
Single insert time: 0.004 s
Best batch time: 0.001 s
Maximum speedup: 3.8x

Optimal batch size for this workload: 500 (diminishing returns beyond this)

Insight: Batch inserts provide 10-100x speedup by reducing transaction overhead.
For edge gateways, buffer 10-100 readings before inserting to maximize throughput.

3. REST API Simulation

Simulate a Flask-like REST API for sensor data ingestion:

Code
import sqlite3
from datetime import datetime
import json
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np

class SensorAPI:
    """Simulated REST API for sensor data ingestion"""

    def __init__(self, db_path=':memory:'):
        self.conn = sqlite3.connect(db_path)
        self._init_database()

    def _init_database(self):
        """Initialize database schema"""
        cursor = self.conn.cursor()

        cursor.execute("""
            CREATE TABLE IF NOT EXISTS devices (
                device_id INTEGER PRIMARY KEY,
                name TEXT NOT NULL,
                type TEXT NOT NULL,
                location TEXT
            )
        """)

        cursor.execute("""
            CREATE TABLE IF NOT EXISTS sensor_readings (
                reading_id INTEGER PRIMARY KEY AUTOINCREMENT,
                device_id INTEGER NOT NULL,
                timestamp TEXT NOT NULL,
                temperature REAL,
                humidity REAL,
                FOREIGN KEY (device_id) REFERENCES devices(device_id)
            )
        """)

        cursor.execute("""
            CREATE INDEX IF NOT EXISTS idx_device_timestamp
            ON sensor_readings(device_id, timestamp)
        """)

        # Insert sample devices
        cursor.executemany("""
            INSERT OR IGNORE INTO devices (device_id, name, type, location)
            VALUES (?, ?, ?, ?)
        """, [
            (1, 'ESP32_01', 'DHT22', 'Living Room'),
            (2, 'ESP32_02', 'BME280', 'Kitchen'),
            (3, 'Arduino_01', 'DHT11', 'Bedroom')
        ])

        self.conn.commit()

    def post_reading(self, device_id, temperature, humidity):
        """
        POST /api/reading - Insert single sensor reading

        Args:
            device_id: Device identifier
            temperature: Temperature in Celsius
            humidity: Humidity percentage

        Returns:
            (status_code, response_json)
        """
        # Validate input
        if not isinstance(device_id, int) or device_id < 1:
            return 400, {"error": "Invalid device_id"}

        if not isinstance(temperature, (int, float)):
            return 400, {"error": "Invalid temperature"}

        if not isinstance(humidity, (int, float)):
            return 400, {"error": "Invalid humidity"}

        # Check device exists
        cursor = self.conn.cursor()
        cursor.execute("SELECT 1 FROM devices WHERE device_id = ?", (device_id,))
        if not cursor.fetchone():
            return 404, {"error": f"Device {device_id} not found"}

        # Insert reading
        try:
            timestamp = datetime.now().isoformat()
            cursor.execute("""
                INSERT INTO sensor_readings (device_id, timestamp, temperature, humidity)
                VALUES (?, ?, ?, ?)
            """, (device_id, timestamp, temperature, humidity))
            self.conn.commit()

            reading_id = cursor.lastrowid

            return 200, {
                "status": "success",
                "reading_id": reading_id,
                "device_id": device_id,
                "timestamp": timestamp,
                "temperature": temperature,
                "humidity": humidity
            }
        except Exception as e:
            return 500, {"error": str(e)}

    def post_batch_readings(self, readings):
        """
        POST /api/readings/batch - Insert multiple readings

        Args:
            readings: List of dicts with keys: device_id, temperature, humidity

        Returns:
            (status_code, response_json)
        """
        if not isinstance(readings, list) or len(readings) == 0:
            return 400, {"error": "readings must be non-empty list"}

        try:
            timestamp = datetime.now().isoformat()
            data = [
                (r['device_id'], timestamp, r['temperature'], r['humidity'])
                for r in readings
            ]

            cursor = self.conn.cursor()
            cursor.executemany("""
                INSERT INTO sensor_readings (device_id, timestamp, temperature, humidity)
                VALUES (?, ?, ?, ?)
            """, data)
            self.conn.commit()

            return 200, {
                "status": "success",
                "inserted": len(readings),
                "timestamp": timestamp
            }
        except Exception as e:
            return 500, {"error": str(e)}

    def get_latest_readings(self, device_id, limit=10):
        """
        GET /api/readings/<device_id>?limit=N - Get recent readings

        Args:
            device_id: Device identifier
            limit: Number of readings to return

        Returns:
            (status_code, response_json)
        """
        cursor = self.conn.cursor()
        cursor.execute("""
            SELECT reading_id, timestamp, temperature, humidity
            FROM sensor_readings
            WHERE device_id = ?
            ORDER BY timestamp DESC
            LIMIT ?
        """, (device_id, limit))

        readings = cursor.fetchall()

        if not readings:
            return 404, {"error": f"No readings found for device {device_id}"}

        return 200, {
            "device_id": device_id,
            "count": len(readings),
            "readings": [
                {
                    "reading_id": r[0],
                    "timestamp": r[1],
                    "temperature": r[2],
                    "humidity": r[3]
                }
                for r in readings
            ]
        }

    def get_statistics(self, device_id):
        """
        GET /api/stats/<device_id> - Get aggregated statistics

        Args:
            device_id: Device identifier

        Returns:
            (status_code, response_json)
        """
        cursor = self.conn.cursor()
        cursor.execute("""
            SELECT
                COUNT(*) AS total_readings,
                MIN(timestamp) AS first_reading,
                MAX(timestamp) AS last_reading,
                ROUND(AVG(temperature), 2) AS avg_temp,
                ROUND(MIN(temperature), 2) AS min_temp,
                ROUND(MAX(temperature), 2) AS max_temp,
                ROUND(AVG(humidity), 2) AS avg_humidity
            FROM sensor_readings
            WHERE device_id = ?
        """, (device_id,))

        row = cursor.fetchone()

        if row[0] == 0:
            return 404, {"error": f"No data for device {device_id}"}

        return 200, {
            "device_id": device_id,
            "total_readings": row[0],
            "first_reading": row[1],
            "last_reading": row[2],
            "statistics": {
                "temperature": {
                    "avg": row[3],
                    "min": row[4],
                    "max": row[5]
                },
                "humidity": {
                    "avg": row[6]
                }
            }
        }

# Demonstrate API usage
print("=== Simulated REST API Demo ===\n")

api = SensorAPI()

# Test 1: POST single reading
print("1. POST /api/reading - Insert single reading")
status, response = api.post_reading(device_id=1, temperature=22.5, humidity=65.0)
print(f"Status: {status}")
print(f"Response: {json.dumps(response, indent=2)}\n")

# Test 2: POST batch readings
print("2. POST /api/readings/batch - Insert multiple readings")
batch_data = [
    {"device_id": 1, "temperature": 22.0 + np.random.randn(), "humidity": 65.0 + np.random.randn()*2}
    for _ in range(100)
]
status, response = api.post_batch_readings(batch_data)
print(f"Status: {status}")
print(f"Response: {json.dumps(response, indent=2)}\n")

# Test 3: GET latest readings
print("3. GET /api/readings/1?limit=5 - Get recent readings")
status, response = api.get_latest_readings(device_id=1, limit=5)
print(f"Status: {status}")
print(f"Response (first 2 readings): {json.dumps(response['readings'][:2], indent=2)}\n")

# Test 4: GET statistics
print("4. GET /api/stats/1 - Get aggregated statistics")
status, response = api.get_statistics(device_id=1)
print(f"Status: {status}")
print(f"Response: {json.dumps(response, indent=2)}\n")

# Test 5: Error handling - invalid device
print("5. Error handling - POST to non-existent device")
status, response = api.post_reading(device_id=999, temperature=20.0, humidity=50.0)
print(f"Status: {status}")
print(f"Response: {json.dumps(response, indent=2)}\n")

# Simulate realistic traffic
print("=== Simulating Realistic API Traffic ===")
num_requests = 500

for device_id in [1, 2, 3]:
    for _ in range(num_requests // 3):
        temp = 20 + np.random.randn() * 2 + (device_id - 1) * 2
        humidity = 50 + np.random.randn() * 5
        api.post_reading(device_id, temp, humidity)

print(f"Simulated {num_requests} API requests across 3 devices\n")

# Visualize API statistics
print("=== API Performance Dashboard ===")

fig, axes = plt.subplots(2, 2, figsize=(14, 10))

# Device 1: Temperature distribution
status, data1 = api.get_latest_readings(1, limit=1000)
temps1 = [r['temperature'] for r in data1['readings']]

axes[0, 0].hist(temps1, bins=30, color='#ff6b6b', edgecolor='black', alpha=0.7)
axes[0, 0].set_xlabel('Temperature (°C)', fontsize=10)
axes[0, 0].set_ylabel('Frequency', fontsize=10)
axes[0, 0].set_title('Device 1: Temperature Distribution', fontsize=11, fontweight='bold')
axes[0, 0].grid(alpha=0.3)

# All devices: Statistics comparison
stats_data = []
for device_id in [1, 2, 3]:
    status, stats = api.get_statistics(device_id)
    stats_data.append({
        'Device': f'Device {device_id}',
        'Avg Temp': stats['statistics']['temperature']['avg'],
        'Min Temp': stats['statistics']['temperature']['min'],
        'Max Temp': stats['statistics']['temperature']['max']
    })

df_stats = pd.DataFrame(stats_data)
x = np.arange(len(df_stats))
width = 0.25

axes[0, 1].bar(x - width, df_stats['Min Temp'], width, label='Min', color='#45b7d1')
axes[0, 1].bar(x, df_stats['Avg Temp'], width, label='Avg', color='#1dd1a1')
axes[0, 1].bar(x + width, df_stats['Max Temp'], width, label='Max', color='#ff6b6b')
axes[0, 1].set_xlabel('Device', fontsize=10)
axes[0, 1].set_ylabel('Temperature (°C)', fontsize=10)
axes[0, 1].set_title('Temperature Statistics by Device', fontsize=11, fontweight='bold')
axes[0, 1].set_xticks(x)
axes[0, 1].set_xticklabels(df_stats['Device'])
axes[0, 1].legend()
axes[0, 1].grid(axis='y', alpha=0.3)

# Reading count per device
reading_counts = []
for device_id in [1, 2, 3]:
    status, stats = api.get_statistics(device_id)
    reading_counts.append(stats['total_readings'])

axes[1, 0].bar([f'Device {i}' for i in [1, 2, 3]], reading_counts,
              color=['#ff6b6b', '#45b7d1', '#1dd1a1'], edgecolor='black', linewidth=1.5)
axes[1, 0].set_ylabel('Total Readings', fontsize=10)
axes[1, 0].set_title('Total Readings per Device', fontsize=11, fontweight='bold')
axes[1, 0].grid(axis='y', alpha=0.3)

# Time series for Device 1
status, data1_full = api.get_latest_readings(1, limit=200)
temps_series = [r['temperature'] for r in reversed(data1_full['readings'])]

axes[1, 1].plot(range(len(temps_series)), temps_series, linewidth=1.5, color='#ff6b6b', alpha=0.7)
axes[1, 1].axhline(y=np.mean(temps_series), color='black', linestyle='--', linewidth=2, label=f'Mean: {np.mean(temps_series):.1f}°C')
axes[1, 1].set_xlabel('Reading Index', fontsize=10)
axes[1, 1].set_ylabel('Temperature (°C)', fontsize=10)
axes[1, 1].set_title('Device 1: Temperature Time Series', fontsize=11, fontweight='bold')
axes[1, 1].legend()
axes[1, 1].grid(alpha=0.3)

plt.tight_layout()
plt.show()

print("\nInsight: REST APIs provide simple HTTP-based data ingestion for edge devices.")
print("Key endpoints: POST /api/reading (single), POST /api/readings/batch (bulk), GET /api/readings/<id>")
=== Simulated REST API Demo ===

1. POST /api/reading - Insert single reading
Status: 200
Response: {
  "status": "success",
  "reading_id": 1,
  "device_id": 1,
  "timestamp": "2025-12-15T01:14:40.000080",
  "temperature": 22.5,
  "humidity": 65.0
}

2. POST /api/readings/batch - Insert multiple readings
Status: 200
Response: {
  "status": "success",
  "inserted": 100,
  "timestamp": "2025-12-15T01:14:40.000525"
}

3. GET /api/readings/1?limit=5 - Get recent readings
Status: 200
Response (first 2 readings): [
  {
    "reading_id": 101,
    "timestamp": "2025-12-15T01:14:40.000525",
    "temperature": 20.63739972025054,
    "humidity": 63.12478444070538
  },
  {
    "reading_id": 100,
    "timestamp": "2025-12-15T01:14:40.000525",
    "temperature": 20.967185590491802,
    "humidity": 64.60908068522714
  }
]

4. GET /api/stats/1 - Get aggregated statistics
Status: 200
Response: {
  "device_id": 1,
  "total_readings": 101,
  "first_reading": "2025-12-15T01:14:40.000080",
  "last_reading": "2025-12-15T01:14:40.000525",
  "statistics": {
    "temperature": {
      "avg": 21.87,
      "min": 19.78,
      "max": 24.27
    },
    "humidity": {
      "avg": 64.96
    }
  }
}

5. Error handling - POST to non-existent device
Status: 404
Response: {
  "error": "Device 999 not found"
}

=== Simulating Realistic API Traffic ===
Simulated 500 API requests across 3 devices

=== API Performance Dashboard ===

Insight: REST APIs provide simple HTTP-based data ingestion for edge devices.
Key endpoints: POST /api/reading (single), POST /api/readings/batch (bulk), GET /api/readings/<id>

4. Data Aggregation Queries

Advanced SQL queries for time-series analytics:

Code
import sqlite3
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from datetime import datetime, timedelta

# Create database with realistic time-series data
print("=== Creating Time-Series Database ===\n")
conn = sqlite3.connect(':memory:')
cursor = conn.cursor()

# Schema
cursor.execute("""
    CREATE TABLE sensor_readings (
        reading_id INTEGER PRIMARY KEY AUTOINCREMENT,
        device_id INTEGER,
        timestamp TEXT,
        temperature REAL,
        humidity REAL
    )
""")

cursor.execute("""
    CREATE INDEX idx_device_timestamp ON sensor_readings(device_id, timestamp)
""")

# Generate realistic time-series data (30 days, every 5 minutes)
np.random.seed(42)
readings = []
start_time = datetime.now() - timedelta(days=30)

for device_id in [1, 2]:
    base_temp = 20.0
    temp_trend = 0.0  # Temperature drift over time

    for minute in range(0, 30*24*60, 5):  # Every 5 minutes for 30 days
        timestamp = start_time + timedelta(minutes=minute)

        # Add daily cycle (warmer during day, cooler at night)
        hour = timestamp.hour
        daily_cycle = 3 * np.sin((hour - 6) * np.pi / 12)

        # Add weekly trend
        weekly_trend = temp_trend * minute / (30*24*60)

        # Add noise
        noise = np.random.randn() * 0.5

        temperature = base_temp + daily_cycle + weekly_trend + noise
        humidity = 50 + np.random.randn() * 5

        readings.append((device_id, timestamp.isoformat(), round(temperature, 2), round(humidity, 2)))

cursor.executemany("""
    INSERT INTO sensor_readings (device_id, timestamp, temperature, humidity)
    VALUES (?, ?, ?, ?)
""", readings)
conn.commit()

print(f"Generated {len(readings):,} readings over 30 days")
print(f"Sampling interval: 5 minutes")
print(f"Devices: 2\n")

# Query 1: Hourly aggregates
print("=== Query 1: Hourly Aggregates (Last 48 Hours) ===\n")
query1 = """
    SELECT
        strftime('%Y-%m-%d %H:00', timestamp) AS hour,
        device_id,
        COUNT(*) AS num_readings,
        ROUND(AVG(temperature), 2) AS avg_temp,
        ROUND(MIN(temperature), 2) AS min_temp,
        ROUND(MAX(temperature), 2) AS max_temp,
        ROUND(AVG(humidity), 2) AS avg_humidity
    FROM sensor_readings
    WHERE timestamp >= datetime('now', '-48 hours')
    GROUP BY hour, device_id
    ORDER BY hour DESC, device_id
    LIMIT 10
"""
df_hourly = pd.read_sql_query(query1, conn)
print(df_hourly.to_string(index=False))

# Query 2: Daily aggregates with moving average
print("\n=== Query 2: Daily Aggregates (Last 7 Days) ===\n")
query2 = """
    SELECT
        DATE(timestamp) AS date,
        device_id,
        COUNT(*) AS readings,
        ROUND(AVG(temperature), 2) AS avg_temp,
        ROUND(MAX(temperature), 2) AS max_temp,
        ROUND(MIN(temperature), 2) AS min_temp,
        ROUND(AVG(temperature) - LAG(AVG(temperature), 1, AVG(temperature))
              OVER (PARTITION BY device_id ORDER BY DATE(timestamp)), 2) AS temp_change
    FROM sensor_readings
    WHERE timestamp >= datetime('now', '-7 days')
    GROUP BY date, device_id
    ORDER BY date DESC, device_id
"""
df_daily = pd.read_sql_query(query2, conn)
print(df_daily.to_string(index=False))

# Query 3: Peak hours analysis
print("\n=== Query 3: Temperature by Hour of Day (Average over 30 days) ===\n")
query3 = """
    SELECT
        CAST(strftime('%H', timestamp) AS INTEGER) AS hour,
        device_id,
        COUNT(*) AS samples,
        ROUND(AVG(temperature), 2) AS avg_temp,
        ROUND(MIN(temperature), 2) AS min_temp,
        ROUND(MAX(temperature), 2) AS max_temp
    FROM sensor_readings
    GROUP BY hour, device_id
    ORDER BY hour, device_id
"""
df_hourly_pattern = pd.read_sql_query(query3, conn)
print(df_hourly_pattern.head(12).to_string(index=False))

# Query 4: Anomaly detection with z-score
print("\n=== Query 4: Temperature Anomalies (|z-score| > 2.5) ===\n")
query4 = """
    WITH stats AS (
        SELECT
            device_id,
            AVG(temperature) AS mean_temp,
            SQRT(AVG(temperature * temperature) - AVG(temperature) * AVG(temperature)) AS std_temp
        FROM sensor_readings
        GROUP BY device_id
    )
    SELECT
        sr.device_id,
        sr.timestamp,
        sr.temperature,
        ROUND(s.mean_temp, 2) AS mean,
        ROUND(s.std_temp, 2) AS std_dev,
        ROUND((sr.temperature - s.mean_temp) / s.std_temp, 2) AS z_score
    FROM sensor_readings sr
    JOIN stats s ON sr.device_id = s.device_id
    WHERE ABS((sr.temperature - s.mean_temp) / s.std_temp) > 2.5
    ORDER BY ABS(z_score) DESC
    LIMIT 10
"""
df_anomalies = pd.read_sql_query(query4, conn)
if not df_anomalies.empty:
    print(df_anomalies.to_string(index=False))
else:
    print("No significant anomalies detected")

# Query 5: Data quality metrics
print("\n=== Query 5: Data Quality Metrics ===\n")
query5 = """
    SELECT
        device_id,
        COUNT(*) AS total_readings,
        DATE(MIN(timestamp)) AS first_reading,
        DATE(MAX(timestamp)) AS last_reading,
        ROUND(JULIANDAY(MAX(timestamp)) - JULIANDAY(MIN(timestamp)), 1) AS days_active,
        ROUND(COUNT(*) * 1.0 / (JULIANDAY(MAX(timestamp)) - JULIANDAY(MIN(timestamp)) + 1) / 288, 2) AS completeness
    FROM sensor_readings
    GROUP BY device_id
"""
df_quality = pd.read_sql_query(query5, conn)
print(df_quality.to_string(index=False))
print("\n(Completeness: 1.0 = 100% of expected readings at 5-minute intervals)")

# Visualizations
print("\n=== Visualization: Time-Series Analysis ===")

fig, axes = plt.subplots(2, 2, figsize=(16, 10))

# Hourly pattern
df_device1_hourly = df_hourly_pattern[df_hourly_pattern['device_id'] == 1]
axes[0, 0].plot(df_device1_hourly['hour'], df_device1_hourly['avg_temp'],
               marker='o', linewidth=2, markersize=6, color='#ff6b6b', label='Device 1')
axes[0, 0].fill_between(df_device1_hourly['hour'],
                        df_device1_hourly['min_temp'],
                        df_device1_hourly['max_temp'],
                        alpha=0.3, color='#ff6b6b')
axes[0, 0].set_xlabel('Hour of Day', fontsize=11, fontweight='bold')
axes[0, 0].set_ylabel('Temperature (°C)', fontsize=11, fontweight='bold')
axes[0, 0].set_title('Average Temperature by Hour (30-day average)', fontsize=12, fontweight='bold')
axes[0, 0].set_xticks(range(0, 24, 3))
axes[0, 0].legend()
axes[0, 0].grid(alpha=0.3)

# Daily trend
query_daily_full = """
    SELECT DATE(timestamp) AS date, AVG(temperature) AS avg_temp
    FROM sensor_readings WHERE device_id = 1
    GROUP BY date ORDER BY date
"""
df_daily_trend = pd.read_sql_query(query_daily_full, conn)
df_daily_trend['date'] = pd.to_datetime(df_daily_trend['date'])

axes[0, 1].plot(df_daily_trend['date'], df_daily_trend['avg_temp'],
               linewidth=2, color='#45b7d1')
axes[0, 1].set_xlabel('Date', fontsize=11, fontweight='bold')
axes[0, 1].set_ylabel('Avg Temperature (°C)', fontsize=11, fontweight='bold')
axes[0, 1].set_title('Daily Average Temperature Trend (30 days)', fontsize=12, fontweight='bold')
axes[0, 1].grid(alpha=0.3)
axes[0, 1].tick_params(axis='x', rotation=45)

# Temperature distribution
query_dist = "SELECT temperature FROM sensor_readings WHERE device_id = 1"
df_dist = pd.read_sql_query(query_dist, conn)

axes[1, 0].hist(df_dist['temperature'], bins=50, color='#1dd1a1', edgecolor='black', alpha=0.7)
axes[1, 0].axvline(df_dist['temperature'].mean(), color='red', linestyle='--', linewidth=2,
                  label=f'Mean: {df_dist["temperature"].mean():.1f}°C')
axes[1, 0].axvline(df_dist['temperature'].median(), color='orange', linestyle='--', linewidth=2,
                  label=f'Median: {df_dist["temperature"].median():.1f}°C')
axes[1, 0].set_xlabel('Temperature (°C)', fontsize=11, fontweight='bold')
axes[1, 0].set_ylabel('Frequency', fontsize=11, fontweight='bold')
axes[1, 0].set_title('Temperature Distribution (Device 1, 30 days)', fontsize=12, fontweight='bold')
axes[1, 0].legend()
axes[1, 0].grid(alpha=0.3)

# Comparison: Device 1 vs Device 2
query_compare = """
    SELECT DATE(timestamp) AS date, device_id, AVG(temperature) AS avg_temp
    FROM sensor_readings
    GROUP BY date, device_id
    ORDER BY date, device_id
"""
df_compare = pd.read_sql_query(query_compare, conn)
df_compare['date'] = pd.to_datetime(df_compare['date'])

for device_id, color in [(1, '#ff6b6b'), (2, '#45b7d1')]:
    df_dev = df_compare[df_compare['device_id'] == device_id]
    axes[1, 1].plot(df_dev['date'], df_dev['avg_temp'],
                   linewidth=2, label=f'Device {device_id}', color=color)

axes[1, 1].set_xlabel('Date', fontsize=11, fontweight='bold')
axes[1, 1].set_ylabel('Avg Temperature (°C)', fontsize=11, fontweight='bold')
axes[1, 1].set_title('Multi-Device Temperature Comparison', fontsize=12, fontweight='bold')
axes[1, 1].legend()
axes[1, 1].grid(alpha=0.3)
axes[1, 1].tick_params(axis='x', rotation=45)

plt.tight_layout()
plt.show()

conn.close()

print("\nInsight: Advanced SQL aggregations enable powerful time-series analytics on edge databases.")
print("Key patterns: hourly/daily rollups, moving averages, anomaly detection, data quality metrics.")
=== Creating Time-Series Database ===

Generated 17,280 readings over 30 days
Sampling interval: 5 minutes
Devices: 2

=== Query 1: Hourly Aggregates (Last 48 Hours) ===

            hour  device_id  num_readings  avg_temp  min_temp  max_temp  avg_humidity
2025-12-15 01:00          1             2     17.63     17.43     17.83         46.14
2025-12-15 01:00          2             2     16.84     16.52     17.16         53.04
2025-12-15 00:00          1            12     17.06     16.40     17.76         52.10
2025-12-15 00:00          2            12     17.16     16.42     18.34         51.36
2025-12-14 23:00          1            12     16.91     16.20     17.66         52.31
2025-12-14 23:00          2            12     17.23     16.57     18.25         51.41
2025-12-14 22:00          1            12     17.52     16.67     18.64         51.80
2025-12-14 22:00          2            12     17.36     16.45     17.98         49.44
2025-12-14 21:00          1            12     17.88     16.95     18.78         50.30
2025-12-14 21:00          2            12     17.70     16.93     18.54         47.45

=== Query 2: Daily Aggregates (Last 7 Days) ===

      date  device_id  readings  avg_temp  max_temp  min_temp  temp_change
2025-12-15          1        14     17.14     17.83     16.40        -2.84
2025-12-15          2        14     17.12     18.34     16.42        -2.82
2025-12-14          1       288     19.99     23.88     16.20        -0.03
2025-12-14          2       288     19.94     23.63     16.22        -0.07
2025-12-13          1       288     20.02     23.95     16.26         0.05
2025-12-13          2       288     20.01     24.07     16.41        -0.03
2025-12-12          1       288     19.97     23.98     15.45        -0.01
2025-12-12          2       288     20.04     24.10     16.24        -0.00
2025-12-11          1       288     19.98     24.13     15.96        -0.06
2025-12-11          2       288     20.04     23.85     15.80         0.09
2025-12-10          1       288     20.03     24.11     16.23        -0.02
2025-12-10          2       288     19.95     24.04     16.12        -0.04
2025-12-09          1       288     20.06     24.03     15.99         0.04
2025-12-09          2       288     20.00     23.83     16.24        -0.02
2025-12-08          1       288     20.01     24.35     15.92         0.00
2025-12-08          2       288     20.01     23.86     15.81         0.00

=== Query 3: Temperature by Hour of Day (Average over 30 days) ===

 hour  device_id  samples  avg_temp  min_temp  max_temp
    0          1      360     17.03     15.04     18.30
    0          2      360     16.99     15.27     18.80
    1          1      360     17.08     15.45     18.49
    1          2      360     17.10     15.63     18.50
    2          1      360     17.40     16.05     19.04
    2          2      360     17.38     15.57     18.59
    3          1      360     17.90     16.66     19.23
    3          2      360     17.86     16.33     19.21
    4          1      360     18.48     17.15     20.08
    4          2      360     18.51     16.57     19.73
    5          1      360     19.23     17.77     20.48
    5          2      360     19.22     17.71     20.76

=== Query 4: Temperature Anomalies (|z-score| > 2.5) ===

No significant anomalies detected

=== Query 5: Data Quality Metrics ===

 device_id  total_readings first_reading last_reading  days_active  completeness
         1            8640    2025-11-15   2025-12-15         30.0          0.97
         2            8640    2025-11-15   2025-12-15         30.0          0.97

(Completeness: 1.0 = 100% of expected readings at 5-minute intervals)

=== Visualization: Time-Series Analysis ===

Insight: Advanced SQL aggregations enable powerful time-series analytics on edge databases.
Key patterns: hourly/daily rollups, moving averages, anomaly detection, data quality metrics.

Interactive Notebook

The notebook below contains runnable code for all Level 1 activities.

LAB 13: Distributed Query Processing on Edge

Open In Colab View on GitHub


Property Value
Book Chapter Chapter 13
Execution Levels Level 1 (SQLite Simulation) | Level 2 (Pi + Database) | Level 3 (Full Stack)
Estimated Time 90 minutes
Prerequisites LAB 12 (Streaming), Basic SQL knowledge

Learning Objectives

  1. Understand distributed systems fundamentals (CAP theorem, consistency models)
  2. Design time-series database schemas for IoT data
  3. Implement edge-cloud synchronization strategies
  4. Optimize queries for time-series analytics
  5. Apply data retention and downsampling policies

Theoretical Foundation: Distributed Systems for IoT

1.1 The CAP Theorem

In distributed systems, you can only guarantee two of three properties:

                    Consistency
                        /\
                       /  \
                      /    \
                     / CA   \
                    /________\
                   /\        /\
                  /  \  CP  /  \
                 / AP \    /    \
                /______\  /______\
        Availability      Partition Tolerance
Property Description IoT Example
Consistency (C) All nodes see same data All dashboards show same temperature
Availability (A) System responds to requests Sensor can always write data
Partition Tolerance (P) Works despite network failures Edge works when WiFi drops

For IoT/Edge systems, we typically choose AP (Availability + Partition Tolerance): - Sensors must continue collecting data during network outages - We accept eventual consistency - data syncs when connection restores

1.2 Consistency Models

Model Guarantee Use Case
Strong Consistency Reads always return latest write Financial transactions
Eventual Consistency Reads eventually return latest write Sensor telemetry
Causal Consistency Related events maintain order User actions

For sensor data, eventual consistency is usually sufficient: \(\text{If no new updates, all replicas converge to same value}\)

Section 2: Time-Series Data Modeling

2.1 Time-Series Data Characteristics

IoT sensor data is time-series data with unique properties:

Characteristic Description Implication
Time-ordered Data arrives in temporal sequence Index on timestamp
Append-only New data added, rarely updated Optimize for inserts
High volume Millions of data points Compression, downsampling
Query patterns Time-range queries dominate Time-based partitioning

2.2 Schema Design Principles

Narrow vs Wide Tables:

-- NARROW: One row per sensor per timestamp (normalized)
-- Pros: Flexible, easy to add sensors
-- Cons: More rows, joins needed
CREATE TABLE readings_narrow (
    timestamp DATETIME,
    device_id INTEGER,
    metric_name TEXT,
    value REAL
);

-- WIDE: All metrics in one row (denormalized)
-- Pros: Fewer rows, faster queries
-- Cons: Schema changes when adding sensors
CREATE TABLE readings_wide (
    timestamp DATETIME,
    device_id INTEGER,
    temperature REAL,
    humidity REAL,
    light INTEGER
);

For IoT, wide tables are generally preferred for query performance.

2.3 Indexing Strategy

Composite index on (timestamp, device_id) optimizes common queries:

\(\text{Query Time} = O(\log n) \text{ with B-tree index vs } O(n) \text{ without}\)

Section 3: Edge Data Manager

3.1 Write-Ahead Logging

To prevent data loss during crashes, databases use Write-Ahead Logging (WAL):

1. Write to log (persistent)
2. Acknowledge to client
3. Apply to database (async)

SQLite’s WAL mode is ideal for IoT: - Readers don’t block writers - Better performance for write-heavy workloads - Crash recovery from log

Section 4: Generating and Storing Time-Series Data

4.1 Realistic Sensor Patterns

Real sensor data follows predictable patterns:

\(T(t) = T_{base} + A_{daily} \sin\left(\frac{2\pi t}{24h}\right) + \epsilon\)

Where: - \(T_{base}\) = baseline temperature (location-dependent) - \(A_{daily}\) = daily amplitude (larger outdoors) - \(\epsilon \sim \mathcal{N}(0, \sigma^2)\) = measurement noise

Section 5: Time-Series Query Patterns

5.1 Common Query Types

Query Type Example Optimization
Latest Value Current temperature Index on (device_id, timestamp DESC)
Time Range Last 24 hours Composite index
Aggregation Hourly averages Pre-computed aggregates
Downsampling Daily summary GROUP BY + date functions

5.2 Query Optimization with EXPLAIN

Use EXPLAIN QUERY PLAN to verify index usage:

EXPLAIN QUERY PLAN
SELECT * FROM sensor_readings WHERE device_id = 1 AND timestamp > '2024-01-01';
-- Should show: USING INDEX idx_readings_device_time

Section 6: Edge-Cloud Synchronization

6.1 Sync Strategies

Strategy Description When to Use
Push-based Edge sends immediately Low latency, reliable network
Pull-based Cloud requests data Cloud-controlled, polling
Store-and-forward Buffer locally, sync later Unreliable network, batch
Delta sync Send only changes Bandwidth optimization

6.2 Conflict Resolution

When same data is modified in multiple places:

  • Last-Writer-Wins (LWW): Latest timestamp wins
  • First-Writer-Wins: Reject later updates
  • Merge: Combine changes (complex)

For sensor data, LWW is usually sufficient since readings are append-only.

Section 7: Data Retention and Downsampling

7.1 The Storage Problem

Sensor data grows unbounded. At 1 reading/second per device:

Devices Daily Readings Daily Storage Yearly Storage
10 864,000 ~35 MB ~12 GB
100 8,640,000 ~350 MB ~120 GB
1,000 86,400,000 ~3.5 GB ~1.2 TB

7.2 Retention Policies

Tiered retention: - Hot: Last 24 hours - full resolution - Warm: Last 7 days - hourly aggregates - Cold: Last 30 days - daily aggregates - Archive: Beyond 30 days - delete or archive

7.3 Downsampling Algorithms

Largest Triangle Three Buckets (LTTB): Preserves visual shape with fewer points

Simple aggregation: Average/min/max per time bucket

Section 8: Edge vs Cloud Storage Trade-offs

8.1 Decision Matrix

Factor Edge Storage Cloud Storage Hybrid
Latency <10ms 50-500ms Varies
Offline Access Full None Partial
Storage Capacity Limited (GB) Unlimited Combined
Query Power Basic Advanced Tiered
Cost Hardware Per-query/storage Optimized
Privacy Data stays local Data leaves device Controlled

Section 9: HTTP API and Arduino Integration

9.1 RESTful API Design

For Level 2/3 deployment, create a Flask API on Raspberry Pi:

# sensor_api.py
from flask import Flask, request, jsonify
import sqlite3

app = Flask(__name__)

@app.route('/api/reading', methods=['POST'])
def add_reading():
    data = request.json
    # Insert into database
    # ...
    return jsonify({'status': 'ok', 'id': reading_id})

@app.route('/api/readings/<int:device_id>')
def get_readings(device_id):
    limit = request.args.get('limit', 100)
    # Query database
    # ...
    return jsonify(readings)

9.2 Arduino HTTP Client

#include <SPI.h>
#include <Ethernet.h>

void sendData(float temp, float humidity) {
    if (client.connect(server, 5000)) {
        String json = "{\"device_id\":1,\"temp\":" + String(temp) + 
                      ",\"humidity\":" + String(humidity) + "}";
        
        client.println("POST /api/reading HTTP/1.1");
        client.println("Content-Type: application/json");
        client.print("Content-Length: ");
        client.println(json.length());
        client.println();
        client.println(json);
    }
}

Checkpoint: Self-Assessment

Conceptual Questions

  1. CAP Theorem: Why do IoT systems typically choose AP over CA? What does this mean for data consistency?

  2. Schema Design: Why is a “wide” table format often better than “narrow” for sensor data?

  3. Indexing: What columns would you index for the query: “Get all readings from device X in the last hour”?

  4. Sync Strategy: When would you prefer push-based sync over store-and-forward?

  5. Retention: If you have 100 devices sampling at 1 Hz, how much storage do you need per year for raw data? What about with 10:1 downsampling?

Hands-On Challenges

  1. Add a daily aggregates table and update the retention policy to create daily summaries from hourly data

  2. Implement LTTB downsampling that preserves visual shape of the time series

  3. Add conflict resolution to the sync system for when cloud and edge have different values

  4. Create a database view that automatically selects raw or aggregated data based on query time range


Part of the Edge Analytics Lab Book

Three-Tier Activities

Environment: local Jupyter or Colab, no hardware required.

Suggested workflow:

  1. Use the notebook to create the devices and sensor_readings tables in SQLite.
  2. Insert synthetic data for 1–3 virtual devices using both single inserts and batch inserts.
  3. Practice core query patterns:
    • “latest N readings” per device
    • hourly/daily aggregates
    • simple anomaly filters based on thresholds or Z-scores
  4. Measure and compare the performance of single vs batch inserts (rows per second) using timing code in Python.
  5. Reflect on how your schema and indexes would behave if you scaled from thousands to millions of rows.

Here you build and exercise the ingestion API on a laptop or Raspberry Pi, using simulated clients instead of real hardware.

Baseline (no Docker required):

  1. Create sensors.db (or a MySQL database) and run the sensor_api.py Flask server from the code/notebook.
  2. Use tools like curl, Python requests, or a small load-generator script to send HTTP requests that mimic edge devices:
    • both single-row and batch ingestion
    • realistic rates (e.g., one reading every 30 seconds per device)
  3. Inspect database content with SQL queries and confirm that:
    • schema matches the LaTeX chapter
    • indexes on (device_id, timestamp) are used for your main queries
  4. Optionally wrap the API and database in Docker containers to simulate deployment on a Pi/edge gateway.

Outcome: a working “edge gateway” API that accepts data from simulated devices and stores it reliably.

Now replace the simulated clients with real hardware.

  1. Choose a board:
    • Arduino + Ethernet shield, or
    • ESP32 with WiFi (using HTTP client libraries instead of Ethernet).
  2. Configure the board with:
    • the IP/hostname of your Flask/MySQL/SQLite server (Pi or laptop)
    • a unique DEVICE_ID that exists in the database
  3. Port the example sketch from Chapter 13:
    • read one or more sensors (e.g., DHT11/22 for temperature/humidity)
    • build and send HTTP requests with query parameters or JSON bodies
    • print responses to Serial for debugging
  4. Monitor:
    • database rows as they arrive
    • error handling behaviour when the network or server is unavailable
  5. Reflect on storage strategy:
    • what data stays at the edge (Pi/local DB) vs what goes to the cloud?
    • how would you buffer and replay data during outages?

This lab connects directly to LAB12: the streams you processed there can now be persisted and queried using the ingestion patterns from LAB13.