Skip to content

发布订阅

一个 PubSub 充当异步消息中心,允许发布者发送可被所有当前订阅者接收的消息。

队列不同,队列中每个值只传递给一个消费者,而 PubSub 将每个发布的消息广播给所有订阅者。这使得 PubSub 非常适合需要消息广播而不是负载分发的场景。

一个 PubSub<A> 存储类型为 A 的消息,并提供两个基本操作:

API描述
PubSub.publishPubSub 发送类型为 A 的消息,返回一个指示消息是否成功发布的 effect。
PubSub.subscribe创建一个允许订阅 PubSub 的作用域 effect,当作用域结束时自动取消订阅。订阅者通过一个保存已发布消息的 Dequeue 接收消息。

示例(向多个订阅者发布消息)

import {
import Effect

@since2.0.0

@since2.0.0

@since2.0.0

Effect
,
import PubSub
PubSub
,
import Queue
Queue
} from "effect"
const
const program: Effect.Effect<void, never, never>
program
=
import Effect

@since2.0.0

@since2.0.0

@since2.0.0

Effect
.
const scoped: <void, never, Scope>(effect: Effect.Effect<void, never, Scope>) => Effect.Effect<void, never, never>

Scopes all resources used in an effect to the lifetime of the effect.

Details

This function ensures that all resources used within an effect are tied to its lifetime. Finalizers for these resources are executed automatically when the effect completes, whether through success, failure, or interruption. This guarantees proper resource cleanup without requiring explicit management.

@since2.0.0

