Combine 是一个声明式、响应式框架,用于随着时间的推移处理异步事件。支持 iOS 13、macOS 10.15、watchOS 6 及之后的系统。这里是我学习 Combine 时的笔记,只是列出要点,没有详细的解释,权当备忘清单。

The Combine framework provides a declarative approach for how your app processes events. Rather than potentially implementing multiple delegate callbacks or completion handler closures, you can create a single processing chain for a given event source. Each part of the chain is a Combine operator that performs a distinct action on the elements received from the previous step.

Publishers & Subscribers

Publishers & Subscribers

Publisher

Publisher 协议声明类型可以随着时间的推移传输一系列值。

发布者可以发出两种事件,值和完成事件。它可以发出零个或多个值,但只能发出一个完成事件,该事件可以是正常完成事件,也可以是错误。一旦发布者发出完成事件,它就完成了并且不能再发出任何事件。

Subscriber

Subscriber 协议声明类型可以从发布者接收输入。如果没有订阅者来接收输出,则发布者不会发出任何值。

Combine 提供两个内置订阅者:

  • sink(_:_:):允许你使用闭包来处理输出值
  • assign(to:on:):将结果输出绑定到数据模型或 UI 控件上的某些属性
1
2
3
4
5
6
7
8
9
10
let just = Just("Hello world!")

_ = just
.sink(
receiveCompletion: {
print("Received completion", $0)
},
receiveValue: {
print("Received value", $0)
})
1
2
3
4
5
6
7
8
9
10
11
12
class SomeObject {
var value: String = "" {
didSet {
print(value)
}
}
}

let object = SomeObject()
let publisher = ["Hello", "world!"].publisher
_ = publisher
.assign(to: \.value, on: object)

Cancellable

Subscription 协议继承自 Cancellable 协议,当订阅者完成其工作并且不再希望从发布者接收值时,可以调用 cancel() 以取消订阅。
如果你没有在订阅上明确调用 cancel(),它将持续到发布者完成,或直到正常内存管理导致存储的订阅非初始化。到那时,它会为你取消订阅。

创建自定义订阅者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
let publisher = (1 ... 6).publisher

final class IntSubscriber: Subscriber {
typealias Input = Int
typealias Failure = Never

func receive(subscription: Subscription) {
subscription.request(.max(3))
}

func receive(_ input: Int) -> Subscribers.Demand {
print("Received value", input)
return .none
}

func receive(completion: Subscribers.Completion<Never>) {
print("Received completion", completion)
}
}

Future

就像可以使用 Just 创建向订阅者发出单个值然后完成的发布者一样, Future 可用于异步生成单个结果然后完成。

Future 是贪婪的,也就是说一旦创建就会执行。它不需要像普通发布者那样懒惰的订阅者。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
var subscriptions = Set<AnyCancellable>()

func futureIncrement(
integer: Int,
afterDelay delay: TimeInterval) -> Future<Int, Never>
{
Future<Int, Never> { promise in
print("Original")
DispatchQueue.global().asyncAfter(deadline: .now() + delay) {
promise(.success(integer + 1))
}
}
}

let future = futureIncrement(integer: 1, afterDelay: 3)

future
.sink(receiveCompletion: { print($0) },
receiveValue: { print($0) })
.store(in: &subscriptions)

// 在指定的延迟之后,第二个订阅会收到相同的值
// feature 不会重新履行诺言;相反,它共享或重放其输出
future
.sink(receiveCompletion: { print("Second", $0) },
receiveValue: { print("Second", $0) })
.store(in: &subscriptions)

类型擦除(Type erasure)

有时,你希望让订阅者订阅以接收来自发布者的事件,但无法访问有关该发布者的其他详细信息,这时可以使用类型擦除 eraseToAnyPublisher

1
2
3
4
5
6
7
8
9
10
11
12
13
14
var subscriptions = Set<AnyCancellable>()

let subject = PassthroughSubject<Int, Never>()

let publisher = subject.eraseToAnyPublisher()

publisher
.sink(receiveValue: { print($0) })
.store(in: &subscriptions)

subject.send(0)

// error: Value of type 'AnyPublisher<Int, Never>' has no member 'send'
publisher.send(1)

Operators

Transforming Operators

collect()

collect()

1
2
3
4
5
6
["A", "B", "C", "D", "E"].publisher
.collect()
// .collect(2)
.sink(receiveCompletion: { print($0) },
receiveValue: { print($0) })
.store(in: &subscriptions)

