springcloud服务注册机制(RxJava响应式编程框架)

RxJava响应式编程框架

在Spring Cloud框架中涉及的Ribbon和Hystrix两个重要的组件都使用了RxJava响应式编程框架,其作为重要的编程基础知识,特开辟一章对RxJava的使用进行详细的介绍。

Hystrix和Ribbon的代码中大量运用了RxJava的API,对于有RxJava基础的同学,学习Hystrix和Ribbon并不是一件难事。如果不懂RxJava,对于Hystrix和Ribbon的学习就会令人头疼不已。

springcloud服务注册机制(RxJava响应式编程框架)(1)

从基础原理讲起:观察者模式

本文的重要特色,从基础原理讲起。只有了解了基础原理,大家对新的知识,特别是复杂的知识才能更加容易地理解和掌握。

RxJava是基于观察者模式实现的,这里先带领大家复习一下观察者模式的基础原理和经典实现。当然,这也是Java工程师面试必备的一个重要知识点。

观察者模式的基础原理

观察者模式是常用的设计模式之一,是所有Java工程师必须掌握的设计模式。观察者模式也叫发布订阅模式。

此模式的角色中有一个可观察的主题对象Subject,有多个观察者Observer去关注它。当Subject的状态发生变化时,会自动通知这些Observer订阅者,令Observer做出响应。

在整个观察者模式中一共有4个角色:Subject(抽象主题、抽象被观察者)、Concrete Subject(具体主题、具体被观察者)、Observer(抽象观察者)以及ConcreteObserver(具体观察者)。

观察者模式的4个角色以及它们之间的关系如图4-1所示。

springcloud服务注册机制(RxJava响应式编程框架)(2)

图4-1 观察者模式的4个角色以及它们之间的关系

观察者模式中4个角色的介绍如下:

(1)Subject(抽象主题):Subject抽象主题的主要职责之一为维护Observer观察者对象的集合,集合里的所有观察者都订阅过该主题。Subject抽象主题负责提供一些接口,可以增加、删除和更新观察者对象。

(2)ConcreteSubject(具体主题):ConcreteSubject用于保持主题的状态,并且在主题的状态发生变化时给所有注册过的观察者发出通知。具体来说,ConcreteSubject需要调用Subject(抽象主题)基类的通知方法给所有注册过的观察者发出通知。

(3)Observer(抽象观察者):观察者的抽象类定义更新接口,使得被观察者可以在收到主题通知的时候更新自己的状态。

(4)ConcreteObserver(具体观察者):实现抽象观察者Observer所定义的更新接口,以便在收到主题的通知时完成自己状态的真正更新。

观察者模式的经典实现

首先来看Subject主题类的代码实现:它将所有订阅过自己的Observer观察者对象保存在一个集合中,然后提供一组方法完成Observer观察者的新增、删除和通知。

Subject主题类的参考代码实现如下:

package com.crazymaker.demo.observerPattern; import lombok.extern.slf4j.Slf4j; import java.util.ArrayList; import java.util.List; @Slf4j public class Subject { //保存订阅过自己的观察者对象 private List<Observer> observers = new ArrayList<>(); //观察者对象订阅 public void add(Observer observer) { observers.add(observer); log.info( "add an observer"); } //观察者对象注销 public void remove(Observer observer) { observers.remove(observer); log.info( "remove an observer"); } //通知所有注册的观察者对象 public void notifyObservers(String newState) { for (Observer observer : observers) { observer.update(newState); } } }

接着来看ConcreteSubject具体主题类:它首先拥有一个成员用于保持主题的状态,并且在主题的状态变化时调用基类Subject(抽象主题)的通知方法给所有注册过的观察者发出通知。

package com.crazymaker.demo.observerPattern; import lombok.extern.slf4j.Slf4j; @Data @Slf4jpublic class ConcreteSubject extends Subject { private String state; //保持主题的状态 public void change(String newState) { state = newState; log.info( "change state :" newState); //状态发生改变,通知观察者 notifyObservers(newState); } }

然后来看一下观察者Observer接口,它抽象出了一个观察者自身的状态更新方法。

package com.crazymaker.demo.observerPattern; public interface Observer { void update(String newState); //状态更新的方法 }

接着来看ConcreteObserver具体观察者类:它首先接收主题的通知,实现抽象观察者Observer所定义的update接口,以便在收到主题的状态发生变化时完成自己的状态更新。

package com.crazymaker.demo.observerPattern; import lombok.extern.slf4j.Slf4j; @Slf4j public class ObserverA implements Observer { //观察者状态 private String observerState; @Override public void update(String newState) { //更新观察者状态,让它与主题的状态一致 observerState = newState; log.info( "观察者的当前状态为:" observerState); } }

4个角色的实现代码已经介绍完了。如何使用观察者模式呢?步骤如下:

package com.crazymaker.demo.observerPattern; public class ObserverPatternDemo { public static void main(String[] args) { //第一步:创建主题 ConcreteSubject mConcreteSubject = new ConcreteSubject(); //第二步:创建观察者 Observer observerA = new ObserverA(); Observer ObserverB = new ObserverA(); //第三步:主题订阅 mConcreteSubject.add(observerA); mConcreteSubject.add(ObserverB); //第四步:主题状态变更 mConcreteSubject.change("倒计时结束,开始秒杀"); } }

运行示例程序,结果如下:

22:46:03.548 [main] INFO c.c.d.o.ConcreteSubject - change state:倒计时结束,开始秒杀 22:46:03.548 [main] INFO c.c.d.o.ObserverA -观察者的当前状态为:倒计时结束,开始秒杀 22:46:03.548 [main] INFO c.c.d.o.ObserverA - 观察者的当前状态为:倒计时结束,开始秒杀

RxJava中的观察者模式

RxJava是基于观察者模式设计的。RxJava中的Observable类和Subscriber类分别对应观察者模式中的Subject(抽象主题)和Observer(抽象观察者)两个角色。

在RxJava中,Observable和Subscriber通过subscribe()方法实现订阅关系,如图4-2所示。

springcloud服务注册机制(RxJava响应式编程框架)(3)

图4-2 RxJava通过subscribe()方法实现订阅关系

在RxJava中,Observable和Subscriber之间通过emitter.onNext(...)弹射的方式实现主题的消息发布,如图4-3所示。

springcloud服务注册机制(RxJava响应式编程框架)(4)

图4-3 RxJava通过emitter.onNext()弹射主题消息

RxJava中主题的消息发布方式之一是通过内部的弹射器Emitter完成。Emitter除了使用onNext()方法弹射消息之外,还定义了两个特殊的通知方法:onCompleted()和onError()。

(1)onCompleted():表示消息序列弹射完结。

RxJava主题(可观察者)中的Emitter可以不只发布(弹射)一个消息,可以重复使用其onNext()方法弹射一系列消息(或事件),这一系列消息组成一个序列。在绝大部分场景下,Observable内部有一个专门的Queue(队列)来负责缓存消息序列。当Emitter明确不会再有新的消息弹射出来时,需要触发onCompleted()方法,作为消息序列的结束标志。

RxJava主题(可观察者)的Emitter弹射器所弹出的消息序列也可以称为消息流。

(2)onError():表示主题的消息序列异常终止。

如果Observable在事件处理过程中出现异常,Emitter的onError()就会被触发,同时消息序列自动终止,不允许再有消息弹射出来。

RxJava的一个简单使用示例代码如下:

package com.crazymaker.demo.observerPattern; //省略import @Slf4j public class RxJavaObserverDemo { /** *演示RxJava中的Observer模式 */ @Test public void rxJavaBaseUse() { //被观察者(主题) Observable observable = Observable.create( new Action1<Emitter<String>>() { @Override public void call(Emitter<String> emitter) { emitter.onNext("apple"); emitter.onNext("banana"); emitter.onNext("pear"); emitter.onCompleted(); } },Emitter.BackpressureMode.NONE); //订阅者(观察者) Subscriber<String> subscriber = new Subscriber<String>() { @Override public void onNext(String s) { log.info("onNext: {}", s); } @Override public void onCompleted() { log.info("onCompleted"); } @Override public void onError(Throwable e) { log.info("onError"); } }; //订阅:Observable与Subscriber之间依然通过subscribe()进行关联 observable.subscribe(subscriber); } }

运行这个示例程序,结果如下:

11:29:07.555 [main] INFO c.c.d.o.RxJavaObserverDemo - onNext: apple 11:29:07.564 [main] INFO c.c.d.o.RxJavaObserverDemo - onNext: banana 11:29:07.564 [main] INFO c.c.d.o.RxJavaObserverDemo - onNext: pear 11:29:07.564 [main] INFO c.c.d.o.RxJavaObserverDemo - onCompleted

通过代码和运行接口可以看出:被观察者Observable与观察者Subscriber产生关联通过subscribe()方法完成。当订阅开始时,Observable主题便开始发送事件。

通过代码还可以看出:Subscriber有3个回调方法,其中onNext(String s)回调方法用于响应Observable主题正常的弹射消息,onCompleted()回调方法用于响应Observable主题的结束消息,onError(Throwable e)回调方法用于响应Observable主题的异常消息。在一个消息序列中,Emitter弹射器的onCompleted()正常结束和onError()异常终止只能调用一个,并且必须是消息序列中最后一个被发送的消息。换句话说,Emitter的onCompleted()和onError()两个方法是互斥的,在消息序列中调用了其中一个,就不可以再调用另一个。

通过示例可以看出,RxJava与经典的观察者模式不同。在RxJava中,主题内部有一个弹射器的角色,而经典的观察者模式中,主题所发送的是单个消息,并不是一个消息序列。

在RxJava中,Observable主题还会负责消息序列缓存,这一点像经典的生产者/消费者模式。在经典的生产者/消费者模式中,生产者生产数据后放入缓存队列,自己不进行处理,而消费者从缓存队列里拿到所要处理的数据,完成逻辑处理。从这一点来说,RxJava借鉴了生产者消费者模式的思想。

RxJava的不完整回调

Java 8引入函数式编程方式大大地提高了编码效率。但是,Java8的函数式编程有一个非常重要的要求:需要函数式接口作为支撑。什么是函数式接口呢?指的是有且只有一个抽象方法的接口,比如Java中内置的Runnable接口。

RxJava的一大特色是支持函数式的编程。由于标准的Subscriber观察者接口有3个抽象方法,当然就不是一个函数式接口,因此直接使用Subscriber观察者接口是不支持函数式编程的。

RxJava为了支持函数式编程,另外定义了几个函数式接口,比较重要的有Action0和Action1。

1.Action0回调接口

这是一个无参数、无返回值的函数式接口,源码如下:

package rx.functions; /** *A zero-argument action. */ public interface Action0 extends Action { void call(); }

Action0接口的call()方法无参数、无返回值,它的具体使用场景对应于Subscriber观察者中的onCompleted()回调方法的使用场景,因为Subscriber的onCompleted()回调方法也是无参数、无返回值的。

2.Action1回调接口

这是一个有一个参数、泛型、无返回值的函数式接口,源码如下:

package rx.functions; /** *A one-argument action. *@param <T> the first argument type */ public interface Action1<T> extends Action { void call(T t); }

Action1回调接口主要有以下两种用途:

(1)作为函数式编程替代使用Subscriber的onNext()方法的传统编程,前提是Action1回调接口的泛型类型与Subscriber的onNext()回调方法的参数类型保持一致。

(2)作为函数式编程替代使用Subscriber的onErrorAction(Throwable e)方法的传统编程,前提是Action1回调接口的泛型类型与Subscriber的onErrorAction()回调方法的参数类型保持一致。

Action1接口承担的主要是观察者(订阅者)角色,所以RxJava为主题类提供了重载的subscribe(Action1 action)订阅方法,可以接收一个Action1回调接口的实现对象作为弹射消息序列的订阅者。

下面使用不完整回调实现4.1.3节的例子,大家可以对比一下。具体的源码如下:

package com.crazymaker.demo.observerPattern; //省略import @Slf4j public class RxJavaObserverDemo { /** 演示 中的不完整观察者 *演示RxJava中的不完整观察者 */ @Test public void rxJavaActionDemo() { //被观察者(主题) Observable observable = Observable.create( new Action1<Emitter<String>>() { @Override public void call(Emitter<String> emitter) { emitter.onNext("apple"); emitter.onNext("banana"); emitter.onNext("pear"); emitter.onCompleted(); } },Emitter.BackpressureMode.NONE); Action1<String> onNextAction = new Action1<String>() { @Override public void call(String s) { log.info(s); } }; Action1<Throwable> onErrorAction = new Action1<Throwable>() { @Override public void call(Throwable throwable) { log.info("onError,Error Info is:" throwable.getMessage()); } }; Action0 onCompletedAction = new Action0() { @Override public void call() { log.info("onCompleted"); } }; log.info("第1次订阅:"); //根据onNextAction来定义onNext() observable.subscribe(onNextAction); log.info("第2次订阅:"); //根据onNextAction来定义onNext(),根据onErrorAction来定义onError() observable.subscribe(onNextAction, onErrorAction); log.info("第3次订阅:"); //根据onNextAction来定义onNext(),根据onErrorAction来定义onError() //根据onCompletedAction来定义onCompleted() observable.subscribe(onNextAction, onErrorAction, onCompletedAction); } }

运行这个示例程序,结果如下:

11:06:22.015 [main] INFO c.c.d.o.RxJavaObserverDemo - 第1次订阅: 11:06:22.015 [main] INFO c.c.d.o.RxJavaObserverDemo - apple 11:06:22.015 [main] INFO c.c.d.o.RxJavaObserverDemo - banana 11:06:22.015 [main] INFO c.c.d.o.RxJavaObserverDemo - pear 11:06:22.015 [main] INFO c.c.d.o.RxJavaObserverDemo - 第2次订阅: 11:06:22.015 [main] INFO c.c.d.o.RxJavaObserverDemo - apple 11:06:22.016 [main] INFO c.c.d.o.RxJavaObserverDemo - banana 11:06:22.016 [main] INFO c.c.d.o.RxJavaObserverDemo - pear 11:06:22.016 [main] INFO c.c.d.o.RxJavaObserverDemo - 第3次订阅: 11:06:22.016 [main] INFO c.c.d.o.RxJavaObserverDemo - apple 11:06:22.016 [main] INFO c.c.d.o.RxJavaObserverDemo - banana 11:06:22.016 [main] INFO c.c.d.o.RxJavaObserverDemo - pear 11:06:22.016 [main] INFO c.c.d.o.RxJavaObserverDemo – onCompleted

在上面的代码中,observable被订阅了3次,由于没有异常消息,因此从输出中只能看到正常消息和结束消息。

总之,RxJava提供的Action0回调接口和Action1回调接口可以看作Subscriber观察者接口的阉割版本和函数式编程版本。使用RxJava的不完整回调观察者接口并结合Java 8的函数式编程,能够编写出更为简洁和灵动的代码。

RxJava的函数式编程

有了Action0和Action1这两个函数式接口,就可以使用RxJava进行函数式编程了。下面使用函数式编程的风格实现上节的例子,大家对比一下。

public class RxJavaObserverDemo { ... /** *演示RxJava中的Lamda表达式实现 */ @Test public void rxJavaActionLamda() { Observable<String> observable = Observable.just("apple", "banana", "pear"); log.info("第1次订阅:"); //使用Action1 函数式接口来实现onNext回调 observable.subscribe(s -> log.info(s)); log.info("第2次订阅:"); //使用Action1 函数式接口来实现onNext回调 //使用Action1 函数式接口来实现onError回调 observable.subscribe( s -> log.info(s), e -> log.info("Error Info is:" e.getMessage())); log.info("第3次订阅:"); //使用Action1 函数式接口来实现onNext回调 //使用Action1 函数式接口来实现onError回调 //使用Action0 函数式接口来实现onCompleted回调 observable.subscribe( s -> log.info(s), e -> log.info("Error Info is:" e.getMessage()), () -> log.info("onCompleted弹射结束")); } }

运行这个示例程序,输出的结果和4.1.4节的示例程序的输出结果是一致的,所以这里不再赘述。对比4.1.4节的程序可以看出,RxJava的函数式编程比普通的Java编程简洁很多。

实际上,在RxJava源码中,Observable类的subscribe()订阅方法的重载版本中使用的是一个ActionSubscriber包装类实例,对3个函数式接口实例进行包装。所以,最终的消息订阅者还是一个Subscriber类型的实例。下面是Observable类的一个重载的subscribe(...)订阅方法的源码,具体如下:

public final Subscription subscribe(final Action1<? super T> onNext, final Action1<Throwable> onError, final Action0 onCompleted) { if (onNext == null) { throw new IllegalArgumentException("onNext can not be null"); } if (onError == null) { throw new IllegalArgumentException("onError can not be null"); } if (onCompleted == null) { throw new IllegalArgumentException("onComplete can not be null"); } //通过包装类进行包装 return subscribe(new ActionSubscriber<T>(onNext, onError, onCompleted)); }

上面的源码中用到的ActionSubscriber类是Subscriber接口的一个实现类,主要用于包装3个函数式接口的实现。

RxJava的操作符

RxJava的操作符实质上是为了方便数据流的操作,是RxJava为Observable主题所定义的一系列函数。

RxJava的操作符按照其作用具体可以分为以下几类:

(1)创建型操作符:创建一个可观察对象Observable主题对象,并根据输入参数弹射数据。

(2)过滤型操作符:从Observable弹射的消息流中过滤出满足条件的消息。

(3)转换型操作符:对Observable弹射的消息执行转换操作。

(4)聚合型操作符:对Observable弹射的消息流进行聚合操作,比如统计数量等。

本文给大家讲解的内容是SpringCloudRPC远程调用核心原理: RxJava响应式编程框架,从基础原理讲起:观察者模式
  1. 下篇文章给大家讲解的是SpringCloudRPC远程调用核心原理: RxJava响应式编程框架,创建型操作符;
  2. 觉得文章不错的朋友可以转发此文关注小编;
  3. 感谢大家的支持!
,

免责声明:本文仅代表文章作者的个人观点,与本站无关。其原创性、真实性以及文中陈述文字和内容未经本站证实,对本文以及其中全部或者部分内容文字的真实性、完整性和原创性本站不作任何保证或承诺,请读者仅作参考,并自行核实相关内容。文章投诉邮箱:anhduc.ph@yahoo.com

    分享
    投诉
    首页