scoped
(
import Effect

@since2.0.0

@since2.0.0

@since2.0.0

Effect
.
const gen: <YieldWrap<Effect.Effect<PubSub.PubSub<string>, never, never>> | YieldWrap<Effect.Effect<Queue.Dequeue<string>, never, Scope>> | YieldWrap<Effect.Effect<boolean, never, never>> | YieldWrap<Effect.Effect<string, never, never>>, void>(f: (resume: Effect.Adapter) => Generator<YieldWrap<Effect.Effect<PubSub.PubSub<string>, never, never>> | YieldWrap<Effect.Effect<Queue.Dequeue<string>, never, Scope>> | YieldWrap<...> | YieldWrap<...>, void, never>) => Effect.Effect<...> (+1 overload)

Provides a way to write effectful code using generator functions, simplifying control flow and error handling.

When to Use

Effect.gen allows you to write code that looks and behaves like synchronous code, but it can handle asynchronous tasks, errors, and complex control flow (like loops and conditions). It helps make asynchronous code more readable and easier to manage.

The generator functions work similarly to async/await but with more explicit control over the execution of effects. You can yield* values from effects and return the final result at the end.

Example

import { Effect } from "effect"
const addServiceCharge = (amount: number) => amount + 1
const applyDiscount = (
total: number,
discountRate: number
): Effect.Effect<number, Error> =>
discountRate === 0
? Effect.fail(new Error("Discount rate cannot be zero"))
: Effect.succeed(total - (total * discountRate) / 100)
const fetchTransactionAmount = Effect.promise(() => Promise.resolve(100))
const fetchDiscountRate = Effect.promise(() => Promise.resolve(5))
export const program = Effect.gen(function* () {
const transactionAmount = yield* fetchTransactionAmount
const discountRate = yield* fetchDiscountRate
const discountedAmount = yield* applyDiscount(
transactionAmount,
discountRate
)
const finalAmount = addServiceCharge(discountedAmount)
return `Final amount to charge: ${finalAmount}`
})

@since2.0.0

gen
(function* () {
const
const pubsub: PubSub.PubSub<string>
pubsub
= yield*
import PubSub
PubSub
.
const bounded: <string>(capacity: number | {
readonly capacity: number;
readonly replay?: number | undefined;
}) => Effect.Effect<PubSub.PubSub<string>, never, never>

Creates a bounded PubSub with the back pressure strategy. The PubSub will retain messages until they have been taken by all subscribers, applying back pressure to publishers if the PubSub is at capacity.

For best performance use capacities that are powers of two.

@since2.0.0

bounded
<string>(2)
// 两个订阅者
const
const dequeue1: Queue.Dequeue<string>
dequeue1
= yield*
import PubSub
PubSub
.
const subscribe: <string>(self: PubSub.PubSub<string>) => Effect.Effect<Queue.Dequeue<string>, never, Scope>

Subscribes to receive messages from the PubSub. The resulting subscription can be evaluated multiple times within the scope to take a message from the PubSub each time.

@since2.0.0

subscribe
(
const pubsub: PubSub.PubSub<string>
pubsub
)
const
const dequeue2: Queue.Dequeue<string>
dequeue2
= yield*
import PubSub
PubSub
.
const subscribe: <string>(self: PubSub.PubSub<string>) => Effect.Effect<Queue.Dequeue<string>, never, Scope>

Subscribes to receive messages from the PubSub. The resulting subscription can be evaluated multiple times within the scope to take a message from the PubSub each time.

@since2.0.0

subscribe
(
const pubsub: PubSub.PubSub<string>
pubsub
)
// 向 pubsub 发布消息
yield*
import PubSub
PubSub
.
const publish: <string>(self: PubSub.PubSub<string>, value: string) => Effect.Effect<boolean> (+1 overload)

Publishes a message to the PubSub, returning whether the message was published to the PubSub.

@since2.0.0

publish
(
const pubsub: PubSub.PubSub<string>
pubsub
, "Hello from a PubSub!")
// 每个订阅者都接收到消息
var console: Console

The console module provides a simple debugging console that is similar to the JavaScript console mechanism provided by web browsers.

The module exports two specific components:

  • A Console class with methods such as console.log(), console.error() and console.warn() that can be used to write to any Node.js stream.
  • A global console instance configured to write to process.stdout and process.stderr. The global console can be used without importing the node:console module.

Warning: The global console object's methods are neither consistently synchronous like the browser APIs they resemble, nor are they consistently asynchronous like all other Node.js streams. See the note on process I/O for more information.

Example using the global console:

console.log('hello world');
// Prints: hello world, to stdout
console.log('hello %s', 'world');
// Prints: hello world, to stdout
console.error(new Error('Whoops, something bad happened'));
// Prints error message and stack trace to stderr:
// Error: Whoops, something bad happened
// at [eval]:5:15
// at Script.runInThisContext (node:vm:132:18)
// at Object.runInThisContext (node:vm:309:38)
// at node:internal/process/execution:77:19
// at [eval]-wrapper:6:22
// at evalScript (node:internal/process/execution:76:60)
// at node:internal/main/eval_string:23:3
const name = 'Will Robinson';
console.warn(`Danger ${name}! Danger!`);
// Prints: Danger Will Robinson! Danger!, to stderr

Example using the Console class:

const out = getStreamSomehow();
const err = getStreamSomehow();
const myConsole = new console.Console(out, err);
myConsole.log('hello world');
// Prints: hello world, to out
myConsole.log('hello %s', 'world');
// Prints: hello world, to out
myConsole.error(new Error('Whoops, something bad happened'));
// Prints: [Error: Whoops, something bad happened], to err
const name = 'Will Robinson';
myConsole.warn(`Danger ${name}! Danger!`);
// Prints: Danger Will Robinson! Danger!, to err

@seesource

console
.
Console.log(message?: any, ...optionalParams: any[]): void

Prints to stdout with newline. Multiple arguments can be passed, with the first used as the primary message and all additional used as substitution values similar to printf(3) (the arguments are all passed to util.format()).

const count = 5;
console.log('count: %d', count);
// Prints: count: 5, to stdout
console.log('count:', count);
// Prints: count: 5, to stdout

See util.format() for more information.

@sincev0.1.100

log
("订阅者 1: " + (yield*
import Queue
Queue
.
const take: <string>(self: Queue.Dequeue<string>) => Effect.Effect<string, never, never>

Takes the oldest value in the queue. If the queue is empty, this will return a computation that resumes when an item has been added to the queue.

@since2.0.0

take
(
const dequeue1: Queue.Dequeue<string>
dequeue1
)))
var console: Console

The console module provides a simple debugging console that is similar to the JavaScript console mechanism provided by web browsers.

The module exports two specific components:

  • A Console class with methods such as console.log(), console.error() and console.warn() that can be used to write to any Node.js stream.
  • A global console instance configured to write to process.stdout and process.stderr. The global console can be used without importing the node:console module.

Warning: The global console object's methods are neither consistently synchronous like the browser APIs they resemble, nor are they consistently asynchronous like all other Node.js streams. See the note on process I/O for more information.

Example using the global console:

console.log('hello world');
// Prints: hello world, to stdout
console.log('hello %s', 'world');
// Prints: hello world, to stdout
console.error(new Error('Whoops, something bad happened'));
// Prints error message and stack trace to stderr:
// Error: Whoops, something bad happened
// at [eval]:5:15
// at Script.runInThisContext (node:vm:132:18)
// at Object.runInThisContext (node:vm:309:38)
// at node:internal/process/execution:77:19
// at [eval]-wrapper:6:22
// at evalScript (node:internal/process/execution:76:60)
// at node:internal/main/eval_string:23:3
const name = 'Will Robinson';
console.warn(`Danger ${name}! Danger!`);
// Prints: Danger Will Robinson! Danger!, to stderr

Example using the Console class:

const out = getStreamSomehow();
const err = getStreamSomehow();
const myConsole = new console.Console(out, err);
myConsole.log('hello world');
// Prints: hello world, to out
myConsole.log('hello %s', 'world');
// Prints: hello world, to out
myConsole.error(new Error('Whoops, something bad happened'));
// Prints: [Error: Whoops, something bad happened], to err
const name = 'Will Robinson';
myConsole.warn(`Danger ${name}! Danger!`);
// Prints: Danger Will Robinson! Danger!, to err

@seesource

console
.
Console.log(message?: any, ...optionalParams: any[]): void

Prints to stdout with newline. Multiple arguments can be passed, with the first used as the primary message and all additional used as substitution values similar to printf(3) (the arguments are all passed to util.format()).

const count = 5;
console.log('count: %d', count);
// Prints: count: 5, to stdout
console.log('count:', count);
// Prints: count: 5, to stdout

See util.format() for more information.

@sincev0.1.100

log
("订阅者 2: " + (yield*
import Queue
Queue
.
const take: <string>(self: Queue.Dequeue<string>) => Effect.Effect<string, never, never>

Takes the oldest value in the queue. If the queue is empty, this will return a computation that resumes when an item has been added to the queue.

@since2.0.0

take
(
const dequeue2: Queue.Dequeue<string>
dequeue2
)))
})
)
import Effect

