Skip to content

流的介绍

在本指南中,我们将探索 Stream<A, E, R> 的概念。Stream 是一个程序描述,当执行时,可以发出类型为 A零个或多个值,处理类型为 E 的错误,并在类型为 R 的上下文中操作。

流在处理随时间变化的值序列时特别有用。它们可以作为 observables、node streams 和 AsyncIterables 的替代品。

Stream 视为 Effect 的扩展。虽然 Effect<A, E, R> 表示一个需要类型为 R 的上下文、可能遇到类型为 E 的错误、并且总是产生类型为 A 的单个结果的程序,但 Stream<A, E, R> 通过允许发出零个或多个类型为 A 的值来进一步扩展这一概念。

为了澄清这一点,让我们看一些使用 Effect 的示例:

import {
import Effect

@since2.0.0

@since2.0.0

@since2.0.0

Effect
,
import Chunk
Chunk
,
import Option

@since2.0.0

@since2.0.0

Option
} from "effect"
// 一个失败并返回字符串错误的 Effect
const
const failedEffect: Effect.Effect<never, string, never>
failedEffect
=
import Effect

@since2.0.0

@since2.0.0

@since2.0.0

Effect
.
const fail: <string>(error: string) => Effect.Effect<never, string, never>

Creates an Effect that represents a recoverable error.

When to Use

Use this function to explicitly signal an error in an Effect. The error will keep propagating unless it is handled. You can handle the error with functions like

catchAll

or

catchTag

.

Example (Creating a Failed Effect)

import { Effect } from "effect"
// ┌─── Effect<never, Error, never>
// ▼
const failure = Effect.fail(
new Error("Operation failed due to network error")
)

@seesucceed to create an effect that represents a successful value.

@since2.0.0

fail
("fail!")
// 一个产生单个数字的 Effect
const
const oneNumberValue: Effect.Effect<number, never, never>
oneNumberValue
=
import Effect

@since2.0.0

@since2.0.0

@since2.0.0

Effect
.
const succeed: <number>(value: number) => Effect.Effect<number, never, never>

Creates an Effect that always succeeds with a given value.

When to Use

Use this function when you need an effect that completes successfully with a specific value without any errors or external dependencies.

Example (Creating a Successful Effect)

import { Effect } from "effect"
// Creating an effect that represents a successful scenario
//
// ┌─── Effect<number, never, never>
// ▼
const success = Effect.succeed(42)

@seefail to create an effect that represents a failure.

@since2.0.0

succeed
(3)
// 一个产生数字块的 Effect
const
const oneListValue: Effect.Effect<Chunk.NonEmptyChunk<number>, never, never>
oneListValue
=
import Effect

@since2.0.0

@since2.0.0

@since2.0.0

Effect
.
const succeed: <Chunk.NonEmptyChunk<number>>(value: Chunk.NonEmptyChunk<number>) => Effect.Effect<Chunk.NonEmptyChunk<number>, never, never>

Creates an Effect that always succeeds with a given value.

When to Use

Use this function when you need an effect that completes successfully with a specific value without any errors or external dependencies.

Example (Creating a Successful Effect)

import { Effect } from "effect"
// Creating an effect that represents a successful scenario
//
// ┌─── Effect<number, never, never>
// ▼
const success = Effect.succeed(42)

@seefail to create an effect that represents a failure.

@since2.0.0

succeed
(
import Chunk
Chunk
.
const make: <[number, number, number]>(as_0: number, as_1: number, as_2: number) => Chunk.NonEmptyChunk<number>

Builds a NonEmptyChunk from an non-empty collection of elements.

@since2.0.0

make
(1, 2, 3))
// 一个产生可选数字的 Effect
const
const oneOption: Effect.Effect<Option.Option<number>, never, never>
oneOption
=
import Effect

@since2.0.0

@since2.0.0

@since2.0.0

Effect
.
const succeed: <Option.Option<number>>(value: Option.Option<number>) => Effect.Effect<Option.Option<number>, never, never>