map(_:)

map(_:)

Mapping key paths
  • map<T>(_:)
  • map<T0, T1>(_:_:)
  • map<T0, T1, T2>(_:_:_:)
1
2
3
4
5
6
7
8
9
10
11
12
publisher
.map(\.x, \.y)
.sink(receiveValue: { x, y in
print(
"The coordinate at (\(x), \(y)) is in quadrant",
quadrantOf(x: x, y: y)
)
})
.store(in: &subscriptions)

publisher.send(Coordinate(x: 10, y: -8))
publisher.send(Coordinate(x: 0, y: 5))
tryMap(_:)
1
2
3
4
5
Just("Directory name that does not exist")
.tryMap { try FileManager.default.contentsOfDirectory(atPath: $0) }
.sink(receiveCompletion: { print($0) },
receiveValue: { print($0) })
.store(in: &subscriptions)

flatMap(maxPublishers:_:)

flatMap(maxPublishers:_:)

flatMap 接收三个发布者: P1P2P3flatMapP1P2 发出发布者的值,但忽略 P3 因为 maxPublishers 设置为 2 。

replaceNil(with:)

replaceNil(with:)

1
2
3
4
5
["A", nil, "C"].publisher
.eraseToAnyPublisher()
.replaceNil(with: "-")
.sink(receiveValue: { print($0) })
.store(in: &subscriptions)

replaceEmpty(with:)

replaceEmpty(with:)

scan(_:_:)

scan

Filtering Operators

Filtering

filter

filter

removeDuplicates

removeDuplicates

Compacting and ignoring

compactMap

compactMap

ignoreOutput

ignoreOutput

Finding values

firstWhere

firstWhere

lastWhere

lastWhere

Dropping values

dropFirst

dropFirst

dropWhile

dropWhile

dropUntilOutputFrom

dropUntilOutputFrom

Limiting values

prefix

prefix

prefixWhile

prefixWhile

prefixUntilOutputFrom

prefixUntilOutputFrom

Combining Operators

Prepending

prepend(Output…)

prepend(Output…)

The last prepend affects the upstream first. The same below.

1
2
3
4
5
6
// ...
.prepend(1, 2)
.prepend(-1, 0)
// ...

// output: -1 0 1 2
prepend(Sequence)

prepend(Sequence)

prepend(Publisher)

prepend(Publisher)

Appending

append(Output…)

append(Output…)

append(Sequence)

append(Sequence)

append(Publisher)

append(Publisher)

Advanced combining

switchToLatest

switchToLatest

merge(with:)

merge(with:)

combineLatest

combineLatestPublisher

zip

zip

Time Manipulation Operators

Shifting time

delay(for:tolerance:scheduler:options) 运算符对整个值序列进行时间偏移。

1.5s delay

Collecting values

collect(_ strategy:options:)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
let collectMaxCount = 2
let collectTimeStride = 4

let sourcePublisher = PassthroughSubject<Date, Never>()

let collectedPublisher = sourcePublisher
// 按指定的时间间隔从发布者收集值
.collect(.byTime(DispatchQueue.main, .seconds(collectTimeStride)))
.flatMap { dates in dates.publisher }

let collectedPublisher2 = sourcePublisher
// 按指定的时间间隔从发布者收集值并限制收集的值的数量
.collect(.byTimeOrCount(DispatchQueue.main,
.seconds(collectTimeStride),
collectMaxCount))
.flatMap { dates in dates.publisher }

Holding off on events

Debounce

.debounce(for:scheduler:options:)

1
2
3
4
5
6
let subject = PassthroughSubject<String, Never>()

let debounced = subject
// 每秒最多允许发送一个值,发送该一秒间隔内发送的最后一个值(如果有)
.debounce(for: .seconds(1.0), scheduler: DispatchQueue.main)
.share()

Note: One thing to watch out for is the publisher’s completion. If your publisher completes right after the last value was emitted, but before the time configured for debounce elapses, you will never see the last value in the debounced publisher!

Throttle
1
2
3
4
5
let throttleDelay = 1.0
let subject = PassthroughSubject<String, Never>()
let throttled = subject
.throttle(for: .seconds(throttleDelay), scheduler: DispatchQueue.main, latest: false)
.share()

debounce 的区别是:

  • debounce 等待接收到的值暂停,然后在指定的时间间隔后发出最新的值。
  • throttle 等待指定的时间间隔,然后发出在该时间间隔内收到的第一个或最新的值。它不关心暂停。