@since2.0.0

@since2.0.0

@since2.0.0

Effect
.
const runFork: <void, never>(effect: Effect.Effect<void, never, never>, options?: RunForkOptions) => RuntimeFiber<void, never>

Runs an effect in the background, returning a fiber that can be observed or interrupted.

Unless you specifically need a Promise or synchronous operation, runFork is a good default choice.

Details

This function is the foundational way to execute an effect in the background. It creates a "fiber," a lightweight, cooperative thread of execution that can be observed (to access its result), interrupted, or joined. Fibers are useful for concurrent programming and allow effects to run independently of the main program flow.

Once the effect is running in a fiber, you can monitor its progress, cancel it if necessary, or retrieve its result when it completes. If the effect fails, the fiber will propagate the failure, which you can observe and handle.

When to Use

Use this function when you need to run an effect in the background, especially if the effect is long-running or performs periodic tasks. It's suitable for tasks that need to run independently but might still need observation or management, like logging, monitoring, or scheduled tasks.

This function is ideal if you don't need the result immediately or if the effect is part of a larger concurrent workflow.

Example (Running an Effect in the Background)

import { Effect, Console, Schedule, Fiber } from "effect"
// ┌─── Effect<number, never, never>
// ▼
const program = Effect.repeat(
Console.log("running..."),
Schedule.spaced("200 millis")
)
// ┌─── RuntimeFiber<number, never>
// ▼
const fiber = Effect.runFork(program)
setTimeout(() => {
Effect.runFork(Fiber.interrupt(fiber))
}, 500)

@since2.0.0

runFork
(
const program: Effect.Effect<void, never, never>
program
)
/*
输出:
订阅者 1: Hello from a PubSub!
订阅者 2: Hello from a PubSub!
*/

有界 PubSub 在达到容量时对发布者应用背压,暂停额外的发布直到有空间可用。

背压确保所有订阅者在订阅期间接收所有消息。但是,如果订阅者较慢,可能导致消息传递变慢。

示例(有界 PubSub 创建)

import {
import PubSub
PubSub
} from "effect"
// 创建一个容量为 2 的有界 PubSub
const
const boundedPubSub: Effect<PubSub.PubSub<string>, never, never>
boundedPubSub
=
import PubSub
PubSub
.
const bounded: <string>(capacity: number | {
readonly capacity: number;
readonly replay?: number | undefined;
}) => Effect<PubSub.PubSub<string>, never, never>

Creates a bounded PubSub with the back pressure strategy. The PubSub will retain messages until they have been taken by all subscribers, applying back pressure to publishers if the PubSub is at capacity.

