| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141 |
- #!/usr/bin/env python3
- """
- heartbeat-poc.py — Proof of Concept heartbeat sender
- Run on Pi 4 to test end-to-end pipeline:
- Pi → API → WebSocket → Dashboard
- Usage:
- python3 heartbeat-poc.py
- Requirements:
- pip install requests
- """
- import os
- import sys
- import time
- import json
- import subprocess
- import argparse
- from datetime import datetime
- # ── Config ──────────────────────────────────────────────────────
- API_BASE = os.environ.get('API_BASE', 'http://localhost:3001')
- DEVICE_ID = os.environ.get('DEVICE_ID', 'DEMO-001') # khớp với seed
- API_KEY = os.environ.get('API_KEY', 'dev-api-key-demo-001') # khớp với seed
- INTERVAL_SECONDS = int(os.environ.get('INTERVAL_SECONDS', 30))
- def get_device_info() -> dict:
- """Gather device telemetry — stub cho POC."""
- try:
- # Nhiệt độ CPU (Linux)
- with open('/sys/class/thermal/thermal_zone0/temp') as f:
- temp_milli = int(f.read().strip())
- temp_c = round(temp_milli / 1000, 1)
- except Exception:
- temp_c = None
- try:
- # Battery (nếu có)
- result = subprocess.run(
- ['cat', '/sys/class/power_supply/BAT0/capacity'],
- capture_output=True, text=True
- )
- battery = int(result.stdout.strip()) if result.returncode == 0 else None
- except Exception:
- battery = None
- try:
- # Disk free (GB)
- result = subprocess.run(
- ['df', '-BG', '--output=avail', '/'],
- capture_output=True, text=True
- )
- lines = result.stdout.strip().split('\n')
- if len(lines) >= 2:
- free_str = lines[1].strip().rstrip('G')
- storage_free = int(free_str)
- else:
- storage_free = 50
- except Exception:
- storage_free = 50
- try:
- # Network status
- result = subprocess.run(
- ['ping', '-c1', '-W1', '8.8.8.8'],
- capture_output=True
- )
- network_status = 'online' if result.returncode == 0 else 'degraded'
- except Exception:
- network_status = 'online'
- return {
- 'tempC': temp_c,
- 'batteryPct': battery,
- 'storageFreeGb': storage_free,
- 'networkStatus': network_status,
- 'lastCaptureAt': datetime.utcnow().isoformat(),
- 'capturesToday': 0,
- }
- def send_heartbeat(device_id: str, api_key: str) -> bool:
- """Gửi heartbeat lên API."""
- import requests
- payload = {
- 'deviceId': device_id,
- 'apiKey': api_key,
- 'status': 'online',
- 'firmwareVersion': 'poc-1.0.0',
- **get_device_info(),
- }
- url = f'{API_BASE}/v1/devices/{device_id}/heartbeat'
- try:
- resp = requests.post(url, json=payload, timeout=10)
- if resp.ok:
- data = resp.json()
- print(f"[{datetime.now().strftime('%H:%M:%S')}] "
- f"✓ heartbeat sent | pending commands: {data.get('pendingCommands', 0)}")
- return True
- else:
- print(f"[{datetime.now().strftime('%H:%M:%S')}] "
- f"✗ API error {resp.status_code}: {resp.text[:100]}")
- return False
- except Exception as e:
- print(f"[{datetime.now().strftime('%H:%M:%S')}] ✗ connection error: {e}")
- return False
- def main():
- parser = argparse.ArgumentParser(description='POC Heartbeat Sender')
- parser.add_argument('--device-id', default=DEVICE_ID, help='Device ID')
- parser.add_argument('--api-key', default=API_KEY, help='API Key')
- parser.add_argument('--interval', type=int, default=INTERVAL_SECONDS, help='Seconds between heartbeats')
- parser.add_argument('--once', action='store_true', help='Send once and exit')
- args = parser.parse_args()
- print(f'=== Heartbeat POC ===')
- print(f' Device: {args.device_id}')
- print(f' API: {API_BASE}')
- print(f' Interval: {args.interval}s')
- print(f' Dashboard: http://localhost:3000')
- print()
- if args.once:
- send_heartbeat(args.device_id, args.api_key)
- return
- print('Press Ctrl+C to stop.\n')
- while True:
- send_heartbeat(args.device_id, args.api_key)
- time.sleep(args.interval)
- if __name__ == '__main__':
- main()
|