index.js 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299
  1. 'use strict';
  2. /**
  3. * Transcode Worker Service
  4. * ─────────────────────────────────────────────────────
  5. * Standalone Node.js process (runs in its own Docker service).
  6. * Polls the database for pending transcode jobs.
  7. *
  8. * DB-as-queue pattern:
  9. * 1. API creates asset → transcodeStatus = PENDING
  10. * 2. Worker polls → atomically claims one PENDING job (UPDATE ... WHERE status=PENDING)
  11. * 3. Worker processes thumbnail + HLS → updates DB
  12. * 4. Repeat
  13. *
  14. * No external queue needed — uses the existing PostgreSQL database.
  15. */
  16. const { PrismaClient } = require('@prisma/client');
  17. const ffmpeg = require('fluent-ffmpeg');
  18. const path = require('path');
  19. const fs = require('fs');
  20. const UPLOAD_DIR = process.env.UPLOAD_DIR || '/app/uploads';
  21. const POLL_INTERVAL_MS = parseInt(process.env.POLL_INTERVAL_MS || '5000', 10);
  22. const prisma = new PrismaClient({
  23. datasources: { db: { url: process.env.DATABASE_URL } },
  24. });
  25. /** ── Helpers ──────────────────────────────────────────────────────────────── */
  26. function send(type, data) {
  27. const msg = JSON.stringify({ type, ...data, ts: new Date().toISOString() });
  28. process.send && process.send(msg);
  29. console.log(`[worker] ${type}`, JSON.stringify(data));
  30. }
  31. function sleep(ms) {
  32. return new Promise(r => setTimeout(r, ms));
  33. }
  34. /** ── Thumbnail ──────────────────────────────────────────────────────────── */
  35. function probeAndThumbnail(videoPath, outputDir) {
  36. return new Promise((resolve) => {
  37. const videoFilename = path.basename(videoPath, path.extname(videoPath));
  38. const thumbFilename = videoFilename + '_thumb.jpg';
  39. fs.mkdirSync(outputDir, { recursive: true });
  40. ffmpeg.ffprobe(videoPath, (err, metadata) => {
  41. const duration = metadata?.format?.duration ?? null;
  42. let fps = 30;
  43. let codecName = 'unknown';
  44. const videoStream = metadata?.streams?.find(s => s.codec_type === 'video');
  45. if (videoStream) {
  46. codecName = videoStream.codec_name || 'unknown';
  47. if (videoStream.r_frame_rate) {
  48. const [num, den] = videoStream.r_frame_rate.split('/').map(Number);
  49. fps = den ? Math.round(num / den) : num;
  50. }
  51. }
  52. send('metadata', { codec: codecName, duration, fps });
  53. ffmpeg(videoPath)
  54. .on('error', () => resolve({ thumbnailPath: null, duration, fps, codec: codecName }))
  55. .on('end', () => resolve({ thumbnailPath: thumbFilename, duration, fps, codec: codecName }))
  56. .screenshots({
  57. count: 1,
  58. folder: outputDir,
  59. filename: thumbFilename,
  60. size: '320x?',
  61. timemarks: ['1'],
  62. });
  63. });
  64. });
  65. }
  66. /** ── HLS Transcode ─────────────────────────────────────────────────────── */
  67. function transcodeToHLS(videoPath, outputDir, assetId, duration) {
  68. return new Promise((resolve, reject) => {
  69. const hlsDir = path.join(outputDir, 'hls', assetId);
  70. fs.mkdirSync(hlsDir, { recursive: true });
  71. const playlistPath = path.join(hlsDir, 'master.m3u8');
  72. const segmentPattern = path.join(hlsDir, 'segment_%03d.ts');
  73. let lastPct = 0;
  74. ffmpeg(videoPath)
  75. // ── Re-encode to H.264 + AAC (universal browser support) ─────────────
  76. .outputOptions([
  77. '-c:v libx264', // H.264 — every browser supports via MSE
  78. '-c:a aac', // AAC — universal audio codec
  79. '-movflags +faststart', // moov atom at front → fast playback start
  80. '-preset fast',
  81. '-crf 23',
  82. // HLS output
  83. '-f hls',
  84. '-hls_time 6',
  85. '-hls_playlist_type vod',
  86. '-hls_segment_filename', segmentPattern,
  87. ])
  88. .output(playlistPath)
  89. .on('progress', ({ percent }) => {
  90. const pct = Math.round(Math.min(99, percent ?? lastPct));
  91. if (pct > lastPct) {
  92. lastPct = pct;
  93. send('progress', { progress: pct });
  94. // Also update DB progress periodically
  95. prisma.asset.update({
  96. where: { id: assetId },
  97. data: { transcodeProgress: pct, transcodeStatus: 'PROCESSING' },
  98. }).catch(() => {}); // ignore errors
  99. }
  100. })
  101. .on('error', (err) => reject(new Error('HLS_TRANSCODE_FAILED: ' + err.message)))
  102. .on('end', () => resolve('/hls/' + assetId + '/master.m3u8'))
  103. .run();
  104. });
  105. }
  106. /** ── Process one job ───────────────────────────────────────────────────── */
  107. async function processJob(asset) {
  108. const { id: assetId, filePath, transcodePaused } = asset;
  109. const videoPath = path.join(UPLOAD_DIR, filePath);
  110. // Respect pause flag — skip if user paused before worker picked it up
  111. if (transcodePaused) {
  112. send('paused', { assetId, reason: 'transcodePaused flag set by user, re-queuing' });
  113. // Keep transcodePaused=true so it won't be re-claimed until user resumes
  114. await prisma.asset.update({
  115. where: { id: assetId },
  116. data: { transcodeStatus: 'PENDING' },
  117. });
  118. return;
  119. }
  120. send('started', { assetId, filePath });
  121. // Check file exists
  122. if (!fs.existsSync(videoPath)) {
  123. send('error', { assetId, error: 'Video file not found on disk: ' + videoPath });
  124. await prisma.asset.update({
  125. where: { id: assetId },
  126. data: { transcodeStatus: 'FAILED', transcodeError: 'Video file not found on server', transcodeProgress: 0 },
  127. });
  128. return;
  129. }
  130. try {
  131. // Phase 1: thumbnail + probe
  132. await prisma.asset.update({ where: { id: assetId }, data: { transcodeStatus: 'PROCESSING', transcodeProgress: 0 } });
  133. const thumbResult = await probeAndThumbnail(videoPath, UPLOAD_DIR);
  134. // Check pause flag between phases
  135. const check1 = await prisma.asset.findUnique({ where: { id: assetId }, select: { transcodePaused: true } });
  136. if (check1?.transcodePaused) {
  137. send('paused', { assetId, reason: 'paused between thumbnail and HLS phases' });
  138. await prisma.asset.update({ where: { id: assetId }, data: { transcodeStatus: 'PENDING', transcodePaused: false } });
  139. return;
  140. }
  141. // Update DB with metadata
  142. await prisma.asset.update({
  143. where: { id: assetId },
  144. data: {
  145. thumbnail: thumbResult.thumbnailPath ?? null,
  146. codec: thumbResult.codec ?? null,
  147. duration: thumbResult.duration ?? null,
  148. fps: thumbResult.fps ?? 30,
  149. transcodeProgress: 10,
  150. },
  151. });
  152. // Phase 2: HLS
  153. const hlsPath = await transcodeToHLS(videoPath, UPLOAD_DIR, assetId, thumbResult.duration);
  154. // Done!
  155. await prisma.asset.update({
  156. where: { id: assetId },
  157. data: {
  158. transcodeStatus: 'COMPLETED',
  159. transcodeProgress: 100,
  160. transcodeError: null,
  161. hlsPath,
  162. codec: thumbResult.codec ?? null,
  163. duration: thumbResult.duration ?? null,
  164. fps: thumbResult.fps ?? null,
  165. },
  166. });
  167. send('done', { assetId, hlsPath });
  168. } catch (err) {
  169. send('error', { assetId, error: err.message });
  170. // Only mark failed if not already deleted or completed by another process
  171. try {
  172. await prisma.asset.update({
  173. where: { id: assetId },
  174. data: { transcodeStatus: 'FAILED', transcodeError: err.message, transcodeProgress: 0 },
  175. });
  176. } catch {}
  177. }
  178. }
  179. /** ── Claim one job (atomic) ─────────────────────────────────────────────── */
  180. async function claimOneJob() {
  181. const result = await prisma.$executeRaw`
  182. UPDATE "Asset"
  183. SET "transcodeStatus" = 'PROCESSING',
  184. "transcodeProgress" = 0,
  185. "updatedAt" = NOW()
  186. WHERE id = (
  187. SELECT id FROM "Asset"
  188. WHERE "transcodeStatus" = 'PENDING'
  189. AND "transcodePaused" = false
  190. ORDER BY "createdAt" ASC
  191. LIMIT 1
  192. FOR UPDATE SKIP LOCKED
  193. )
  194. RETURNING id, "filePath", "transcodeStatus", "transcodePaused"
  195. `;
  196. if (!result || result === 0) return null;
  197. // Re-fetch the claimed asset (result doesn't return full row with $executeRaw)
  198. return prisma.asset.findFirst({
  199. where: { transcodeStatus: 'PROCESSING' },
  200. orderBy: { updatedAt: 'asc' },
  201. take: 1,
  202. });
  203. }
  204. /** ── Poll loop (runs on interval AND after every job) ───────────────────── */
  205. async function poll() {
  206. try {
  207. const claimed = await claimOneJob();
  208. if (!claimed) return;
  209. await processJob(claimed);
  210. // Immediately poll again — don't wait for the next interval tick
  211. // This prevents the 5-second gap between back-to-back jobs
  212. poll().catch(err => console.error('[worker] Recursive poll error:', err.message));
  213. } catch (err) {
  214. console.error('[worker] Poll error:', err.message);
  215. }
  216. }
  217. /** ── Main ──────────────────────────────────────────────────────────────── */
  218. async function main() {
  219. console.log('[worker] Starting transcode worker...');
  220. console.log('[worker] UPLOAD_DIR:', UPLOAD_DIR);
  221. console.log('[worker] DATABASE_URL:', process.env.DATABASE_URL ? '(set)' : 'MISSING!');
  222. console.log('[worker] Poll interval:', POLL_INTERVAL_MS, 'ms');
  223. // Make sure upload dir exists
  224. fs.mkdirSync(UPLOAD_DIR, { recursive: true });
  225. send('ready', { UPLOAD_DIR, POLL_INTERVAL_MS });
  226. // Process any stale PROCESSING jobs (worker crashed mid-job) on startup
  227. await recoverStaleJobs();
  228. // Main poll loop
  229. setInterval(poll, POLL_INTERVAL_MS);
  230. }
  231. /** Recover stale jobs — assets stuck in PROCESSING from a crashed worker */
  232. async function recoverStaleJobs() {
  233. try {
  234. const stale = await prisma.asset.findMany({
  235. where: { transcodeStatus: 'PROCESSING', transcodePaused: false },
  236. select: { id: true },
  237. });
  238. if (stale.length > 0) {
  239. console.log(`[worker] Recovering ${stale.length} stale job(s)...`);
  240. await prisma.asset.updateMany({
  241. where: { id: { in: stale.map(s => s.id) } },
  242. data: { transcodeStatus: 'PENDING', transcodeProgress: 0 },
  243. });
  244. }
  245. } catch (err) {
  246. console.warn('[worker] recoverStaleJobs error:', err.message);
  247. }
  248. }
  249. main().catch(err => {
  250. console.error('[worker] Fatal error:', err);
  251. process.exit(1);
  252. });
  253. // Graceful shutdown
  254. process.on('SIGTERM', async () => {
  255. console.log('[worker] SIGTERM received, shutting down...');
  256. await prisma.$disconnect();
  257. process.exit(0);
  258. });
  259. process.on('SIGINT', async () => {
  260. console.log('[worker] SIGINT received, shutting down...');
  261. await prisma.$disconnect();
  262. process.exit(0);
  263. });