For best performance use capacities that are powers of two.

@since2.0.0

bounded
<string>(2)

丢弃 PubSub 在满时丢弃新值。如果消息被丢弃,PubSub.publish 操作返回 false

在丢弃 pubsub 中,发布者可以继续发布新值,但不保证订阅者接收所有消息。

示例(丢弃 PubSub 创建)

import {
import PubSub
PubSub
} from "effect"
// 创建一个容量为 2 的丢弃 PubSub
const
const droppingPubSub: Effect<PubSub.PubSub<string>, never, never>
droppingPubSub
=
import PubSub
PubSub
.
const dropping: <string>(capacity: number | {
readonly capacity: number;
readonly replay?: number | undefined;
}) => Effect<PubSub.PubSub<string>, never, never>

Creates a bounded PubSub with the dropping strategy. The PubSub will drop new messages if the PubSub is at capacity.

For best performance use capacities that are powers of two.

@since2.0.0

dropping
<string>(2)

滑动 PubSub 移除最旧的消息以为新消息腾出空间,确保发布永不阻塞。

滑动 pubsub 防止慢订阅者影响消息传递速率。但是,慢订阅者仍有可能错过一些消息。

示例(滑动 PubSub 创建)

import {
import PubSub
PubSub
} from "effect"
// 创建一个容量为 2 的滑动 PubSub
const
const slidingPubSub: Effect<PubSub.PubSub<string>, never, never>
slidingPubSub
=
import PubSub
PubSub
.
const sliding: <string>(capacity: number | {
readonly capacity: number;
readonly replay?: number | undefined;
}) => Effect<PubSub.PubSub<string>, never, never>

Creates a bounded PubSub with the sliding strategy. The PubSub will add new messages and drop old messages if the PubSub is at capacity.

For best performance use capacities that are powers of two.

@since2.0.0

sliding
<string>(2)

无界 PubSub 没有容量限制,因此发布总是立即成功。

无界 pubsub 保证所有订阅者接收所有消息而不减慢消息传递。但是,如果消息发布速度快于消费速度,它们可能无限增长。

通常,建议使用有界、丢弃或滑动 pubsub,除非您有无界 pubsub 的特定用例。

示例

import {
import PubSub
PubSub
} from "effect"
// 创建一个具有无限容量的无界 PubSub
const
const unboundedPubSub: Effect<PubSub.PubSub<string>, never, never>
unboundedPubSub
=
import PubSub
PubSub
.
const unbounded: <string>(options?: {
readonly replay?: number | undefined;
}) => Effect<PubSub.PubSub<string>, never, never>

Creates an unbounded PubSub.

@since2.0.0

unbounded
<string>()

PubSub.publishAll 函数让您一次向 pubsub 发布多个值。

示例(发布多个消息)

import {
import Effect

@since2.0.0

@since2.0.0

@since2.0.0

Effect
,
import PubSub
PubSub
,
import Queue
Queue
} from "effect"
const
const program: Effect.Effect<void, never, never>
program
=
import Effect

@since2.0.0

@since2.0.0

@since2.0.0

Effect
.
const scoped: <void, never, Scope>(effect: Effect.Effect<void, never, Scope>) => Effect.Effect<void, never, never>

Scopes all resources used in an effect to the lifetime of the effect.

Details

This function ensures that all resources used within an effect are tied to its lifetime. Finalizers for these resources are executed automatically when the effect completes, whether through success, failure, or interruption. This guarantees proper resource cleanup without requiring explicit management.

@since2.0.0

