Initial commit
This commit is contained in:
472
skipper-api/src/index.js
Normal file
472
skipper-api/src/index.js
Normal file
@@ -0,0 +1,472 @@
|
||||
const express = require('express');
|
||||
const { AppError } = require('../../shared/errors');
|
||||
const { createContext } = require('../../shared/context');
|
||||
const { verifyNodeToken } = require('../../shared/auth');
|
||||
const { emitEvent } = require('../../shared/events');
|
||||
const { createLogger } = require('../../shared/logs');
|
||||
const { bootstrapDataLayout } = require('../../shared/bootstrap');
|
||||
const { createSnapshot, getLatestSystemSnapshot, getLatestTenantSnapshot } = require('../../shared/snapshots');
|
||||
const { createResourceDocument, getResource, listAllResources, saveResource, patchResourceState } = require('../../shared/resources');
|
||||
const { getIdempotencyRecord, saveIdempotencyRecord } = require('../../shared/idempotency');
|
||||
const { claimNextWorkOrder, createWorkOrder, finishWorkOrder, getWorkOrder, listWorkOrders } = require('../../shared/work-orders');
|
||||
const { dataDir } = require('../../shared/paths');
|
||||
|
||||
const PORT = Number(process.env.PORT || 3000);
|
||||
const HOST = process.env.HOST || '0.0.0.0';
|
||||
const ADMIN_TOKEN = process.env.ADMIN_TOKEN || 'dev-admin-token';
|
||||
|
||||
const app = express();
|
||||
|
||||
function envelope(context, data, error) {
|
||||
return {
|
||||
schema_version: 'v1',
|
||||
request_id: context.request_id,
|
||||
correlation_id: context.correlation_id,
|
||||
data: data === undefined ? null : data,
|
||||
error: error || null,
|
||||
metadata: {
|
||||
timestamp: new Date().toISOString(),
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
function parseBearerToken(req) {
|
||||
const value = req.headers.authorization || '';
|
||||
|
||||
if (!value.startsWith('Bearer ')) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return value.slice('Bearer '.length).trim();
|
||||
}
|
||||
|
||||
function asyncHandler(handler) {
|
||||
return (req, res, next) => {
|
||||
Promise.resolve(handler(req, res, next)).catch(next);
|
||||
};
|
||||
}
|
||||
|
||||
function requireAdmin(req, res, next) {
|
||||
if (req.headers['x-admin-token'] !== ADMIN_TOKEN) {
|
||||
return next(new AppError(401, 'UNAUTHORIZED', 'Admin token required'));
|
||||
}
|
||||
|
||||
return next();
|
||||
}
|
||||
|
||||
async function requireNodeAuth(req, res, next) {
|
||||
let nodeId = req.params.nodeId;
|
||||
|
||||
if (!nodeId && req.params.workOrderId) {
|
||||
const workOrder = await getWorkOrder(req.params.workOrderId);
|
||||
nodeId = workOrder && workOrder.target ? workOrder.target.node_id : null;
|
||||
}
|
||||
|
||||
const token = parseBearerToken(req);
|
||||
|
||||
if (!(await verifyNodeToken(nodeId, token))) {
|
||||
return next(new AppError(401, 'UNAUTHORIZED', 'Node token invalid', { node_id: nodeId }));
|
||||
}
|
||||
|
||||
return next();
|
||||
}
|
||||
|
||||
app.use(express.json({ limit: '2mb' }));
|
||||
|
||||
app.use((req, res, next) => {
|
||||
req.context = createContext({
|
||||
request_id: req.headers['x-request-id'] || (req.body && req.body.request_id),
|
||||
correlation_id: req.headers['x-correlation-id'] || (req.body && req.body.correlation_id),
|
||||
});
|
||||
|
||||
res.setHeader('x-request-id', req.context.request_id);
|
||||
res.setHeader('x-correlation-id', req.context.correlation_id);
|
||||
next();
|
||||
});
|
||||
|
||||
app.use((req, res, next) => {
|
||||
const startedAt = Date.now();
|
||||
|
||||
res.on('finish', () => {
|
||||
createLogger({ service: 'skipper' }).info('http.request', res.statusCode < 400 ? 'success' : 'error', {
|
||||
method: req.method,
|
||||
path: req.path,
|
||||
status_code: res.statusCode,
|
||||
duration_ms: Date.now() - startedAt,
|
||||
request_id: req.context.request_id,
|
||||
correlation_id: req.context.correlation_id,
|
||||
}).catch(() => {});
|
||||
});
|
||||
|
||||
next();
|
||||
});
|
||||
|
||||
app.get('/health', asyncHandler(async (req, res) => {
|
||||
res.json(envelope(req.context, { ok: true, data_dir: dataDir }));
|
||||
}));
|
||||
|
||||
app.get('/v1/health', asyncHandler(async (req, res) => {
|
||||
res.json(envelope(req.context, { ok: true, data_dir: dataDir }));
|
||||
}));
|
||||
|
||||
app.get('/v1/resources', requireAdmin, asyncHandler(async (req, res) => {
|
||||
res.json(envelope(req.context, await listAllResources()));
|
||||
}));
|
||||
|
||||
app.get('/v1/resources/:resourceType/:resourceId', requireAdmin, asyncHandler(async (req, res) => {
|
||||
const resource = await getResource(req.params.resourceType, req.params.resourceId);
|
||||
|
||||
if (!resource) {
|
||||
throw new AppError(404, 'RESOURCE_NOT_FOUND', 'Resource not found', {
|
||||
resource_type: req.params.resourceType,
|
||||
resource_id: req.params.resourceId,
|
||||
});
|
||||
}
|
||||
|
||||
res.json(envelope(req.context, resource));
|
||||
}));
|
||||
|
||||
app.get('/v1/work-orders', requireAdmin, asyncHandler(async (req, res) => {
|
||||
res.json(envelope(req.context, await listWorkOrders()));
|
||||
}));
|
||||
|
||||
app.get('/v1/work-orders/:workOrderId', requireAdmin, asyncHandler(async (req, res) => {
|
||||
const workOrder = await getWorkOrder(req.params.workOrderId);
|
||||
|
||||
if (!workOrder) {
|
||||
throw new AppError(404, 'RESOURCE_NOT_FOUND', 'Work order not found', {
|
||||
resource_type: 'work_order',
|
||||
resource_id: req.params.workOrderId,
|
||||
});
|
||||
}
|
||||
|
||||
res.json(envelope(req.context, workOrder));
|
||||
}));
|
||||
|
||||
app.post('/v1/nodes/:nodeId/heartbeat', requireNodeAuth, asyncHandler(async (req, res) => {
|
||||
const nodeId = req.params.nodeId;
|
||||
const payload = req.body.data || {};
|
||||
const node = await getResource('node', nodeId);
|
||||
|
||||
if (!node) {
|
||||
throw new AppError(404, 'RESOURCE_NOT_FOUND', 'Node not found', {
|
||||
resource_type: 'node',
|
||||
resource_id: nodeId,
|
||||
});
|
||||
}
|
||||
|
||||
const updated = await saveResource({
|
||||
...node,
|
||||
current_state: {
|
||||
...node.current_state,
|
||||
heartbeat_at: new Date().toISOString(),
|
||||
hostname: payload.hostname || node.current_state.hostname || nodeId,
|
||||
capabilities: Array.isArray(payload.capabilities) ? payload.capabilities : node.current_state.capabilities || [],
|
||||
agent_version: payload.agent_version || node.current_state.agent_version || null,
|
||||
},
|
||||
});
|
||||
|
||||
await emitEvent({
|
||||
type: 'node_heartbeat_received',
|
||||
resource_type: 'node',
|
||||
resource_id: nodeId,
|
||||
request_id: req.context.request_id,
|
||||
correlation_id: req.context.correlation_id,
|
||||
payload: {
|
||||
hostname: updated.current_state.hostname,
|
||||
capabilities: updated.current_state.capabilities || [],
|
||||
},
|
||||
});
|
||||
|
||||
res.json(envelope(req.context, updated));
|
||||
}));
|
||||
|
||||
app.get('/v1/nodes/:nodeId/work-orders/next', requireNodeAuth, asyncHandler(async (req, res) => {
|
||||
const workOrder = await claimNextWorkOrder(req.params.nodeId, req.context);
|
||||
|
||||
if (workOrder) {
|
||||
await emitEvent({
|
||||
type: 'work_order_started',
|
||||
resource_type: 'work_order',
|
||||
resource_id: workOrder.id,
|
||||
request_id: req.context.request_id,
|
||||
correlation_id: req.context.correlation_id,
|
||||
payload: {
|
||||
node_id: req.params.nodeId,
|
||||
tenant_id: workOrder.target.tenant_id || null,
|
||||
status: 'running',
|
||||
},
|
||||
});
|
||||
|
||||
if (workOrder.desired_state.deployment_id) {
|
||||
await emitEvent({
|
||||
type: 'deployment_started',
|
||||
resource_type: 'deployment',
|
||||
resource_id: workOrder.desired_state.deployment_id,
|
||||
request_id: req.context.request_id,
|
||||
correlation_id: req.context.correlation_id,
|
||||
payload: {
|
||||
node_id: req.params.nodeId,
|
||||
work_order_id: workOrder.id,
|
||||
},
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
res.json(envelope(req.context, workOrder));
|
||||
}));
|
||||
|
||||
app.post('/v1/work-orders/:workOrderId/result', requireNodeAuth, asyncHandler(async (req, res) => {
|
||||
const workOrderId = req.params.workOrderId;
|
||||
const payload = req.body.data || {};
|
||||
const existing = await getWorkOrder(workOrderId);
|
||||
|
||||
if (!existing) {
|
||||
throw new AppError(404, 'RESOURCE_NOT_FOUND', 'Work order not found', {
|
||||
resource_type: 'work_order',
|
||||
resource_id: workOrderId,
|
||||
});
|
||||
}
|
||||
|
||||
const status = payload.status === 'success' ? 'success' : 'failed';
|
||||
const completed = await finishWorkOrder(workOrderId, status, payload.result || null);
|
||||
|
||||
if (!completed) {
|
||||
throw new AppError(409, 'WORK_ORDER_NOT_CLAIMABLE', 'Work order is not claimable', {
|
||||
resource_id: workOrderId,
|
||||
});
|
||||
}
|
||||
|
||||
for (const stateUpdate of payload.state_report && Array.isArray(payload.state_report.resources)
|
||||
? payload.state_report.resources
|
||||
: []) {
|
||||
await patchResourceState(stateUpdate.resource_type, stateUpdate.resource_id, {
|
||||
current_state: stateUpdate.current_state,
|
||||
last_applied_state: stateUpdate.last_applied_state,
|
||||
});
|
||||
}
|
||||
|
||||
if (completed.desired_state.deployment_id) {
|
||||
const deployment = await getResource('deployment', completed.desired_state.deployment_id);
|
||||
|
||||
if (deployment) {
|
||||
await saveResource({
|
||||
...deployment,
|
||||
current_state: {
|
||||
...deployment.current_state,
|
||||
status,
|
||||
work_order_id: completed.id,
|
||||
finished_at: completed.finished_at,
|
||||
},
|
||||
last_applied_state: completed.desired_state,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
await emitEvent({
|
||||
type: status === 'success' ? 'work_order_succeeded' : 'work_order_failed',
|
||||
resource_type: 'work_order',
|
||||
resource_id: completed.id,
|
||||
request_id: req.context.request_id,
|
||||
correlation_id: req.context.correlation_id,
|
||||
payload: {
|
||||
node_id: completed.target.node_id,
|
||||
tenant_id: completed.target.tenant_id || null,
|
||||
result: completed.result,
|
||||
},
|
||||
});
|
||||
|
||||
if (completed.desired_state.deployment_id) {
|
||||
await emitEvent({
|
||||
type: status === 'success' ? 'deployment_succeeded' : 'deployment_failed',
|
||||
resource_type: 'deployment',
|
||||
resource_id: completed.desired_state.deployment_id,
|
||||
request_id: req.context.request_id,
|
||||
correlation_id: req.context.correlation_id,
|
||||
payload: {
|
||||
node_id: completed.target.node_id,
|
||||
work_order_id: completed.id,
|
||||
result: completed.result,
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
res.json(envelope(req.context, completed));
|
||||
}));
|
||||
|
||||
app.post('/v1/deployments/:tenantId/apply', requireAdmin, asyncHandler(async (req, res) => {
|
||||
const tenantId = req.params.tenantId;
|
||||
const idempotencyKey = req.headers['x-idempotency-key'] || (req.body && req.body.idempotency_key);
|
||||
const tenant = await getResource('tenant', tenantId);
|
||||
|
||||
if (!tenant) {
|
||||
throw new AppError(404, 'RESOURCE_NOT_FOUND', 'Tenant not found', {
|
||||
resource_type: 'tenant',
|
||||
resource_id: tenantId,
|
||||
});
|
||||
}
|
||||
|
||||
if (!idempotencyKey) {
|
||||
throw new AppError(400, 'INVALID_REQUEST', 'Idempotency key required');
|
||||
}
|
||||
|
||||
const replay = await getIdempotencyRecord('deployment_apply', idempotencyKey);
|
||||
|
||||
if (replay) {
|
||||
return res.json(envelope(req.context, replay.value));
|
||||
}
|
||||
|
||||
const targetNodeId = tenant.desired_state.deployment_policy && tenant.desired_state.deployment_policy.target_node_id;
|
||||
const composeSpec = tenant.desired_state.compose;
|
||||
|
||||
if (!targetNodeId || !composeSpec || !composeSpec.compose_file) {
|
||||
throw new AppError(400, 'INVALID_REQUEST', 'Tenant desired state is incomplete', {
|
||||
tenant_id: tenant.id,
|
||||
});
|
||||
}
|
||||
|
||||
const deployment = await saveResource(createResourceDocument({
|
||||
id: `deployment-${Date.now()}`,
|
||||
resource_type: 'deployment',
|
||||
desired_state: {
|
||||
tenant_id: tenant.id,
|
||||
service_ids: tenant.desired_state.service_ids || [],
|
||||
target_node_id: targetNodeId,
|
||||
strategy: 'apply',
|
||||
compose: composeSpec,
|
||||
},
|
||||
current_state: {
|
||||
status: 'pending',
|
||||
},
|
||||
last_applied_state: {},
|
||||
metadata: {},
|
||||
}));
|
||||
|
||||
const workOrder = await createWorkOrder({
|
||||
type: 'deploy_service',
|
||||
target: {
|
||||
tenant_id: tenant.id,
|
||||
node_id: targetNodeId,
|
||||
},
|
||||
desired_state: {
|
||||
deployment_id: deployment.id,
|
||||
tenant_id: tenant.id,
|
||||
service_ids: tenant.desired_state.service_ids || [],
|
||||
compose_project: composeSpec,
|
||||
},
|
||||
request_id: req.context.request_id,
|
||||
correlation_id: req.context.correlation_id,
|
||||
metadata: {},
|
||||
});
|
||||
|
||||
await emitEvent({
|
||||
type: 'resource_created',
|
||||
resource_type: 'deployment',
|
||||
resource_id: deployment.id,
|
||||
request_id: req.context.request_id,
|
||||
correlation_id: req.context.correlation_id,
|
||||
payload: {
|
||||
tenant_id: tenant.id,
|
||||
target_node_id: targetNodeId,
|
||||
},
|
||||
});
|
||||
|
||||
await emitEvent({
|
||||
type: 'work_order_created',
|
||||
resource_type: 'work_order',
|
||||
resource_id: workOrder.id,
|
||||
request_id: req.context.request_id,
|
||||
correlation_id: req.context.correlation_id,
|
||||
payload: {
|
||||
tenant_id: tenant.id,
|
||||
node_id: targetNodeId,
|
||||
deployment_id: deployment.id,
|
||||
},
|
||||
});
|
||||
|
||||
const responseData = {
|
||||
deployment,
|
||||
work_order: workOrder,
|
||||
};
|
||||
|
||||
await saveIdempotencyRecord('deployment_apply', idempotencyKey, responseData);
|
||||
res.status(201).json(envelope(req.context, responseData));
|
||||
}));
|
||||
|
||||
app.get('/v1/snapshots/system/latest', requireAdmin, asyncHandler(async (req, res) => {
|
||||
const snapshot = (await getLatestSystemSnapshot()) || (await createSnapshot('system', req.context));
|
||||
|
||||
await emitEvent({
|
||||
type: 'snapshot_created',
|
||||
resource_type: 'snapshot',
|
||||
resource_id: snapshot.snapshot_id,
|
||||
request_id: req.context.request_id,
|
||||
correlation_id: req.context.correlation_id,
|
||||
payload: {
|
||||
scope: 'system',
|
||||
},
|
||||
});
|
||||
|
||||
res.json(envelope(req.context, snapshot));
|
||||
}));
|
||||
|
||||
app.get('/v1/snapshots/tenants/:tenantId/latest', requireAdmin, asyncHandler(async (req, res) => {
|
||||
const scope = `tenant:${req.params.tenantId}`;
|
||||
const snapshot = (await getLatestTenantSnapshot(req.params.tenantId)) || (await createSnapshot(scope, req.context));
|
||||
|
||||
await emitEvent({
|
||||
type: 'snapshot_created',
|
||||
resource_type: 'snapshot',
|
||||
resource_id: snapshot.snapshot_id,
|
||||
request_id: req.context.request_id,
|
||||
correlation_id: req.context.correlation_id,
|
||||
payload: {
|
||||
scope,
|
||||
},
|
||||
});
|
||||
|
||||
res.json(envelope(req.context, snapshot));
|
||||
}));
|
||||
|
||||
app.use((error, req, res, next) => {
|
||||
createLogger({
|
||||
service: 'skipper',
|
||||
request_id: req.context ? req.context.request_id : null,
|
||||
correlation_id: req.context ? req.context.correlation_id : null,
|
||||
}).error('http.error', 'failed', {
|
||||
code: error.code || 'INTERNAL_ERROR',
|
||||
message: error.message,
|
||||
details: error.details || {},
|
||||
}).catch(() => {});
|
||||
|
||||
if (error instanceof AppError) {
|
||||
return res.status(error.statusCode).json(envelope(req.context || createContext(), null, {
|
||||
code: error.code,
|
||||
message: error.message,
|
||||
details: error.details,
|
||||
}));
|
||||
}
|
||||
|
||||
return res.status(500).json(envelope(req.context || createContext(), null, {
|
||||
code: 'INTERNAL_ERROR',
|
||||
message: 'Internal server error',
|
||||
details: {},
|
||||
}));
|
||||
});
|
||||
|
||||
async function start() {
|
||||
await bootstrapDataLayout();
|
||||
|
||||
app.listen(PORT, HOST, () => {
|
||||
createLogger({ service: 'skipper' }).info('server.start', 'success', {
|
||||
host: HOST,
|
||||
port: PORT,
|
||||
data_dir: dataDir,
|
||||
}).catch(() => {});
|
||||
});
|
||||
}
|
||||
|
||||
start().catch((error) => {
|
||||
process.stderr.write(`${error.stack || error.message}\n`);
|
||||
process.exit(1);
|
||||
});
|
||||
Reference in New Issue
Block a user