编辑推荐: |
本文详细的阐述了All-Reative全响应式架构以及在spring框架下使用WebSocket配置环境,希望可以为您的学习带来帮助。
本文来自于简书,由火龙果软件Alice编辑、推荐。 |
|
什么是响应式
目前响应式是一个极度被使用的名词。牛津词典定义响应式是”对刺激作出响应”,因此,响应式软件基于它接受到刺激作出响应、调节它的行为。百度百科给出的答案是响应式编程是一种面向数据流和变化传播的编程范式。而维基百科上对响应式编程是这样解释的:
In computing, reactive programming is a programming
paradigm oriented around data flows and the propagation
of change.
响应式编程:面向订阅数据流、响应变化并传播的一种编程范型。
响应式系统:一种基于异步消息传递的构建响应式、健壮的分布式系统的构架风格。
说起响应式编程,其实我们遇到过。在Excel中,我们让B1=A1/5,那么一旦我们修改了A1单元格的值,B1就会随着改变。即,B1响应了A1的变化,这就是响应式。
在Web应用中,这样的场景更多,UI上的每个事件,都会造成一系列改变。我们可以通过注册回调函数来解决这个问题,可是,异步编程控制起来是一个难题,例如callback
hell,于是,ES6加入了Promise,ES7提案了Await/Async等解决方案。
而Rx采用了另外的方式,它结合了观察者模式和迭代器模式,用了一种称为Observable的数据结构。Observable是一层抽象,封装了同步数据和异步数据,通过Observable之间进行变换,将最终结果反映到观察者那里。
(摘自:你所不知道的响应式编程,用Java构建响应式微服务)
全响应式架构实现
整个架构从数据到后端到前端全部使用响应式编程风格,核心在于Redis支持的发布订阅模式、后端的响应式编程框架、双工通讯的WebSocket连接、支持响应式编程的前端框架。
Redis
PUBLISH命令发送消息,其返回值为接收到该消息的订阅者的数量。
SUBSCRIBE命令订阅某个频道,其返回值包括客户端订阅的频道,目前已订阅的频道数量,以及接收到的消息,其中subscribe表示已经成功订阅了某个频道。
PSUBSCRIBE订阅符合某个模式所有频道,用“”表示模式,“”可以被任意值代替。 假设客户端同时订阅了某种模式和符合该模式的某个频道,那么发送给这个频道的消息将被客户端接收到两次,只不过这两条消息的类型不同,一个是message类型,一个是pmessage类型,但其内容相同。
UNSUBSCRIBE和PUNSUBSCRIBE命令取消订阅,其返回值与订阅类似。
这里使用Jedis实现Redis与后端通信
首先Maven中引入依赖
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency> |
然后通过继承JedisPubSub类实现一个订阅者
public class
Subscriber extends JedisPubSub {
public Subscriber(){}
@Override
public void onMessage(String channel, String
message) { //收到消息会调用
System.out.println(String.format("receive
redis published message, channel %s, message
%s", channel, message));
}
@Override
public void onSubscribe(String channel, int
subscribedChannels) { //订阅了频道会调用
System.out.println(String.format("subscribe
redis channel success, channel %s, subscribedChannels
%d",
channel, subscribedChannels));
}
@Override
public void onUnsubscribe(String channel, int
subscribedChannels) { //取消订阅 会调用
System.out.println(String.format("unsubscribe
redis channel, channel %s, subscribedChannels
%d",
channel, subscribedChannels));
}
} |
最后再实现一个发布者
public class
Publisher {
private final JedisPool jedisPool;
public Publisher(JedisPool jedisPool) {
this.jedisPool = jedisPool;
}
public void send() {
Jedis jedis = jedisPool.getResource();
jedis.publish("one channel", line);
}
} |
RxJava
由于响应式编程和流式编程是一对好基友,所以后端采用ReactiveStream架构:
同时后端需要维护两个映射关系,用于确定将消息推送到哪些用户的浏览器前端:
Websocket
Websocket是HTML5规范中新增的一个持久化的协议。HTTP的生命周期通过 Request
来界定,也就是一个 Request 一个 Response ,那么在 HTTP1.0 中,这次HTTP请求就结束了。
在HTTP1.1中进行了改进,使得有一个keep-alive,也就是说,在一个HTTP连接中,可以发送多个Request,接收多个Response。但是请记住
Request = Response, 在HTTP中永远是这样,也就是说一个request只能有一个response。而且这个response也是被动的,不能主动发起。
首先Websocket是基于HTTP协议的,或者说借用了HTTP的协议来完成一部分握手。
首先我们来看个典型的 Websocket 握手:
GET /chat HTTP/1.1
Host: server.example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: x3JJHMbDL1EzLkh9GBhXDw==
Sec-WebSocket-Protocol: chat, superchat
Sec-WebSocket-Version: 13
Origin: http://example.com |
熟悉HTTP的同学可能发现了,这段类似HTTP协议的握手请求中,多了几个东西。我会顺便讲解下作用。
Upgrade: websocket
Connection: Upgrade |
这个就是Websocket的核心了,告诉 Apache 、 Nginx 等服务器发起的是Websocket协议。
Sec-WebSocket-Key:
x3JJHMbDL1EzLkh9GBhXDw==
Sec-WebSocket-Protocol: chat, superchat
Sec-WebSocket-Version: 13 |
首先, Sec-WebSocket-Key 是一个 Base64 encode 的值,这个是浏览器随机生成的,告诉服务器:我要验证服务器是不是真的是Websocket助理。
然后, Sec_WebSocket-Protocol 是一个用户定义的字符串,用来区分同URL下,不同的服务所需要的协议。
最后, Sec-WebSocket-Version 是告诉服务器所使用的 Websocket Draft
(协议版本),在最初的时候,Websocket协议还在 Draft 阶段,各种奇奇怪怪的协议都有,而且还有很多期奇奇怪怪不同的东西,什么Firefox和Chrome用的不是一个版本之类的,当初Websocket协议太多可是一个大难题。
然后服务器会返回下列东西,表示已经接受到请求, 成功建立Websocket。
HTTP/1.1 101
Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: HSmrc0sMlYUkAGmm5OPpG2HaGWk=
Sec-WebSocket-Protocol: chat |
这里开始就是HTTP最后负责的区域了,告诉客户,我已经成功切换协议啦~
Upgrade: websocket
Connection: Upgrade |
依然是固定的,告诉客户端即将升级的是 Websocket 协议,而不是mozillasocket,lurnarsocket或者shitsocket。
然后, Sec-WebSocket-Accept 这个则是经过服务器确认,并且加密过后的 Sec-WebSocket-Key
。 服务器:好啦好啦,知道啦,给你看我的ID CARD来证明行了吧。。
后面的, Sec-WebSocket-Protocol 则是表示最终使用的协议。
至此,HTTP已经完成它所有工作了,接下来就是完全按照Websocket协议进行了。
前端使用WebSocket
// 初始化一个 WebSocket
对象
var ws = new WebSocket("ws://localhost/websocket");
var wss = new WebSocket("wss://localhost/websocket");
// 建立 web socket 连接成功触发事件
ws.onopen = function () {
// 使用 send() 方法发送数据
ws.send("发送数据");
alert("数据发送中...");
};
// 接收服务端数据时触发事件
ws.onmessage = function (evt) {
var received_msg = evt.data;
alert("数据已接收...");
}; // 断开 web socket 连接成功触发事件
ws.onclose = function () {
alert("连接已关闭...");
}; |
假定我们使用了以上代码创建了 Socket 对象,那么该对象有以下属性:
Socket.readyState 只读属性 readyState 表示连接状态,可以是以下值:0
- 表示连接尚未建立。1 - 表示连接已建立,可以进行通信。2 - 表示连接正在进行关闭。3 - 表示连接已经关闭或者连接不能打开。
Socket.bufferedAmount 只读属性 bufferedAmount 已被 send()
放入正在队列中等待传输,但是还没有发出的 UTF-8 文本字节数。
后端使用WebSocket
这里假设在spring框架下使用WebSocket,那么首先添加依赖:
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-websocket</artifactId>
<version>${spring.version}</version>
</dependency> |
然后创建 WebSocket 处理器:
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.TextMessage;
public class MyHandler extends TextWebSocketHandler
{ @Override
public void handleTextMessage(WebSocketSession
session, TextMessage message) {
// ...
} } |
WebSocketHandler 源码如下,这意味着你的处理器大概可以处理哪些 WebSocket
事件:
public interface
WebSocketHandler {
/**
* 建立连接后触发的回调
*/
void afterConnectionEstablished(WebSocketSession
session) throws Exception;
/**
* 收到消息时触发的回调
*/
void handleMessage(WebSocketSession session,
WebSocketMessage<?> message) throws Exception; /**
* 传输消息出错时触发的回调
*/
void handleTransportError(WebSocketSession session,
Throwable exception) throws Exception;
/**
* 断开连接后触发的回调
*/
void afterConnectionClosed(WebSocketSession
session, CloseStatus closeStatus) throws Exception; /**
* 是否处理分片消息
*/
boolean supportsPartialMessages();
} |
最后配置WebSocket服务接口:
import org.springframework.web.socket.config. annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation. WebSocketConfigurer;
import org.springframework.web.socket.config.annotation. WebSocketHandlerRegistry;
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer
{ @Override
public void registerWebSocketHandlers(WebSocket HandlerRegistry
registry) {
registry.addHandler(myHandler(), "/myHandler");
} @Bean
public WebSocketHandler myHandler() {
return new MyHandler();
} } |
RxJS
RxJS是ReactiveX编程理念的JavaScript版本。ReactiveX来自微软,它是一种针对异步数据流的编程。简单来说,它将一切数据,包括HTTP请求,DOM事件或者普通数据等包装成流的形式,然后用强大丰富的操作符对流进行处理,使你能以同步编程的方式处理异步数据,并组合不同的操作符来轻松优雅的实现你所需要的功能。
这里我们使用RxJS实现一个前端的订阅发布中心服务,用来接收并处理来自WebSocket的onmessage事件传入的消息,然后根据消息的topic去通知订阅了这类消息的组件。事实上就是实现了一个简易的前后端消息队列。响应式架构前端.png
总结
在全响应式架构中,从前端到后端再到数据库都是基于事件驱动的,因此避免了大量的状态轮询操作;同时通过订阅的方式精确限定了事件传播的范围,
|