scoped
(
import Effect

@since2.0.0

@since2.0.0

@since2.0.0

Effect
.
const gen: <YieldWrap<Effect.Effect<PubSub.PubSub<string>, never, never>> | YieldWrap<Effect.Effect<Queue.Dequeue<string>, never, Scope>> | YieldWrap<Effect.Effect<boolean, never, never>> | YieldWrap<Effect.Effect<Chunk<string>, never, never>>, void>(f: (resume: Effect.Adapter) => Generator<YieldWrap<Effect.Effect<PubSub.PubSub<string>, never, never>> | YieldWrap<Effect.Effect<Queue.Dequeue<string>, never, Scope>> | YieldWrap<...> | YieldWrap<...>, void, never>) => Effect.Effect<...> (+1 overload)

Provides a way to write effectful code using generator functions, simplifying control flow and error handling.

When to Use

Effect.gen allows you to write code that looks and behaves like synchronous code, but it can handle asynchronous tasks, errors, and complex control flow (like loops and conditions). It helps make asynchronous code more readable and easier to manage.

The generator functions work similarly to async/await but with more explicit control over the execution of effects. You can yield* values from effects and return the final result at the end.

Example

import { Effect } from "effect"
const addServiceCharge = (amount: number) => amount + 1
const applyDiscount = (
total: number,
discountRate: number
): Effect.Effect<number, Error> =>
discountRate === 0
? Effect.fail(new Error("Discount rate cannot be zero"))
: Effect.succeed(total - (total * discountRate) / 100)
const fetchTransactionAmount = Effect.promise(() => Promise.resolve(100))
const fetchDiscountRate = Effect.promise(() => Promise.resolve(5))
export const program = Effect.gen(function* () {
const transactionAmount = yield* fetchTransactionAmount
const discountRate = yield* fetchDiscountRate
const discountedAmount = yield* applyDiscount(
transactionAmount,
discountRate
)
const finalAmount = addServiceCharge(discountedAmount)
return `Final amount to charge: ${finalAmount}`
})

@since2.0.0

gen
(function* () {
const
const pubsub: PubSub.PubSub<string>
pubsub
= yield*
import PubSub
PubSub
.
const bounded: <string>(capacity: number | {
readonly capacity: number;
readonly replay?: number | undefined;
}) => Effect.Effect<PubSub.PubSub<string>, never, never>

Creates a bounded PubSub with the back pressure strategy. The PubSub will retain messages until they have been taken by all subscribers, applying back pressure to publishers if the PubSub is at capacity.

For best performance use capacities that are powers of two.

@since2.0.0

bounded
<string>(2)
const
const dequeue: Queue.Dequeue<string>
dequeue
= yield*
import PubSub
PubSub
.
const subscribe: <string>(self: PubSub.PubSub<string>) => Effect.Effect<Queue.Dequeue<string>, never, Scope>

Subscribes to receive messages from the PubSub. The resulting subscription can be evaluated multiple times within the scope to take a message from the PubSub each time.

@since2.0.0

subscribe
(
const pubsub: PubSub.PubSub<string>
pubsub
)
yield*
import PubSub
PubSub
.
const publishAll: <string>(self: PubSub.PubSub<string>, elements: Iterable<string>) => Effect.Effect<boolean> (+1 overload)

Publishes all of the specified messages to the PubSub, returning whether they were published to the PubSub.

@since2.0.0

publishAll
(
const pubsub: PubSub.PubSub<string>
pubsub
, ["Message 1", "Message 2"])
var console: Console

The console module provides a simple debugging console that is similar to the JavaScript console mechanism provided by web browsers.

The module exports two specific components:

  • A Console class with methods such as console.log(), console.error() and console.warn() that can be used to write to any Node.js stream.
  • A global console instance configured to write to process.stdout and process.stderr. The global console can be used without importing the node:console module.

Warning: The global console object's methods are neither consistently synchronous like the browser APIs they resemble, nor are they consistently asynchronous like all other Node.js streams. See the note on process I/O for more information.

Example using the global console:

console.log('hello world');
// Prints: hello world, to stdout
console.log('hello %s', 'world');
// Prints: hello world, to stdout
console.error(new Error('Whoops, something bad happened'));
// Prints error message and stack trace to stderr:
// Error: Whoops, something bad happened
// at [eval]:5:15
// at Script.runInThisContext (node:vm:132:18)
// at Object.runInThisContext (node:vm:309:38)
// at node:internal/process/execution:77:19
// at [eval]-wrapper:6:22
// at evalScript (node:internal/process/execution:76:60)
// at node:internal/main/eval_string:23:3
const name = 'Will Robinson';
console.warn(`Danger ${name}! Danger!`);
// Prints: Danger Will Robinson! Danger!, to stderr

Example using the Console class:

const out = getStreamSomehow();
const err = getStreamSomehow();
const myConsole = new console.Console(out, err);
myConsole.log('hello world');
// Prints: hello world, to out
myConsole.log('hello %s', 'world');
// Prints: hello world, to out
myConsole.error(new Error('Whoops, something bad happened'));
// Prints: [Error: Whoops, something bad happened], to err
const name = 'Will Robinson';
myConsole.warn(`Danger ${name}! Danger!`);
// Prints: Danger Will Robinson! Danger!, to err

@seesource

console
.
Console.log(message?: any, ...optionalParams: any[]): void

Prints to stdout with newline. Multiple arguments can be passed, with the first used as the primary message and all additional used as substitution values similar to printf(3) (the arguments are all passed to util.format()).

const count = 5;
console.log('count: %d', count);
// Prints: count: 5, to stdout
console.log('count:', count);
// Prints: count: 5, to stdout

See util.format() for more information.

@sincev0.1.100

log
(yield*
import Queue
Queue
.
const takeAll: <string>(self: Queue.Dequeue<string>) => Effect.Effect<Chunk<string>, never, never>

Takes all the values in the queue and returns the values. If the queue is empty returns an empty collection.

@since2.0.0

takeAll
(
const dequeue: Queue.Dequeue<string>
dequeue
))
})
)
import Effect

