Преглед изворни кода

fix: move stale job recovery to master process (before workers fork)

recoverStaleJobs() was called by each worker independently on startup,
creating a race condition where multiple workers would all see the same
PROCESSING jobs and try to claim/recover them simultaneously — causing
the same jobs to be reset multiple times and never actually claimed.

Fix: run recovery once in the master process before forking workers,
using a separate PrismaClient. Workers no longer call recoverStaleJobs.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
kingkong пре 1 месец
родитељ
комит
c37fefa98d
1 измењених фајлова са 23 додато и 16 уклоњено
  1. 23 16
      packages/api/src/worker/index.js

+ 23 - 16
packages/api/src/worker/index.js

@@ -41,6 +41,29 @@ if (cluster.isMaster) {
 
   fs.mkdirSync(UPLOAD_DIR, { recursive: true });
 
+  // Recovery runs ONCE in master before any workers start, so all stale
+  // PROCESSING jobs are atomically reset to PENDING before workers poll.
+  // This prevents the multi-worker race where every worker independently
+  // sees stale jobs and tries to claim/recover them simultaneously.
+  const masterPrisma = new PrismaClient({ datasources: { db: { url: process.env.DATABASE_URL } } });
+  (async () => {
+    try {
+      const stale = await masterPrisma.asset.findMany({
+        where: { transcodeStatus: 'PROCESSING', transcodePaused: false },
+        select: { id: true },
+      });
+      if (stale.length > 0) {
+        await masterPrisma.asset.updateMany({
+          where: { id: { in: stale.map(s => s.id) } },
+          data: { transcodeStatus: 'PENDING', transcodeProgress: 0 },
+        });
+        console.log(`[master] Reset ${stale.length} stale job(s) to PENDING`);
+      }
+    } finally {
+      await masterPrisma.$disconnect();
+    }
+  })();
+
   cluster.on('exit', (worker, code, signal) => {
     console.log(`[master] Worker ${worker.id} exited (${code || signal}), restarting...`);
     setTimeout(() => cluster.fork(), 2000);
@@ -421,21 +444,6 @@ async function claimOneJob() {
   });
 }
 
-/** ── Recover stale jobs on startup ────────────────────────────────────── */
-async function recoverStaleJobs() {
-  const stale = await prisma.asset.findMany({
-    where: { transcodeStatus: 'PROCESSING', transcodePaused: false },
-    select: { id: true },
-  });
-  if (stale.length > 0) {
-    console.log(`[worker:${WORKER_ID}] Recovering ${stale.length} stale job(s)...`);
-    await prisma.asset.updateMany({
-      where: { id: { in: stale.map(s => s.id) } },
-      data: { transcodeStatus: 'PENDING', transcodeProgress: 0 },
-    });
-  }
-}
-
 /** ── Poll loop ────────────────────────────────────────────────────────── */
 async function poll() {
   try {
@@ -460,7 +468,6 @@ async function main() {
   // Don't run the worker entry point in master — it only forks children
   if (cluster.isMaster) return;
   console.log(`[worker:${WORKER_ID}] Started, ENCODER=${ENCODER}`);
-  await recoverStaleJobs();
   // Start polling immediately — tight recursive loop with backoff when idle
   poll().catch(err => console.error(`[worker:${WORKER_ID}] Fatal poll error:`, err.message));
   log('ready', {});