Timing out

timeout(_:scheduler:options:customError:)

1
2
let subject = PassthroughSubject<Void, Never>()
let timedOutSubject = subject.timeout(.seconds(5), scheduler: DispatchQueue.main)
1
2
3
4
5
6
7
8
enum TimeoutError: Error {
case timedOut
}

let subject = PassthroughSubject<Void, TimeoutError>()
let timedOutSubject = subject.timeout(.seconds(5),
scheduler: DispatchQueue.main,
customError: { .timedOut })

Measuring time

measureInterval(using:)

1
2
3
4
5
let subject = PassthroughSubject<String, Never>()
// 在 `DispatchQueue` 的情况下, `TimeInterval` 被定义为“使用该类型的值(以纳秒为单位)创建的 `DispatchTimeInterval` ”
let measureSubject = subject.measureInterval(using: DispatchQueue.main)
// 在 `RunLoop` 调度程序的输出,其大小直接以秒表示
let measureSubject2 = subject.measureInterval(using: RunLoop.main)

Sequence Operators

Finding values

min

min 运算符可让你找到发布者发出的最小值。它是贪婪的,这意味着它必须等待发布者发送 .finished 完成事件。发布者完成后,运算符仅发出最小值。

min

max

max

first

不会等待上游发布者完成,而是在收到第一个发出的值时取消订阅。
还可以使用 first(where:)

first

last

last

output(at:)

output(at:)

output(in:)

该运算符发出索引范围内的各个值,而不是它们的集合。

output(in:)

Querying the publisher

count

count

contains

如果上游发布者发出指定的值,则 contains 运算符将发出 true 并取消订阅。
还可以使用 contains(where:)

contains

allSatisfy

一旦不满足条件,allSatisfy 发出 false 就立即取消订阅。

allSatisfy

reduce

scanreduce 具有相同的功能,主要区别在于 scan 为每个发出的值发出累积值,而 reduce 一旦上游发布者发送 .finished 完成事件,就会发出单个累积值。

reduce

Action

Debugging

Printing events

print(:to:)

1
2
3
let subscription = (1...3).publisher
.print("publisher")
.sink { _ in }

Acting on events — performing side effects

除了打印信息之外,对特定事件执行操作通常也很有用。我们称之为执行副作用,因为您“在侧面”采取的操作不会直接影响下游的其他发布者,但可能会产生类似于修改外部变量的效果。

handleEvents(receiveSubscription:receiveOutput:receiveCompletion:receiveCancel:receiveRequest:)

Using the debugger as a last resort

breakpointOnError()
breakpoint(receiveSubscription:receiveOutput:receiveCompletion:)

1
2
3
.breakpoint(receiveOutput: { value in
return value > 10 && value < 15
})

Timers

Using RunLoop

1
2
3
4
5
6
7
8
9
10
11
12
13
let runLoop = RunLoop.main

let subscription = runLoop.schedule(
after: runLoop.now,
interval: .seconds(1),
tolerance: .milliseconds(100)
) {
print("Timer fired")
}

runLoop.schedule(after: .init(Date(timeIntervalSinceNow: 3.0))) {
subscription.cancel()
}

Using the Timer class

1
2
3
4
5
6
7
let subscription = Timer
.publish(every: 1.0, on: .main, in: .common)
.autoconnect()
.scan(0) { counter, _ in counter + 1 }
.sink { counter in
print("Counter is \(counter)")
}

Using DispatchQueue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
let queue = DispatchQueue.main
let source = PassthroughSubject<Int, Never>()
var counter = 0

let cancellable = queue.schedule(
after: queue.now,
interval: .seconds(1)
) {
source.send(counter)
counter += 1
}

let subscription = source.sink {
print("Timer emitted \($0)")
}

Key-Value Observing

publisher(for:options:)

