Implements the Pipeline design pattern in Node.js for data transformation. Use when the user mentions pipeline pattern, or when you need a fixed sequence of stages that each transform data and pass to the next—ETL, parsing, data processing, or any linear transformation flow that runs to completion.
Install
npx skillscat add progmichaelkibenko/top-coder-agent-skills/pipeline-pattern-nodejs Install via the SkillsCat registry.
Pipeline (Node.js)
Why: Pipeline runs data through a fixed sequence of stages. Each stage receives input, transforms it, and passes the result to the next. All stages run in order; there is no conditional “skip” or early exit (barring errors). You avoid one big function and keep each transformation in its own stage.
Hard constraints: Stages share a single interface (e.g. process(data) returning transformed data). A pipeline composes stages in a fixed order and runs them one after another. Flow is linear—no branching or handler-driven termination.
When to use
- Data transformation: Parse → normalize → enrich → serialize, where every step must run.
- ETL / data processing: Ingest, transform, load; or parse log lines, extract fields, aggregate.
- Raw JSON sanitization: Transform/sanitize JSON with one stage per field (or concern) to remove; build the pipeline conditionally from the request (e.g. add removal stages by response type or
includeXflags). - Parsing pipelines: Raw input → tokenize → parse → build AST → emit.
- You need a fixed, mandatory sequence (unlike Chain of Responsibility, where handlers can short-circuit).
Structure
| Role | Responsibility |
|---|---|
| Stage (interface) | Declares process(data) (or process(input) => output). Each stage receives data, returns transformed data. |
| Concrete stages | Implement the interface; one transformation per stage. No “pass or stop” decision—always pass result to next. |
| Pipeline | Holds an ordered list of stages; runs them in sequence, passing each stage’s output as the next stage’s input. |
| Client | Builds the pipeline (e.g. new Pipeline([parse, normalize, enrich, serialize])) and runs it with initial input. |
Data flows in one direction; each stage’s output is the next stage’s input. The pipeline runs to completion (or fails at a stage).
Real example: access-log ETL pipeline
A pipeline that justifies the pattern: raw access log text → parse → filter → enrich → normalize → aggregate → serialize. Each stage has real logic; combining them in one function would be hard to test and extend.
Pipeline payload shape (evolves per stage):
- Input:
{ raw: string }(log file content). - After Parse:
{ records: Array<{ ip, method, path, status, timestamp, ... }> }. - After Filter/Enrich/Normalize: same
recordsarray, refined. - After Aggregate:
{ summary: { byEndpoint, totalRequests, errorCount, errorRate } }. - After Serialize: string (JSON report).
❌ ANTI-PATTERN: One big function
// One function: parsing, filtering, enrichment, aggregation, and output all mixed.
function processAccessLogs(raw) {
const lines = raw.trim().split('\n');
const records = [];
for (const line of lines) {
const m = line.match(/^(\S+) \S+ \S+ \[([^\]]+)\] "(\w+) ([^"]+)" (\d+)/);
if (!m) continue;
const [, ip, time, method, path, status] = m;
if (Number(status) < 400) continue; // filter
const geo = lookupGeo(ip); // enrich
records.push({ ip, method, path, status: Number(status), timestamp: time, country: geo?.country });
}
const byPath = {};
for (const r of records) {
const key = (r.path.split('/')[1] || 'root');
byPath[key] = (byPath[key] || 0) + 1;
}
return JSON.stringify({ byEndpoint: byPath, total: records.length });
}Problems: parse/filter/enrich/aggregate/serialize are tangled; can’t unit-test parsing or aggregation alone; adding a stage (e.g. normalize timestamps) forces edits here; reuse is impossible.
✅ TOP-CODER PATTERN: One stage per concern, pipeline runs all
Pipeline runner and stage contract:
// pipeline/Pipeline.js
class Pipeline {
constructor(stages) {
this.stages = stages;
}
run(input) {
return this.stages.reduce((data, stage) => stage.process(data), input);
}
}ParseStage — split lines, parse format (e.g. Apache combined or JSONL), skip malformed:
// pipeline/stages/ParseStage.js
const COMBINED_LOG = /^(\S+) \S+ \S+ \[([^\]]+)\] "(\w+) ([^"]+)" (\d+)/;
class ParseStage {
process(data) {
const lines = (data.raw || '').trim().split(/\r?\n/).filter(Boolean);
const records = [];
for (const line of lines) {
const m = line.match(COMBINED_LOG);
if (m) {
records.push({ ip: m[1], timestamp: m[2], method: m[3], path: m[4], status: parseInt(m[5], 10) });
}
// else: try JSON.parse for JSONL, or skip
}
return { ...data, records };
}
}FilterStage — e.g. only errors, or only certain paths (configurable):
// pipeline/stages/FilterStage.js
class FilterStage {
constructor(options = {}) {
this.minStatus = options.minStatus ?? 400;
this.pathPrefix = options.pathPrefix ?? null;
}
process(data) {
let records = data.records || [];
records = records.filter(r => r.status >= this.minStatus);
if (this.pathPrefix) records = records.filter(r => r.path.startsWith(this.pathPrefix));
return { ...data, records };
}
}EnrichStage — add geo from IP, derive endpoint group:
// pipeline/stages/EnrichStage.js
class EnrichStage {
constructor(geoLookup = (ip) => ({})) {
this.geoLookup = geoLookup;
}
process(data) {
const records = (data.records || []).map(r => ({
...r,
country: this.geoLookup(r.ip)?.country ?? null,
endpointGroup: (r.path.split('/').filter(Boolean)[0]) || 'root',
}));
return { ...data, records };
}
}NormalizeStage — consistent types and field names:
// pipeline/stages/NormalizeStage.js
class NormalizeStage {
process(data) {
const records = (data.records || []).map(r => ({
ip: String(r.ip),
method: (r.method || 'GET').toUpperCase(),
path: String(r.path),
status: Number(r.status) || 0,
timestamp: r.timestamp || null,
country: r.country ?? null,
endpoint_group: r.endpointGroup ?? 'root',
}));
return { ...data, records };
}
}AggregateStage — group by endpoint, compute counts and error rate:
// pipeline/stages/AggregateStage.js
class AggregateStage {
process(data) {
const records = data.records || [];
const byEndpoint = {};
let errorCount = 0;
for (const r of records) {
const key = r.endpoint_group ?? 'root';
byEndpoint[key] = (byEndpoint[key] || 0) + 1;
if (r.status >= 400) errorCount++;
}
const totalRequests = records.length;
return {
...data,
summary: {
byEndpoint,
totalRequests,
errorCount,
errorRate: totalRequests ? (errorCount / totalRequests) : 0,
},
};
}
}SerializeStage — format final output:
// pipeline/stages/SerializeStage.js
class SerializeStage {
process(data) {
return JSON.stringify(data.summary ?? data, null, 2);
}
}Client wires the pipeline and runs it:
// services/accessLogPipeline.js
const pipeline = new Pipeline([
new ParseStage(),
new FilterStage({ minStatus: 400 }),
new EnrichStage(lookupGeoByIP),
new NormalizeStage(),
new AggregateStage(),
new SerializeStage(),
]);
const report = pipeline.run({ raw: logFileContent });Benefits: parse/filter/enrich/normalize/aggregate/serialize are separate and testable; you can add stages (e.g. dedupe, rate-limit detection) or swap implementations without touching others; the fixed order and “run to completion” semantics match the Pipeline pattern.
Dynamic composition: raw JSON sanitization
Another real case: transform raw JSON by removing or redacting fields, with one stage per removal and conditionally add stages based on the request (e.g. response type, user role, or includeX flags).
Idea: Build the pipeline at runtime from the request. Each stage does one thing (remove one field, or one category of fields). Order is fixed once the pipeline is built; composition is dynamic.
Example — one stage per field to remove, conditional stages by request type:
// pipeline/stages/RemoveFieldStage.js — reusable stage that drops one path
class RemoveFieldStage {
constructor(path) {
this.path = path; // e.g. 'internalId', 'metadata.audit', 'password'
}
process(data) {
const out = { ...data };
const parts = this.path.split('.');
let cur = out;
for (let i = 0; i < parts.length - 1; i++) {
const key = parts[i];
if (cur[key] == null) return data;
cur = cur[key] = { ...cur[key] };
}
delete cur[parts[parts.length - 1]];
return out;
}
}
// Or dedicated stages per concern (easier to test and name)
class RemoveInternalIdStage {
process(data) {
const { internalId, ...rest } = data;
return rest;
}
}
class RemovePasswordStage {
process(data) {
const { password, ...rest } = data;
return rest;
}
}
class RemoveAuditFieldsStage {
process(data) {
const { metadata, ...rest } = data;
const { audit, ...metaRest } = metadata || {};
return metadata ? { ...rest, metadata: metaRest } : rest;
}
}Build pipeline conditionally from request:
// services/sanitizeJson.js
function buildSanitizePipeline(options) {
const { isUser = false, isPayment = false, includeAudit = true } = options;
const stages = [
new RemoveInternalIdStage(),
new RemoveFieldStage('_rev'),
];
if (isUser) {
stages.push(new RemovePasswordStage());
stages.push(new RemoveFieldStage('tokens'));
} else if (isPayment) {
stages.push(new RemoveFieldStage('cardNumber'));
stages.push(new RemoveFieldStage('cvv'));
}
if (!includeAudit) {
stages.push(new RemoveAuditFieldsStage());
}
return new Pipeline(stages);
}
const pipeline = buildSanitizePipeline({ isUser: true, includeAudit: false });
const sanitized = pipeline.run(rawJsonFromDb);Benefits: one stage per removal, easy to test; add/remove stages without touching others; only the list of stages is conditional, not the flow inside.
Node.js notes
- Sync vs async: For async stages, use
async process(data)andawaitin the pipeline runner (e.g.for (const stage of this.stages) { data = await stage.process(data); }). - Error handling: Pipeline typically fails fast: if a stage throws, the pipeline stops. Handle errors in the runner or in individual stages.
- Files: One file per stage when logic is non-trivial (e.g.
pipeline/stages/NormalizeStage.js); pipeline runner in a shared file. - No overkill: For two or three fixed steps, a simple function composition may be enough; use Pipeline when you have many steps or need to reuse/reorder stages.
Chain of Responsibility vs Pipeline
| Feature | Pipeline | Chain of Responsibility |
|---|---|---|
| Execution | Fixed, mandatory sequence | Conditional; handler decides whether to pass to the next |
| Flow | Linear, no branching | Allows flexible termination and branching |
| Termination | Runs to completion (barring errors) | Can be terminated early by a handler |
| Use cases | Data processing, parsing, ETL | Event handling, approval workflows, validation, message filtering |
Use Pipeline when every stage must run in a fixed order (e.g. data transformation). Use CoR when handlers can short-circuit or decide not to pass (e.g. validation, approval chains).
Reference
- Pipeline is a common architectural pattern; related to Unix pipes, middleware chains (with fixed flow), and ETL pipelines.