Observable & Observer - 既是可被监听的序列也是观察者

https://beeth0ven.github.io/RxSwift-Chinese-Documentation/content/rxswift_core/observable_and_observer.html

在我们所遇到的事物中,有一部分非常特别。它们既是可被监听的序列也是观察者

例如:textField的当前文本。它可以看成是由用户输入,而产生的一个文本序列。也可以是由外部文本序列,来控制当前显示内容的观察者

// 作为可被监听的序列
let observable = textField.rx.text
observable.subscribe(onNext: { text in show(text: text) })
// 作为观察者
let observer = textField.rx.text
let text: Observable<String?> = ...
text.bind(to: observer)

看下 textFieldrx.text 的定义

public var text: ControlProperty<String?> {
    return value
}

/// Reactive wrapper for `text` property.
public var value: ControlProperty<String?> {
    return base.rx.controlPropertyWithDefaultEvents(
        getter: { textField in
            textField.text
        },
        setter: { textField, value in
            // This check is important because setting text value always clears control state
            // including marked text selection which is imporant for proper input 
            // when IME input method is used.
            if textField.text != value {
                textField.text = value
            }
        }
    )
}

看下 ControlProperty<Element> 是啥


public protocol ControlPropertyType : ObservableType, ObserverType {
    ...
}

public struct ControlProperty<PropertyType> : ControlPropertyType {
    ...
    ...
}

有许多 UI 控件都存在这种特性,例如:switch的开关状态,segmentedControl的选中索引号,datePicker的选中日期等等。

另外,框架里面定义了一些辅助类型,它们既是可被监听的序列也是观察者。如果你能合适的应用这些辅助类型,它们就可以帮助你更准确的描述事物的特征。

AsyncSubject

https://beeth0ven.github.io/RxSwift-Chinese-Documentation/content/rxswift_core/observable_and_observer/async_subject.html

public final class AsyncSubject<Element>
    : Observable<Element>
    , SubjectType
    , ObserverType
    , SynchronizedUnsubscribeType {
    ...
    ...
}
  • 如果源 Observable在发出多个元素并产生完成事件后, AsyncSubject 发出最后一个元素(仅仅只有最后一个元素);
  • 如果源 Observable 没有发出任何元素,只有一个完成事件, AsyncSubject 也只有一个完成事件。
  • 如果源 Observable 因为产生了一个 error 事件而中止, AsyncSubject 就不会发出任何元素,而是将这个 error 事件发送出来。

创建方法:

let subject = AsyncSubject<String>()

eg:

func testAsyncSubject() {
    let subject = AsyncSubject<String>()

    subject
        .subscribe({ (e) in
            print("Subscription: 1 Event:", e)
        })
        .disposed(by: disposeBag)
    // 1
    subject.onNext("🐶")
    subject.onNext("🐱")
    // 只发送 onCompleted前面最后一个
    subject.onNext("🐹")
    subject.onCompleted()
}

输出如下:

Subscription: 1 Event: next(🐹)
Subscription: 1 Event: completed

PublishSubject

https://beeth0ven.github.io/RxSwift-Chinese-Documentation/content/rxswift_core/observable_and_observer/publish_subject.html

public final class PublishSubject<Element>
    : Observable<Element>
    , SubjectType
    , Cancelable
    , ObserverType
    , SynchronizedUnsubscribeType {
    ...
    ...
}
  • 对观察者发送订阅后产生的元素
  • 在观察者订阅前发出的元素不会重新发送给观察者(只会发送订阅之后产生的元素给这个订阅者)

如果你希望观察者接收到所有的元素,你可以通过使用 Observablecreate 方法来创建 Observable,或者使用 ReplaySubject

创建方法:

let subject = PublishSubject<String>()

eg:

func testPublishSubject() {
    let disposeBag = DisposeBag()
    let subject = PublishSubject<String>()

    subject
        .subscribe({ (e) in
            print("Subscription: 1 Event:", e)
        })
        .disposed(by: disposeBag)

    subject.onNext("🐶")
    subject.onNext("🐱")

    subject
        .subscribe({ (e) in
            print("Subscription: 2 Event:", e)
        })
        .disposed(by: disposeBag)

    subject.onNext("🅰️")
    subject.onNext("🅱️")
    subject.onCompleted()
}

输出如下:

Subscription: 1 Event: next(🐶)
Subscription: 1 Event: next(🐱)
Subscription: 1 Event: next(🅰️)
Subscription: 2 Event: next(🅰️)
Subscription: 1 Event: next(🅱️)
Subscription: 2 Event: next(🅱️)
Subscription: 1 Event: completed
Subscription: 2 Event: completed

ReplaySubject

https://beeth0ven.github.io/RxSwift-Chinese-Documentation/content/rxswift_core/observable_and_observer/replay_subject.html