1
2
3
4
5
let queue = OperationQueue()
let subscription = queue.publisher(for: \.operationCount)
.sink {
print("Outstanding operations in queue: \($0)")
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class TestObject: NSObject {
@objc dynamic var integerProperty: Int = 0
@objc dynamic var stringProperty: String = ""
@objc dynamic var arrayProperty: [Float] = []
}

let obj = TestObject()
let subscription = obj.publisher(for: \.integerProperty)
.sink {
print("integerProperty changes to \($0)")
}
let subscription2 = obj.publisher(for: \.stringProperty)
.sink {
print("stringProperty changes to \($0)")
}
let subscription3 = obj.publisher(for: \.arrayProperty)
.sink {
print("arrayProperty changes to \($0)")
}

obj.integerProperty = 100
obj.integerProperty = 200
obj.stringProperty = "Hello"
obj.arrayProperty = [1.0]
obj.stringProperty = "World"
obj.arrayProperty = [1.0, 2.0]

Observation options

options 参数是一个具有四个值的选项集: .initial.prior.old.new 。默认值为 [.initial]

Resource Management

The share() operator

share() 运算符的目的是让你通过引用而不是通过值获取发布者。
share() 运算符返回 Publishers.Share 类的实例。这个新发布者“共享”上游发布者。

例如,你正在执行一个网络请求,希望多个订阅者接收结果而不需要多次请求。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
let shared = URLSession.shared
.dataTaskPublisher(for: URL(string: "https://www.kodeco.com")!)
.map(\.data)
.print("shared")
.share()

print("subscribing first")

let subscription1 = shared.sink(
receiveCompletion: { _ in },
receiveValue: { print("subscription1 received: '\($0)'") }
)

print("subscribing second")

let subscription2 = shared.sink(
receiveCompletion: { _ in },
receiveValue: { print("subscription2 received: '\($0)'") }
)

The multicast(_:) operator

multicast(_:) 的独特特征是它返回的发布者是 ConnectablePublisher 。这意味着在调用其 connect() 方法之前,它不会订阅上游发布者。这使你有足够的时间来设置所需的所有订阅者,然后再让它连接到上游发布者并开始工作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
let subject = PassthroughSubject<Data, URLError>()

let multicasted = URLSession.shared
.dataTaskPublisher(for: URL(string: "https://www.raywenderlich.com")!)
.map(\.data)
.print("multicast")
.multicast(subject: subject)

let subscription1 = multicasted
.sink(
receiveCompletion: { _ in },
receiveValue: { print("subscription1 received: '\($0)'") }
)

let subscription2 = multicasted
.sink(
receiveCompletion: { _ in },
receiveValue: { print("subscription2 received: '\($0)'") }
)

let cancellable = multicasted.connect()

Future

Future 是一个类,而不是一个结构。创建后,立即执行,存储已完成的 Promise 的结果并将其传递给当前和未来的订阅者。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func performSomeWork() throws -> Int {
print("Performing some work and returning a result")
return 5
}

let future = Future<Int, Error> { fulfill in
do {
let result = try performSomeWork()
fulfill(.success(result))
} catch {
fulfill(.failure(error))
}
}

print("Subscribing to future...")

let subscription1 = future
.sink(
receiveCompletion: { _ in print("subscription1 completed") },
receiveValue: { print("subscription1 received: '\($0)'") }
)
let subscription2 = future
.sink(
receiveCompletion: { _ in print("subscription2 completed") },
receiveValue: { print("subscription2 received: '\($0)'") }
)

Advanced Combine

Error Handling

Never

Failure 类型为 Never 的发布者表明该发布者永远不会失败。

Never

Just 始终声明 FailureNever

setFailureType

将绝对正确的发布者转变为会发出错误的发布者的第一种方法是使用 setFailureType 。这是另一个仅适用于失败类型为 Never 的发布者的运算符。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
enum MyError: Error {
case ohNo
}

Just("Hello")
.setFailureType(to: MyError.self)
.sink(
receiveCompletion: { completion in
switch completion {
case .failure(.ohNo):
print("Finished with Oh No!")
case .finished:
print("Finished successfully!")
}
},
receiveValue: { value in
print("Got value: \(value)")
}
)
.store(in: &subscriptions)

assign(to:on:)

assign 运算符仅适用于不会失败的发布者,与 setFailureType 相同。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class Person {
let id = UUID()
var name = "Unknown"
}

let person = Person()
print("1", person.name)

Just("Shai")
.handleEvents(
receiveCompletion: { _ in print("2", person.name) }
)
.assign(to: \.name, on: person)
.store(in: &subscriptions)

assign(to:)

assign(to:on:) 会强引用 on 参数的对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class MyViewModel: ObservableObject {
@Published var currentDate = Date()

init() {
Timer.publish(every: 1, on: .main, in: .common)
.autoconnect()
.prefix(3)
.assign(to: &$currentDate)
// 下面两行会造成循环引用
// .assign(to: \.currentDate, on: self)
// .store(in: &subscriptions)
}
}

let vm = MyViewModel()
vm.$currentDate
.sink(receiveValue: { print($0) })
.store(in: &subscriptions)

assertNoFailure

在开发过程中,为确认发布者无法以失败事件完成时,assertNoFailure 运算符非常有用。它不会阻止上游发出故障事件。但是,如果检测到错误,它会崩溃并显示 fatalError

1
2
3
4
5
6
7
8
9
10
enum MyError: Error {
case ohNo
}

Just("Hello")
.setFailureType(to: MyError.self)
.tryMap { _ in throw MyError.ohNo }
.assertNoFailure()
.sink(receiveValue: { print("Got value: \($0) ") })
.store(in: &subscriptions)

Catching and retrying

retry 运算符接受一个数字。如果发布者失败,它将重新订阅上游并重试最多您指定的次数。如果所有重试都失败,它只会将错误推送到下游。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
let photoService = PhotoService()

photoService
.fetchPhoto(quality: .high, failingTimes: 2)
.handleEvents(
receiveSubscription: { _ in print("Trying ...") },
receiveCompletion: {
guard case .failure(let error) = $0 else { return }
print("Got error: \(error)")
}
)
+ .retry(3)
+ .catch { _ -> PhotoService.Publisher in
print("Failed fetching high quality, falling back to low quality")
return photoService.fetchPhoto(quality: .low)
}
+ .replaceError(with: UIImage(named: "na.jpg")!)
.sink(
receiveCompletion: { print("\($0)") },
receiveValue: { image in
image
print("Got image: \(image)")
}
)
.store(in: &subscriptions)

Schedulers

Combine 框架提供了两个与调度程序一起使用的基本运算符:

  • subscribe(on:)subscribe(on:options:) 在指定的调度程序上创建订阅(开始工作)。
  • receive(on:)receive(on:options:) 在指定的调度程序上传递值。

此外,以下运算符将调度程序和调度程序选项作为参数。

  • debounce(for:scheduler:options:)
  • delay(for:tolerance:scheduler:options:)
  • measureInterval(using:options:)
  • throttle(for:scheduler:latest:)
  • timeout(_:scheduler:options:customError:)

subscribe(on:) 和 receive(on:)

subscribe(on:) 和 receive(on:)

  1. Publisher 接收订阅者并创建一个 Subscription
  2. Subscriber 接收订阅并请求来自发布者的值(虚线)
  3. Publisher 开始工作(通过 Subscription
  4. Publisher 发出值(通过 Subscription
  5. 运算符转换值
  6. Subscriber 接收最终值

步骤一、二和三通常发生在代码订阅发布者时当前的线程上。但是当您使用 subscribe(on:) 运算符时,所有这些操作都在您指定的调度程序上运行。例如,发布者在后台执行一些昂贵的计算时,以避免阻塞主线程,执行此操作的简单方法是使用 subscribe(on:)

receive(on:) 允许你指定应使用哪个调度程序来向订阅者传递值。例如在主线程接收值以更新 UI。

Scheduler implementations

  • ImmediateScheduler:立即在当前线程上执行代码,这是默认的执行上下文
  • RunLoop:绑定到 FoundationThread 对象
  • DispatchQueue:可以是串行的也可以是并发的
  • OperationQueue:调节工作项执行的队列
  • TestScheduler:一个虚拟的、模拟的调度程序,用于测试
RunLoop.main vs DispatchQueue.main

RunLoop 是管理输入源(例如应用程序的触摸)的对象的编程接口。 RunLoop 由系统创建和管理,系统还负责为每个线程对象创建一个 RunLoop 对象。系统还负责创建代表主线程的主运行循环。

DispatchQueue.main 是与当前进程的主线程关联的调度队列。系统负责生成代表主线程的队列。调度队列在其关联的线程上串行或并发地执行任务。

RunLoop.mainDispatchQueue.main 都在主线程上执行其代码,这意味着您可以使用两者来更新用户界面。

RunLoop.mainDispatchQueue.main 之间最显着的区别是后者即使在 RunLoop 繁忙时也能直接执行。例如,在使用 DispatchQueue.main 作为调度程序时,下载的图像即使在滚动时也会立即显示,而使用 RunLoop.main 时图像仅在滚动后显示。换句话说:只要发生用户交互,主运行循环上计划的闭包的执行就会被延迟执行。

另见