Schedulers - 调度器

https://beeth0ven.github.io/RxSwift-Chinese-Documentation/content/rxswift_core/schedulers.html

SchedulersRx 实现多线程的核心模块,它主要用于控制任务在哪个线程或队列运行。

如果你曾经使用过 GCD, 那你对以下代码应该不会陌生:

// 后台取得数据,主线程处理结果
DispatchQueue.global(qos: .userInitiated).async {
    let data = try? Data(contentsOf: url)
    DispatchQueue.main.async {
        self.data = data
    }
}

如果用 RxSwift 来实现,大致是这样的:

let rxData: Observable<Data> = ...

rxData
    .subscribeOn(ConcurrentDispatchQueueScheduler(qos: .userInitiated))
    .observeOn(MainScheduler.instance)
    .subscribe(onNext: { [weak self] data in
        self?.data = data
    })
    .disposed(by: disposeBag)

使用 subscribeOn

我们用 subscribeOn 来决定数据序列的构建函数在哪个 Scheduler 上运行。以上例子中,由于获取 Data 需要花很长的时间,所以用 subscribeOn 切换到 后台 Scheduler 来获取 Data。这样可以避免主线程被阻塞。

eg:

首先写一些辅助代码:

// getCurrentQueueName() -> String
func getCurrentQueueName() -> String {
    let name = __dispatch_queue_get_label(nil)
    return String.init(cString: name, encoding: .utf8) ?? ""
}

// MARK: getObservable() -> Observable<String>
func getObservable() -> Observable<String> {
    return Observable<String>.create { [weak self] (observer) -> Disposable in
        guard let strongSelf = self else { return Disposables.create() }
        print("Observable  Queue Is: " + strongSelf.getCurrentQueueName())

        // 当前线程直接发送元素
        observer.on(.next("Test 1"))
        DispatchQueue.main.async {
            // 切换到主线程发送元素
            observer.on(.next("Test 2"))
        }
        return Disposables.create()
    }
}

// MARK: getObserver() -> AnyObserver<UIImage?>
func getObserver() -> AnyObserver<String> {
    return AnyObserver<String>.init(eventHandler: { [weak self] (e) in
        guard let strongSelf = self else { return }
        print("AnyObserver Queue Is: " + strongSelf.getCurrentQueueName())
        print("\t\t" + e.debugDescription)
    })
}

测试代码如下:

func testSubscribeOn() {
    let subscribeQueue = DispatchQueue.init(label: "ink.tbd.test.subscribeQueue")
    getObservable()
        // 数据序列的构建函数在哪个Scheduler上运行
        .subscribeOn(ConcurrentDispatchQueueScheduler.init(queue: subscribeQueue))
        .subscribe(getObserver())
        .disposed(by: disposeBag)
}

输出如下:

Observable  Queue Is: ink.tbd.test.subscribeQueue
AnyObserver Queue Is: ink.tbd.test.subscribeQueue
        next(Test 1)
AnyObserver Queue Is: com.apple.main-thread
        next(Test 2)

可以知道:

  • subscribeOn(_ scheduler: ImmediateSchedulerType) 可以使 Observable 的主体在指定线程中运行
  • Observer响应事件的线程受到 Observable 代码中中发送事件所在的线程影响,在什么线程中发送,响应也就在什么线程之中

使用 observeOn

我们用 observeOn 来决定在哪个 Scheduler 监听这个数据序列。以上例子中,通过使用 observeOn 方法切换到主线程来监听并且处理结果。

一个比较典型的例子就是,在后台发起网络请求,然后解析数据,最后在主线程刷新页面。你就可以先用 subscribeOn 切到后台去发送请求并解析数据,最后用 observeOn 切换到主线程更新页面。

eg:

func testObserveOn() {
    let observeQueue = DispatchQueue.init(label: "ink.tbd.test.observeQueue")
    getObservable()
        // 在哪个Scheduler监听这个数据序列
        .observeOn(ConcurrentDispatchQueueScheduler.init(queue: observeQueue))
        .subscribe(getObserver())
        .disposed(by: disposeBag)
}

运行结果如下:

Observable  Queue Is: com.apple.main-thread
AnyObserver Queue Is: ink.tbd.test.observeQueue
        next(Test 1)
AnyObserver Queue Is: ink.tbd.test.observeQueue
        next(Test 2)

可以知道:

observeOn(_ scheduler: ImmediateSchedulerType) 可以使 Observer 的主体在指定线程中运行,不管 Observable 代码中发送事件在什么线程,都会切换到指定线程执行。

RxSwift中提供的Scheduler

MainScheduler

MainScheduler 代表主线程。如果你需要执行一些和 UI 相关的任务,就需要切换到该 Scheduler 运行。

SerialDispatchQueueScheduler

SerialDispatchQueueScheduler 抽象了串行 DispatchQueue。如果你需要执行一些串行任务,可以切换到这个 Scheduler 运行。

ConcurrentDispatchQueueScheduler

ConcurrentDispatchQueueScheduler 抽象了并行 DispatchQueue。如果你需要执行一些并发任务,可以切换到这个 Scheduler 运行。

OperationQueueScheduler

OperationQueueScheduler 抽象了 NSOperationQueue

它具备 NSOperationQueue 的一些特点,例如,你可以通过设置 maxConcurrentOperationCount,来控制同时执行并发任务的最大数量。

results matching ""

    No results matching ""