This skill should be used when the user asks about "Effect Stream", "Stream.from", "Stream.map", "Stream.filter", "Stream.run", "streaming data", "async iteration", "Sink", "Channel", "Stream.concat", "Stream.merge", "backpressure", "Stream.fromIterable", "chunked processing", "real-time data", or needs to understand how Effect handles streaming data processing.
Install
npx skillscat add andrueandersoncs/claude-skill-effect-ts/streams Install via the SkillsCat registry.
SKILL.md
Streams in Effect
Overview
Effect Streams provide:
- Lazy evaluation - Elements produced on demand
- Resource safety - Automatic cleanup
- Backpressure - Producer/consumer coordination
- Composition - Transform, filter, merge streams
- Error handling - Typed errors in stream pipeline
Stream<A, E, R>
// Produces values of type A
// May fail with error E
// Requires environment RCreating Streams
From Values
import { Stream } from "effect"
const numbers = Stream.make(1, 2, 3, 4, 5)
const fromArray = Stream.fromIterable([1, 2, 3])
const empty = Stream.empty
const single = Stream.succeed(42)
const infinite = Stream.iterate(1, (n) => n + 1)From Effects
const fromEffect = Stream.fromEffect(fetchData())
const polling = Stream.repeatEffect(checkStatus())
const scheduled = Stream.repeatEffectWithSchedule(
checkStatus(),
Schedule.spaced("5 seconds")
)From Async Sources
// From async iterable
const fromAsyncIterable = Stream.fromAsyncIterable(
asyncGenerator(),
(error) => new StreamError({ cause: error })
)
// From callback/event emitter
const fromCallback = Stream.async<number, never>((emit) => {
const handler = (value: number) => emit.single(value)
eventEmitter.on("data", handler)
return Effect.sync(() => eventEmitter.off("data", handler))
})
// From queue
const fromQueue = Stream.fromQueue(queue)Generating Streams
const naturals = Stream.unfold(1, (n) => Option.some([n, n + 1]))
const range = Stream.range(1, 100)
const repeated = Stream.repeat(Stream.succeed("ping")).pipe(
Stream.take(5)
)Transforming Streams
map - Transform Elements
const doubled = numbers.pipe(
Stream.map((n) => n * 2)
)
const enriched = users.pipe(
Stream.mapEffect((user) => fetchProfile(user.id))
)
const parallel = items.pipe(
Stream.mapEffect(process, { concurrency: 10 })
)filter - Select Elements
const evens = numbers.pipe(
Stream.filter((n) => n % 2 === 0)
)
const valid = items.pipe(
Stream.filterEffect((item) => validate(item))
)flatMap - Nested Streams
const expanded = numbers.pipe(
Stream.flatMap((n) => Stream.make(n, n * 10, n * 100))
)
// 1, 10, 100, 2, 20, 200, ...take/drop
const first5 = numbers.pipe(Stream.take(5))
const skip5 = numbers.pipe(Stream.drop(5))
const firstWhile = numbers.pipe(Stream.takeWhile((n) => n < 10))
const dropWhile = numbers.pipe(Stream.dropWhile((n) => n < 10))Combining Streams
concat - Sequential
const combined = Stream.concat(stream1, stream2)
// or
const combined = stream1.pipe(Stream.concat(stream2))merge - Interleaved
// Interleave elements from both
const merged = Stream.merge(stream1, stream2)
// Merge multiple
const allMerged = Stream.mergeAll([s1, s2, s3], { concurrency: 3 })zip - Pair Elements
const zipped = Stream.zip(names, ages)
// Stream<[string, number]>
// With function
const combined = Stream.zipWith(
names,
ages,
(name, age) => ({ name, age })
)interleave
const interleaved = Stream.interleave(stream1, stream2)
// a1, b1, a2, b2, ...Consuming Streams
Running to Collection
const array = yield* Stream.runCollect(numbers)
const first = yield* Stream.runHead(numbers)
const sum = yield* Stream.runFold(
numbers,
0,
(acc, n) => acc + n
)Running for Effects
yield* numbers.pipe(
Stream.runForEach((n) => Effect.log(`Got: ${n}`))
)
yield* numbers.pipe(Stream.runDrain)Running to Sink
import { Sink } from "effect"
const sum = yield* numbers.pipe(
Stream.run(Sink.sum)
)
const array = yield* numbers.pipe(
Stream.run(Sink.collectAll())
)Chunking
Streams process elements in chunks for efficiency:
const chunked = numbers.pipe(
Stream.grouped(10)
)
const processed = numbers.pipe(
Stream.mapChunks((chunk) => Chunk.map(chunk, (n) => n * 2))
)
const rechunked = numbers.pipe(
Stream.rechunk(100)
)Error Handling
const safe = stream.pipe(
Stream.catchAll((error) => Stream.succeed(fallbackValue))
)
const handled = stream.pipe(
Stream.catchTag("NetworkError", (error) =>
Stream.succeed(cachedValue)
)
)
const resilient = stream.pipe(
Stream.retry(Schedule.exponential("1 second"))
)
const withFallback = stream.pipe(
Stream.orElse(() => fallbackStream)
)Resource Management
// Stream with resource lifecycle
const fileStream = Stream.acquireRelease(
Effect.sync(() => fs.openSync("data.txt", "r")),
(fd) => Effect.sync(() => fs.closeSync(fd))
).pipe(
Stream.flatMap((fd) =>
Stream.repeatEffectOption(
Effect.sync(() => {
const buffer = Buffer.alloc(1024)
const bytes = fs.readSync(fd, buffer)
return bytes > 0 ? Option.some(buffer.slice(0, bytes)) : Option.none()
})
)
)
)
// Scoped streams
const scoped = Stream.scoped(
Effect.acquireRelease(openConnection, closeConnection)
)Sinks
Sinks consume stream elements:
import { Sink } from "effect"
Sink.sum
Sink.count
Sink.head
Sink.last
Sink.collectAll()
Sink.forEach(f)
const maxSink = Sink.foldLeft(
Number.NEGATIVE_INFINITY,
(max, n: number) => Math.max(max, n)
)Common Patterns
Batched Processing
const batchProcess = stream.pipe(
Stream.grouped(100),
Stream.mapEffect((batch) =>
Effect.tryPromise(() => api.processBatch(Chunk.toArray(batch)))
)
)Rate Limiting
const rateLimited = stream.pipe(
Stream.throttle({
units: 1,
duration: "100 millis",
strategy: "shape"
})
)Debouncing
const debounced = stream.pipe(
Stream.debounce("500 millis")
)Windowing
// Time-based windows
const windows = stream.pipe(
Stream.groupedWithin(1000, "1 second")
)Best Practices
- Use chunking for efficiency - Batch operations when possible
- Handle backpressure - Use appropriate buffer strategies
- Clean up resources - Use acquireRelease for external resources
- Process in parallel - Use concurrency option in mapEffect
- Handle errors early - Catch/retry before final consumption
Additional Resources
For comprehensive stream documentation, consult ${CLAUDE_PLUGIN_ROOT}/references/llms-full.txt.
Search for these sections:
- "Creating Streams" for stream construction
- "Consuming Streams" for running streams
- "Operations" for transformations
- "Error Handling in Streams" for error patterns
- "Resourceful Streams" for resource management
- "Sink" for custom sinks