@since2.0.0

@since2.0.0

@since2.0.0

Effect
.
const runFork: <void, never>(effect: Effect.Effect<void, never, never>, options?: RunForkOptions) => RuntimeFiber<void, never>

Runs an effect in the background, returning a fiber that can be observed or interrupted.

Unless you specifically need a Promise or synchronous operation, runFork is a good default choice.

Details

This function is the foundational way to execute an effect in the background. It creates a "fiber," a lightweight, cooperative thread of execution that can be observed (to access its result), interrupted, or joined. Fibers are useful for concurrent programming and allow effects to run independently of the main program flow.

Once the effect is running in a fiber, you can monitor its progress, cancel it if necessary, or retrieve its result when it completes. If the effect fails, the fiber will propagate the failure, which you can observe and handle.

When to Use

Use this function when you need to run an effect in the background, especially if the effect is long-running or performs periodic tasks. It's suitable for tasks that need to run independently but might still need observation or management, like logging, monitoring, or scheduled tasks.

This function is ideal if you don't need the result immediately or if the effect is part of a larger concurrent workflow.

Example (Running an Effect in the Background)

import { Effect, Console, Schedule, Fiber } from "effect"
// ┌─── Effect<number, never, never>
// ▼
const program = Effect.repeat(
Console.log("running..."),
Schedule.spaced("200 millis")
)
// ┌─── RuntimeFiber<number, never>
// ▼
const fiber = Effect.runFork(program)
setTimeout(() => {
Effect.runFork(Fiber.interrupt(fiber))
}, 500)

@since2.0.0

runFork
(
const program: Effect.Effect<void, never, never>
program
)
/*
输出:
{ _id: 'Chunk', values: [ 'Message 1', 'Message 2' ] }
*/

您可以分别使用 PubSub.capacityPubSub.size 检查 pubsub 的容量和当前大小。

注意 PubSub.capacity 返回一个 number,因为容量在 pubsub 创建时设置且永不改变。 相比之下,PubSub.size 返回一个确定 pubsub 当前大小的 effect,因为 pubsub 中的消息数量可能随时间变化。

示例(检索 PubSub 容量和大小)

import {
import Effect

@since2.0.0

@since2.0.0

@since2.0.0

Effect
,
import PubSub
PubSub
} from "effect"
const
const program: Effect.Effect<void, never, never>
program
=
import Effect

@since2.0.0

@since2.0.0

@since2.0.0

Effect
.
const gen: <YieldWrap<Effect.Effect<PubSub.PubSub<number>, never, never>> | YieldWrap<Effect.Effect<number, never, never>>, void>(f: (resume: Effect.Adapter) => Generator<YieldWrap<Effect.Effect<PubSub.PubSub<number>, never, never>> | YieldWrap<Effect.Effect<number, never, never>>, void, never>) => Effect.Effect<void, never, never> (+1 overload)

Provides a way to write effectful code using generator functions, simplifying control flow and error handling.

When to Use

Effect.gen allows you to write code that looks and behaves like synchronous code, but it can handle asynchronous tasks, errors, and complex control flow (like loops and conditions). It helps make asynchronous code more readable and easier to manage.

The generator functions work similarly to async/await but with more explicit control over the execution of effects. You can yield* values from effects and return the final result at the end.

Example

import { Effect } from "effect"
const addServiceCharge = (amount: number) => amount + 1
const applyDiscount = (
total: number,
discountRate: number
): Effect.Effect<number, Error> =>
discountRate === 0
? Effect.fail(new Error("Discount rate cannot be zero"))
: Effect.succeed(total - (total * discountRate) / 100)
const fetchTransactionAmount = Effect.promise(() => Promise.resolve(100))
const fetchDiscountRate = Effect.promise(() => Promise.resolve(5))
export const program = Effect.gen(function* () {
const transactionAmount = yield* fetchTransactionAmount
const discountRate = yield* fetchDiscountRate
const discountedAmount = yield* applyDiscount(
transactionAmount,
discountRate
)
const finalAmount = addServiceCharge(discountedAmount)
return `Final amount to charge: ${finalAmount}`
})

