Skip to content

闩锁

闩锁是一种同步工具,像门一样工作,让纤程等待直到闩锁打开后才能继续。闩锁可以是打开或关闭的:

  • 当关闭时,到达闩锁的纤程等待直到它打开。
  • 当打开时,纤程立即通过。

一旦打开,闩锁通常保持打开状态,尽管如果需要,您可以再次关闭它

想象一个应用程序,只有在完成初始设置(如加载配置数据或建立数据库连接)后才处理请求。 您可以在设置过程中创建一个处于关闭状态的闩锁。 任何传入的请求,表示为纤程,都会在闩锁处等待,直到它打开。 一旦设置完成,您调用 latch.open 以便请求可以继续。

Latch 包含几个操作,让您控制和观察其状态:

操作描述
whenOpen仅当闩锁打开时运行给定的 effect,否则等待直到它打开。
open打开闩锁,以便任何等待的纤程可以继续。
close关闭闩锁,导致纤程在将来到达此闩锁时等待。
await暂停当前纤程直到闩锁打开。如果闩锁已经打开,立即返回。
release允许等待的纤程继续而不永久打开闩锁。

使用 Effect.makeLatch 函数通过传递布尔值来创建处于打开或关闭状态的闩锁。默认值是 false,这意味着它开始时是关闭的。

示例(创建和使用闩锁)

在此示例中,闩锁开始时是关闭的。纤程仅在闩锁打开时记录 “open sesame”。等待一秒钟后,闩锁被打开,释放纤程:

import {
import Console
Console
,
import Effect

@since2.0.0

@since2.0.0

@since2.0.0

Effect
} from "effect"
// 演示闩锁用法的生成器函数
const
const program: Effect.Effect<void, never, never>
program
=
import Effect

@since2.0.0

@since2.0.0

@since2.0.0

Effect
.
const gen: <YieldWrap<Effect.Effect<void, never, never>>, void>(f: (resume: Effect.Adapter) => Generator<YieldWrap<Effect.Effect<void, never, never>>, void, never>) => Effect.Effect<void, never, never> (+1 overload)

Provides a way to write effectful code using generator functions, simplifying control flow and error handling.

When to Use

Effect.gen allows you to write code that looks and behaves like synchronous code, but it can handle asynchronous tasks, errors, and complex control flow (like loops and conditions). It helps make asynchronous code more readable and easier to manage.

The generator functions work similarly to async/await but with more explicit control over the execution of effects. You can yield* values from effects and return the final result at the end.

Example

import { Effect } from "effect"
const addServiceCharge = (amount: number) => amount + 1
const applyDiscount = (
total: number,
discountRate: number
): Effect.Effect<number, Error> =>
discountRate === 0
? Effect.fail(new Error("Discount rate cannot be zero"))
: Effect.succeed(total - (total * discountRate) / 100)
const fetchTransactionAmount = Effect.promise(() => Promise.resolve(100))
const fetchDiscountRate = Effect.promise(() => Promise.resolve(5))
export const program = Effect.gen(function* () {
const transactionAmount = yield* fetchTransactionAmount
const discountRate = yield* fetchDiscountRate
const discountedAmount = yield* applyDiscount(
transactionAmount,
discountRate
)
const finalAmount = addServiceCharge(discountedAmount)
return `Final amount to charge: ${finalAmount}`
})

@since2.0.0

