andrueandersoncs

Streams

This skill should be used when the user asks about "Effect Stream", "Stream.from", "Stream.map", "Stream.filter", "Stream.run", "streaming data", "async iteration", "Sink", "Channel", "Stream.concat", "Stream.merge", "backpressure", "Stream.fromIterable", "chunked processing", "real-time data", or needs to understand how Effect handles streaming data processing.

andrueandersoncs 8 Updated 4mo ago
GitHub

Install

npx skillscat add andrueandersoncs/claude-skill-effect-ts/streams

Install via the SkillsCat registry.

SKILL.md

Streams in Effect

Overview

Effect Streams provide:

  • Lazy evaluation - Elements produced on demand
  • Resource safety - Automatic cleanup
  • Backpressure - Producer/consumer coordination
  • Composition - Transform, filter, merge streams
  • Error handling - Typed errors in stream pipeline
Stream<A, E, R>
// Produces values of type A
// May fail with error E
// Requires environment R

Creating Streams

From Values

import { Stream } from "effect"

const numbers = Stream.make(1, 2, 3, 4, 5)

const fromArray = Stream.fromIterable([1, 2, 3])

const empty = Stream.empty

const single = Stream.succeed(42)

const infinite = Stream.iterate(1, (n) => n + 1)

From Effects

const fromEffect = Stream.fromEffect(fetchData())

const polling = Stream.repeatEffect(checkStatus())

const scheduled = Stream.repeatEffectWithSchedule(
  checkStatus(),
  Schedule.spaced("5 seconds")
)

From Async Sources

// From async iterable
const fromAsyncIterable = Stream.fromAsyncIterable(
  asyncGenerator(),
  (error) => new StreamError({ cause: error })
)

// From callback/event emitter
const fromCallback = Stream.async<number, never>((emit) => {
  const handler = (value: number) => emit.single(value)
  eventEmitter.on("data", handler)
  return Effect.sync(() => eventEmitter.off("data", handler))
})

// From queue
const fromQueue = Stream.fromQueue(queue)

Generating Streams

const naturals = Stream.unfold(1, (n) => Option.some([n, n + 1]))

const range = Stream.range(1, 100)

const repeated = Stream.repeat(Stream.succeed("ping")).pipe(
  Stream.take(5)
)

Transforming Streams

map - Transform Elements

const doubled = numbers.pipe(
  Stream.map((n) => n * 2)
)

const enriched = users.pipe(
  Stream.mapEffect((user) => fetchProfile(user.id))
)

const parallel = items.pipe(
  Stream.mapEffect(process, { concurrency: 10 })
)

filter - Select Elements

const evens = numbers.pipe(
  Stream.filter((n) => n % 2 === 0)
)

const valid = items.pipe(
  Stream.filterEffect((item) => validate(item))
)

flatMap - Nested Streams

const expanded = numbers.pipe(
  Stream.flatMap((n) => Stream.make(n, n * 10, n * 100))
)
// 1, 10, 100, 2, 20, 200, ...

take/drop

const first5 = numbers.pipe(Stream.take(5))
const skip5 = numbers.pipe(Stream.drop(5))
const firstWhile = numbers.pipe(Stream.takeWhile((n) => n < 10))
const dropWhile = numbers.pipe(Stream.dropWhile((n) => n < 10))

Combining Streams

concat - Sequential

const combined = Stream.concat(stream1, stream2)
// or
const combined = stream1.pipe(Stream.concat(stream2))

merge - Interleaved

// Interleave elements from both
const merged = Stream.merge(stream1, stream2)

// Merge multiple
const allMerged = Stream.mergeAll([s1, s2, s3], { concurrency: 3 })

zip - Pair Elements

const zipped = Stream.zip(names, ages)
// Stream<[string, number]>

// With function
const combined = Stream.zipWith(
  names,
  ages,
  (name, age) => ({ name, age })
)

