分布式系统中保持网络稳定的五种方式
- 重试模式
-
超时模式
-
断路器模式
-
握手模式
-
隔离壁模式
倘若分布式系统的可靠性由一个极弱的控件决定,那么一个很小的内部功能都可能导致整个系统不稳定。了解稳定模式如何预知分布式网络热点,进而了解应用于Jersey和RESTEasy RESTFUL事务中的五种模式。
要实现高可用、高可靠分布式系统,需要预测一些可不预测的状况。假设你运行规模更大的软件系统,产品发布之后迟早会面临的各种突发状况,一般你会发现两个重要的漏洞。第一个和功能相关,比如计算错误、或者处理和解释数据的错误。这类漏洞很容易产生,通常产品上线前这些bug都会被检测到并得到处理。
第二类漏洞更具挑战性,只有在特定的底层框架下这些漏洞才会显现。因此这些漏洞很难检测和重现,通常情况下不易在测试中发现。相反的,在产品上线运行时几乎总会遇到这些漏洞。更好的测试以及软件质量控制可以提高漏洞移除的机率,然而这并不能确保你的代码没有漏洞。
最坏的情况下,代码中的漏洞会触发系统级联错误,进而导致系统致命的失败。特别是在分布式系统中,其服务位于其它服务与客户端之间。
稳定分布式操作系统的网络行为
系统致命失败热点首要是网络通信。不幸的是,分布式系统的架构师和设计者常常以错误的方式假设网络行为。二十年前,L. Peter Deutsch和其他Sun公司同事就撰文分布式错误,直到今天依然普遍存在。
1.网络是可靠的
2.零延迟
3.无限宽带
4.网络是安全
5.不变的拓扑结构
6.只有一个管理员
7.传输成本为零
8.同质化的网络
今天的多数开发人员依赖RESTFUL系统解决分布式系统网络通信带来的诸多挑战。REST最重要的特点是,它不会隐藏存在高层的RPC桩(Stub)后面的网络通信限制。但RESTful接口和终端不能单独确保系统内在的稳定性,还需要做一些额外的工作。
本文介绍了四种稳定模式来解决分布式系统中常见的失败。本文关注REStful终端,不过这些模式也能应用于其他通信终端。本文应用的模式来自Michael Nygard的书,Release It! Design and Deploy Production-Ready Software。示例代码和demo是自己的。
下载本文源代码,Gregor Roth在2014年10月JavaWorld大会上关于稳定模式在RESTful架构中的应用的源代码。
应用稳定模式
稳定模式(Stability Pattern)用来提升分布式系统的弹性,利用我们熟知的网络行为热点去保护系统免遭失败。本文所引用的模式用来保护分布式系统在网络通信中常见的失败,网络通信中的集成点比如Socket、远程方法调用、数据库调用(数据库驱动隐藏了远程调用)是第一个系统稳定风险。用这些模式能避免一个分布式系统仅仅因为系统的一部分失败而宕机。
网店demo
在线电子支付系统通常没有新的客户数据。相反,这些系统常常基于新用户住址信息为外部在线信用评分检查。基于用户信用得分,网店demo应用决定采用哪种支付手段(信用卡、PayPal账户、预付款或者发票)。
这个demo解决了一个关键场景:如果信用检测失败会发生什么?订单应该被拒绝么?多数情况下,支付系统回退接收一个更加可靠的支付方式。处理这种外部控件失败即是一种技术也是一种业务决策,它需要在失去订单和一个爽约支付可能之间做出权衡。
图1显示了网店系统蓝图
图1 电子支付系统流程图
网店应用采用内部支付服务决定选用何种支付方式,即支付服务提供针对某个用户支付信息以及采用何种支付方式。本例中服务采用RESTful方式实现,意味着诸如GET或者POST的HTTP方法会被显示调用,进而由URI对服务资源进行处理。此方法在JAX-RS 2.0特殊注解所在代码样品中同样有体现。JAX-RS 2.0文档实现了REST与Java的绑定,并作为Java企业版本平台。
列表1、采用何种支付手段
@Singleton 2.@Path("/") 3.public class PaymentService { 4. // ... 5. private final PaymentDao paymentDao; 6. private final URI creditScoreURI; 7. private final static Function<Score, ImmutableSet<PaymentMethod>> SCORE_TO_PAYMENTMETHOD = score -> { 8. switch (score) { 9. case Score.POSITIVE: 10. return ImmutableSet.of(CREDITCARD, PAYPAL, PREPAYMENT, INVOCE); 11. case Score.NEGATIVE: 12. return ImmutableSet.of(PREPAYMENT); 13. default: 14. return ImmutableSet.of(CREDITCARD, PAYPAL, PREPAYMENT); 15. } 16. }; 17. @Path("paymentmethods") 18. @GET 19. @Produces(MediaType.APPLICATION_JSON) 20. public ImmutableSet<PaymentMethod> getPaymentMethods(@QueryParam("addr") String address) { 21. Score score = Score.NEUTRAL; 22. try { 23. ImmutableList<Payment> payments = paymentDao.getPayments(address, 50); 24. score = payments.isEmpty() 25. ? restClient.target(creditScoreURI). queryParam("addr", address).request().get(Score.class) 26. : (payments.stream().filter(payment -> payment.isDelayed()).count() >= 1) ? Score.NEGATIVE : Score.POSITIVE; 27. } catch (RuntimeException rt) { 28. LOG.fine("error occurred by calculating score. Fallback to " + score + " " + rt.toString()); 29. } 30. return SCORE_TO_PAYMENTMETHOD.apply(score); 31. } 32. @Path("payments") 33. @GET 34. @Produces(MediaType.APPLICATION_JSON) 35. public ImmutableList<Payment> getPayments(@QueryParam("userid") String userid) { 36. // ... 37. } 38. // ... 39.}
|
列表1中 getPaymentMethods() 方法绑定了URI路径片段paymentmethods,这样就会得到诸如 http: //myserver/paymentservice/paymentmethods的URI。@GET注解定义了注解方法,若一个HTTP GET请求过来,就会被这个URI所接收。网店应用调用 getPaymentMethods(),借助用户过往的信用历史,为用户的可靠性打分。倘若没有历史数据,一个信用评级服务会被调用。对于本例集成点的异常,系统采用getPaymentMethods() 来降级。即便这样会从一个未知或授信度低客户那里接收到一个更不稳定的支付方法。如果内部的 paymentDao 查询或者 creditScoreURI 查询失败,getPaymentMethods() 会返回缺省的支付方式。
重试模式
Apache HttpClient以及其它的网络客户端实现了一些稳定特性。比如,客户端在某些场景内部反复执行。这个策略有助于处理瞬时网络错误,比如断掉连接,或者服务器宕机。但重试无助于解决永久性错误,这会浪费客户端和服务器双方的资源和时间。
现在来看如何应用四种常用稳定模式解决存在外部信用评分组件中的不稳定错误。
使用超时模式
一种简单却极其有效的稳定模式就是利用合适的超时,Socket编程是一种基础技术,使得软件可以在TCP/IP网络上通信。本质上说,Socket API定义了两种超时类型:
1.连接超时 指建立连接或者错误发生前消耗的最长时间。
2.Socket超时表示,连接建立以后两个连续数据包抵达客户端之间非活跃的最大周期。
列表1中,我用JAX-RS 2.0客户端接口调用信用评分服务。但怎样的超时周期才算合理呢?这个取决于你的JAX-RS供应商。比如,眼下的Jersey版本采用HttpURLConnection。但缺省的Jersey设定连接超时或者Socket超时为0毫秒,即超时是无限的,倘若你不认为这样设置有问题,请三思。
考虑到JAX-RS客户端会在一个服务器/servlet引擎中得到处理,利用一个工作线程池处理进来的HTTP请求。若我们利用经典的阻塞请求处理方法,列表1中的 getPaymentMethods() 会被线程池中一个排他的线程调用。在整个请求处理过程中,一个既定线程与请求处理绑定。如果内在的信用评分服务(由thecreditScoreURI提供地址)调用相应很慢,所有工作池中的线程最终会被挂起。接着,支付服务其它方法,比如getPayments() 会被调用。因为所有线程都在等待信用评分响应,这个请求没有被处理。最糟糕的可能是,一个不好的信用评分服务行为可能拖累整个支付服务功能。
实现超时:线程池 vs 连接池
合理的超时是可用性的基础。但JAX-RS 2.0客户端接口并没有定一个设置超时的接口,所以你不得不利用供应商提供的接口。下面的代码,我为Jersey设置了客户属性。
ClientConfig clientConfig = new ClientConfig(); // jersey specific 2. ClientConfig.connectorProvider(new ApacheConnectorProvider()); // jersey specific 3. RequestConfig reqConfig = RequestConfig.custom() // apache HttpClient specific 4. .setConnectTimeout(2000) 5. .setSocketTimeout(2000) 6. .setConnectionRequestTimeout(200) 7. .build(); 8. clientConfig.property(ApacheClientProperties.REQUEST_CONFIG, reqConfig); // jersey specific 9. restClient = ClientBuilder.newClient(clientConfig);
|
与Jersey不同,RESTEasy采用Apache HttpClient,比HttpURLConnection更加有效,Apache HttpClient支持连接池。连接池确保连接在处理完了一个HTTP事务之后,可以用来处理其它HTTP事务,假设该连接可以被看作持久连接。这个方式能减少建立新TCP/IP连接的开销,这一点很重要。
一个高负载系统内,单个HTTP客户端实例每秒处理成千上万的HTTP传出事务也并不罕见。
为了在Jersey中能够利用Apache HttpClient,如列表2所示,你需要设置ApacheConnectorProvider。注意在request-config定义中设置超时。
列表2、在Jersey中使用Apache HttpClient
ClientConfig clientConfig = new ClientConfig(); // jersey specific 2. ClientConfig.connectorProvider(new ApacheConnectorProvider()); // jersey specific 3. RequestConfig reqConfig = RequestConfig.custom()// apache HttpClient specific 4. .setConnectTimeout(2000) 5. .setSocketTimeout(2000) 6. .setConnectionRequestTimeout(200) 7. .build(); 8. clientConfig.property(ApacheClientProperties.REQUEST_CONFIG, reqConfig); // jersey specific 9. restClient = ClientBuilder.newClient(clientConfig);
|
注意,连接池特定连接请求超时同在上面的例子也有设置。连接请求超时表示,从发起一个连接请求到在HttpClient内在连接池管理返回一个请求连接花费的时间。缺省状态不限制超时,意味着连接请求调用时会一直阻塞直到连接变为可用,效果和无限连接、Socket超时一样。
利用Jersey的另一种方式,你可以间接通过RESTEasy设置连接请求超时,参见列表3。
列表3、在RESTEasy中设置连接超时
RequestConfig reqConfig = RequestConfig.custom() // apache HttpClient specific 2. .setConnectTimeout(2000) 3. .setSocketTimeout(2000) 4. .setConnectionRequestTimeout(200) 5. .build(); 6. CloseableHttpClient httpClient = HttpClientBuilder.create() 7. .setDefaultRequestConfig(reqConfig) 8. .build(); 9. Client restClient = new ResteasyClientBuilder(). httpEngine(new ApacheHttpClient4Engine(httpClient, true)).build(); // RESTEasy specific
|
我所展示的超时模式实现都是基于RESTEasy和Jersey,这两种RESTful网络服务框架都实现了JAX-RS 2.0。同时,我也展示了两种超时设置方法,JAX-RS 2.0供应商利用标准线程池或者连接池管理外部请求。
断路器模式
与超时限制系统资源消费不同,断路器模式(Circuit Breaker)更加积极主动。断路器诊断失败并防止应用尝试执行注定失败的活动。与HttpClient的重试模式不同,断路器模式可以解决持续化错误。
利用断路器存储客户端资源中那些注定失败的调用,如同存储服务器端资源一样。若服务器处在错误状态,比如过高的负载状态,多数情形下,服务器添加额外的负载就不太明智。
图2 断路器模式状态图
一个断路器可以装饰并且检测了一个受保护的功能调用。根据当前的状态决定调用时被执行还是回退。通常情况下,一个断路器实现三种类型的状态:open、half-open以及closed:
1.closed状态的调用被执行,事务度量被存储,这些度量是实现一个健康策略所必备的。
2.倘若系统健康状况变差,断路器就处在open状态。此种状态下,所有调用会被立即回退并且不会产生新的调用。open状态的目的是给服务器端回复和处理问题的时间。
3.一旦断路器进入一个open状态,超时计时器开始计时。如果计时器超时,断路器切换到half-open状态。在half-open状态调用间歇性执行以确定问题是否已解决。如果解决,状态切换回closed状态。
客户端断路器
图3展示了如何利用JAX-RS过滤器接口实现一个断路器,注意有好几处拦截请求的地方,比如HttpClient底层一个拦截器接口同样适用整合一个断路器。
图3 利用JAX-RS过滤器接口实现断路器
在客户端调用JAX-RS客户端接口register方法,设置一个断路器过滤器:
client.register(new ClientCircutBreakerFilter());
断路器过滤器实现了前置处理(pre-execution)和后置处理(post-execution)方法。在前置处理方法中,系统会检测请求执行是否允许。一个目标主机会用一个专门的断路器实例对应,避免产生副作用。如果调用允许,HTTP事务就会被记录在度量中。存在于后执行方法中事务度量对象分派结果给事务被关闭。一个5xx状态响应会被处理为成错误。
列表4、断路器模式中的前置和后置执行方法
public class ClientCircutBreakerFilter implements ClientRequestFilter, ClientResponseFilter { 2. // .. 3. @Override 4. public void filter(ClientRequestContext requestContext) throws IOException, CircuitOpenedException { 5. String scope = requestContext.getUri().getHost(); 6. if (!circuitBreakerRegistry.get(scope).isRequestAllowed()) { 7. throw new CircuitOpenedException("circuit is open"); 8. } 9. Transaction transaction = metricsRegistry.transactions(scope).openTransaction(); 10. requestContext.setProperty(TRANSACTION, transaction); 11. } 12. @Override 13. public void filter(ClientRequestContext requestContext, ClientResponseContext responseContext) throws IOException { 14. boolean isFailed = (responseContext.getStatus() >= 500); 15. Transaction.close(requestContext.getProperty(TRANSACTION), isFailed); 16. } 17.}
|
系统健康实现策略
基于列表4事务记录,一个断路器系统健康策略实现能够得到 totalRate/errorRate比率的度量。特别的是,逻辑健康同样需要考虑异常行为,比如在请求率极低的时候,健康策略可以忽视totalRate/errorRate比率。
列表5、健康策略逻辑
public class ErrorRateBasedHealthPolicy implements HealthPolicy { 2. // ... 3. @Override 4. public boolean isHealthy(String scope) { 5. Transactions recorded = metricsRegistry.transactions(scope).ofLast(Duration.ofMinutes(60)); 6. return ! ((recorded.size() > thresholdMinReqPerMin) && // check threshold reached? 7. (recorded.failed().size() == recorded.size()) && // every call failed? 8. (... )); // client connection pool limit almost reached? 9. } 10.}
|
倘若健康策略返回值为负,断路器会进入open、half-open状态。在这个简单的例子中百分之二的调用会检测服务器端是否处在正常状态。
列表6、健康响应策略
public class CircuitBreaker { 2. private final AtomicReference<CircuitBreakerState> state = new AtomicReference<>(new ClosedState()); 3. private final String scope; 4. // .. 5. public boolean isRequestAllowed() { 6. return state.get().isRequestAllowed(); 7. } 8. private final class ClosedState implements CircuitBreakerState { 9. @Override 10. public boolean isRequestAllowed() { 11. return (policy.isHealthy(scope)) ? true 12. : changeState(new OpenState()).isRequestAllowed(); 13. } 14. } 15. private final class OpenState implements CircuitBreakerState { 16. private final Instant exitDate = Instant.now().plus(openStateTimeout); 17. @Override 18. public boolean isRequestAllowed() { 19. return (Instant.now().isAfter(exitDate)) ? changeState(new HalfOpenState()).isRequestAllowed() 20. : false; 21. } 22. } 23. private final class HalfOpenState implements CircuitBreakerState { 24. private double chance = 0.02; // 2% will be passed through 25. @Override 26. public boolean isRequestAllowed() { 27. return (policy.isHealthy(scope)) ? changeState(new ClosedState()).isRequestAllowed() 28. : (random.nextDouble() <= chance); 29. } 30. } 31. // .. 32.}
|
服务器端断路器实现
断路器也可以在服务器端实现。服务器端过滤器范围作为目标运算取代目标主机。若目标运算处理出错,调用会携带一个错误状态立即回退。用服务端过滤器可以避免某个错误运算消耗过多资源。
列表1的 getPaymentMethods() 方法实现中,信用评分服务会被 creditScoreURI 在内部调用。然而,一旦内部信用评级服务调用响应很慢(设置了不恰当的超时),信用评分服务调用可能会在后台消耗掉Servlet引擎线程池中所有可用线程。这样,即便 getPayments() 不再查询信用评分服务,其它支付服务的远程运算,比如 getPayments() 都无法调用。
列表7、服务端断路器的过滤器
@Provider 2.public class ContainerCircutBreakerFilter implements ContainerRequestFilter, ContainerResponseFilter { 3. //.. 4. @Override 5. public void filter(ContainerRequestContext requestContext) throws IOException { 6. String scope = resourceInfo.getResourceClass().getName() + "#" + resourceInfo.getResourceClass().getName(); 7. if (!circuitBreakerRegistry.get(scope).isRequestAllowed()) { 8. throw new CircuitOpenedException("circuit is open"); 9. } 10. Transaction transaction = metricsRegistry.transactions(scope).openTransaction(); 11. requestContext.setProperty(TRANSACTION, transaction); 12. } 13. //.. 14.}
|
注意,与客户端的HealthPolicy不一样,服务端例子采用OverloadBasedHealthPolicy。本例中,一旦所有工作池中线程都处于活跃状态,超过百分之八十的线程被既定运算消费,并且超过最大慢速延迟阈值。接下来,运算会被认为有误。OverloadBasedHealthPolicy如下所示:
列表8、服务端OverloadBasedHealthPolicy
public class OverloadBasedHealthPolicy implements HealthPolicy { 2. private final Environment environment; 3. //... 4. @Override 5. public boolean isHealthy(String scope) { 6. // [1] all servlet container threads busy? 7. Threadpool pool = environment.getThreadpoolUsage(); 8. if (pool.getCurrentThreadsBusy() >= pool.getMaxThreads()) { 9. TransactionMetrics metrics = metricsRegistry.transactions(scope); 10. // [2] more than 80% currently consumed by this operation? 11. if (metrics.running().size() > (pool.getMaxThreads() * 0.8)) { 12. // [3] is 50percentile higher than slow threshold? 13. Duration current50percentile = metrics.ofLast(Duration.ofMinutes(3)).percentile(50); 14. if (thresholdSlowTransaction.minus(current50percentile).isNegative()) { 15. return false; 16. } 17. } 18. } 19. return true; 20. } 21.}
|
握手模式
断路器模式要么全部使用要么完全不用。根据记录指标的质量和粒度,另一种替代方法是提前检测过量负载状态。若检测到一个即将发生的过载,客户端能够被通知减少请求。在握手模式( Handshaking pattern)中,服务器会与客户端通信以便掌控自身工作负载。
握手模式通过一个负载均衡器为服务器提供常规系统健康更新。负载均衡器利用诸如 http: //myserver/paymentservice/~health 这样的健康检查URI决定那个服务器请求可以转发。出于安全的原因,健康检查页通常不提供公共因特网接入,所以健康检测的范围仅仅局限于公司内部通信。
与pull方式不同,另一种方式是添加一个流程控制头信息(header)给响应以实现一个服务器push方式。这样能够帮助服务器控制每个客户端的负载,当然需要对客户端做甄别。我在列表9添加了一个私有的客户端ID请求头信息,这个跟一个恰当的流控制响应头信息一样。
列表9、握手过滤器的流程控制头信息
@Provider 2.public class HandshakingFilter implements ContainerRequestFilter, ContainerResponseFilter { 3. // ... 4. @Override 5. public void filter(ContainerRequestContext requestContext) throws IOException { 6. String clientId = requestContext.getHeaderString("X-Client-Id"); 7. requestContext.setProperty(TRANSACTION, metricsRegistry.transactions(clientId).openTransaction()); 8. } 9. @Override 10. public void filter(ContainerRequestContext requestContext, ContainerResponseContext responseContext) throws IOException { 11. String clientId = requestContext.getHeaderString("X-Client-Id"); 12. if (flowController.isVeryHighRequestRate(clientId)) { 13. responseContext.getHeaders().add("X-FlowControl-Request", "reduce"); 14. } 15. Transaction.close(requestContext.getProperty(TRANSACTION), responseContext.getStatus() >= 500); 16. } 17.}
|
本例中,一旦某个度量超出阈值,服务器就会通知客户端减少请求。度量以客户端ID形式被记录下来,方便我们为某个特定客户端作配备定额资源。通常客户端会关闭诸如预获取或者暗示功能直接减少请求响应,这些功能需要后台请求。
隔离壁模式
在工业界隔离壁(Bulkhead)常常用来将船只或者飞机分割成几部件,一旦部件有裂缝部件可以进行加封。同理,在软件系统中利用隔离壁分割系统可以应对系统的级联错误。重要的是,隔离壁分派有限的资源给特定的客户端、应用、运算和客户终端等。
RESTful系统中的隔离壁
建立隔离壁或者系统分区方式有很多种,接下来我会一一展示。
每客户资源(Resources-per-client)是一种为特定客户端建立单个集群的隔离壁模式。比如图4是一个新的移动网店应用版本示意图。分割这些移动网店App可以确保蜂拥而来的移动状态请求不会对原始的网店应用产生副面影响。任何由移动App新请求引发的系统失败,都应该被限制在移动通道里面。
图4 移动网店应用
每应用资源(Resources-per-application)。如图5展示的那样,一个排他的隔离壁实现方式,比如,支付服务不仅利用信用评分服务,同时也利用汇率服务。如果这两种方式放在同一个容器中,不好的信用评分服务行为可能拆分汇率服务。从隔离壁的角度看,将每个应用放在各自的容器中,这样可以保护彼此不受干扰。
图5 应用分区
此种方式不好的地方就是一个既定资源池添加海量资源开销很大。不过虚拟化可以减少这种开销。
每操作资源(Resources-per-operation)是一种更加细粒度方式,分派单个系统资源给运算。比如,支付服务中的getAcceptedPaymentMethods() 运算运行有漏洞,getPayments() 运算依旧能处理。Netflix的Hystrix框架是支持这种细粒度隔离壁典型系统。
每终端资源(Resources-per-endpoint)为既定客户终端管理资源,比如在电子支付系统中单个客户端实例对应单个服务终端,如图6所示。
图6 终端分区
在本例中Apache HttpClient缺省状态最大可以利用20个网络连接,单个HTTP事务消费一个连接。利用经典的阻塞方式,最大连接数等于HttpClient 实例可以利用的最大线程数。下面的例子中,客户端可以消费30个连接数最多可利用30个线程。
列表10、隔离壁在系统终端控制资源应用
// ... 2. CloseableHttpClient httpClient = HttpClientBuilder.create() 3. .setMaxConnTotal(30) 4. .setMaxConnPerRoute(30) 5. .setDefaultRequestConfig(reqConfig) 6. .build(); 7. Client addrScoreClient = new ResteasyClientBuilder(). httpEngine(new ApacheHttpClient4Engine(httpClient, true)).build();// RESTEasy specific 8. CloseableHttpClient httpClient2 = HttpClientBuilder.create() 9. .setMaxConnTotal(30) 10. .setMaxConnPerRoute(30) 11. .setDefaultRequestConfig(reqConfig) 12. .build(); 13. Client exchangeRateClient = new ResteasyClientBuilder(). httpEngine(new ApacheHttpClient4Engine(httpClient2, true)).build();// RESTEasy specific
|
另外一种实现隔离壁模式的方式可以利用不同的maxConnPerRoute和maxConnTotal值,maxConnPerRoute可以限制特定主机的连接数。与两个客户端实例不同,单个客户端实例会限制每个目标主机的连接数。在本例中,你需要仔细观察线程池,比如服务器容器利用300个工作线程,配置内部已用客户端需要考虑最大空闲线程数。
Java8中的稳定模式:非阻塞异步调用
至今在多种模式和日常案例中,对线程的应用都是至关重要的一环,系统没有响应大都是线程引起的。由一个枯竭线程池引发的系统严重失败非常常见,在这个线程池中所有线程都被阻塞调用挂起,等待缓慢的响应。
Java8为大家提供了另一种支持lambda表达式的线程编程方式。lambda表达式通过更好的分布式计算响应方式,让Java非阻塞异步编程更容易。
响应式编程的核心原则就是事件驱动,即程序流由事件决定。与调用阻塞方法并且等到响应结果不同的是,事件驱动方式所定义的代码响应诸如响应接受等事件。挂起等待响应的线程就不再需要,程序中的handler代码会对事件做出响应。
列表11中,thenCompose()、exceptionally()、thenApply()和whenComplete() 方法都是响应式的。方法参数都是Java8函数,只要诸如处理完成或者有错误等特定事件发生,这些参数就会被异步处理。
列表11展示了列表1中一个彻底的异步、非阻塞的原始支付方法调用实现。本例中一旦请求被接收,数据库就会以匿名的方式被调用,这就意味着 getPaymentMethodsAsync() 方法调用迅速返回,无需等待数据库查询响应。一旦数据库响应请求,函数 thenCompose() 就会被处理,这个函数要么异步调用信用评级服务,要么返回基于用户先前支付记录的评分,接着分数会映射到所支持的支付方法上。
列表11、获得异步支付方法
@Singleton 2.@Path("/") 3.public class AsyncPaymentService { 4. // ... 5. private final PaymentDao paymentDao; 6. private final URI creditScoreURI; 7. public AsyncPaymentService() { 8. ClientConfig clientConfig = new ClientConfig(); // jersey specific 9. clientConfig.connectorProvider(new GrizzlyConnectorProvider()); // jersey specific 10. // ... 11. // use extended client (JAX-RS 2.0 client does not support CompletableFuture) 12. restClient = Java8Client.newClient(ClientBuilder.newClient(clientConfig)); 13. // ... 14. restClient.register(new ClientCircutBreakerFilter()); 15. } 16. @Path("paymentmethods") 17. @GET 18. @Produces(MediaType.APPLICATION_JSON) 19. public void getPaymentMethodsAsync(@QueryParam("addr") String address, @Suspended AsyncResponse resp) { 20. paymentDao.getPaymentsAsync(address, 50) // returns a CompletableFuture<ImmutableList<Payment>> 21. .thenCompose(pmts -> pmts.isEmpty() // function will be processed if paymentDao result is received 22. ? restClient.target(addrScoreURI).queryParam("addr", address). request().async().get(Score.class) // internal async http call 23. : CompletableFuture.completedFuture((pmts.stream(). filter(pmt -> pmt.isDelayed()).count() > 1) ? Score.NEGATIVE : Score.POSITIVE)) 24. .exceptionally(error -> Score.NEUTRAL) // function will be processed if any error occurs 25. .thenApply(SCORE_TO_PAYMENTMETHOD) // function will be processed if score is determined and maps it to payment methods 26. .whenComplete(ResultConsumer.write(resp)); // writes result/error into async response 27. } 28. // ... 29.}
|
注意,本实现中请求处理无需绑定在那个等待响应的线程上,是否意味着稳定模式无需这种响应模式?当然不是,我们依旧要实现这些稳定模式。
非阻塞模式需要非阻塞代码运行在调用路径中,比如,PaymentDao的某个漏洞引起某些特定情形下的阻塞行为,非阻塞协议就被打破,调用路径因此变成阻塞式。而且,一个工作池线程隐式地绑定在某个调用路径上,即使线程这会不是 连接/响应 管理等其他资源的瓶颈,也有可能成为下一个瓶颈。
最后结语
本文我所介绍的稳定模式描述了应对分布式系统级联失败的最佳实践。即便某个组件失败,在这种降级的模式下,系统依旧做既定的运算。
本文例子用于RESTful终端的应用架构,同样可以应用于其它通信终端。比如,很多系统包含数据库客户端,就不得不考虑这些。需要声明的是,本文没有阐述所有稳定相关模式。在一个产出很高的环境中,诸如Servlet容器这样的服务器处理需要管理者们监控,管理者追踪容器是否健康,一旦处理临近崩溃需要重启;很多例证表明,重启服务比让它处于活跃状态更有益,毕竟一个错误几乎没有响应的服务节点比一个移除的死节点更要命。
|