Creates an Effect that always succeeds with a given value.

When to Use

Use this function when you need an effect that completes successfully with a specific value without any errors or external dependencies.

Example (Creating a Successful Effect)

import { Effect } from "effect"
// Creating an effect that represents a successful scenario
//
// ┌─── Effect<number, never, never>
// ▼
const success = Effect.succeed(42)

@seefail to create an effect that represents a failure.

@since2.0.0

succeed
(
import Option

@since2.0.0

@since2.0.0

Option
.
const some: <number>(value: number) => Option.Option<number>

Wraps the given value into an Option to represent its presence.

Example (Creating an Option with a Value)

import { Option } from "effect"
// An Option holding the number 1
//
// ┌─── Option<number>
// ▼
const value = Option.some(1)
console.log(value)
// Output: { _id: 'Option', _tag: 'Some', value: 1 }

@seenone for the opposite operation.

@since2.0.0

some
(1))

在每种情况下,Effect 总是以恰好一个值结束。没有变化性;你总是得到一个结果。

现在,让我们将注意力转向 StreamStream 表示一个与 Effect 有相似之处的程序描述,它需要类型为 R 的上下文,可能发出类型为 E 的错误信号,并产生类型为 A 的值。然而,关键区别在于它可以产生零个或多个值

以下是 Stream 的可能场景:

  • 空流:它可以是空的,表示没有值的流。
  • 单元素流:它可以表示只有一个值的流。
  • 有限元素流:它可以表示具有有限数量值的流。
  • 无限元素流:它可以表示无限期继续的流,本质上是一个无限流。

让我们看看这些场景的实际应用:

import {
import Stream
Stream
} from "effect"
// 一个空流
const
const emptyStream: Stream.Stream<never, never, never>
emptyStream
=
import Stream
Stream
.
const empty: Stream.Stream<never, never, never>

The empty stream.

@example

import { Effect, Stream } from "effect"
const stream = Stream.empty
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [] }

@since2.0.0

empty
// 一个包含单个数字的流
const
const oneNumberValueStream: Stream.Stream<number, never, never>
oneNumberValueStream
=
import Stream
Stream
.
const succeed: <number>(value: number) => Stream.Stream<number, never, never>

Creates a single-valued pure stream.

@example

import { Effect, Stream } from "effect"
// A Stream with a single number
const stream = Stream.succeed(3)
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 3 ] }

@since2.0.0

succeed
(3)
// 一个包含从 1 到 10 数字范围的流
const
const finiteNumberStream: Stream.Stream<number, never, never>
finiteNumberStream
=
import Stream
Stream
.
const range: (min: number, max: number, chunkSize?: number) => Stream.Stream<number>

Constructs a stream from a range of integers, including both endpoints.

@example

import { Effect, Stream } from "effect"
// A Stream with a range of numbers from 1 to 5
const stream = Stream.range(1, 5)
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 1, 2, 3, 4, 5 ] }

@since2.0.0

range
(1, 10)
// 一个从 1 开始递增的无限数字流
const
const infiniteNumberStream: Stream.Stream<number, never, never>
infiniteNumberStream
=
import Stream
Stream
.
const iterate: <number>(value: number, next: (value: number) => number) => Stream.Stream<number, never, never>

The infinite stream of iterative function application: a, f(a), f(f(a)), f(f(f(a))), ...

@example

import { Effect, Stream } from "effect"
// An infinite Stream of numbers starting from 1 and incrementing
const stream = Stream.iterate(1, (n) => n + 1)
Effect.runPromise(Stream.runCollect(stream.pipe(Stream.take(10)))).then(console.log)
// { _id: 'Chunk', values: [ 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 ] }

@since2.0.0

iterate
(1, (
n: number
n
) =>
n: number
n
+ 1)

总之,Stream 是一个多功能的工具,用于表示可能产生多个值的程序,使其适用于广泛的任务,从处理有限列表到处理无限序列。