|
|
@@ -3,142 +3,292 @@
|
|
|
/**
|
|
|
* Transcode Worker Service
|
|
|
* ─────────────────────────────────────────────────────
|
|
|
- * Standalone Node.js process (runs in its own Docker service).
|
|
|
- * Polls the database for pending transcode jobs.
|
|
|
+ * Runs in its own Docker service. Uses the `cluster` module to
|
|
|
+ * fork N concurrent worker processes (N = WORKER_CONCURRENCY, default 4).
|
|
|
+ *
|
|
|
+ * Each forked worker independently polls the DB using FOR UPDATE SKIP LOCKED,
|
|
|
+ * so no coordination is needed — the DB acts as the queue.
|
|
|
*
|
|
|
* DB-as-queue pattern:
|
|
|
* 1. API creates asset → transcodeStatus = PENDING
|
|
|
- * 2. Worker polls → atomically claims one PENDING job (UPDATE ... WHERE status=PENDING)
|
|
|
+ * 2. Workers poll → atomically claim one PENDING job each
|
|
|
* 3. Worker processes thumbnail + HLS → updates DB
|
|
|
* 4. Repeat
|
|
|
*
|
|
|
- * No external queue needed — uses the existing PostgreSQL database.
|
|
|
+ * Stream-copy optimization: if source is already H.264+AAC in MP4/MOV container,
|
|
|
+ * it is remuxed (not re-encoded) which is near-instant.
|
|
|
+ *
|
|
|
+ * Hardware encoding: set ENCODER env var to h264_nvenc | h264_qsv | h264_videotoolbox
|
|
|
*/
|
|
|
const { PrismaClient } = require('@prisma/client');
|
|
|
const ffmpeg = require('fluent-ffmpeg');
|
|
|
const path = require('path');
|
|
|
const fs = require('fs');
|
|
|
+const cluster = require('cluster');
|
|
|
|
|
|
const UPLOAD_DIR = process.env.UPLOAD_DIR || '/app/uploads';
|
|
|
-const POLL_INTERVAL_MS = parseInt(process.env.POLL_INTERVAL_MS || '5000', 10);
|
|
|
+const POLL_INTERVAL_MS = parseInt(process.env.POLL_INTERVAL_MS || '2000', 10);
|
|
|
+const BACKOFF_MS = parseInt(process.env.BACKOFF_MS || '1000', 10); // idle sleep between polls
|
|
|
+const WORKER_CONCURRENCY = parseInt(process.env.WORKER_CONCURRENCY || '4', 10);
|
|
|
+const ENCODER = process.env.ENCODER || 'libx264'; // libx264 | h264_nvenc | h264_qsv | h264_videotoolbox
|
|
|
+
|
|
|
+// ─── Master: fork N workers and manage them ────────────────────────────────
|
|
|
+if (cluster.isMaster) {
|
|
|
+ console.log(`[master] Starting ${WORKER_CONCURRENCY} transcode workers...`);
|
|
|
+ console.log(`[master] UPLOAD_DIR: ${UPLOAD_DIR}`);
|
|
|
+ console.log(`[master] ENCODER: ${ENCODER}`);
|
|
|
+ console.log(`[master] Poll interval: ${POLL_INTERVAL_MS}ms`);
|
|
|
+
|
|
|
+ fs.mkdirSync(UPLOAD_DIR, { recursive: true });
|
|
|
+
|
|
|
+ cluster.on('exit', (worker, code, signal) => {
|
|
|
+ console.log(`[master] Worker ${worker.id} exited (${code || signal}), restarting...`);
|
|
|
+ setTimeout(() => cluster.fork(), 2000);
|
|
|
+ });
|
|
|
+
|
|
|
+ for (let i = 0; i < WORKER_CONCURRENCY; i++) {
|
|
|
+ cluster.fork({ WORKER_ID: String(i + 1) });
|
|
|
+ }
|
|
|
+
|
|
|
+ cluster.on('online', (worker) => {
|
|
|
+ console.log(`[master] Worker ${worker.id} online`);
|
|
|
+ });
|
|
|
|
|
|
+ process.on('SIGTERM', () => {
|
|
|
+ console.log('[master] SIGTERM — killing all workers...');
|
|
|
+ for (const id in cluster.workers) cluster.workers[id].kill('SIGTERM');
|
|
|
+ process.exit(0);
|
|
|
+ });
|
|
|
+}
|
|
|
+
|
|
|
+// ─── Worker: each process runs its own poll loop ─────────────────────────
|
|
|
const prisma = new PrismaClient({
|
|
|
datasources: { db: { url: process.env.DATABASE_URL } },
|
|
|
});
|
|
|
|
|
|
-/** ── Helpers ──────────────────────────────────────────────────────────────── */
|
|
|
-function send(type, data) {
|
|
|
- const msg = JSON.stringify({ type, ...data, ts: new Date().toISOString() });
|
|
|
+const WORKER_ID = process.env.WORKER_ID || '?';
|
|
|
+
|
|
|
+function log(type, data) {
|
|
|
+ const msg = JSON.stringify({ type, ...data, worker: WORKER_ID, ts: new Date().toISOString() });
|
|
|
process.send && process.send(msg);
|
|
|
- console.log(`[worker] ${type}`, JSON.stringify(data));
|
|
|
+ console.log(`[worker:${WORKER_ID}] ${type}`, JSON.stringify(data));
|
|
|
}
|
|
|
|
|
|
-function sleep(ms) {
|
|
|
- return new Promise(r => setTimeout(r, ms));
|
|
|
+function sleep(ms) { return new Promise(r => setTimeout(r, ms)); }
|
|
|
+
|
|
|
+/** Probe container + codec + bitrate (used for stream-copy decision) */
|
|
|
+function probeContainer(videoPath) {
|
|
|
+ return new Promise((resolve) => {
|
|
|
+ ffmpeg.ffprobe(videoPath, (err, metadata) => {
|
|
|
+ if (err) return resolve({ container: null, videoCodec: null, audioCodec: null, duration: null, bitrate: null });
|
|
|
+ const streams = metadata?.streams || [];
|
|
|
+ const videoStream = streams.find(s => s.codec_type === 'video');
|
|
|
+ const audioStream = streams.find(s => s.codec_type === 'audio');
|
|
|
+ // bit_rate from video stream is in bits/s; format bit_rate is overall in bits/s
|
|
|
+ const bitrate = videoStream?.bit_rate
|
|
|
+ ? parseInt(videoStream.bit_rate)
|
|
|
+ : (metadata?.format?.bit_rate ? parseInt(metadata.format.bit_rate) : null);
|
|
|
+ resolve({
|
|
|
+ container: metadata?.format?.format_name || null,
|
|
|
+ videoCodec: videoStream?.codec_name || null,
|
|
|
+ audioCodec: audioStream?.codec_name || null,
|
|
|
+ duration: metadata?.format?.duration || null,
|
|
|
+ bitrate,
|
|
|
+ });
|
|
|
+ });
|
|
|
+ });
|
|
|
+}
|
|
|
+
|
|
|
+/** Returns true if source can be stream-copied (remuxed, no re-encode) */
|
|
|
+function canStreamCopy(probe) {
|
|
|
+ if (!probe) return false;
|
|
|
+ const c = probe.container || '';
|
|
|
+ const mp4Container = /^(mov|mp4|m4a|quicktime)$/.test(c) || c.includes('mov') || c.includes('mp4');
|
|
|
+ return (
|
|
|
+ mp4Container &&
|
|
|
+ (probe.videoCodec === 'h264' || probe.videoCodec === 'hevc') &&
|
|
|
+ (probe.audioCodec === 'aac' || probe.audioCodec === 'mp3' || probe.audioCodec === 'ac3' || probe.audioCodec === null)
|
|
|
+ );
|
|
|
}
|
|
|
|
|
|
-/** ── Thumbnail ──────────────────────────────────────────────────────────── */
|
|
|
+/** Get encoder-specific ffmpeg output options */
|
|
|
+function getEncoderOptions(encoder, duration) {
|
|
|
+ const opts = [];
|
|
|
+
|
|
|
+ if (encoder === 'h264_nvenc') {
|
|
|
+ // NVIDIA GPU encoder
|
|
|
+ opts.push(
|
|
|
+ '-c:v', 'h264_nvenc',
|
|
|
+ '-preset', 'p4', // p1=fastest … p7=slowest
|
|
|
+ '-rc', 'vbr', '-cq', '23',
|
|
|
+ '-gpu', '0',
|
|
|
+ );
|
|
|
+ } else if (encoder === 'h264_qsv') {
|
|
|
+ // Intel Quick Sync Video
|
|
|
+ opts.push(
|
|
|
+ '-c:v', 'h264_qsv',
|
|
|
+ '-preset', 'fast',
|
|
|
+ '-global_quality', '23',
|
|
|
+ );
|
|
|
+ } else if (encoder === 'h264_videotoolbox') {
|
|
|
+ // Apple VideoToolbox (macOS)
|
|
|
+ opts.push(
|
|
|
+ '-c:v', 'h264_videotoolbox',
|
|
|
+ '-realtime',
|
|
|
+ );
|
|
|
+ } else {
|
|
|
+ // Default: software libx264
|
|
|
+ opts.push(
|
|
|
+ '-c:v', 'libx264',
|
|
|
+ '-preset', 'fast',
|
|
|
+ '-crf', '23',
|
|
|
+ );
|
|
|
+ }
|
|
|
+
|
|
|
+ opts.push('-pix_fmt', 'yuv420p');
|
|
|
+ opts.push('-c:a', 'aac', '-b:a', '128k');
|
|
|
+ return opts;
|
|
|
+}
|
|
|
+
|
|
|
+/** ── Thumbnail + metadata extraction ─────────────────────────────────── */
|
|
|
function probeAndThumbnail(videoPath, outputDir) {
|
|
|
return new Promise((resolve) => {
|
|
|
const videoFilename = path.basename(videoPath, path.extname(videoPath));
|
|
|
const thumbFilename = videoFilename + '_thumb.jpg';
|
|
|
-
|
|
|
fs.mkdirSync(outputDir, { recursive: true });
|
|
|
|
|
|
ffmpeg.ffprobe(videoPath, (err, metadata) => {
|
|
|
- const duration = metadata?.format?.duration ?? null;
|
|
|
- let fps = 30;
|
|
|
- let codecName = 'unknown';
|
|
|
-
|
|
|
+ let duration = null, fps = 30, codecName = 'unknown', bitrate = null;
|
|
|
const videoStream = metadata?.streams?.find(s => s.codec_type === 'video');
|
|
|
+ if (metadata?.format?.duration) duration = parseFloat(metadata.format.duration);
|
|
|
if (videoStream) {
|
|
|
codecName = videoStream.codec_name || 'unknown';
|
|
|
+ bitrate = videoStream?.bit_rate
|
|
|
+ ? parseInt(videoStream.bit_rate)
|
|
|
+ : (metadata?.format?.bit_rate ? parseInt(metadata.format.bit_rate) : null);
|
|
|
if (videoStream.r_frame_rate) {
|
|
|
const [num, den] = videoStream.r_frame_rate.split('/').map(Number);
|
|
|
fps = den ? Math.round(num / den) : num;
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
- send('metadata', { codec: codecName, duration, fps });
|
|
|
+ log('metadata', { codec: codecName, duration, fps, bitrate });
|
|
|
|
|
|
ffmpeg(videoPath)
|
|
|
- .on('error', () => resolve({ thumbnailPath: null, duration, fps, codec: codecName }))
|
|
|
- .on('end', () => resolve({ thumbnailPath: thumbFilename, duration, fps, codec: codecName }))
|
|
|
+ .on('error', () => resolve({ thumbnailPath: null, duration, fps, codec: codecName, bitrate }))
|
|
|
+ .on('end', () => resolve({ thumbnailPath: thumbFilename, duration, fps, codec: codecName, bitrate }))
|
|
|
.screenshots({
|
|
|
- count: 1,
|
|
|
- folder: outputDir,
|
|
|
- filename: thumbFilename,
|
|
|
- size: '320x?',
|
|
|
- timemarks: ['1'],
|
|
|
+ count: 1, folder: outputDir, filename: thumbFilename,
|
|
|
+ size: '320x?', timemarks: ['1'],
|
|
|
});
|
|
|
});
|
|
|
});
|
|
|
}
|
|
|
|
|
|
-/** ── HLS Transcode ─────────────────────────────────────────────────────── */
|
|
|
-function transcodeToHLS(videoPath, outputDir, assetId, duration) {
|
|
|
+/** ── HLS Transcode ────────────────────────────────────────────────────── */
|
|
|
+function transcodeToHLS(videoPath, outputDir, assetId, duration, encoder) {
|
|
|
return new Promise((resolve, reject) => {
|
|
|
const hlsDir = path.join(outputDir, 'hls', assetId);
|
|
|
fs.mkdirSync(hlsDir, { recursive: true });
|
|
|
-
|
|
|
- const playlistPath = path.join(hlsDir, 'master.m3u8');
|
|
|
- const segmentPattern = path.join(hlsDir, 'segment_%03d.ts');
|
|
|
let lastPct = 0;
|
|
|
|
|
|
- ffmpeg(videoPath)
|
|
|
- // ── Re-encode to H.264 + AAC ────────────────────────────────────────
|
|
|
- .outputOptions([
|
|
|
- '-c:v libx264',
|
|
|
- '-c:a aac',
|
|
|
- '-movflags +faststart',
|
|
|
- '-preset fast',
|
|
|
- '-crf 23',
|
|
|
- '-f hls',
|
|
|
- '-hls_time 2',
|
|
|
- '-hls_playlist_type vod',
|
|
|
- '-hls_segment_filename', segmentPattern,
|
|
|
- '-hls_list_size 0',
|
|
|
- ])
|
|
|
- .output(playlistPath)
|
|
|
+ const cmd = ffmpeg(videoPath);
|
|
|
+
|
|
|
+ const encOpts = getEncoderOptions(encoder, duration);
|
|
|
+ cmd.outputOptions([
|
|
|
+ // Explicit stream mapping: video + optional audio (safe for silent files)
|
|
|
+ '-map', '0:v',
|
|
|
+ '-map', '0:a?',
|
|
|
+ ...encOpts,
|
|
|
+ // HLS muxer options
|
|
|
+ '-f', 'hls',
|
|
|
+ '-hls_time', '6',
|
|
|
+ '-hls_playlist_type', 'vod',
|
|
|
+ '-hls_segment_filename', path.join(hlsDir, 'segments_%03d.ts'),
|
|
|
+ '-hls_list_size', '0',
|
|
|
+ // Variant playlist output (not .ts — avoid collision with segment files)
|
|
|
+ '-master_pl_name', 'master.m3u8',
|
|
|
+ ]);
|
|
|
+
|
|
|
+ cmd.output(path.join(hlsDir, 'variant.m3u8'))
|
|
|
.on('progress', ({ percent }) => {
|
|
|
const pct = Math.round(Math.min(99, percent ?? lastPct));
|
|
|
if (pct > lastPct) {
|
|
|
lastPct = pct;
|
|
|
- send('progress', { progress: pct });
|
|
|
- // Also update DB progress periodically
|
|
|
+ log('progress', { assetId, progress: pct });
|
|
|
prisma.asset.update({
|
|
|
where: { id: assetId },
|
|
|
data: { transcodeProgress: pct, transcodeStatus: 'PROCESSING' },
|
|
|
- }).catch(() => {}); // ignore errors
|
|
|
+ }).catch(() => {});
|
|
|
}
|
|
|
})
|
|
|
.on('error', (err) => reject(new Error('HLS_TRANSCODE_FAILED: ' + err.message)))
|
|
|
- .on('end', () => resolve('/hls/' + assetId + '/master.m3u8'))
|
|
|
+ .on('end', () => {
|
|
|
+ // fluent-ffmpeg with %03d pattern creates segments + variant playlist but no master.m3u8;
|
|
|
+ // write master.m3u8 pointing to variant.m3u8 using awk (no python needed)
|
|
|
+ const masterPath = path.join(hlsDir, 'master.m3u8');
|
|
|
+ const variantPath = path.join(hlsDir, 'variant.m3u8');
|
|
|
+ const { execSync } = require('child_process');
|
|
|
+ execSync(
|
|
|
+ `awk 'BEGIN{print "#EXTM3U\\n#EXT-X-VERSION:3\\n#EXT-X-STREAM-INF:BANDWIDTH=2000000,RESOLUTION=1280x720\\nvariant.m3u8"}' > ${masterPath}`,
|
|
|
+ { cwd: hlsDir }
|
|
|
+ );
|
|
|
+ resolve('/hls/' + assetId + '/master.m3u8');
|
|
|
+ })
|
|
|
+ .run();
|
|
|
+ });
|
|
|
+}
|
|
|
+
|
|
|
+/** ── Stream-copy HLS (remux, no re-encode) ────────────────────────────── */
|
|
|
+function streamCopyToHLS(videoPath, outputDir, assetId, duration) {
|
|
|
+ return new Promise((resolve, reject) => {
|
|
|
+ const hlsDir = path.join(outputDir, 'hls', assetId);
|
|
|
+ fs.mkdirSync(hlsDir, { recursive: true });
|
|
|
+ const segmentPattern = path.join(hlsDir, 'segments_%03d.ts');
|
|
|
+
|
|
|
+ ffmpeg(videoPath)
|
|
|
+ .outputOptions([
|
|
|
+ '-map', '0:v',
|
|
|
+ '-map', '0:a?',
|
|
|
+ '-c:v', 'copy',
|
|
|
+ '-c:a', 'copy',
|
|
|
+ '-f', 'hls',
|
|
|
+ '-hls_time', '6',
|
|
|
+ '-hls_playlist_type', 'vod',
|
|
|
+ '-hls_segment_filename', segmentPattern,
|
|
|
+ '-hls_list_size', '0',
|
|
|
+ '-master_pl_name', 'master.m3u8',
|
|
|
+ ])
|
|
|
+ .output(path.join(hlsDir, 'variant.m3u8'))
|
|
|
+ .on('error', (err) => reject(new Error('STREAM_COPY_FAILED: ' + err.message)))
|
|
|
+ .on('end', () => {
|
|
|
+ const masterPath = path.join(hlsDir, 'master.m3u8');
|
|
|
+ fs.writeFileSync(masterPath, [
|
|
|
+ '#EXTM3U',
|
|
|
+ '#EXT-X-VERSION:3',
|
|
|
+ '#EXT-X-STREAM-INF:BANDWIDTH=2000000,RESOLUTION=1280x720',
|
|
|
+ 'variant.m3u8',
|
|
|
+ ].join('\n') + '\n');
|
|
|
+ resolve('/hls/' + assetId + '/master.m3u8');
|
|
|
+ })
|
|
|
.run();
|
|
|
});
|
|
|
}
|
|
|
|
|
|
-/** ── Process one job ───────────────────────────────────────────────────── */
|
|
|
+/** ── Process one job ──────────────────────────────────────────────────── */
|
|
|
async function processJob(asset) {
|
|
|
const { id: assetId, filePath, transcodePaused } = asset;
|
|
|
const videoPath = path.join(UPLOAD_DIR, filePath);
|
|
|
|
|
|
- // Respect pause flag — skip if user paused before worker picked it up
|
|
|
if (transcodePaused) {
|
|
|
- send('paused', { assetId, reason: 'transcodePaused flag set by user, re-queuing' });
|
|
|
- // Keep transcodePaused=true so it won't be re-claimed until user resumes
|
|
|
- await prisma.asset.update({
|
|
|
- where: { id: assetId },
|
|
|
- data: { transcodeStatus: 'PENDING' },
|
|
|
- });
|
|
|
+ log('paused', { assetId, reason: 'transcodePaused flag set, re-queuing' });
|
|
|
+ await prisma.asset.update({ where: { id: assetId }, data: { transcodeStatus: 'PENDING' } });
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
- send('started', { assetId, filePath });
|
|
|
+ log('started', { assetId, filePath, encoder: ENCODER });
|
|
|
|
|
|
- // Check file exists
|
|
|
if (!fs.existsSync(videoPath)) {
|
|
|
- send('error', { assetId, error: 'Video file not found on disk: ' + videoPath });
|
|
|
+ log('error', { assetId, error: 'Video file not found: ' + videoPath });
|
|
|
await prisma.asset.update({
|
|
|
where: { id: assetId },
|
|
|
data: { transcodeStatus: 'FAILED', transcodeError: 'Video file not found on server', transcodeProgress: 0 },
|
|
|
@@ -147,14 +297,15 @@ async function processJob(asset) {
|
|
|
}
|
|
|
|
|
|
try {
|
|
|
- // Phase 1: thumbnail + probe
|
|
|
+ // Phase 1: probe + thumbnail
|
|
|
await prisma.asset.update({ where: { id: assetId }, data: { transcodeStatus: 'PROCESSING', transcodeProgress: 0 } });
|
|
|
+ const probe = await probeContainer(videoPath);
|
|
|
const thumbResult = await probeAndThumbnail(videoPath, UPLOAD_DIR);
|
|
|
|
|
|
- // Check pause flag between phases
|
|
|
- const check1 = await prisma.asset.findUnique({ where: { id: assetId }, select: { transcodePaused: true } });
|
|
|
- if (check1?.transcodePaused) {
|
|
|
- send('paused', { assetId, reason: 'paused between thumbnail and HLS phases' });
|
|
|
+ // Check pause between phases
|
|
|
+ const check = await prisma.asset.findUnique({ where: { id: assetId }, select: { transcodePaused: true } });
|
|
|
+ if (check?.transcodePaused) {
|
|
|
+ log('paused', { assetId, reason: 'paused between phases' });
|
|
|
await prisma.asset.update({ where: { id: assetId }, data: { transcodeStatus: 'PENDING', transcodePaused: false } });
|
|
|
return;
|
|
|
}
|
|
|
@@ -167,31 +318,38 @@ async function processJob(asset) {
|
|
|
codec: thumbResult.codec ?? null,
|
|
|
duration: thumbResult.duration ?? null,
|
|
|
fps: thumbResult.fps ?? 30,
|
|
|
+ bitrate: thumbResult.bitrate ?? null,
|
|
|
transcodeProgress: 10,
|
|
|
},
|
|
|
});
|
|
|
|
|
|
- // Phase 2: HLS
|
|
|
- const hlsPath = await transcodeToHLS(videoPath, UPLOAD_DIR, assetId, thumbResult.duration);
|
|
|
+ // Phase 2: HLS — decide between stream-copy or re-encode
|
|
|
+ const isStreamCopy = canStreamCopy(probe);
|
|
|
+ if (isStreamCopy) {
|
|
|
+ log('stream_copy', { assetId, reason: `container=${probe.container} video=${probe.videoCodec} audio=${probe.audioCodec}` });
|
|
|
+ await streamCopyToHLS(videoPath, UPLOAD_DIR, assetId, thumbResult.duration);
|
|
|
+ } else {
|
|
|
+ log('re_encode', { assetId, reason: `video=${probe?.videoCodec} audio=${probe?.audioCodec} container=${probe?.container}`, encoder: ENCODER });
|
|
|
+ await transcodeToHLS(videoPath, UPLOAD_DIR, assetId, thumbResult.duration, ENCODER);
|
|
|
+ }
|
|
|
|
|
|
- // Done!
|
|
|
+ // Done
|
|
|
await prisma.asset.update({
|
|
|
where: { id: assetId },
|
|
|
data: {
|
|
|
transcodeStatus: 'COMPLETED',
|
|
|
transcodeProgress: 100,
|
|
|
transcodeError: null,
|
|
|
- hlsPath,
|
|
|
+ hlsPath: '/hls/' + assetId + '/master.m3u8',
|
|
|
codec: thumbResult.codec ?? null,
|
|
|
duration: thumbResult.duration ?? null,
|
|
|
fps: thumbResult.fps ?? null,
|
|
|
+ bitrate: thumbResult.bitrate ?? null,
|
|
|
},
|
|
|
});
|
|
|
-
|
|
|
- send('done', { assetId, hlsPath });
|
|
|
+ log('done', { assetId });
|
|
|
} catch (err) {
|
|
|
- send('error', { assetId, error: err.message });
|
|
|
- // Only mark failed if not already deleted or completed by another process
|
|
|
+ log('error', { assetId, error: err.message });
|
|
|
try {
|
|
|
await prisma.asset.update({
|
|
|
where: { id: assetId },
|
|
|
@@ -201,7 +359,7 @@ async function processJob(asset) {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-/** ── Claim one job (atomic) ─────────────────────────────────────────────── */
|
|
|
+/** ── Claim one job (atomic, skip locked) ─────────────────────────────── */
|
|
|
async function claimOneJob() {
|
|
|
const result = await prisma.$executeRaw`
|
|
|
UPDATE "Asset"
|
|
|
@@ -216,12 +374,10 @@ async function claimOneJob() {
|
|
|
LIMIT 1
|
|
|
FOR UPDATE SKIP LOCKED
|
|
|
)
|
|
|
- RETURNING id, "filePath", "transcodeStatus", "transcodePaused"
|
|
|
`;
|
|
|
|
|
|
if (!result || result === 0) return null;
|
|
|
|
|
|
- // Re-fetch the claimed asset (result doesn't return full row with $executeRaw)
|
|
|
return prisma.asset.findFirst({
|
|
|
where: { transcodeStatus: 'PROCESSING' },
|
|
|
orderBy: { updatedAt: 'asc' },
|
|
|
@@ -229,71 +385,63 @@ async function claimOneJob() {
|
|
|
});
|
|
|
}
|
|
|
|
|
|
-/** ── Poll loop (runs on interval AND after every job) ───────────────────── */
|
|
|
+/** ── Recover stale jobs on startup ────────────────────────────────────── */
|
|
|
+async function recoverStaleJobs() {
|
|
|
+ const stale = await prisma.asset.findMany({
|
|
|
+ where: { transcodeStatus: 'PROCESSING', transcodePaused: false },
|
|
|
+ select: { id: true },
|
|
|
+ });
|
|
|
+ if (stale.length > 0) {
|
|
|
+ console.log(`[worker:${WORKER_ID}] Recovering ${stale.length} stale job(s)...`);
|
|
|
+ await prisma.asset.updateMany({
|
|
|
+ where: { id: { in: stale.map(s => s.id) } },
|
|
|
+ data: { transcodeStatus: 'PENDING', transcodeProgress: 0 },
|
|
|
+ });
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+/** ── Poll loop ────────────────────────────────────────────────────────── */
|
|
|
async function poll() {
|
|
|
try {
|
|
|
const claimed = await claimOneJob();
|
|
|
- if (!claimed) return;
|
|
|
+ if (!claimed) {
|
|
|
+ // No job — sleep then poll again with backoff (max 5s)
|
|
|
+ await sleep(BACKOFF_MS);
|
|
|
+ return poll();
|
|
|
+ }
|
|
|
+ // Job claimed — process immediately, then poll again without delay
|
|
|
await processJob(claimed);
|
|
|
- // Immediately poll again — don't wait for the next interval tick
|
|
|
- // This prevents the 5-second gap between back-to-back jobs
|
|
|
- poll().catch(err => console.error('[worker] Recursive poll error:', err.message));
|
|
|
+ poll().catch(err => console.error(`[worker:${WORKER_ID}] poll error:`, err.message));
|
|
|
} catch (err) {
|
|
|
- console.error('[worker] Poll error:', err.message);
|
|
|
+ console.error(`[worker:${WORKER_ID}] poll error:`, err.message);
|
|
|
+ await sleep(BACKOFF_MS);
|
|
|
+ poll().catch(err => console.error(`[worker:${WORKER_ID}] poll error:`, err.message));
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-/** ── Main ──────────────────────────────────────────────────────────────── */
|
|
|
+/** ─── Worker entry point ─────────────────────────────────────────────── */
|
|
|
async function main() {
|
|
|
- console.log('[worker] Starting transcode worker...');
|
|
|
- console.log('[worker] UPLOAD_DIR:', UPLOAD_DIR);
|
|
|
- console.log('[worker] DATABASE_URL:', process.env.DATABASE_URL ? '(set)' : 'MISSING!');
|
|
|
- console.log('[worker] Poll interval:', POLL_INTERVAL_MS, 'ms');
|
|
|
-
|
|
|
- // Make sure upload dir exists
|
|
|
- fs.mkdirSync(UPLOAD_DIR, { recursive: true });
|
|
|
-
|
|
|
- send('ready', { UPLOAD_DIR, POLL_INTERVAL_MS });
|
|
|
-
|
|
|
- // Process any stale PROCESSING jobs (worker crashed mid-job) on startup
|
|
|
+ // Don't run the worker entry point in master — it only forks children
|
|
|
+ if (cluster.isMaster) return;
|
|
|
+ console.log(`[worker:${WORKER_ID}] Started, ENCODER=${ENCODER}`);
|
|
|
await recoverStaleJobs();
|
|
|
-
|
|
|
- // Main poll loop
|
|
|
- setInterval(poll, POLL_INTERVAL_MS);
|
|
|
-}
|
|
|
-
|
|
|
-/** Recover stale jobs — assets stuck in PROCESSING from a crashed worker */
|
|
|
-async function recoverStaleJobs() {
|
|
|
- try {
|
|
|
- const stale = await prisma.asset.findMany({
|
|
|
- where: { transcodeStatus: 'PROCESSING', transcodePaused: false },
|
|
|
- select: { id: true },
|
|
|
- });
|
|
|
- if (stale.length > 0) {
|
|
|
- console.log(`[worker] Recovering ${stale.length} stale job(s)...`);
|
|
|
- await prisma.asset.updateMany({
|
|
|
- where: { id: { in: stale.map(s => s.id) } },
|
|
|
- data: { transcodeStatus: 'PENDING', transcodeProgress: 0 },
|
|
|
- });
|
|
|
- }
|
|
|
- } catch (err) {
|
|
|
- console.warn('[worker] recoverStaleJobs error:', err.message);
|
|
|
- }
|
|
|
+ // Start polling immediately — tight recursive loop with backoff when idle
|
|
|
+ poll().catch(err => console.error(`[worker:${WORKER_ID}] Fatal poll error:`, err.message));
|
|
|
+ log('ready', {});
|
|
|
}
|
|
|
|
|
|
main().catch(err => {
|
|
|
- console.error('[worker] Fatal error:', err);
|
|
|
+ console.error(`[worker:${WORKER_ID}] Fatal:`, err);
|
|
|
process.exit(1);
|
|
|
});
|
|
|
|
|
|
-// Graceful shutdown
|
|
|
process.on('SIGTERM', async () => {
|
|
|
- console.log('[worker] SIGTERM received, shutting down...');
|
|
|
+ console.log(`[worker:${WORKER_ID}] SIGTERM, exiting...`);
|
|
|
await prisma.$disconnect();
|
|
|
process.exit(0);
|
|
|
});
|
|
|
process.on('SIGINT', async () => {
|
|
|
- console.log('[worker] SIGINT received, shutting down...');
|
|
|
+ console.log(`[worker:${WORKER_ID}] SIGINT, exiting...`);
|
|
|
await prisma.$disconnect();
|
|
|
process.exit(0);
|
|
|
});
|