@since2.0.0

gen
(function* () {
const
const pubsub: PubSub.PubSub<number>
pubsub
= yield*
import PubSub
PubSub
.
const bounded: <number>(capacity: number | {
readonly capacity: number;
readonly replay?: number | undefined;
}) => Effect.Effect<PubSub.PubSub<number>, never, never>

Creates a bounded PubSub with the back pressure strategy. The PubSub will retain messages until they have been taken by all subscribers, applying back pressure to publishers if the PubSub is at capacity.

For best performance use capacities that are powers of two.

@since2.0.0

bounded
<number>(2)
var console: Console

The console module provides a simple debugging console that is similar to the JavaScript console mechanism provided by web browsers.

The module exports two specific components:

  • A Console class with methods such as console.log(), console.error() and console.warn() that can be used to write to any Node.js stream.
  • A global console instance configured to write to process.stdout and process.stderr. The global console can be used without importing the node:console module.

Warning: The global console object's methods are neither consistently synchronous like the browser APIs they resemble, nor are they consistently asynchronous like all other Node.js streams. See the note on process I/O for more information.

Example using the global console:

console.log('hello world');
// Prints: hello world, to stdout
console.log('hello %s', 'world');
// Prints: hello world, to stdout
console.error(new Error('Whoops, something bad happened'));
// Prints error message and stack trace to stderr:
// Error: Whoops, something bad happened
// at [eval]:5:15
// at Script.runInThisContext (node:vm:132:18)
// at Object.runInThisContext (node:vm:309:38)
// at node:internal/process/execution:77:19
// at [eval]-wrapper:6:22
// at evalScript (node:internal/process/execution:76:60)
// at node:internal/main/eval_string:23:3
const name = 'Will Robinson';
console.warn(`Danger ${name}! Danger!`);
// Prints: Danger Will Robinson! Danger!, to stderr

Example using the Console class:

const out = getStreamSomehow();
const err = getStreamSomehow();
const myConsole = new console.Console(out, err);
myConsole.log('hello world');
// Prints: hello world, to out
myConsole.log('hello %s', 'world');
// Prints: hello world, to out
myConsole.error(new Error('Whoops, something bad happened'));
// Prints: [Error: Whoops, something bad happened], to err
const name = 'Will Robinson';
myConsole.warn(`Danger ${name}! Danger!`);
// Prints: Danger Will Robinson! Danger!, to err

@seesource

console
.
Console.log(message?: any, ...optionalParams: any[]): void

Prints to stdout with newline. Multiple arguments can be passed, with the first used as the primary message and all additional used as substitution values similar to printf(3) (the arguments are all passed to util.format()).

const count = 5;
console.log('count: %d', count);
// Prints: count: 5, to stdout
console.log('count:', count);
// Prints: count: 5, to stdout

See util.format() for more information.

@sincev0.1.100

log
(`capacity: ${
import PubSub
PubSub
.
const capacity: <number>(self: PubSub.PubSub<number>) => number

Returns the number of elements the queue can hold.

@since2.0.0

capacity
(
const pubsub: PubSub.PubSub<number>
pubsub
)}`)
var console: Console

The console module provides a simple debugging console that is similar to the JavaScript console mechanism provided by web browsers.

The module exports two specific components:

  • A Console class with methods such as console.log(), console.error() and console.warn() that can be used to write to any Node.js stream.
  • A global console instance configured to write to process.stdout and process.stderr. The global console can be used without importing the node:console module.

Warning: The global console object's methods are neither consistently synchronous like the browser APIs they resemble, nor are they consistently asynchronous like all other Node.js streams. See the note on process I/O for more information.

Example using the global console:

console.log('hello world');
// Prints: hello world, to stdout
console.log('hello %s', 'world');
// Prints: hello world, to stdout
console.error(new Error('Whoops, something bad happened'));
// Prints error message and stack trace to stderr:
// Error: Whoops, something bad happened
// at [eval]:5:15
// at Script.runInThisContext (node:vm:132:18)
// at Object.runInThisContext (node:vm:309:38)
// at node:internal/process/execution:77:19
// at [eval]-wrapper:6:22
// at evalScript (node:internal/process/execution:76:60)
// at node:internal/main/eval_string:23:3
const name = 'Will Robinson';
console.warn(`Danger ${name}! Danger!`);
// Prints: Danger Will Robinson! Danger!, to stderr

Example using the Console class:

const out = getStreamSomehow();
const err = getStreamSomehow();
const myConsole = new console.Console(out, err);
myConsole.log('hello world');
// Prints: hello world, to out
myConsole.log('hello %s', 'world');
// Prints: hello world, to out
myConsole.error(new Error('Whoops, something bad happened'));
// Prints: [Error: Whoops, something bad happened], to err
const name = 'Will Robinson';
myConsole.warn(`Danger ${name}! Danger!`);
// Prints: Danger Will Robinson! Danger!, to err

@seesource

console
.
Console.log(message?: any, ...optionalParams: any[]): void

Prints to stdout with newline. Multiple arguments can be passed, with the first used as the primary message and all additional used as substitution values similar to printf(3) (the arguments are all passed to util.format()).

const count = 5;
console.log('count: %d', count);
// Prints: count: 5, to stdout
console.log('count:', count);
// Prints: count: 5, to stdout

See util.format() for more information.

@sincev0.1.100

log
(`size: ${yield*
import PubSub
PubSub
.
const size: <number>(self: PubSub.PubSub<number>) => Effect.Effect<number>

Retrieves the size of the queue, which is equal to the number of elements in the queue. This may be negative if fibers are suspended waiting for elements to be added to the queue.

@since2.0.0

size
(
const pubsub: PubSub.PubSub<number>
pubsub
)}`)
})
import Effect