public class ReplaySubject<Element>
    : Observable<Element>
    , SubjectType
    , ObserverType
    , Disposable {
    ...
    ...
}
  • 对观察者发送全部的元素,无论观察者是何时进行订阅的。会按顺序补发订阅之前产生的元素
  • 这里存在多个版本的 ReplaySubject,有的只会将最新的 n 个元素发送给观察者,有的只会将限制时间段内最新的元素发送给观察者。
  • 如果把 ReplaySubject 当作观察者来使用,注意不要在多个线程调用 onNext, onErroronCompleted。这样会导致无序调用,将造成意想不到的结果。

创建方法:

// 缓存所有已产生的元素
let subject = ReplaySubject<String>.createUnbounded()
// 缓存指定数量的已产生的元素
let subject = ReplaySubject<String>.create(bufferSize: 1)

eg:

func testReplaySubject() {
    // let subject = ReplaySubject<String>.createUnbounded()
    let subject = ReplaySubject<String>.create(bufferSize: 1)

    subject
        .subscribe { print("Subscription: 1 Event:", $0) }
        .disposed(by: disposeBag)

    subject.onNext("🐶")
    subject.onNext("🐱")

    subject
        .subscribe { print("Subscription: 2 Event:", $0) }
        .disposed(by: disposeBag)

    subject.onNext("🅰️")
    subject.onNext("🅱️")
}

输出如下:

Subscription: 1 Event: next(🐶)
Subscription: 1 Event: next(🐱)
Subscription: 2 Event: next(🐱)
Subscription: 1 Event: next(🅰️)
Subscription: 2 Event: next(🅰️)
Subscription: 1 Event: next(🅱️)
Subscription: 2 Event: next(🅱️)

BehaviorSubject

https://beeth0ven.github.io/RxSwift-Chinese-Documentation/content/rxswift_core/observable_and_observer/behavior_subject.html

public final class BehaviorSubject<Element>
    : Observable<Element>
    , SubjectType
    , ObserverType
    , SynchronizedUnsubscribeType
    , Disposable {
    ...
    ...
}
  • 当观察者对 BehaviorSubject 进行订阅时,它会将源 Observable 中最新的元素发送出来
  • 如果不存在最新的元素,就发出默认元素。
  • 每当产生新的元素,都会发送给观察者。
  • 如果源 Observable 因为产生了一个 error 事件而中止, BehaviorSubject 就不会发出任何元素,而是将这个 error 事件发送出来。

eg:

func testBehaviorSubject() {
    let subject = BehaviorSubject(value: "🔴")

    subject
        .subscribe { print("Subscription: 1 Event:", $0) }
        .disposed(by: disposeBag)

    subject.onNext("🐶")
    subject.onNext("🐱")

    subject
        .subscribe { print("Subscription: 2 Event:", $0) }
        .disposed(by: disposeBag)

    subject.onNext("🅰️")
    subject.onNext("🅱️")

    subject
        .subscribe { print("Subscription: 3 Event:", $0) }
        .disposed(by: disposeBag)

    subject.onNext("🍐")
    subject.onNext("🍊")
}

输出如下:

Subscription: 1 Event: next(🔴)
Subscription: 1 Event: next(🐶)
Subscription: 1 Event: next(🐱)
Subscription: 2 Event: next(🐱)
Subscription: 1 Event: next(🅰️)
Subscription: 2 Event: next(🅰️)
Subscription: 1 Event: next(🅱️)
Subscription: 2 Event: next(🅱️)
Subscription: 3 Event: next(🅱️)
Subscription: 1 Event: next(🍐)
Subscription: 2 Event: next(🍐)
Subscription: 3 Event: next(🍐)
Subscription: 1 Event: next(🍊)
Subscription: 2 Event: next(🍊)
Subscription: 3 Event: next(🍊)

Variable

https://beeth0ven.github.io/RxSwift-Chinese-Documentation/content/rxswift_core/observable_and_observer/variable.html

public final class Variable<Element> {
    ...
    ...
}

Swift 中我们经常会用 var 关键字来声明变量。RxSwift 提供的 Variable 实际上是 varRx 版本,你可以将它看作是 RxVar。

我们来对比一下 var 以及 Variable 的用法:

使用 var:

// 在 ViewController 中
var model: Model? = nil {
    didSet { updateUI(with: model) }
}

override func viewDidLoad() {
    super.viewDidLoad()

    model = getModel()
}

func updateUI(with model: Model?) { ... }
func getModel() -> Model { ... }

使用 Variable:

// 在 ViewController 中
let model: Variable<Model?> = Variable(nil)

override func viewDidLoad() {
    super.viewDidLoad()

    model.asObservable()
        .subscribe(onNext: { [weak self] model in
            self?.updateUI(with: model)
        })
        .disposed(by: disposeBag)

    model.value = getModel()
}

func updateUI(with model: Model?) { ... }
func getModel() -> Model { ... }

第一种使用 var 的方式十分常见,在 ViewController 中监听 Model 的变化,然后刷新页面。

第二种使用 Variable 则是 RxSwift 独有的。Variable 几乎提供了 var 的所有功能。另外,加上一条非常重要的特性,就是可以通过调用 asObservable() 方法转换成序列。然后你可以对这个序列应用操作符,来合成其他的序列。所以,如果我们声明的变量需要提供 Rx 支持,那就选用 Variable 这个类型。

