Explorar el Código

feat: realtime WebSocket — Socket.io gateway + dashboard integration

Backend:
- RealtimeGateway (Socket.io @ /realtime namespace)
- JWT auth on connect
- Room subscriptions: project:X, org:X
- Emit: device.heartbeat, device.status.changed, capture.uploaded, alert.opened
- DevicesService emits events after heartbeat write

Frontend:
- useSocket hook with event subscription API
- Dashboard home auto-refetch on WS heartbeat/status change
- Live indicator badge in header
kingkong hace 2 meses
padre
commit
07180f7916

+ 2 - 0
apps/api-server/src/modules/devices/devices.module.ts

@@ -2,8 +2,10 @@ import { Module } from '@nestjs/common'
 import { DevicesController } from './devices.controller'
 import { DevicesService } from './devices.service'
 import { DevicesRepository } from './devices.repository'
+import { RealtimeModule } from '../../realtime/realtime.module'
 
 @Module({
+  imports: [RealtimeModule],
   controllers: [DevicesController],
   providers: [DevicesService, DevicesRepository],
   exports: [DevicesService],

+ 29 - 12
apps/api-server/src/modules/devices/devices.service.ts

@@ -1,11 +1,15 @@
-import { Injectable, UnauthorizedException, NotFoundException } from '@nestjs/common'
+import { Injectable, NotFoundException, UnauthorizedException } from '@nestjs/common'
 import { DevicesRepository } from './devices.repository'
 import { DeviceStatus } from '@shared/types'
 import { HeartbeatDto } from './dto/heartbeat.dto'
+import { RealtimeGateway } from '../../realtime/gateways/realtime.gateway'
 
 @Injectable()
 export class DevicesService {
-  constructor(private readonly repo: DevicesRepository) {}
+  constructor(
+    private readonly repo: DevicesRepository,
+    private readonly realtime: RealtimeGateway,
+  ) {}
 
   async createHeartbeat(dto: HeartbeatDto) {
     const device = await this.repo.findDeviceById(dto.deviceId)
@@ -13,12 +17,12 @@ export class DevicesService {
       throw new NotFoundException(`Device ${dto.deviceId} not found`)
     }
 
-    // API key check (Phase 1: simple compare — replace with hash in production)
     if (device.apiKeyHash !== dto.apiKey) {
       throw new UnauthorizedException('Invalid API key')
     }
 
     const now = new Date()
+    const previousStatus = device.status
 
     await Promise.all([
       this.repo.insertHeartbeat({
@@ -35,6 +39,25 @@ export class DevicesService {
 
     const pendingCommands = await this.repo.getPendingCommandCount(dto.deviceId)
 
+    // Emit real-time events
+    this.realtime.emitDeviceHeartbeat(
+      dto.deviceId,
+      device.projectId,
+      device.orgId,
+      dto.status as DeviceStatus,
+      dto.storageFreeGb,
+    )
+
+    // Emit status change if status actually changed
+    if (previousStatus !== dto.status) {
+      this.realtime.emitDeviceStatusChanged(
+        dto.deviceId,
+        device.projectId,
+        dto.status as DeviceStatus,
+        previousStatus as DeviceStatus,
+      )
+    }
+
     return {
       success: true,
       serverTime: now.toISOString(),
@@ -43,25 +66,19 @@ export class DevicesService {
   }
 
   async getDevices(projectId?: string) {
-    if (projectId) {
-      return this.repo.findDevicesByProjectId(projectId)
-    }
+    if (projectId) return this.repo.findDevicesByProjectId(projectId)
     return this.repo.findAllDevices()
   }
 
   async getDeviceById(id: string) {
     const device = await this.repo.findDeviceById(id)
-    if (!device) {
-      throw new NotFoundException(`Device ${id} not found`)
-    }
+    if (!device) throw new NotFoundException(`Device ${id} not found`)
     return device
   }
 
   async getDeviceHeartbeats(deviceId: string, limit = 10) {
     const device = await this.repo.findDeviceById(deviceId)
-    if (!device) {
-      throw new NotFoundException(`Device ${deviceId} not found`)
-    }
+    if (!device) throw new NotFoundException(`Device ${deviceId} not found`)
     return this.repo.getRecentHeartbeats(deviceId, limit)
   }
 

+ 123 - 0
apps/api-server/src/realtime/gateways/realtime.gateway.ts

@@ -0,0 +1,123 @@
+import {
+  WebSocketGateway,
+  WebSocketServer,
+  SubscribeMessage,
+  OnGatewayConnection,
+  OnGatewayDisconnect,
+  ConnectedSocket,
+} from '@nestjs/websockets'
+import { Server, Socket } from 'socket.io'
+import { JwtService } from '@nestjs/jwt'
+import { DeviceStatus } from '@shared/types'
+
+interface AuthenticatedSocket extends Socket {
+  userId?: string
+  orgId?: string
+}
+
+@WebSocketGateway({
+  cors: {
+    origin: process.env['CORS_ORIGIN'] || 'http://localhost:3000',
+    credentials: true,
+  },
+  namespace: '/realtime',
+})
+export class RealtimeGateway implements OnGatewayConnection, OnGatewayDisconnect {
+  @WebSocketServer()
+  server!: Server
+
+  // Track connected clients: socketId → { userId, orgId, subscribedDevices }
+  private clients = new Map<string, { userId: string; orgId?: string }>()
+
+  constructor(private readonly jwt: JwtService) {}
+
+  async handleConnection(client: AuthenticatedSocket) {
+    try {
+      const token = client.handshake.auth?.token ?? client.handshake.headers?.authorization?.replace('Bearer ', '')
+      if (!token) {
+        client.disconnect()
+        return
+      }
+
+      const payload = this.jwt.verify(token as string, {
+        secret: process.env['JWT_SECRET'],
+      })
+      client.userId = payload.sub
+
+      this.clients.set(client.id, { userId: payload.sub })
+      console.log(`[WS] Client connected: ${client.id} (user: ${payload.sub})`)
+    } catch {
+      client.disconnect()
+    }
+  }
+
+  handleDisconnect(client: AuthenticatedSocket) {
+    this.clients.delete(client.id)
+    console.log(`[WS] Client disconnected: ${client.id}`)
+  }
+
+  @SubscribeMessage('subscribe:project')
+  handleSubscribeProject(client: AuthenticatedSocket, projectId: string) {
+    client.join(`project:${projectId}`)
+    console.log(`[WS] Client ${client.id} joined project:${projectId}`)
+  }
+
+  @SubscribeMessage('unsubscribe:project')
+  handleUnsubscribeProject(client: AuthenticatedSocket, projectId: string) {
+    client.leave(`project:${projectId}`)
+  }
+
+  // ── Emit helpers (called from services) ────────────────────────
+
+  emitDeviceHeartbeat(deviceId: string, projectId: string, orgId: string, status: DeviceStatus, storageFreeGb: number) {
+    this.server.to(`project:${projectId}`).emit('device.heartbeat', {
+      deviceId,
+      status,
+      storageFreeGb,
+      ts: new Date().toISOString(),
+    })
+    // Also emit to org room
+    this.server.to(`org:${orgId}`).emit('device.heartbeat', {
+      deviceId,
+      status,
+      storageFreeGb,
+      ts: new Date().toISOString(),
+    })
+  }
+
+  emitDeviceStatusChanged(deviceId: string, projectId: string, status: DeviceStatus, previousStatus: DeviceStatus) {
+    this.server.to(`project:${projectId}`).emit('device.status.changed', {
+      deviceId,
+      status,
+      previousStatus,
+      ts: new Date().toISOString(),
+    })
+  }
+
+  emitCaptureUploaded(deviceId: string, projectId: string, captureId: string) {
+    this.server.to(`project:${projectId}`).emit('capture.uploaded', {
+      captureId,
+      deviceId,
+      ts: new Date().toISOString(),
+    })
+  }
+
+  emitAlert(deviceId: string | null, projectId: string | null, alertId: string, type: string, severity: string, message: string) {
+    if (projectId) {
+      this.server.to(`project:${projectId}`).emit('alert.opened', {
+        alertId,
+        deviceId,
+        type,
+        severity,
+        message,
+        ts: new Date().toISOString(),
+      })
+    }
+  }
+
+  joinOrg(client: AuthenticatedSocket, orgId: string) {
+    client.join(`org:${orgId}`)
+    const current = this.clients.get(client.id) ?? { userId: '' }
+    this.clients.set(client.id, { ...current, orgId })
+  }
+}

+ 10 - 2
apps/api-server/src/realtime/realtime.module.ts

@@ -1,7 +1,15 @@
 import { Module } from '@nestjs/common'
+import { JwtModule } from '@nestjs/jwt'
+import { RealtimeGateway } from './gateways/realtime.gateway'
 
 @Module({
-  providers: [],
-  exports: [],
+  imports: [
+    JwtModule.register({
+      secret: process.env['JWT_SECRET'] || 'dev-secret',
+      signOptions: { expiresIn: '15m' },
+    }),
+  ],
+  providers: [RealtimeGateway],
+  exports: [RealtimeGateway],
 })
 export class RealtimeModule {}

+ 4 - 3
apps/web-dashboard/package.json

@@ -10,12 +10,13 @@
     "typecheck": "tsc --noEmit"
   },
   "dependencies": {
+    "@shared/types": "workspace:*",
+    "@tanstack/react-query": "^5.17.0",
     "next": "^14.1.0",
     "react": "^18.2.0",
     "react-dom": "^18.2.0",
-    "@tanstack/react-query": "^5.17.0",
-    "zustand": "^4.4.7",
-    "@shared/types": "workspace:*"
+    "socket.io-client": "^4.8.3",
+    "zustand": "^4.4.7"
   },
   "devDependencies": {
     "@types/node": "^20.11.0",

+ 67 - 0
apps/web-dashboard/src/hooks/useSocket.ts

@@ -0,0 +1,67 @@
+import { useEffect, useRef, useCallback } from 'react'
+import { io } from 'socket.io-client'
+import { getAccessToken } from '../lib/auth'
+
+const SOCKET_URL = process.env['NEXT_PUBLIC_API_URL'] || 'http://localhost:3001'
+
+// eslint-disable-next-line @typescript-eslint/no-explicit-any
+type EventCallback = (data: any) => void
+
+export function useSocket(projectIds: string[]) {
+  const socketRef = useRef<ReturnType<typeof io> | null>(null)
+  const listenersRef = useRef<Map<string, Set<EventCallback>>>(new Map())
+
+  useEffect(() => {
+    const token = getAccessToken()
+    if (!token) return
+
+    const socket = io(`${SOCKET_URL}/realtime`, {
+      auth: { token },
+      transports: ['websocket', 'polling'],
+      reconnection: true,
+      reconnectionAttempts: 5,
+      reconnectionDelay: 1000,
+    })
+
+    socketRef.current = socket
+
+    socket.on('connect', () => {
+      console.log('[WS] Connected:', socket.id)
+      projectIds.forEach((pid) => socket.emit('subscribe:project', pid))
+    })
+
+    socket.on('disconnect', (reason) => {
+      console.log('[WS] Disconnected:', reason)
+    })
+
+    socket.on('connect_error', (err: Error) => {
+      console.error('[WS] Connection error:', err.message)
+    })
+
+    // Re-dispatch events to registered listeners
+    const events = ['device.heartbeat', 'device.status.changed', 'capture.uploaded', 'alert.opened']
+    events.forEach((event) => {
+      socket.on(event, (data: any) => {
+        listenersRef.current.get(event)?.forEach((fn) => fn(data))
+      })
+    })
+
+    return () => {
+      socket.disconnect()
+      socketRef.current = null
+    }
+  // eslint-disable-next-line react-hooks/exhaustive-deps
+  }, [projectIds.join(',')])
+
+  const subscribe = useCallback((event: string, callback: EventCallback) => {
+    if (!listenersRef.current.has(event)) {
+      listenersRef.current.set(event, new Set())
+    }
+    listenersRef.current.get(event)!.add(callback)
+    return () => {
+      listenersRef.current.get(event)?.delete(callback)
+    }
+  }, [])
+
+  return { subscribe }
+}

+ 38 - 6
apps/web-dashboard/src/pages/index.tsx

@@ -1,22 +1,47 @@
 import Head from 'next/head'
 import Link from 'next/link'
 import { useRouter } from 'next/router'
-import { useEffect, useState } from 'react'
+import { useEffect, useState, useCallback } from 'react'
 import { useDashboardStats } from '../hooks/useDashboardStats'
 import { useDevices } from '../hooks/useDevices'
 import { StatsCard } from '../components/StatsCard'
 import { DeviceCard } from '../components/DeviceCard'
 import { ProtectedRoute } from '../components/ProtectedRoute'
 import { getStoredUser, authApi } from '../lib/auth'
+import { useSocket } from '../hooks/useSocket'
 import type { AuthUser } from '../lib/auth'
 
 function DashboardContent() {
   const router = useRouter()
-  const { data: stats, isLoading: statsLoading } = useDashboardStats()
-  const { data: devices = [], isLoading: devicesLoading } = useDevices()
+  const { data: stats, isLoading: statsLoading, refetch: refetchStats } = useDashboardStats()
+  const { data: devices = [], isLoading: devicesLoading, refetch: refetchDevices } = useDevices()
   const [user, setUser] = useState<AuthUser | null>(null)
 
-  useEffect(() => { setUser(getStoredUser()) }, [])
+  // Collect unique project IDs from devices for WS subscription
+  const projectIds = [...new Set(devices.map((d: any) => d.projectId).filter(Boolean))] as string[]
+
+  // Subscribe to realtime events
+  const onHeartbeat = useCallback((data: { deviceId: string; status: string; storageFreeGb: number }) => {
+    refetchDevices()
+    refetchStats()
+  }, [refetchDevices, refetchStats])
+
+  const onStatusChanged = useCallback((data: { deviceId: string; status: string }) => {
+    refetchDevices()
+    refetchStats()
+  }, [refetchDevices, refetchStats])
+
+  const { subscribe } = useSocket(projectIds)
+
+  useEffect(() => {
+    const unsubHb = subscribe('device.heartbeat', onHeartbeat)
+    const unsubSc = subscribe('device.status.changed', onStatusChanged)
+    return () => { unsubHb(); unsubSc() }
+  }, [subscribe, onHeartbeat, onStatusChanged])
+
+  useEffect(() => {
+    setUser(getStoredUser())
+  }, [])
 
   async function handleLogout() {
     try { await authApi.logout() } catch {}
@@ -33,7 +58,14 @@ function DashboardContent() {
         {/* Header */}
         <header className="bg-white border-b border-gray-200 px-6 py-3">
           <div className="flex items-center justify-between">
-            <h1 className="text-lg font-bold text-gray-900">Construction Timelapse</h1>
+            <div className="flex items-center gap-3">
+              <h1 className="text-lg font-bold text-gray-900">Construction Timelapse</h1>
+              {/* Live indicator */}
+              <span className="flex items-center gap-1.5 rounded-full bg-emerald-50 px-2 py-0.5 text-xs font-medium text-emerald-600">
+                <span className="h-1.5 w-1.5 rounded-full bg-emerald-500" />
+                Live
+              </span>
+            </div>
             <div className="flex items-center gap-4">
               <nav className="flex gap-4 text-sm text-gray-500">
                 <Link href="/" className="text-blue-600 font-medium">Dashboard</Link>
@@ -98,7 +130,7 @@ function DashboardContent() {
             </div>
           ) : (
             <div className="grid grid-cols-1 md:grid-cols-2 lg:grid-cols-3 gap-4">
-              {devices.slice(0, 6).map((d) => (
+              {devices.slice(0, 6).map((d: any) => (
                 <DeviceCard
                   key={d.id}
                   id={d.id}

+ 7 - 0
bun.lock

@@ -61,6 +61,7 @@
         "next": "^14.1.0",
         "react": "^18.2.0",
         "react-dom": "^18.2.0",
+        "socket.io-client": "^4.8.3",
         "zustand": "^4.4.7",
       },
       "devDependencies": {
@@ -730,6 +731,8 @@
 
     "engine.io": ["engine.io@6.6.6", "", { "dependencies": { "@types/cors": "^2.8.12", "@types/node": ">=10.0.0", "@types/ws": "^8.5.12", "accepts": "~1.3.4", "base64id": "2.0.0", "cookie": "~0.7.2", "cors": "~2.8.5", "debug": "~4.4.1", "engine.io-parser": "~5.2.1", "ws": "~8.18.3" } }, "sha512-U2SN0w3OpjFRVlrc17E6TMDmH58Xl9rai1MblNjAdwWp07Kk+llmzX0hjDpQdrDGzwmvOtgM5yI+meYX6iZ2xA=="],
 
+    "engine.io-client": ["engine.io-client@6.6.4", "", { "dependencies": { "@socket.io/component-emitter": "~3.1.0", "debug": "~4.4.1", "engine.io-parser": "~5.2.1", "ws": "~8.18.3", "xmlhttprequest-ssl": "~2.1.1" } }, "sha512-+kjUJnZGwzewFDw951CDWcwj35vMNf2fcj7xQWOctq1F2i1jkDdVvdFG9kM/BEChymCH36KgjnW0NsL58JYRxw=="],
+
     "engine.io-parser": ["engine.io-parser@5.2.3", "", {}, "sha512-HqD3yTBfnBxIrbnM1DoD6Pcq8NECnh8d4As1Qgh0z5Gg3jRRIqijury0CL3ghu/edArpUYiYqQiDUQBIs4np3Q=="],
 
     "env-paths": ["env-paths@3.0.0", "", {}, "sha512-dtJUTepzMW3Lm/NPxRf3wP4642UWhjL2sQxc+ym2YMj1m/H2zDNQOlezafzkHwn6sMstjHTwG6iQQsctDW/b1A=="],
@@ -1198,6 +1201,8 @@
 
     "socket.io-adapter": ["socket.io-adapter@2.5.6", "", { "dependencies": { "debug": "~4.4.1", "ws": "~8.18.3" } }, "sha512-DkkO/dz7MGln0dHn5bmN3pPy+JmywNICWrJqVWiVOyvXjWQFIv9c2h24JrQLLFJ2aQVQf/Cvl1vblnd4r2apLQ=="],
 
+    "socket.io-client": ["socket.io-client@4.8.3", "", { "dependencies": { "@socket.io/component-emitter": "~3.1.0", "debug": "~4.4.1", "engine.io-client": "~6.6.1", "socket.io-parser": "~4.2.4" } }, "sha512-uP0bpjWrjQmUt5DTHq9RuoCBdFJF10cdX9X+a368j/Ft0wmaVgxlrjvK3kjvgCODOMMOz9lcaRzxmso0bTWZ/g=="],
+
     "socket.io-parser": ["socket.io-parser@4.2.6", "", { "dependencies": { "@socket.io/component-emitter": "~3.1.0", "debug": "~4.4.1" } }, "sha512-asJqbVBDsBCJx0pTqw3WfesSY0iRX+2xzWEWzrpcH7L6fLzrhyF8WPI8UaeM4YCuDfpwA/cgsdugMsmtz8EJeg=="],
 
     "source-map": ["source-map@0.6.1", "", {}, "sha512-UjgapumWlbMhkBgzT7Ykc5YXUT46F0iKu8SGXq0bcwP5dz/h0Plj6enJqjz1Zbq2l5WaqYnrVbwWOWMyF3F47g=="],
@@ -1320,6 +1325,8 @@
 
     "ws": ["ws@8.18.3", "", { "peerDependencies": { "bufferutil": "^4.0.1", "utf-8-validate": ">=5.0.2" }, "optionalPeers": ["bufferutil", "utf-8-validate"] }, "sha512-PEIGCY5tSlUt50cqyMXfCzX+oOPqN0vuGqWzbcJ2xvnkzkq46oOpz7dQaTDBdfICb4N14+GARUDw2XV2N4tvzg=="],
 
+    "xmlhttprequest-ssl": ["xmlhttprequest-ssl@2.1.2", "", {}, "sha512-TEU+nJVUUnA4CYJFLvK5X9AOeH4KvDvhIfm0vV1GaQRtchnG0hgK5p8hw/xjv8cunWYCsiPCSDzObPyhEwq3KQ=="],
+
     "xtend": ["xtend@4.0.2", "", {}, "sha512-LKYU1iAXJXUgAXn9URjiu+MWhyUXHsvfp7mcuYm9dSUKK0/CjtrUwFAxD82/mCWbtLsGjFIad0wIsod4zrTAEQ=="],
 
     "yallist": ["yallist@4.0.0", "", {}, "sha512-3wdGidZyq5PB084XLES5TpOSRA3wjXAlIWMhum2kRcv/41Sn2emQ0dycQW4uZXLejwKvg6EsvbdlVL+FYEct7A=="],

+ 6 - 0
memory.md

@@ -117,6 +117,12 @@ docker compose up -d --build api-server web-dashboard worker
 
 ---
 
+## Realtime WebSocket — ✅ ĐÃ LÀM
+- Socket.io gateway tại `src/realtime/gateways/realtime.gateway.ts`
+- Events: `device.heartbeat`, `device.status.changed`, `capture.uploaded`, `alert.opened`
+- Dashboard tự refetch khi nhận WS event (useSocket hook)
+- Client subscribe theo project room: `subscribe:project`
+
 ## TODO — Phase tiếp theo
 1. **DB migration**: drizzle-kit push + seed data (trên Pi)
 2. **Role-based access control nâng cao** (org_admin/project_manager/viewer)