@since2.0.0

@since2.0.0

@since2.0.0

Effect
.
const runFork: <void, never>(effect: Effect.Effect<void, never, never>, options?: RunForkOptions) => RuntimeFiber<void, never>

Runs an effect in the background, returning a fiber that can be observed or interrupted.

Unless you specifically need a Promise or synchronous operation, runFork is a good default choice.

Details

This function is the foundational way to execute an effect in the background. It creates a "fiber," a lightweight, cooperative thread of execution that can be observed (to access its result), interrupted, or joined. Fibers are useful for concurrent programming and allow effects to run independently of the main program flow.

Once the effect is running in a fiber, you can monitor its progress, cancel it if necessary, or retrieve its result when it completes. If the effect fails, the fiber will propagate the failure, which you can observe and handle.

When to Use

Use this function when you need to run an effect in the background, especially if the effect is long-running or performs periodic tasks. It's suitable for tasks that need to run independently but might still need observation or management, like logging, monitoring, or scheduled tasks.

This function is ideal if you don't need the result immediately or if the effect is part of a larger concurrent workflow.

Example (Running an Effect in the Background)

import { Effect, Console, Schedule, Fiber } from "effect"
// ┌─── Effect<number, never, never>
// ▼
const program = Effect.repeat(
Console.log("running..."),
Schedule.spaced("200 millis")
)
// ┌─── RuntimeFiber<number, never>
// ▼
const fiber = Effect.runFork(program)
setTimeout(() => {
Effect.runFork(Fiber.interrupt(fiber))
}, 500)

@since2.0.0

runFork
(
const program: Effect.Effect<void, never, never>
program
)
/*
输出:
capacity: 2
size: 0
*/

要关闭 pubsub,请使用 PubSub.shutdown。您还可以使用 PubSub.isShutdown 验证它是否已关闭,或使用 PubSub.awaitShutdown 等待关闭完成。关闭 pubsub 还会终止所有关联的队列,确保关闭信号得到有效传达。

PubSub 操作符与队列的操作符相似,主要区别是使用 PubSub.publishPubSub.subscribe 代替 Queue.offerQueue.take。如果您已经熟悉使用 Queue,您会发现 PubSub 很简单。

本质上,PubSub 可以看作是只允许写入的 Enqueue

import type {
import Queue
Queue
} from "effect"
interface
interface PubSub<A>
PubSub
<
function (type parameter) A in PubSub<A>
A
> extends
import Queue
Queue
.
interface Enqueue<in A>

@since2.0.0

Enqueue
<
function (type parameter) A in PubSub<A>
A
> {}

这里,Enqueue 类型指的是只接受入队(或写入)的队列。在这里入队的任何值都会发布到 pubsub,关闭等操作也会影响 pubsub。

这种设计使 PubSub 高度灵活,让您可以在任何需要只接受发布值的 Enqueue 的地方使用它。