Combine 是一个声明式、响应式框架,用于随着时间的推移处理异步事件。支持 iOS 13、macOS 10.15、watchOS 6 及之后的系统。这里是我学习 Combine 时的笔记,只是列出要点,没有详细的解释,权当备忘清单。
The Combine framework provides a declarative approach for how your app processes events. Rather than potentially implementing multiple delegate callbacks or completion handler closures, you can create a single processing chain for a given event source. Each part of the chain is a Combine operator that performs a distinct action on the elements received from the previous step.
Publishers & Subscribers
Publisher
Publisher
协议声明类型可以随着时间的推移传输一系列值。
发布者可以发出两种事件,值和完成事件。它可以发出零个或多个值,但只能发出一个完成事件,该事件可以是正常完成事件,也可以是错误。一旦发布者发出完成事件,它就完成了并且不能再发出任何事件。
Subscriber
Subscriber
协议声明类型可以从发布者接收输入。如果没有订阅者来接收输出,则发布者不会发出任何值。
Combine 提供两个内置订阅者:
sink(_:_:)
:允许你使用闭包来处理输出值
assign(to:on:)
:将结果输出绑定到数据模型或 UI 控件上的某些属性
1 2 3 4 5 6 7 8 9 10 let just = Just ("Hello world!" )_ = just .sink( receiveCompletion: { print ("Received completion" , $0 ) }, receiveValue: { print ("Received value" , $0 ) })
1 2 3 4 5 6 7 8 9 10 11 12 class SomeObject { var value: String = "" { didSet { print (value) } } } let object = SomeObject ()let publisher = ["Hello" , "world!" ].publisher_ = publisher .assign(to: \.value, on: object)
Cancellable
Subscription
协议继承自 Cancellable
协议,当订阅者完成其工作并且不再希望从发布者接收值时,可以调用 cancel()
以取消订阅。
如果你没有在订阅上明确调用 cancel()
,它将持续到发布者完成,或直到正常内存管理导致存储的订阅非初始化。到那时,它会为你取消订阅。
创建自定义订阅者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 let publisher = (1 ... 6 ).publisherfinal class IntSubscriber : Subscriber { typealias Input = Int typealias Failure = Never func receive (subscription : Subscription ) { subscription.request(.max(3 )) } func receive (_ input : Int ) -> Subscribers .Demand { print ("Received value" , input) return .none } func receive (completion : Subscribers .Completion <Never >) { print ("Received completion" , completion) } }
Future
就像可以使用 Just
创建向订阅者发出单个值然后完成的发布者一样, Future
可用于异步生成单个结果然后完成。
Future
是贪婪的,也就是说一旦创建就会执行。它不需要像普通发布者那样懒惰的订阅者。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 var subscriptions = Set <AnyCancellable >()func futureIncrement ( integer : Int , afterDelay delay : TimeInterval ) -> Future <Int , Never > { Future <Int , Never > { promise in print ("Original" ) DispatchQueue .global().asyncAfter(deadline: .now() + delay) { promise(.success(integer + 1 )) } } } let future = futureIncrement(integer: 1 , afterDelay: 3 )future .sink(receiveCompletion: { print ($0 ) }, receiveValue: { print ($0 ) }) .store(in: & subscriptions) future .sink(receiveCompletion: { print ("Second" , $0 ) }, receiveValue: { print ("Second" , $0 ) }) .store(in: & subscriptions)
类型擦除(Type erasure)
有时,你希望让订阅者订阅以接收来自发布者的事件,但无法访问有关该发布者的其他详细信息,这时可以使用类型擦除 eraseToAnyPublisher
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 var subscriptions = Set <AnyCancellable >()let subject = PassthroughSubject <Int , Never >() let publisher = subject.eraseToAnyPublisher() publisher .sink(receiveValue: { print ($0 ) }) .store(in: & subscriptions) subject.send(0 ) publisher.send(1 )
Operators
collect()
1 2 3 4 5 6 ["A" , "B" , "C" , "D" , "E" ].publisher .collect() .sink(receiveCompletion: { print ($0 ) }, receiveValue: { print ($0 ) }) .store(in: & subscriptions)
map(_:)
Mapping key paths
map<T>(_:)
map<T0, T1>(_:_:)
map<T0, T1, T2>(_:_:_:)
1 2 3 4 5 6 7 8 9 10 11 12 publisher .map(\.x, \.y) .sink(receiveValue: { x, y in print ( "The coordinate at (\(x) , \(y) ) is in quadrant" , quadrantOf(x: x, y: y) ) }) .store(in: & subscriptions) publisher.send(Coordinate (x: 10 , y: - 8 )) publisher.send(Coordinate (x: 0 , y: 5 ))
tryMap(_:)
1 2 3 4 5 Just ("Directory name that does not exist" ) .tryMap { try FileManager .default.contentsOfDirectory(atPath: $0 ) } .sink(receiveCompletion: { print ($0 ) }, receiveValue: { print ($0 ) }) .store(in: & subscriptions)
flatMap(maxPublishers:_:)
flatMap
接收三个发布者: P1
、 P2
和 P3
。 flatMap
从 P1
和 P2
发出发布者的值,但忽略 P3
因为 maxPublishers
设置为 2 。
replaceNil(with:)
1 2 3 4 5 ["A" , nil , "C" ].publisher .eraseToAnyPublisher() .replaceNil(with: "-" ) .sink(receiveValue: { print ($0 ) }) .store(in: & subscriptions)
replaceEmpty(with:)
scan(_:_:)
Filtering Operators
Filtering
filter
removeDuplicates
Compacting and ignoring
compactMap
ignoreOutput
Finding values
firstWhere
lastWhere
Dropping values
dropFirst
dropWhile
dropUntilOutputFrom
Limiting values
prefix
prefixWhile
prefixUntilOutputFrom
Combining Operators
Prepending
prepend(Output…)
The last prepend affects the upstream first. The same below.
1 2 3 4 5 6 .prepend(1 , 2 ) .prepend(- 1 , 0 )
prepend(Sequence)
prepend(Publisher)
Appending
append(Output…)
append(Sequence)
append(Publisher)
Advanced combining
switchToLatest
merge(with:)
combineLatest
zip
Time Manipulation Operators
Shifting time
delay(for:tolerance:scheduler:options)
运算符对整个值序列进行时间偏移。
Collecting values
collect(_ strategy:options:)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 let collectMaxCount = 2 let collectTimeStride = 4 let sourcePublisher = PassthroughSubject <Date , Never >()let collectedPublisher = sourcePublisher .collect(.byTime(DispatchQueue .main, .seconds(collectTimeStride))) .flatMap { dates in dates.publisher } let collectedPublisher2 = sourcePublisher .collect(.byTimeOrCount(DispatchQueue .main, .seconds(collectTimeStride), collectMaxCount)) .flatMap { dates in dates.publisher }
Holding off on events
Debounce
.debounce(for:scheduler:options:)
1 2 3 4 5 6 let subject = PassthroughSubject <String , Never >()let debounced = subject .debounce(for: .seconds(1.0 ), scheduler: DispatchQueue .main) .share()
Note: One thing to watch out for is the publisher’s completion. If your publisher completes right after the last value was emitted, but before the time configured for debounce
elapses, you will never see the last value in the debounced publisher!
Throttle
1 2 3 4 5 let throttleDelay = 1.0 let subject = PassthroughSubject <String , Never >()let throttled = subject .throttle(for: .seconds(throttleDelay), scheduler: DispatchQueue .main, latest: false ) .share()
和 debounce
的区别是:
debounce
等待接收到的值暂停,然后在指定的时间间隔后发出最新的值。
throttle
等待指定的时间间隔,然后发出在该时间间隔内收到的第一个或最新的值。它不关心暂停。
Timing out
timeout(_:scheduler:options:customError:)
1 2 let subject = PassthroughSubject <Void , Never >()let timedOutSubject = subject.timeout(.seconds(5 ), scheduler: DispatchQueue .main)
1 2 3 4 5 6 7 8 enum TimeoutError : Error { case timedOut } let subject = PassthroughSubject <Void , TimeoutError >()let timedOutSubject = subject.timeout(.seconds(5 ), scheduler: DispatchQueue .main, customError: { .timedOut })
Measuring time
measureInterval(using:)
1 2 3 4 5 let subject = PassthroughSubject <String , Never >()let measureSubject = subject.measureInterval(using: DispatchQueue .main)let measureSubject2 = subject.measureInterval(using: RunLoop .main)
Sequence Operators
Finding values
min
min
运算符可让你找到发布者发出的最小值。它是贪婪的,这意味着它必须等待发布者发送 .finished
完成事件。发布者完成后,运算符仅发出最小值。
max
first
不会等待上游发布者完成,而是在收到第一个发出的值时取消订阅。
还可以使用 first(where:)
。
last
output(at:)
output(in:)
该运算符发出索引范围内的各个值,而不是它们的集合。
Querying the publisher
count
contains
如果上游发布者发出指定的值,则 contains
运算符将发出 true
并取消订阅。
还可以使用 contains(where:)
。
allSatisfy
一旦不满足条件,allSatisfy
发出 false
就立即取消订阅。
reduce
scan
和 reduce
具有相同的功能,主要区别在于 scan
为每个发出的值发出累积值,而 reduce
一旦上游发布者发送 .finished
完成事件,就会发出单个累积值。
Action
Debugging
Printing events
print(:to:)
1 2 3 let subscription = (1 ... 3 ).publisher .print("publisher" ) .sink { _ in }
除了打印信息之外,对特定事件执行操作通常也很有用。我们称之为执行副作用 ,因为您“在侧面”采取的操作不会直接影响下游的其他发布者,但可能会产生类似于修改外部变量的效果。
handleEvents(receiveSubscription:receiveOutput:receiveCompletion:receiveCancel:receiveRequest:)
Using the debugger as a last resort
breakpointOnError()
breakpoint(receiveSubscription:receiveOutput:receiveCompletion:)
1 2 3 .breakpoint(receiveOutput: { value in return value > 10 && value < 15 })
Timers
Using RunLoop
1 2 3 4 5 6 7 8 9 10 11 12 13 let runLoop = RunLoop .mainlet subscription = runLoop.schedule( after: runLoop.now, interval: .seconds(1 ), tolerance: .milliseconds(100 ) ) { print ("Timer fired" ) } runLoop.schedule(after: .init (Date (timeIntervalSinceNow: 3.0 ))) { subscription.cancel() }
Using the Timer class
1 2 3 4 5 6 7 let subscription = Timer .publish(every: 1.0 , on: .main, in: .common) .autoconnect() .scan(0 ) { counter, _ in counter + 1 } .sink { counter in print ("Counter is \(counter) " ) }
Using DispatchQueue
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 let queue = DispatchQueue .mainlet source = PassthroughSubject <Int , Never >()var counter = 0 let cancellable = queue.schedule( after: queue.now, interval: .seconds(1 ) ) { source.send(counter) counter += 1 } let subscription = source.sink { print ("Timer emitted \($0 ) " ) }
Key-Value Observing
publisher(for:options:)
1 2 3 4 5 let queue = OperationQueue ()let subscription = queue.publisher(for: \.operationCount) .sink { print ("Outstanding operations in queue: \($0 ) " ) }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 class TestObject : NSObject { @objc dynamic var integerProperty: Int = 0 @objc dynamic var stringProperty: String = "" @objc dynamic var arrayProperty: [Float ] = [] } let obj = TestObject ()let subscription = obj.publisher(for: \.integerProperty) .sink { print ("integerProperty changes to \($0 ) " ) } let subscription2 = obj.publisher(for: \.stringProperty) .sink { print ("stringProperty changes to \($0 ) " ) } let subscription3 = obj.publisher(for: \.arrayProperty) .sink { print ("arrayProperty changes to \($0 ) " ) } obj.integerProperty = 100 obj.integerProperty = 200 obj.stringProperty = "Hello" obj.arrayProperty = [1.0 ] obj.stringProperty = "World" obj.arrayProperty = [1.0 , 2.0 ]
Observation options
options
参数是一个具有四个值的选项集: .initial
、 .prior
、 .old
和 .new
。默认值为 [.initial]
。
Resource Management
The share() operator
share()
运算符的目的是让你通过引用而不是通过值获取发布者。
share()
运算符返回 Publishers.Share
类的实例。这个新发布者“共享”上游发布者。
例如,你正在执行一个网络请求,希望多个订阅者接收结果而不需要多次请求。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 let shared = URLSession .shared .dataTaskPublisher(for: URL (string: "https://www.kodeco.com" )! ) .map(\.data) .print("shared" ) .share() print ("subscribing first" )let subscription1 = shared.sink( receiveCompletion: { _ in }, receiveValue: { print ("subscription1 received: '\($0 ) '" ) } ) print ("subscribing second" )let subscription2 = shared.sink( receiveCompletion: { _ in }, receiveValue: { print ("subscription2 received: '\($0 ) '" ) } )
The multicast(_:) operator
multicast(_:)
的独特特征是它返回的发布者是 ConnectablePublisher
。这意味着在调用其 connect()
方法之前,它不会订阅上游发布者。这使你有足够的时间来设置所需的所有订阅者,然后再让它连接到上游发布者并开始工作。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 let subject = PassthroughSubject <Data , URLError >()let multicasted = URLSession .shared .dataTaskPublisher(for: URL (string: "https://www.raywenderlich.com" )! ) .map(\.data) .print("multicast" ) .multicast(subject: subject) let subscription1 = multicasted .sink( receiveCompletion: { _ in }, receiveValue: { print ("subscription1 received: '\($0 ) '" ) } ) let subscription2 = multicasted .sink( receiveCompletion: { _ in }, receiveValue: { print ("subscription2 received: '\($0 ) '" ) } ) let cancellable = multicasted.connect()
Future
Future
是一个类,而不是一个结构。创建后,立即执行,存储已完成的 Promise
的结果并将其传递给当前和未来的订阅者。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 func performSomeWork () throws -> Int { print ("Performing some work and returning a result" ) return 5 } let future = Future <Int , Error > { fulfill in do { let result = try performSomeWork() fulfill(.success(result)) } catch { fulfill(.failure(error)) } } print ("Subscribing to future..." )let subscription1 = future .sink( receiveCompletion: { _ in print ("subscription1 completed" ) }, receiveValue: { print ("subscription1 received: '\($0 ) '" ) } ) let subscription2 = future .sink( receiveCompletion: { _ in print ("subscription2 completed" ) }, receiveValue: { print ("subscription2 received: '\($0 ) '" ) } )
Advanced Combine
Error Handling
Never
Failure
类型为 Never
的发布者表明该发布者永远不会失败。
Just
始终声明 Failure
为 Never
。
setFailureType
将绝对正确的发布者转变为会发出错误的发布者的第一种方法是使用 setFailureType
。这是另一个仅适用于失败类型为 Never
的发布者的运算符。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 enum MyError : Error { case ohNo } Just ("Hello" ) .setFailureType(to: MyError .self ) .sink( receiveCompletion: { completion in switch completion { case .failure(.ohNo): print ("Finished with Oh No!" ) case .finished: print ("Finished successfully!" ) } }, receiveValue: { value in print ("Got value: \(value) " ) } ) .store(in: & subscriptions)
assign(to:on:)
assign
运算符仅适用于不会失败的发布者,与 setFailureType
相同。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 class Person { let id = UUID () var name = "Unknown" } let person = Person ()print ("1" , person.name)Just ("Shai" ) .handleEvents( receiveCompletion: { _ in print ("2" , person.name) } ) .assign(to: \.name, on: person) .store(in: & subscriptions)
assign(to:)
assign(to:on:)
会强引用 on
参数的对象。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 class MyViewModel : ObservableObject { @Published var currentDate = Date () init () { Timer .publish(every: 1 , on: .main, in: .common) .autoconnect() .prefix(3 ) .assign(to: & $currentDate ) } } let vm = MyViewModel ()vm.$currentDate .sink(receiveValue: { print ($0 ) }) .store(in: & subscriptions)
assertNoFailure
在开发过程中,为确认发布者无法以失败事件完成时,assertNoFailure
运算符非常有用。它不会阻止上游发出故障事件。但是,如果检测到错误,它会崩溃并显示 fatalError
。
1 2 3 4 5 6 7 8 9 10 enum MyError : Error { case ohNo } Just ("Hello" ) .setFailureType(to: MyError .self ) .tryMap { _ in throw MyError .ohNo } .assertNoFailure() .sink(receiveValue: { print ("Got value: \($0 ) " ) }) .store(in: & subscriptions)
Catching and retrying
retry
运算符接受一个数字。如果发布者失败,它将重新订阅上游并重试最多您指定的次数。如果所有重试都失败,它只会将错误推送到下游。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 let photoService = PhotoService ()photoService .fetchPhoto(quality: .high, failingTimes: 2 ) .handleEvents( receiveSubscription: { _ in print ("Trying ..." ) }, receiveCompletion: { guard case .failure(let error) = $0 else { return } print ("Got error: \(error) " ) } ) + .retry(3 )+ .catch { _ -> PhotoService .Publisher in print ("Failed fetching high quality, falling back to low quality" ) return photoService.fetchPhoto(quality: .low) } + .replaceError(with: UIImage (named: "na.jpg" )! ) .sink( receiveCompletion: { print ("\($0 ) " ) }, receiveValue: { image in image print ("Got image: \(image) " ) } ) .store(in: & subscriptions)
Schedulers
Combine 框架提供了两个与调度程序一起使用的基本运算符:
subscribe(on:)
和 subscribe(on:options:)
在指定的调度程序上创建订阅(开始工作)。
receive(on:)
和 receive(on:options:)
在指定的调度程序上传递值。
此外,以下运算符将调度程序和调度程序选项作为参数。
debounce(for:scheduler:options:)
delay(for:tolerance:scheduler:options:)
measureInterval(using:options:)
throttle(for:scheduler:latest:)
timeout(_:scheduler:options:customError:)
subscribe(on:) 和 receive(on:)
Publisher
接收订阅者并创建一个 Subscription
Subscriber
接收订阅并请求来自发布者的值(虚线)
Publisher
开始工作(通过 Subscription
)
Publisher
发出值(通过 Subscription
)
运算符转换值
Subscriber
接收最终值
步骤一、二和三通常发生在代码订阅发布者时当前的线程上。但是当您使用 subscribe(on:)
运算符时,所有这些操作都在您指定的调度程序上运行。例如,发布者在后台执行一些昂贵的计算时,以避免阻塞主线程,执行此操作的简单方法是使用 subscribe(on:)
。
receive(on:)
允许你指定应使用哪个调度程序来向订阅者传递值。例如在主线程接收值以更新 UI。
Scheduler implementations
ImmediateScheduler
:立即在当前线程上执行代码,这是默认的执行上下文
RunLoop
:绑定到 Foundation
的 Thread
对象
DispatchQueue
:可以是串行的也可以是并发的
OperationQueue
:调节工作项执行的队列
TestScheduler
:一个虚拟的、模拟的调度程序,用于测试
RunLoop.main vs DispatchQueue.main
RunLoop 是管理输入源(例如应用程序的触摸)的对象的编程接口。 RunLoop 由系统创建和管理,系统还负责为每个线程对象创建一个 RunLoop 对象。系统还负责创建代表主线程的主运行循环。
DispatchQueue.main
是与当前进程的主线程关联的调度队列。系统负责生成代表主线程的队列。调度队列在其关联的线程上串行或并发地执行任务。
RunLoop.main
和 DispatchQueue.main
都在主线程上执行其代码,这意味着您可以使用两者来更新用户界面。
RunLoop.main
和 DispatchQueue.main
之间最显着的区别是后者即使在 RunLoop 繁忙时也能直接执行。例如,在使用 DispatchQueue.main
作为调度程序时,下载的图像即使在滚动时也会立即显示,而使用 RunLoop.main
时图像仅在滚动后显示。换句话说:只要发生用户交互,主运行循环上计划的闭包的执行就会被延迟执行。
另见