#!/usr/bin/env python3 """ heartbeat-poc.py — Proof of Concept heartbeat sender Run on Pi 4 to test end-to-end pipeline: Pi → API → WebSocket → Dashboard Usage: python3 heartbeat-poc.py Requirements: pip install requests """ import os import sys import time import json import subprocess import argparse from datetime import datetime # ── Config ────────────────────────────────────────────────────── API_BASE = os.environ.get('API_BASE', 'http://localhost:3001') DEVICE_ID = os.environ.get('DEVICE_ID', 'DEMO-001') # khớp với seed API_KEY = os.environ.get('API_KEY', 'dev-api-key-demo-001') # khớp với seed INTERVAL_SECONDS = int(os.environ.get('INTERVAL_SECONDS', 30)) def get_device_info() -> dict: """Gather device telemetry — stub cho POC.""" try: # Nhiệt độ CPU (Linux) with open('/sys/class/thermal/thermal_zone0/temp') as f: temp_milli = int(f.read().strip()) temp_c = round(temp_milli / 1000, 1) except Exception: temp_c = None try: # Battery (nếu có) result = subprocess.run( ['cat', '/sys/class/power_supply/BAT0/capacity'], capture_output=True, text=True ) battery = int(result.stdout.strip()) if result.returncode == 0 else None except Exception: battery = None try: # Disk free (GB) result = subprocess.run( ['df', '-BG', '--output=avail', '/'], capture_output=True, text=True ) lines = result.stdout.strip().split('\n') if len(lines) >= 2: free_str = lines[1].strip().rstrip('G') storage_free = int(free_str) else: storage_free = 50 except Exception: storage_free = 50 try: # Network status result = subprocess.run( ['ping', '-c1', '-W1', '8.8.8.8'], capture_output=True ) network_status = 'online' if result.returncode == 0 else 'degraded' except Exception: network_status = 'online' return { 'tempC': temp_c, 'batteryPct': battery, 'storageFreeGb': storage_free, 'networkStatus': network_status, 'lastCaptureAt': datetime.utcnow().isoformat(), 'capturesToday': 0, } def send_heartbeat(device_id: str, api_key: str) -> bool: """Gửi heartbeat lên API.""" import requests payload = { 'deviceId': device_id, 'apiKey': api_key, 'status': 'online', 'firmwareVersion': 'poc-1.0.0', **get_device_info(), } url = f'{API_BASE}/v1/devices/{device_id}/heartbeat' try: resp = requests.post(url, json=payload, timeout=10) if resp.ok: data = resp.json() print(f"[{datetime.now().strftime('%H:%M:%S')}] " f"✓ heartbeat sent | pending commands: {data.get('pendingCommands', 0)}") return True else: print(f"[{datetime.now().strftime('%H:%M:%S')}] " f"✗ API error {resp.status_code}: {resp.text[:100]}") return False except Exception as e: print(f"[{datetime.now().strftime('%H:%M:%S')}] ✗ connection error: {e}") return False def main(): parser = argparse.ArgumentParser(description='POC Heartbeat Sender') parser.add_argument('--device-id', default=DEVICE_ID, help='Device ID') parser.add_argument('--api-key', default=API_KEY, help='API Key') parser.add_argument('--interval', type=int, default=INTERVAL_SECONDS, help='Seconds between heartbeats') parser.add_argument('--once', action='store_true', help='Send once and exit') args = parser.parse_args() print(f'=== Heartbeat POC ===') print(f' Device: {args.device_id}') print(f' API: {API_BASE}') print(f' Interval: {args.interval}s') print(f' Dashboard: http://localhost:3000') print() if args.once: send_heartbeat(args.device_id, args.api_key) return print('Press Ctrl+C to stop.\n') while True: send_heartbeat(args.device_id, args.api_key) time.sleep(args.interval) if __name__ == '__main__': main()