Build, operate, and debug Atlas Stream Processing through the MongoDB MCP Server. Use when user says "set up a Kafka pipeline", "deploy a stream processor", "add a connection to my workspace", "why is my processor failing", "stop my processor", "delete my workspace", "show me my Streams workspaces", or any task involving Atlas Stream Processing workspaces, connections, or processors. Do NOT use for general MongoDB queries, Atlas cluster management, or non-Streams Atlas operations.
Install
npx skillscat add jwongmongodb/streams-mcp-skills Install via the SkillsCat registry.
Atlas Stream Processing — MCP Tool Skill
Prerequisites
This skill requires the MongoDB MCP Server connected with:
- Atlas API credentials (
apiClientIdandapiClientSecret) previewFeatures: ["streams"]enabled in the MCP server config
The 4 tools: atlas-streams-discover, atlas-streams-build, atlas-streams-manage, atlas-streams-teardown.
CRITICAL: Consult Official Examples Before Creating Processors
IMPORTANT: Before creating any processor, consult the official MongoDB ASP examples repo for best practices and valid patterns:
https://github.com/mongodb/ASP_example
This repo is continuously updated with high-value customer-driven examples and contains 33+ end-to-end processors, 6 quickstarts, and 15 code snippets maintained by MongoDB engineering. Use it as the authoritative source for:
- Valid pipeline stage combinations and ordering
- Correct
$sourceand$merge/$emitsyntax per connection type - Windowing patterns (tumbling, hopping, session)
- Advanced patterns (joins,
$httpsenrichment, dynamic routing, chained processors)
Key quickstart references:
| Quickstart | Pattern |
|---|---|
00_hello_world.json |
Inline $source.documents with $match (zero infra, ephemeral) |
01_changestream_basic.json |
Change stream → tumbling window → $merge to Atlas |
02_changestream_to_kafka.json |
Change stream → $emit to Kafka topic |
03_kafka_to_mongo.json |
Kafka source → tumbling window rollup → $merge to Atlas |
04_mongo_to_mongo.json |
Change stream → transform → $merge (archive pattern) |
05_kafka_tail.json |
Kafka source with no sink (ephemeral tail -f) |
Pipeline Warnings — Invalid Constructs
These MongoDB aggregation features are NOT valid in streaming pipelines:
$$NOW— not available in stream processing context$$ROOT— not available in stream processing context$$CURRENT— not available in stream processing context- HTTPS connections as
$source— HTTPS connections are for$httpsenrichment only, not as data sources - Pipelines without a sink —
$mergeor$emitis required for persistent (deployed) processors. Sinkless pipelines only work ephemerally viasp.process()in mongosh - Kafka
$sourcewithouttopic— Kafka sources MUST include atopicfield
Instructions
You are helping a user interact with Atlas Stream Processing through the MongoDB MCP Server. This skill teaches you which tools to call, what fields to fill, and how to sequence multi-step workflows.
Step 1: Select the right tool
| Tool | When to use |
|---|---|
atlas-streams-discover |
See, inspect, or diagnose — "list workspaces", "show processor stats", "why is it failing?" |
atlas-streams-build |
Create — "set up a workspace", "add a Kafka connection", "deploy a processor" |
atlas-streams-manage |
Change state or config — "start/stop processor", "change pipeline", "update credentials" |
atlas-streams-teardown |
Delete — "delete workspace", "remove connection", "clean up" |
Intent mapping:
- "What do I have?" / "Show me" / "List" / "Status" / "Why failing?" →
atlas-streams-discover - "Create" / "Set up" / "Add" / "Deploy" / "Connect" →
atlas-streams-build - "Start" / "Stop" / "Restart" / "Change" / "Modify" / "Update" →
atlas-streams-manage - "Delete" / "Remove" / "Tear down" / "Clean up" →
atlas-streams-teardown
When in doubt, call atlas-streams-discover first to understand current state.
Do NOT use these tools for general MongoDB queries (find/aggregate), Atlas cluster management (atlas-list-clusters), or non-Streams operations.
Step 2: Fill the right fields
Every tool call requires projectId. If unknown, call atlas-list-projects first.
atlas-streams-build field mapping
CRITICAL: This tool uses a resource enum. Only fill fields for the selected resource type.
resource = "workspace":
Fill: projectId, workspaceName, cloudProvider, region, tier, includeSampleData
Leave empty: all connection and processor fields
resource = "connection":
Fill: projectId, workspaceName, connectionName, connectionType, connectionConfig
Leave empty: all workspace and processor fields
(See references/connection-configs.md for type-specific schemas)
resource = "processor":
Fill: projectId, workspaceName, processorName, pipeline, dlq (recommended), autoStart (optional)
Leave empty: all workspace and connection fields
(See references/pipeline-patterns.md for pipeline examples)
resource = "privatelink":
Fill: projectId, workspaceName, privateLinkProvider, privateLinkConfig
Leave empty: all connection and processor fields
atlas-streams-discover notes
action: "list-workspaces"— list all workspaces in a projectaction: "inspect-workspace"— details on a specific workspaceaction: "list-connections"/"inspect-connection"— connections in a workspaceaction: "list-processors"/"inspect-processor"— processors in a workspaceaction: "diagnose-processor"— combined health report (state, stats, connection health, errors, actionable recommendations)action: "get-logs"— operational logs (default) or audit logs. UselogType: "operational"for runtime errors (Kafka failures, schema issues, OOM). UselogType: "audit"for lifecycle events (start/stop). Optionally filter byresourceName(processor name).action: "get-networking"— PrivateLink/VPC peering. Optionally providecloudProviderandregionfor account details.
Pagination (all list actions): limit (1-100, default 20), pageNum (default 1).
Response format: responseFormat — "concise" (default for list actions: names/states only) or "detailed" (default for inspect/diagnose: full config).
atlas-streams-manage field mapping
Always fill: projectId, workspaceName. Then by action:
"start-processor"→resourceName. Optional:tier,resumeFromCheckpoint,startAtOperationTime"stop-processor"→resourceName"modify-processor"→resourceName. At least one of:pipeline,dlq,newName"update-workspace"→newRegionornewTier"update-connection"→resourceName,connectionConfig"accept-peering"→peeringId,requesterAccountId,requesterVpcId"reject-peering"→peeringId
atlas-streams-teardown field mapping
Always fill: projectId, resource. Then:
resource: "workspace"→workspaceNameresource: "connection"or"processor"→workspaceName,resourceNameresource: "privatelink"or"peering"→resourceName(the ID)
Step 3: Sequence multi-step workflows
Setup from scratch:
atlas-streams-build→resource: "workspace"(cloud, region, tier)atlas-streams-build→resource: "connection"(one call per connection)atlas-streams-build→resource: "processor"(reference connections by name in pipeline)- Set
autoStart: truein step 3, or callatlas-streams-manage→action: "start-processor"
Incremental pipeline development (recommended):
See references/development-workflow.md for the full 5-phase lifecycle.
- Start with basic
$source→$mergepipeline (validate connectivity) - Add
$matchstages (validate filtering) - Add
$addFields/$projecttransforms (validate reshaping) - Add windowing or enrichment (validate aggregation logic)
- Add error handling / DLQ configuration
Modify a processor pipeline:
atlas-streams-manage→action: "stop-processor"— processor MUST be stopped firstatlas-streams-manage→action: "modify-processor"— provide new pipelineatlas-streams-manage→action: "start-processor"— restart
Debug a failing processor:
See references/output-diagnostics.md for the full decision framework.
atlas-streams-discover→action: "diagnose-processor"— one-shot health reportatlas-streams-discover→action: "get-logs"(defaults tologType: "operational") — runtime errors, Kafka failures, schema issues, OOM messages. Filter byresourceNamefor a specific processor.- Classify processor type before interpreting output volume:
- Alert/anomaly processors: low or zero output is NORMAL and healthy
- Data transformation processors: low output is a RED FLAG
- Filter processors: variable output depending on data match rate
- If DLQ analysis needed → use the MongoDB
findtool on the DLQ collection (not a Streams tool) - If lifecycle event history needed →
atlas-streams-discover→action: "get-logs",logType: "audit"— shows start/stop events - Based on diagnosis: stop → modify → restart, or investigate connection health
Tear down:
Delete workspace directly (removes all contained resources), or individually: delete processors (auto-stops if running) → delete connections (fails if referenced by running processors) → delete workspace.
Chained processors:
Multiple processors can be chained: processor A writes to an Atlas collection via $merge, processor B reads from that collection via change stream $source. This enables multi-stage processing pipelines.
MCP Tool Behaviors
These are built-in behaviors of the MCP tools — do not duplicate this logic manually.
Connection creation — elicitation: When creating a connection, the build tool auto-collects missing sensitive fields (passwords, bootstrap servers, usernames) via an interactive form using the MCP elicitation protocol. Do NOT ask the user for these fields yourself — let the tool elicit them.
Connection creation — auto-normalization:
bootstrapServersarray → auto-converted to comma-separated stringschemaRegistryUrlsstring → auto-wrapped in arraydbRoleToExecute→ auto-defaults to{role: "readWriteAnyDatabase", type: "BUILT_IN"}for Cluster connections
Workspace creation — sample data: includeSampleData defaults to true, which auto-creates the sample_stream_solar connection via a special API endpoint.
State pre-checks (manage tool):
start-processor→ errors if processor is already STARTEDstop-processor→ no-ops if already STOPPED or CREATED (not an error)modify-processor→ errors if processor is STARTED (must stop first)
Teardown safety checks:
- Processor deletion → auto-stops the processor before deleting (no need to stop manually first)
- Connection deletion → scans all processor pipelines for references; blocks deletion if any running processor uses the connection. Stop/delete referencing processors first.
- Workspace deletion → counts connections and processors, reports the full impact before deleting
Pre-Deploy Quality Checklist
Before creating a processor, verify:
- Pipeline starts with
$sourceand ends with$mergeor$emit - No
$$NOW,$$ROOT, or$$CURRENTin the pipeline - Kafka
$sourceincludes atopicfield - HTTPS connections are only used in
$httpsenrichment stages, not in$source - All
connectionNamereferences match actual connections in the workspace (usediscover→list-connectionsto verify) - DLQ is configured (recommended for production)
-
$httpsstages useonError: "dlq"(not"fail") - API auth is stored in connection settings, not hardcoded in the pipeline
Post-Deploy Verification Workflow
After creating and starting a processor:
atlas-streams-discover→action: "inspect-processor"— confirm state is STARTEDatlas-streams-discover→action: "diagnose-processor"— check for errors in the health report- Use MongoDB
counttool on the DLQ collection — verify no errors accumulating - Use MongoDB
findtool on the output collection — verify documents are arriving - If output is low/zero, classify processor type before assuming a problem (see Debug section)
Tier Sizing & Performance
See references/sizing-and-parallelism.md for the complete guide including complexity scoring, worked examples, and cost optimization.
Tier Reference
| Tier | vCPU | RAM | Bandwidth | Max Parallelism | Kafka Partitions | Use case |
|---|---|---|---|---|---|---|
| SP2 | 0.25 | 512MB | 50 Mbps | 1 | 32 | Minimal filtering, testing |
| SP5 | 0.5 | 1GB | 125 Mbps | 2 | 64 | Simple filtering and routing |
| SP10 | 1 | 2GB | 200 Mbps | 8 | Unlimited | Moderate workloads, joins, grouping |
| SP30 | 2 | 8GB | 750 Mbps | 16 | Unlimited | Windows, JavaScript UDFs, production |
| SP50 | 8 | 32GB | 2500 Mbps | 64 | Unlimited | High throughput, large window state |
Sizing Rules
- Stream Processing reserves 20% memory for overhead — user processes are limited to 80%
- Monitor
memoryUsageBytesvia processor stats to determine proper tier - If memory usage exceeds 80% of tier capacity, processor fails with OOM
- Use
parallelismsetting on$merge,$lookup,$httpsfor concurrent I/O operations
Parallelism formula: minimum tier = sum of (parallelism - 1) for all stages where parallelism > 1. Example: a pipeline with $lookup at parallelism 3 and $merge at parallelism 4 needs (3-1) + (4-1) = 5 excess parallelism → requires SP10 (max 8).
Performance Best Practices
- Place
$matchstages as early as possible to reduce downstream volume - Place
$httpsenrichment calls downstream of window stages to batch and reduce API call frequency - Use
partitionIdleTimeoutin Kafka$sourceto unblock windows when partitions go idle - Use descriptive processor names indicating their function (e.g.,
celsius-converter,fraud-detector)
Troubleshooting
| Error | Cause | Solution |
|---|---|---|
| 404 on workspace | Doesn't exist or misspelled | discover → list-workspaces |
| 409 on create | Name already exists | Inspect existing resource or pick new name |
| 402 on create | No billing configured | Add payment method in Atlas → Billing. Use sp.process() in mongosh as free alternative |
| "processor must be stopped" | Tried to modify running processor | manage → stop-processor first |
| Processor FAILED | Pipeline error, connection failure, or OOM | discover → diagnose-processor |
| bootstrapServers format | Passed as array instead of string | Use comma-separated string: "broker1:9092,broker2:9092" |
| "must choose at least one role" | Cluster connection without dbRoleToExecute |
Defaults to readWriteAnyDatabase — or specify custom role |
| "No cluster named X" | Cluster doesn't exist in project | atlas-list-clusters to verify |
| IAM role ARN not found | ARN not registered in project | Register via Atlas → Cloud Provider Access |
| dataProcessRegion format | Wrong region format | Use Atlas names: VIRGINIA_USA, not US_EAST_1 |
| Low/zero processor output | May be normal for alert-type processors | Classify processor type before assuming a problem |
$$NOW / $$ROOT / $$CURRENT in pipeline |
Invalid in streaming context | Remove these system variables; use alternative approaches |
Billing & Cost Awareness
Atlas Stream Processing has no free tier. All deployed processors incur charges while running. You MUST surface this proactively — do not silently start a processor without the user understanding cost implications.
Before creating or starting a processor
- Confirm billing is set up. Ask the user if they have a payment method on their Atlas account. If unsure, recommend they verify in Atlas → Organization → Billing before proceeding.
- Warn about ongoing costs. A running processor bills continuously, calculated per-second.
start-processorbegins billing,stop-processorstops it. Suggest stopping processors when not actively needed. - If no billing or user wants to avoid charges: Recommend
sp.process()in mongosh as an ephemeral alternative. This runs a pipeline ad-hoc without deploying a named processor — no billing method required, no persistent cost. Ideal for prototyping and validating pipelines before committing to a deployed processor.
If you receive a 402 error
Do NOT retry. Instead:
- Explain that Atlas Stream Processing requires an active payment method
- Direct the user to Atlas → Organization → Billing to add a credit card
- Offer
sp.process()in mongosh as a no-cost way to test their pipeline in the meantime
Safety Rules
atlas-streams-teardownandatlas-streams-managerequire user confirmation — do not bypass- Deleting a workspace removes ALL connections and processors permanently
- Processors must be STOPPED before modifying their pipeline
- After stopping, state is preserved 45 days — then checkpoints are discarded
resumeFromCheckpoint: falsedrops all window state — warn user first- Moving processors between workspaces is not supported (must recreate)
- Dry-run / simulation is not supported — explain what you would do and ask for confirmation
- Always warn users about billing before starting processors
- Store API authentication credentials in connection settings, never hardcode in processor pipelines
Additional References
Internal Reference Files
- references/pipeline-patterns.md — Stage categories, source/sink/window/enrichment patterns, full pipeline examples
- references/connection-configs.md — Connection config schemas by type, auth patterns, elicitation behavior
- references/development-workflow.md — 5-phase lifecycle, debugging decision trees, monitoring cadence
- references/output-diagnostics.md — Processor output classification, red/green flag framework, diagnostic workflow
- references/sizing-and-parallelism.md — Tier hardware specs, parallelism formula, complexity scoring, cost optimization
External Resources
- Official ASP examples (33+ processors, continuously updated): https://github.com/mongodb/ASP_example
- ASP Claude plugin (tier sizing, development workflows, CI/CD): https://github.com/kgorman/asp_claude
- Atlas Stream Processing billing: https://www.mongodb.com/docs/atlas/billing/stream-processing-costs/