Files
skipper/shared/work-orders.js
2026-04-05 15:28:04 +02:00

168 lines
4.0 KiB
JavaScript

const fs = require('fs/promises');
const path = require('path');
const { randomUUID } = require('crypto');
const { workOrdersPendingDir, workOrdersRunningDir, workOrdersFinishedDir } = require('./paths');
const { ensureDir, fileExists, listJsonFiles, readJson, readJsonIfExists, writeJson } = require('./fs');
function pendingPath(workOrderId) {
return path.join(workOrdersPendingDir, `${workOrderId}.json`);
}
function runningPath(workOrderId) {
return path.join(workOrdersRunningDir, `${workOrderId}.json`);
}
function finishedPath(workOrderId) {
return path.join(workOrdersFinishedDir, `${workOrderId}.json`);
}
function lockPath(workOrderId) {
return path.join(workOrdersPendingDir, `${workOrderId}.lock`);
}
async function withLock(workOrderId, callback) {
const targetPath = lockPath(workOrderId);
await ensureDir(workOrdersPendingDir);
try {
await fs.mkdir(targetPath);
} catch (error) {
if (error.code === 'EEXIST') {
return null;
}
throw error;
}
try {
return await callback();
} finally {
await fs.rm(targetPath, { recursive: true, force: true });
}
}
async function createWorkOrder({ type, target, desired_state, request_id, correlation_id, metadata }) {
const timestamp = new Date().toISOString();
const workOrder = {
id: randomUUID(),
resource_type: 'work_order',
schema_version: 'v1',
type,
target,
desired_state: desired_state || {},
status: 'pending',
result: null,
request_id: request_id || null,
correlation_id: correlation_id || null,
created_at: timestamp,
started_at: null,
finished_at: null,
metadata: metadata || {},
};
await writeJson(pendingPath(workOrder.id), workOrder);
return workOrder;
}
async function getWorkOrder(workOrderId) {
return (
(await readJsonIfExists(pendingPath(workOrderId))) ||
(await readJsonIfExists(runningPath(workOrderId))) ||
(await readJsonIfExists(finishedPath(workOrderId)))
);
}
async function listWorkOrders() {
const files = [
...(await listJsonFiles(workOrdersPendingDir)),
...(await listJsonFiles(workOrdersRunningDir)),
...(await listJsonFiles(workOrdersFinishedDir)),
];
return Promise.all(files.map((filePath) => readJson(filePath)));
}
async function claimNextWorkOrder(nodeId, context) {
const files = await listJsonFiles(workOrdersPendingDir);
for (const filePath of files) {
const current = await readJson(filePath);
if (!current.target || current.target.node_id !== nodeId) {
continue;
}
const claimed = await withLock(current.id, async () => {
if (!(await fileExists(filePath))) {
return null;
}
const latest = await readJson(filePath);
if (!latest.target || latest.target.node_id !== nodeId || latest.status !== 'pending') {
return null;
}
const running = {
...latest,
status: 'running',
started_at: new Date().toISOString(),
request_id: context.request_id,
correlation_id: context.correlation_id,
};
await writeJson(runningPath(latest.id), running);
await fs.rm(filePath, { force: true });
return running;
});
if (claimed) {
return claimed;
}
}
return null;
}
async function finishWorkOrder(workOrderId, status, result) {
const existingFinished = await readJsonIfExists(finishedPath(workOrderId));
if (existingFinished) {
return existingFinished;
}
const activePath = runningPath(workOrderId);
if (!(await fileExists(activePath))) {
return null;
}
return withLock(workOrderId, async () => {
const running = await readJsonIfExists(activePath);
if (!running) {
return readJsonIfExists(finishedPath(workOrderId));
}
const finished = {
...running,
status,
result,
finished_at: new Date().toISOString(),
};
await writeJson(finishedPath(workOrderId), finished);
await fs.rm(activePath, { force: true });
return finished;
});
}
module.exports = {
createWorkOrder,
getWorkOrder,
listWorkOrders,
claimNextWorkOrder,
finishWorkOrder,
};