interleave

const interleaved = Stream.interleave(stream1, stream2)
// a1, b1, a2, b2, ...

Consuming Streams

Running to Collection

const array = yield* Stream.runCollect(numbers)

const first = yield* Stream.runHead(numbers)

const sum = yield* Stream.runFold(
  numbers,
  0,
  (acc, n) => acc + n
)

Running for Effects

yield* numbers.pipe(
  Stream.runForEach((n) => Effect.log(`Got: ${n}`))
)

yield* numbers.pipe(Stream.runDrain)

Running to Sink

import { Sink } from "effect"

const sum = yield* numbers.pipe(
  Stream.run(Sink.sum)
)

const array = yield* numbers.pipe(
  Stream.run(Sink.collectAll())
)

Chunking

Streams process elements in chunks for efficiency:

const chunked = numbers.pipe(
  Stream.grouped(10)
)

const processed = numbers.pipe(
  Stream.mapChunks((chunk) => Chunk.map(chunk, (n) => n * 2))
)

const rechunked = numbers.pipe(
  Stream.rechunk(100)
)

Error Handling

const safe = stream.pipe(
  Stream.catchAll((error) => Stream.succeed(fallbackValue))
)

const handled = stream.pipe(
  Stream.catchTag("NetworkError", (error) =>
    Stream.succeed(cachedValue)
  )
)

const resilient = stream.pipe(
  Stream.retry(Schedule.exponential("1 second"))
)

const withFallback = stream.pipe(
  Stream.orElse(() => fallbackStream)
)

Resource Management

// Stream with resource lifecycle
const fileStream = Stream.acquireRelease(
  Effect.sync(() => fs.openSync("data.txt", "r")),
  (fd) => Effect.sync(() => fs.closeSync(fd))
).pipe(
  Stream.flatMap((fd) =>
    Stream.repeatEffectOption(
      Effect.sync(() => {
        const buffer = Buffer.alloc(1024)
        const bytes = fs.readSync(fd, buffer)
        return bytes > 0 ? Option.some(buffer.slice(0, bytes)) : Option.none()
      })
    )
  )
)

// Scoped streams
const scoped = Stream.scoped(
  Effect.acquireRelease(openConnection, closeConnection)
)

Sinks

Sinks consume stream elements:

import { Sink } from "effect"

Sink.sum
Sink.count
Sink.head
Sink.last
Sink.collectAll()
Sink.forEach(f)

const maxSink = Sink.foldLeft(
  Number.NEGATIVE_INFINITY,
  (max, n: number) => Math.max(max, n)
)

Common Patterns

Batched Processing

const batchProcess = stream.pipe(
  Stream.grouped(100),
  Stream.mapEffect((batch) =>
    Effect.tryPromise(() => api.processBatch(Chunk.toArray(batch)))
  )
)

Rate Limiting

const rateLimited = stream.pipe(
  Stream.throttle({
    units: 1,
    duration: "100 millis",
    strategy: "shape"
  })
)

Debouncing

const debounced = stream.pipe(
  Stream.debounce("500 millis")
)

Windowing

// Time-based windows
const windows = stream.pipe(
  Stream.groupedWithin(1000, "1 second")
)

Best Practices

  1. Use chunking for efficiency - Batch operations when possible
  2. Handle backpressure - Use appropriate buffer strategies
  3. Clean up resources - Use acquireRelease for external resources
  4. Process in parallel - Use concurrency option in mapEffect
  5. Handle errors early - Catch/retry before final consumption

Additional Resources

For comprehensive stream documentation, consult ${CLAUDE_PLUGIN_ROOT}/references/llms-full.txt.

Search for these sections:

  • "Creating Streams" for stream construction
  • "Consuming Streams" for running streams
  • "Operations" for transformations
  • "Error Handling in Streams" for error patterns
  • "Resourceful Streams" for resource management
  • "Sink" for custom sinks