说明:

Variable 封装了一个 BehaviorSubject,所以它会持有当前值,并且 Variable 会对新的观察者发送当前值。它不会产生 error 事件。Variabledeinit 时,会发出一个 completed 事件。

eg:

func testVariable() {
    struct Model {
        var text: String?
        var image: UIImage?
    }

    func updateView(with model: Model?) {
        guard let m = model else { return }
        DispatchQueue.main.async {
            self.imageView.image = m.image
            print(m.text ?? "none")
        }
    }

    let model: Variable<Model?> = Variable.init(nil)

    model
        .asObservable()
        .subscribe(onNext: { (m) in
            updateView(with: m)
        })
        .disposed(by: disposeBag)

    getImage()
        .subscribe(onNext: { (image) in
            model.value = Model.init(text: image.description, image: image)
        }, onError: { (err) in
            if let err = err as? TError {
                err.printLog()
                return
            }
            print(err.localizedDescription)
        })
        .disposed(by: disposeBag)
}

ControlProperty

https://beeth0ven.github.io/RxSwift-Chinese-Documentation/content/rxswift_core/observable_and_observer/control_property.html

public protocol ControlPropertyType : ObservableType, ObserverType {

    /// - returns: `ControlProperty` interface
    func asControlProperty() -> ControlProperty<E>
}

public struct ControlProperty<PropertyType> : ControlPropertyType {
    ...
    ...
}

ControlProperty 专门用于描述 UI 控件属性的,它具有以下特征:

  • 不会产生 error 事件
  • 一定在 MainScheduler 订阅(主线程订阅)
  • 一定在 MainScheduler 监听(主线程监听)
  • 共享状态变化

eg:

func testControlProperty() {
    // 仅仅用于测试,主要用于UI控件,例如 textField.rx.text

    // Creat:
    var c_value: String = ""
    let c_observable = Observable<String>.create({ (observer) -> Disposable in
        observer.onNext(c_value)
        observer.onCompleted()
        return Disposables.create()
    })
    let c_observer = AnyObserver<String>.init { (e) in
        switch e {
        case .next(let el):
            c_value = el
        default:
            break
        }
        print("controlProperty is Changed: " + e.debugDescription)
    }
    let controlProperty = ControlProperty<String>.init(values: c_observable, valueSink: c_observer)


    // USE:

    let observable = Observable<String>.create({ (observer) -> Disposable in
        observer.onNext("测试1")
        observer.onNext("测试2")
        observer.onCompleted()
        return Disposables.create()
    })

    let observer = AnyObserver<String>.init { (e) in
        print("controlProperty Value Is: " + e.debugDescription)
    }


    observable
        .bind(to: controlProperty)
        .disposed(by: disposeBag)

    controlProperty
        .bind(to: observer)
        .disposed(by: disposeBag)
}

附录

Subject相关的一些 Protocol | Class | Struct 定义

  • ObservableType
public protocol ObservableConvertibleType {
    /// Type of elements in sequence.
    associatedtype E

    /// Converts `self` to `Observable` sequence.
    ///
    /// - returns: Observable sequence that represents `self`.
    func asObservable() -> Observable<E>
}

public protocol ObservableType : ObservableConvertibleType {
    /**
    Subscribes `observer` to receive events for this sequence.

    ### Grammar

    **Next\* (Error | Completed)?**

    * sequences can produce zero or more elements so zero or more `Next` events can be sent to `observer`
    * once an `Error` or `Completed` event is sent, the sequence terminates and can't produce any other elements

    It is possible that events are sent from different threads, but no two events can be sent concurrently to
    `observer`.

    ### Resource Management

    When sequence sends `Complete` or `Error` event all internal resources that compute sequence elements
    will be freed.

    To cancel production of sequence elements and free resources immediately, call `dispose` on returned
    subscription.

    - returns: Subscription for `observer` that can be used to cancel production of sequence elements and free resources.
    */
    func subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == E
}
  • Observable<Element>
public class Observable<Element> : ObservableType {
    ...
    ...
}
  • SubjectType
public protocol SubjectType : ObservableType {
    /// The type of the observer that represents this subject.
    ///
    /// Usually this type is type of subject itself, but it doesn't have to be.
    associatedtype SubjectObserverType : ObserverType

    /// Returns observer interface for subject.
    ///
    /// - returns: Observer interface for subject.
    func asObserver() -> SubjectObserverType

}
  • ObserverType
public protocol ObserverType {
    /// The type of elements in sequence that observer can observe.
    associatedtype E

    /// Notify observer about sequence event.
    ///
    /// - parameter event: Event that occurred.
    func on(_ event: Event<E>)
}
  • SynchronizedUnsubscribeType
protocol SynchronizedUnsubscribeType : class {
    associatedtype DisposeKey

    func synchronizedUnsubscribe(_ disposeKey: DisposeKey)
}
  • Disposable
public protocol Disposable {
    /// Dispose resource.
    func dispose()
}

results matching ""

    No results matching ""