|
|
@@ -420,6 +420,8 @@ async function processJob(asset) {
|
|
|
}
|
|
|
|
|
|
/** ── Watchdog: re-reset jobs that have been PROCESSING too long without progress ── */
|
|
|
+// Runs independently of the job queue on a timer — catches stuck jobs even when
|
|
|
+// the queue is perpetually full and claimOneJob() never returns null.
|
|
|
async function resetStuckJobs() {
|
|
|
const cutoff = new Date(Date.now() - STUCK_TIMEOUT_MS);
|
|
|
try {
|
|
|
@@ -436,14 +438,39 @@ async function resetStuckJobs() {
|
|
|
where: { id: { in: stuck.map(s => s.id) } },
|
|
|
data: { transcodeStatus: 'PENDING', transcodeProgress: 0 },
|
|
|
});
|
|
|
+ // Put reset jobs into grace period so this worker doesn't immediately re-claim them
|
|
|
+ for (const job of stuck) {
|
|
|
+ recentlyReset.set(job.id, Date.now() + WATCHDOG_GRACE_MS);
|
|
|
+ }
|
|
|
log('watchdog_reset', { count: stuck.length, cutoffSeconds: STUCK_TIMEOUT_MS / 1000 });
|
|
|
}
|
|
|
} catch {}
|
|
|
}
|
|
|
|
|
|
+// Grace period (ms) after a watchdog reset before the job can be re-claimed.
|
|
|
+// Prevents a tight reset → re-claim → fail → reset loop within the same process tick.
|
|
|
+const WATCHDOG_GRACE_MS = Math.max(STUCK_TIMEOUT_MS * 0.5, 30_000); // 50% of timeout or 30s min
|
|
|
+
|
|
|
+// Set of job IDs recently reset by watchdog — excluded from claim for WATCHDOG_GRACE_MS.
|
|
|
+// Reset on each worker restart so stale entries don't persist.
|
|
|
+const recentlyReset = new Set();
|
|
|
+setInterval(() => {
|
|
|
+ // Clean up expired entries from the grace set
|
|
|
+ for (const [jobId, expiresAt] of recentlyReset) {
|
|
|
+ if (Date.now() > expiresAt) recentlyReset.delete(jobId);
|
|
|
+ }
|
|
|
+}, STUCK_TIMEOUT_MS); // clean up roughly once per timeout period
|
|
|
+
|
|
|
/** ── Claim one job (atomic, skip locked) ─────────────────────────────── */
|
|
|
+// recentlyReset jobs are excluded for WATCHDOG_GRACE_MS to prevent tight
|
|
|
+// reset → re-claim → fail → reset loops.
|
|
|
async function claimOneJob() {
|
|
|
- const result = await prisma.$executeRaw`
|
|
|
+ const resetIds = Array.from(recentlyReset.keys());
|
|
|
+ const resetCondition = resetIds.length > 0
|
|
|
+ ? `AND id NOT IN (${resetIds.map(id => `'${id}'`).join(',')})`
|
|
|
+ : '';
|
|
|
+
|
|
|
+ const result = await prisma.$executeRawUnsafe(`
|
|
|
UPDATE "Asset"
|
|
|
SET "transcodeStatus" = 'PROCESSING',
|
|
|
"transcodeProgress" = 0,
|
|
|
@@ -452,11 +479,12 @@ async function claimOneJob() {
|
|
|
SELECT id FROM "Asset"
|
|
|
WHERE "transcodeStatus" = 'PENDING'
|
|
|
AND "transcodePaused" = false
|
|
|
+ ${resetCondition}
|
|
|
ORDER BY "createdAt" ASC
|
|
|
LIMIT 1
|
|
|
FOR UPDATE SKIP LOCKED
|
|
|
)
|
|
|
- `;
|
|
|
+ `);
|
|
|
|
|
|
if (!result || result === 0) return null;
|
|
|
|
|
|
@@ -472,8 +500,8 @@ async function poll() {
|
|
|
try {
|
|
|
const claimed = await claimOneJob();
|
|
|
if (!claimed) {
|
|
|
- // No job — check for stuck jobs and sleep with backoff
|
|
|
- await resetStuckJobs();
|
|
|
+ // Queue is empty — sleep and retry.
|
|
|
+ // Watchdog runs on its own timer (see main()), so nothing to do here.
|
|
|
await sleep(BACKOFF_MS);
|
|
|
return poll();
|
|
|
}
|
|
|
@@ -492,6 +520,10 @@ async function main() {
|
|
|
// Don't run the worker entry point in master — it only forks children
|
|
|
if (cluster.isMaster) return;
|
|
|
console.log(`[worker:${WORKER_ID}] Started, ENCODER=${ENCODER}`);
|
|
|
+ // Watchdog timer: independent of poll loop — catches stuck jobs even when queue is full
|
|
|
+ setInterval(async () => {
|
|
|
+ await resetStuckJobs();
|
|
|
+ }, STUCK_TIMEOUT_MS);
|
|
|
// Start polling immediately — tight recursive loop with backoff when idle
|
|
|
poll().catch(err => console.error(`[worker:${WORKER_ID}] Fatal poll error:`, err.message));
|
|
|
log('ready', {});
|