Guides understanding and working with Apache Beam runners (Direct, Dataflow, Flink, Spark, etc.). Use when configuring pipelines for different execution environments or debugging runner-specific issues.
Install
npx skillscat add apache/beam/runners Install via the SkillsCat registry.
SKILL.md
Apache Beam Runners
Overview
Runners execute Beam pipelines on distributed processing backends. Each runner translates the portable Beam model to its native execution engine.
Available Runners
| Runner | Location | Description |
|---|---|---|
| Direct | runners/direct-java/ |
Local execution for testing |
| Prism | runners/prism/ |
Portable local runner |
| Dataflow | runners/google-cloud-dataflow-java/ |
Google Cloud Dataflow |
| Flink | runners/flink/ |
Apache Flink |
| Spark | runners/spark/ |
Apache Spark |
| Samza | runners/samza/ |
Apache Samza |
| Jet | runners/jet/ |
Hazelcast Jet |
| Twister2 | runners/twister2/ |
Twister2 |
Direct Runner
For local development and testing.
Java
PipelineOptions options = PipelineOptionsFactory.create();
options.setRunner(DirectRunner.class);
Pipeline p = Pipeline.create(options);Python
options = PipelineOptions()
options.view_as(StandardOptions).runner = 'DirectRunner'
p = beam.Pipeline(options=options)Command Line
--runner=DirectRunnerDataflow Runner
Prerequisites
- GCP project with Dataflow API enabled
- Service account with Dataflow Admin role
- GCS bucket for staging
Java Usage
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
options.setRunner(DataflowRunner.class);
options.setProject("my-project");
options.setRegion("us-central1");
options.setTempLocation("gs://my-bucket/temp");Python Usage
options = PipelineOptions([
'--runner=DataflowRunner',
'--project=my-project',
'--region=us-central1',
'--temp_location=gs://my-bucket/temp'
])Runner v2
--experiments=use_runner_v2Custom SDK Container
--sdkContainerImage=gcr.io/project/beam_java11_sdk:customFlink Runner
Embedded Mode
FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
options.setRunner(FlinkRunner.class);
options.setFlinkMaster("[local]");Cluster Mode
options.setFlinkMaster("host:port");Portable Mode (Python)
options = PipelineOptions([
'--runner=FlinkRunner',
'--flink_master=host:port',
'--environment_type=LOOPBACK' # or DOCKER, EXTERNAL
])Spark Runner
Java
SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
options.setRunner(SparkRunner.class);
options.setSparkMaster("local[*]"); # or spark://host:portPython (Portable)
options = PipelineOptions([
'--runner=SparkRunner',
'--spark_master_url=local[*]'
])Testing with Runners
ValidatesRunner Tests
Tests that validate runner correctness:
# Direct Runner
./gradlew :runners:direct-java:validatesRunner
# Flink Runner
./gradlew :runners:flink:1.18:validatesRunner
# Spark Runner
./gradlew :runners:spark:3:validatesRunner
# Dataflow Runner
./gradlew :runners:google-cloud-dataflow-java:validatesRunnerTestPipeline with Runners
@Rule public TestPipeline pipeline = TestPipeline.create();
// Set runner via system property
-DbeamTestPipelineOptions='["--runner=TestDataflowRunner"]'Portable Runners
Concept
- SDK-independent execution via Fn API
- SDK runs in container, communicates via gRPC
Environment Types
DOCKER- SDK in Docker containerLOOPBACK- SDK in same process (testing)EXTERNAL- SDK at specified addressPROCESS- SDK in subprocess
Job Server
Start Flink job server:
./gradlew :runners:flink:1.18:job-server:runShadowStart Spark job server:
./gradlew :runners:spark:3:job-server:runShadowRunner-Specific Options
Dataflow
| Option | Description |
|---|---|
--project |
GCP project |
--region |
GCP region |
--tempLocation |
GCS temp location |
--stagingLocation |
GCS staging |
--numWorkers |
Initial workers |
--maxNumWorkers |
Max workers |
--workerMachineType |
VM type |
Flink
| Option | Description |
|---|---|
--flinkMaster |
Flink master address |
--parallelism |
Default parallelism |
--checkpointingInterval |
Checkpoint interval |
Spark
| Option | Description |
|---|---|
--sparkMaster |
Spark master URL |
--sparkConf |
Additional Spark config |
Building Runner Artifacts
Dataflow Worker Jar
./gradlew :runners:google-cloud-dataflow-java:worker:shadowJarFlink Job Server
./gradlew :runners:flink:1.18:job-server:shadowJarSpark Job Server
./gradlew :runners:spark:3:job-server:shadowJarDebugging
Direct Runner
- Enable logging:
-Dorg.slf4j.simpleLogger.defaultLogLevel=debug - Use
--targetParallelism=1for deterministic execution
Dataflow
- Check Dataflow UI: console.cloud.google.com/dataflow
- Use
--experiments=upload_graphfor graph debugging - Worker logs in Cloud Logging
Portable Runners
- Enable debug logging on job server
- Check SDK harness logs in worker containers