gen
(function* () {
// 创建一个闩锁,开始时处于关闭状态
const
const latch: Effect.Latch
latch
= yield*
import Effect

@since2.0.0

@since2.0.0

@since2.0.0

Effect
.
const makeLatch: (open?: boolean | undefined) => Effect.Effect<Effect.Latch, never, never>

Creates a new Latch, starting in the specified state.

Details

This function initializes a Latch safely, ensuring proper runtime guarantees. By default, the latch starts in the closed state.

Example

import { Console, Effect } from "effect"
const program = Effect.gen(function*() {
// Create a latch, starting in the closed state
const latch = yield* Effect.makeLatch(false)
// Fork a fiber that logs "open sesame" when the latch is opened
const fiber = yield* Console.log("open sesame").pipe(
latch.whenOpen,
Effect.fork
)
yield* Effect.sleep("1 second")
// Open the latch
yield* latch.open
yield* fiber.await
})
Effect.runFork(program)
// Output: open sesame (after 1 second)

@since3.8.0

makeLatch
()
// 分叉一个仅在闩锁打开时记录 "open sesame" 的纤程
const
const fiber: RuntimeFiber<void, never>
fiber
= yield*
import Console
Console
.
const log: (...args: ReadonlyArray<any>) => Effect.Effect<void>

@since2.0.0

log
("open sesame").
Pipeable.pipe<Effect.Effect<void, never, never>, Effect.Effect<void, never, never>, Effect.Effect<RuntimeFiber<void, never>, never, never>>(this: Effect.Effect<void, never, never>, ab: (_: Effect.Effect<void, never, never>) => Effect.Effect<void, never, never>, bc: (_: Effect.Effect<void, never, never>) => Effect.Effect<RuntimeFiber<void, never>, never, never>): Effect.Effect<RuntimeFiber<void, never>, never, never> (+21 overloads)
pipe
(
const latch: Effect.Latch
latch
.
Latch.whenOpen: <A, E, R>(self: Effect.Effect<A, E, R>) => Effect.Effect<A, E, R>

Runs the given effect only when the latch is open.

Details

This function ensures that the provided effect executes only if the latch is open. If the latch is closed, the fiber will wait until it opens.

whenOpen
, // 等待闩锁打开
import Effect

@since2.0.0

@since2.0.0

@since2.0.0

Effect
.
const fork: <A, E, R>(self: Effect.Effect<A, E, R>) => Effect.Effect<RuntimeFiber<A, E>, never, R>

Creates a new fiber to run an effect concurrently.

Details

This function takes an effect and forks it into a separate fiber, allowing it to run concurrently without blocking the original effect. The new fiber starts execution immediately after being created, and the fiber object is returned immediately without waiting for the effect to begin. This is useful when you want to run tasks concurrently while continuing other tasks in the parent fiber.

The forked fiber is attached to the parent fiber's scope. This means that when the parent fiber terminates, the child fiber will also be terminated automatically. This feature, known as "auto supervision," ensures that no fibers are left running unintentionally. If you prefer not to have this auto supervision behavior, you can use

forkDaemon

or

forkIn

.

When to Use

Use this function when you need to run an effect concurrently without blocking the current execution flow. For example, you might use it to launch background tasks or concurrent computations. However, working with fibers can be complex, so before using this function directly, you might want to explore higher-level functions like

raceWith

,

zip

, or others that can manage concurrency for you.

Example

import { Effect } from "effect"
const fib = (n: number): Effect.Effect<number> =>
n < 2
? Effect.succeed(n)
: Effect.zipWith(fib(n - 1), fib(n - 2), (a, b) => a + b)
// ┌─── Effect<RuntimeFiber<number, never>, never, never>
// ▼
const fib10Fiber = Effect.fork(fib(10))

@seeforkWithErrorHandler for a version that allows you to handle errors.

@since2.0.0

fork
// 将 effect 分叉到新纤程
)
// 等待 1 秒
yield*
import Effect

@since2.0.0

@since2.0.0

@since2.0.0

Effect
.
const sleep: (duration: DurationInput) => Effect.Effect<void>

Suspends the execution of an effect for a specified Duration.

Details

This function pauses the execution of an effect for a given duration. It is asynchronous, meaning that it does not block the fiber executing the effect. Instead, the fiber is suspended during the delay period and can resume once the specified time has passed.

The duration can be specified using various formats supported by the Duration module, such as a string ("2 seconds") or numeric value representing milliseconds.

Example

import { Effect } from "effect"
const program = Effect.gen(function*() {
console.log("Starting task...")
yield* Effect.sleep("3 seconds") // Waits for 3 seconds
console.log("Task completed!")
})
Effect.runFork(program)
// Output:
// Starting task...
// Task completed!

@since2.0.0

sleep
("1 second")
// 打开闩锁,释放纤程
yield*
const latch: Effect.Latch
latch
.
Latch.open: Effect.Effect<void, never, never>

Opens the latch, releasing all fibers waiting on it.

Details

Once the latch is opened, it remains open. Any fibers waiting on await will be released and can continue execution.

open
// 等待分叉的纤程完成
yield*
const fiber: RuntimeFiber<void, never>
fiber
.
Fiber<void, never>.await: Effect.Effect<Exit<void, never>, never, never>

Awaits the fiber, which suspends the awaiting fiber until the result of the fiber has been determined.

await
})
import Effect

@since2.0.0

@since2.0.0

@since2.0.0

Effect
.
const runFork: <void, never>(effect: Effect.Effect<void, never, never>, options?: RunForkOptions) => RuntimeFiber<void, never>

Runs an effect in the background, returning a fiber that can be observed or interrupted.

Unless you specifically need a Promise or synchronous operation, runFork is a good default choice.

Details

This function is the foundational way to execute an effect in the background. It creates a "fiber," a lightweight, cooperative thread of execution that can be observed (to access its result), interrupted, or joined. Fibers are useful for concurrent programming and allow effects to run independently of the main program flow.

Once the effect is running in a fiber, you can monitor its progress, cancel it if necessary, or retrieve its result when it completes. If the effect fails, the fiber will propagate the failure, which you can observe and handle.

When to Use

Use this function when you need to run an effect in the background, especially if the effect is long-running or performs periodic tasks. It's suitable for tasks that need to run independently but might still need observation or management, like logging, monitoring, or scheduled tasks.

This function is ideal if you don't need the result immediately or if the effect is part of a larger concurrent workflow.

Example (Running an Effect in the Background)

import { Effect, Console, Schedule, Fiber } from "effect"
// ┌─── Effect<number, never, never>
// ▼
const program = Effect.repeat(
Console.log("running..."),
Schedule.spaced("200 millis")
)
// ┌─── RuntimeFiber<number, never>
// ▼
const fiber = Effect.runFork(program)
setTimeout(() => {
Effect.runFork(Fiber.interrupt(fiber))
}, 500)

@since2.0.0

runFork
(
const program: Effect.Effect<void, never, never>
program
)
// 输出: open sesame (1 秒后)

当您有一个一次性事件或条件来确定纤程是否可以继续时,闩锁很有用。例如,您可能使用闩锁阻塞所有纤程,直到设置步骤完成,然后打开闩锁以便每个人都可以继续。

具有一个锁的信号量(通常称为二进制信号量或互斥锁)通常用于互斥:它确保一次只有一个纤程访问共享资源或代码段。一旦纤程获得锁,在锁被释放之前,没有其他纤程可以进入受保护区域。

简而言之:

  • 如果您要在特定事件上控制一组纤程(“在这里等待,直到这变为真”),请使用闩锁
  • 如果您需要确保一次只有一个纤程在关键部分或使用共享资源,请使用信号量(具有一个锁)