| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299 |
- 'use strict';
- /**
- * Transcode Worker Service
- * ─────────────────────────────────────────────────────
- * Standalone Node.js process (runs in its own Docker service).
- * Polls the database for pending transcode jobs.
- *
- * DB-as-queue pattern:
- * 1. API creates asset → transcodeStatus = PENDING
- * 2. Worker polls → atomically claims one PENDING job (UPDATE ... WHERE status=PENDING)
- * 3. Worker processes thumbnail + HLS → updates DB
- * 4. Repeat
- *
- * No external queue needed — uses the existing PostgreSQL database.
- */
- const { PrismaClient } = require('@prisma/client');
- const ffmpeg = require('fluent-ffmpeg');
- const path = require('path');
- const fs = require('fs');
- const UPLOAD_DIR = process.env.UPLOAD_DIR || '/app/uploads';
- const POLL_INTERVAL_MS = parseInt(process.env.POLL_INTERVAL_MS || '5000', 10);
- const prisma = new PrismaClient({
- datasources: { db: { url: process.env.DATABASE_URL } },
- });
- /** ── Helpers ──────────────────────────────────────────────────────────────── */
- function send(type, data) {
- const msg = JSON.stringify({ type, ...data, ts: new Date().toISOString() });
- process.send && process.send(msg);
- console.log(`[worker] ${type}`, JSON.stringify(data));
- }
- function sleep(ms) {
- return new Promise(r => setTimeout(r, ms));
- }
- /** ── Thumbnail ──────────────────────────────────────────────────────────── */
- function probeAndThumbnail(videoPath, outputDir) {
- return new Promise((resolve) => {
- const videoFilename = path.basename(videoPath, path.extname(videoPath));
- const thumbFilename = videoFilename + '_thumb.jpg';
- fs.mkdirSync(outputDir, { recursive: true });
- ffmpeg.ffprobe(videoPath, (err, metadata) => {
- const duration = metadata?.format?.duration ?? null;
- let fps = 30;
- let codecName = 'unknown';
- const videoStream = metadata?.streams?.find(s => s.codec_type === 'video');
- if (videoStream) {
- codecName = videoStream.codec_name || 'unknown';
- if (videoStream.r_frame_rate) {
- const [num, den] = videoStream.r_frame_rate.split('/').map(Number);
- fps = den ? Math.round(num / den) : num;
- }
- }
- send('metadata', { codec: codecName, duration, fps });
- ffmpeg(videoPath)
- .on('error', () => resolve({ thumbnailPath: null, duration, fps, codec: codecName }))
- .on('end', () => resolve({ thumbnailPath: thumbFilename, duration, fps, codec: codecName }))
- .screenshots({
- count: 1,
- folder: outputDir,
- filename: thumbFilename,
- size: '320x?',
- timemarks: ['1'],
- });
- });
- });
- }
- /** ── HLS Transcode ─────────────────────────────────────────────────────── */
- function transcodeToHLS(videoPath, outputDir, assetId, duration) {
- return new Promise((resolve, reject) => {
- const hlsDir = path.join(outputDir, 'hls', assetId);
- fs.mkdirSync(hlsDir, { recursive: true });
- const playlistPath = path.join(hlsDir, 'master.m3u8');
- const segmentPattern = path.join(hlsDir, 'segment_%03d.ts');
- let lastPct = 0;
- ffmpeg(videoPath)
- // ── Re-encode to H.264 + AAC ────────────────────────────────────────
- .outputOptions([
- '-c:v libx264',
- '-c:a aac',
- '-movflags +faststart',
- '-preset fast',
- '-crf 23',
- '-f hls',
- '-hls_time 2',
- '-hls_playlist_type vod',
- '-hls_segment_filename', segmentPattern,
- '-hls_list_size 0',
- ])
- .output(playlistPath)
- .on('progress', ({ percent }) => {
- const pct = Math.round(Math.min(99, percent ?? lastPct));
- if (pct > lastPct) {
- lastPct = pct;
- send('progress', { progress: pct });
- // Also update DB progress periodically
- prisma.asset.update({
- where: { id: assetId },
- data: { transcodeProgress: pct, transcodeStatus: 'PROCESSING' },
- }).catch(() => {}); // ignore errors
- }
- })
- .on('error', (err) => reject(new Error('HLS_TRANSCODE_FAILED: ' + err.message)))
- .on('end', () => resolve('/hls/' + assetId + '/master.m3u8'))
- .run();
- });
- }
- /** ── Process one job ───────────────────────────────────────────────────── */
- async function processJob(asset) {
- const { id: assetId, filePath, transcodePaused } = asset;
- const videoPath = path.join(UPLOAD_DIR, filePath);
- // Respect pause flag — skip if user paused before worker picked it up
- if (transcodePaused) {
- send('paused', { assetId, reason: 'transcodePaused flag set by user, re-queuing' });
- // Keep transcodePaused=true so it won't be re-claimed until user resumes
- await prisma.asset.update({
- where: { id: assetId },
- data: { transcodeStatus: 'PENDING' },
- });
- return;
- }
- send('started', { assetId, filePath });
- // Check file exists
- if (!fs.existsSync(videoPath)) {
- send('error', { assetId, error: 'Video file not found on disk: ' + videoPath });
- await prisma.asset.update({
- where: { id: assetId },
- data: { transcodeStatus: 'FAILED', transcodeError: 'Video file not found on server', transcodeProgress: 0 },
- });
- return;
- }
- try {
- // Phase 1: thumbnail + probe
- await prisma.asset.update({ where: { id: assetId }, data: { transcodeStatus: 'PROCESSING', transcodeProgress: 0 } });
- const thumbResult = await probeAndThumbnail(videoPath, UPLOAD_DIR);
- // Check pause flag between phases
- const check1 = await prisma.asset.findUnique({ where: { id: assetId }, select: { transcodePaused: true } });
- if (check1?.transcodePaused) {
- send('paused', { assetId, reason: 'paused between thumbnail and HLS phases' });
- await prisma.asset.update({ where: { id: assetId }, data: { transcodeStatus: 'PENDING', transcodePaused: false } });
- return;
- }
- // Update DB with metadata
- await prisma.asset.update({
- where: { id: assetId },
- data: {
- thumbnail: thumbResult.thumbnailPath ?? null,
- codec: thumbResult.codec ?? null,
- duration: thumbResult.duration ?? null,
- fps: thumbResult.fps ?? 30,
- transcodeProgress: 10,
- },
- });
- // Phase 2: HLS
- const hlsPath = await transcodeToHLS(videoPath, UPLOAD_DIR, assetId, thumbResult.duration);
- // Done!
- await prisma.asset.update({
- where: { id: assetId },
- data: {
- transcodeStatus: 'COMPLETED',
- transcodeProgress: 100,
- transcodeError: null,
- hlsPath,
- codec: thumbResult.codec ?? null,
- duration: thumbResult.duration ?? null,
- fps: thumbResult.fps ?? null,
- },
- });
- send('done', { assetId, hlsPath });
- } catch (err) {
- send('error', { assetId, error: err.message });
- // Only mark failed if not already deleted or completed by another process
- try {
- await prisma.asset.update({
- where: { id: assetId },
- data: { transcodeStatus: 'FAILED', transcodeError: err.message, transcodeProgress: 0 },
- });
- } catch {}
- }
- }
- /** ── Claim one job (atomic) ─────────────────────────────────────────────── */
- async function claimOneJob() {
- const result = await prisma.$executeRaw`
- UPDATE "Asset"
- SET "transcodeStatus" = 'PROCESSING',
- "transcodeProgress" = 0,
- "updatedAt" = NOW()
- WHERE id = (
- SELECT id FROM "Asset"
- WHERE "transcodeStatus" = 'PENDING'
- AND "transcodePaused" = false
- ORDER BY "createdAt" ASC
- LIMIT 1
- FOR UPDATE SKIP LOCKED
- )
- RETURNING id, "filePath", "transcodeStatus", "transcodePaused"
- `;
- if (!result || result === 0) return null;
- // Re-fetch the claimed asset (result doesn't return full row with $executeRaw)
- return prisma.asset.findFirst({
- where: { transcodeStatus: 'PROCESSING' },
- orderBy: { updatedAt: 'asc' },
- take: 1,
- });
- }
- /** ── Poll loop (runs on interval AND after every job) ───────────────────── */
- async function poll() {
- try {
- const claimed = await claimOneJob();
- if (!claimed) return;
- await processJob(claimed);
- // Immediately poll again — don't wait for the next interval tick
- // This prevents the 5-second gap between back-to-back jobs
- poll().catch(err => console.error('[worker] Recursive poll error:', err.message));
- } catch (err) {
- console.error('[worker] Poll error:', err.message);
- }
- }
- /** ── Main ──────────────────────────────────────────────────────────────── */
- async function main() {
- console.log('[worker] Starting transcode worker...');
- console.log('[worker] UPLOAD_DIR:', UPLOAD_DIR);
- console.log('[worker] DATABASE_URL:', process.env.DATABASE_URL ? '(set)' : 'MISSING!');
- console.log('[worker] Poll interval:', POLL_INTERVAL_MS, 'ms');
- // Make sure upload dir exists
- fs.mkdirSync(UPLOAD_DIR, { recursive: true });
- send('ready', { UPLOAD_DIR, POLL_INTERVAL_MS });
- // Process any stale PROCESSING jobs (worker crashed mid-job) on startup
- await recoverStaleJobs();
- // Main poll loop
- setInterval(poll, POLL_INTERVAL_MS);
- }
- /** Recover stale jobs — assets stuck in PROCESSING from a crashed worker */
- async function recoverStaleJobs() {
- try {
- const stale = await prisma.asset.findMany({
- where: { transcodeStatus: 'PROCESSING', transcodePaused: false },
- select: { id: true },
- });
- if (stale.length > 0) {
- console.log(`[worker] Recovering ${stale.length} stale job(s)...`);
- await prisma.asset.updateMany({
- where: { id: { in: stale.map(s => s.id) } },
- data: { transcodeStatus: 'PENDING', transcodeProgress: 0 },
- });
- }
- } catch (err) {
- console.warn('[worker] recoverStaleJobs error:', err.message);
- }
- }
- main().catch(err => {
- console.error('[worker] Fatal error:', err);
- process.exit(1);
- });
- // Graceful shutdown
- process.on('SIGTERM', async () => {
- console.log('[worker] SIGTERM received, shutting down...');
- await prisma.$disconnect();
- process.exit(0);
- });
- process.on('SIGINT', async () => {
- console.log('[worker] SIGINT received, shutting down...');
- await prisma.$disconnect();
- process.exit(0);
- });
|