'use strict'; /** * Transcode Worker Service * ───────────────────────────────────────────────────── * Standalone Node.js process (runs in its own Docker service). * Polls the database for pending transcode jobs. * * DB-as-queue pattern: * 1. API creates asset → transcodeStatus = PENDING * 2. Worker polls → atomically claims one PENDING job (UPDATE ... WHERE status=PENDING) * 3. Worker processes thumbnail + HLS → updates DB * 4. Repeat * * No external queue needed — uses the existing PostgreSQL database. */ const { PrismaClient } = require('@prisma/client'); const ffmpeg = require('fluent-ffmpeg'); const path = require('path'); const fs = require('fs'); const UPLOAD_DIR = process.env.UPLOAD_DIR || '/app/uploads'; const POLL_INTERVAL_MS = parseInt(process.env.POLL_INTERVAL_MS || '5000', 10); const prisma = new PrismaClient({ datasources: { db: { url: process.env.DATABASE_URL } }, }); /** ── Helpers ──────────────────────────────────────────────────────────────── */ function send(type, data) { const msg = JSON.stringify({ type, ...data, ts: new Date().toISOString() }); process.send && process.send(msg); console.log(`[worker] ${type}`, JSON.stringify(data)); } function sleep(ms) { return new Promise(r => setTimeout(r, ms)); } /** ── Thumbnail ──────────────────────────────────────────────────────────── */ function probeAndThumbnail(videoPath, outputDir) { return new Promise((resolve) => { const videoFilename = path.basename(videoPath, path.extname(videoPath)); const thumbFilename = videoFilename + '_thumb.jpg'; fs.mkdirSync(outputDir, { recursive: true }); ffmpeg.ffprobe(videoPath, (err, metadata) => { const duration = metadata?.format?.duration ?? null; let fps = 30; let codecName = 'unknown'; const videoStream = metadata?.streams?.find(s => s.codec_type === 'video'); if (videoStream) { codecName = videoStream.codec_name || 'unknown'; if (videoStream.r_frame_rate) { const [num, den] = videoStream.r_frame_rate.split('/').map(Number); fps = den ? Math.round(num / den) : num; } } send('metadata', { codec: codecName, duration, fps }); ffmpeg(videoPath) .on('error', () => resolve({ thumbnailPath: null, duration, fps, codec: codecName })) .on('end', () => resolve({ thumbnailPath: thumbFilename, duration, fps, codec: codecName })) .screenshots({ count: 1, folder: outputDir, filename: thumbFilename, size: '320x?', timemarks: ['1'], }); }); }); } /** ── HLS Transcode ─────────────────────────────────────────────────────── */ function transcodeToHLS(videoPath, outputDir, assetId, duration) { return new Promise((resolve, reject) => { const hlsDir = path.join(outputDir, 'hls', assetId); fs.mkdirSync(hlsDir, { recursive: true }); const playlistPath = path.join(hlsDir, 'master.m3u8'); const segmentPattern = path.join(hlsDir, 'segment_%03d.ts'); let lastPct = 0; ffmpeg(videoPath) // ── Re-encode to H.264 + AAC ──────────────────────────────────────── .outputOptions([ '-c:v libx264', '-c:a aac', '-movflags +faststart', '-preset fast', '-crf 23', '-f hls', '-hls_time 2', '-hls_playlist_type vod', '-hls_segment_filename', segmentPattern, '-hls_list_size 0', ]) .output(playlistPath) .on('progress', ({ percent }) => { const pct = Math.round(Math.min(99, percent ?? lastPct)); if (pct > lastPct) { lastPct = pct; send('progress', { progress: pct }); // Also update DB progress periodically prisma.asset.update({ where: { id: assetId }, data: { transcodeProgress: pct, transcodeStatus: 'PROCESSING' }, }).catch(() => {}); // ignore errors } }) .on('error', (err) => reject(new Error('HLS_TRANSCODE_FAILED: ' + err.message))) .on('end', () => resolve('/hls/' + assetId + '/master.m3u8')) .run(); }); } /** ── Process one job ───────────────────────────────────────────────────── */ async function processJob(asset) { const { id: assetId, filePath, transcodePaused } = asset; const videoPath = path.join(UPLOAD_DIR, filePath); // Respect pause flag — skip if user paused before worker picked it up if (transcodePaused) { send('paused', { assetId, reason: 'transcodePaused flag set by user, re-queuing' }); // Keep transcodePaused=true so it won't be re-claimed until user resumes await prisma.asset.update({ where: { id: assetId }, data: { transcodeStatus: 'PENDING' }, }); return; } send('started', { assetId, filePath }); // Check file exists if (!fs.existsSync(videoPath)) { send('error', { assetId, error: 'Video file not found on disk: ' + videoPath }); await prisma.asset.update({ where: { id: assetId }, data: { transcodeStatus: 'FAILED', transcodeError: 'Video file not found on server', transcodeProgress: 0 }, }); return; } try { // Phase 1: thumbnail + probe await prisma.asset.update({ where: { id: assetId }, data: { transcodeStatus: 'PROCESSING', transcodeProgress: 0 } }); const thumbResult = await probeAndThumbnail(videoPath, UPLOAD_DIR); // Check pause flag between phases const check1 = await prisma.asset.findUnique({ where: { id: assetId }, select: { transcodePaused: true } }); if (check1?.transcodePaused) { send('paused', { assetId, reason: 'paused between thumbnail and HLS phases' }); await prisma.asset.update({ where: { id: assetId }, data: { transcodeStatus: 'PENDING', transcodePaused: false } }); return; } // Update DB with metadata await prisma.asset.update({ where: { id: assetId }, data: { thumbnail: thumbResult.thumbnailPath ?? null, codec: thumbResult.codec ?? null, duration: thumbResult.duration ?? null, fps: thumbResult.fps ?? 30, transcodeProgress: 10, }, }); // Phase 2: HLS const hlsPath = await transcodeToHLS(videoPath, UPLOAD_DIR, assetId, thumbResult.duration); // Done! await prisma.asset.update({ where: { id: assetId }, data: { transcodeStatus: 'COMPLETED', transcodeProgress: 100, transcodeError: null, hlsPath, codec: thumbResult.codec ?? null, duration: thumbResult.duration ?? null, fps: thumbResult.fps ?? null, }, }); send('done', { assetId, hlsPath }); } catch (err) { send('error', { assetId, error: err.message }); // Only mark failed if not already deleted or completed by another process try { await prisma.asset.update({ where: { id: assetId }, data: { transcodeStatus: 'FAILED', transcodeError: err.message, transcodeProgress: 0 }, }); } catch {} } } /** ── Claim one job (atomic) ─────────────────────────────────────────────── */ async function claimOneJob() { const result = await prisma.$executeRaw` UPDATE "Asset" SET "transcodeStatus" = 'PROCESSING', "transcodeProgress" = 0, "updatedAt" = NOW() WHERE id = ( SELECT id FROM "Asset" WHERE "transcodeStatus" = 'PENDING' AND "transcodePaused" = false ORDER BY "createdAt" ASC LIMIT 1 FOR UPDATE SKIP LOCKED ) RETURNING id, "filePath", "transcodeStatus", "transcodePaused" `; if (!result || result === 0) return null; // Re-fetch the claimed asset (result doesn't return full row with $executeRaw) return prisma.asset.findFirst({ where: { transcodeStatus: 'PROCESSING' }, orderBy: { updatedAt: 'asc' }, take: 1, }); } /** ── Poll loop (runs on interval AND after every job) ───────────────────── */ async function poll() { try { const claimed = await claimOneJob(); if (!claimed) return; await processJob(claimed); // Immediately poll again — don't wait for the next interval tick // This prevents the 5-second gap between back-to-back jobs poll().catch(err => console.error('[worker] Recursive poll error:', err.message)); } catch (err) { console.error('[worker] Poll error:', err.message); } } /** ── Main ──────────────────────────────────────────────────────────────── */ async function main() { console.log('[worker] Starting transcode worker...'); console.log('[worker] UPLOAD_DIR:', UPLOAD_DIR); console.log('[worker] DATABASE_URL:', process.env.DATABASE_URL ? '(set)' : 'MISSING!'); console.log('[worker] Poll interval:', POLL_INTERVAL_MS, 'ms'); // Make sure upload dir exists fs.mkdirSync(UPLOAD_DIR, { recursive: true }); send('ready', { UPLOAD_DIR, POLL_INTERVAL_MS }); // Process any stale PROCESSING jobs (worker crashed mid-job) on startup await recoverStaleJobs(); // Main poll loop setInterval(poll, POLL_INTERVAL_MS); } /** Recover stale jobs — assets stuck in PROCESSING from a crashed worker */ async function recoverStaleJobs() { try { const stale = await prisma.asset.findMany({ where: { transcodeStatus: 'PROCESSING', transcodePaused: false }, select: { id: true }, }); if (stale.length > 0) { console.log(`[worker] Recovering ${stale.length} stale job(s)...`); await prisma.asset.updateMany({ where: { id: { in: stale.map(s => s.id) } }, data: { transcodeStatus: 'PENDING', transcodeProgress: 0 }, }); } } catch (err) { console.warn('[worker] recoverStaleJobs error:', err.message); } } main().catch(err => { console.error('[worker] Fatal error:', err); process.exit(1); }); // Graceful shutdown process.on('SIGTERM', async () => { console.log('[worker] SIGTERM received, shutting down...'); await prisma.$disconnect(); process.exit(0); }); process.on('SIGINT', async () => { console.log('[worker] SIGINT received, shutting down...'); await prisma.$disconnect(); process.exit(0); });