Java中的责任链模式
1.责任链模式又叫做职责链模式,赋予对象行为的设计模式,使多个参与者都有机会执行相应的逻辑,降低了耦合,平时开发过程中;我们其实已经接触到了责任链模式。只是是以另一种形态呈现:像switch、if-else,重构项目时对于这些首先想到的是责任链模式使结构更清晰,同样带来了一个问题。如果逻辑过于复杂,调用嵌套过深对性能是有一定的损失的。设计模式的多样化也表明了没有十全十美的设计模式。而我们需要掌握是思想。
2.责任链模式的UML图(网上爬的):
3.通过类图中可以明确的是,责任链模式包含了抽象的处理类Handler、客户端调用Client、具体的处理实现类ConcreteHandler。
4.以生活中职员请假为例子,在公司中通常请假是根据天数来确定审批的流程,三天、五天或者一个星期需要最终批准的上层是不一样的。
抽象的处理类Handler
1 | public abstract class Handler { |
TeamLeader处理类
1 | public class TeamLeader extends Handler { |
ProjectManager
1 | public class ProjectManager extends Handler { |
DepartmentManager
1 | public class DepartmentManager extends Handler { |
Client调用
1 | public class Client { |
打印信息:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22第1次请假请假天数 = 3
TeamLeader agree the approve and the leave days is 3
approve success!
第2次请假请假天数 = 5
Approve days is above 3 days, ask the ProjectManager to handler!
ProjectManager agree the approve and the leave days is 5
approve success!
第3次请假请假天数 = 7
Approve days is above 3 days, ask the ProjectManager to handler!
Approve days is above 3 days, ask the DepartmentManager to handler!
DepartmentManager agree the approve and the leave days is 7
approve success!
第4次请假请假天数 = 10
Approve days is above 3 days, ask the ProjectManager to handler!
Approve days is above 3 days, ask the DepartmentManager to handler!
Approve too many days? Please work hard!
approve failed!
Process finished with exit code 0
OkHttp
1.对Java中的责任链首先有个意识,在看OkHttp中,开发请求网络框架,基本上都会接触OkHttp,其中就用到了责任链模式。具体看源码,就是想理解这种思想的运用,对比优秀的实现,在实际开发过程中或多或少会起到启发的作用。
2.OkHttpClient
相当于我们网络的配置控制中心,包括一些基础的配置,连接池、重试、桥接、协议等,主要配置:Dispatcher(线程调度)
设定了最大请求数,单个Host的最大请求数。Protocols
支持的协议,HTTP/1.1、HTTP/2
。ConnectionSpec
对于Socket的设置信息,明确是明文传输的HTTP,还是TLS的HTTPS。Interceptor
,核心链,包含了几个重要的拦截器。当然还包括其他的一些基础参数,不在赘述。
newCall
1.从实际运用的例子入手:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18private static void connectNet() {
OkHttpClient client = new OkHttpClient();
Request request = new Request.Builder()
.url("https://www.baidu.com")
.build();
//异步请求
client.newCall(request).enqueue(new Callback() {
public void onFailure(Call call, IOException e) {
System.out.println("Failed----->" + e.getMessage());
}
public void onResponse(Call call, Response response) throws IOException {
System.out.println("Success----->" + response.toString());
}
});
}
newCall(request)
方法返回的是一个RealCall
对象,实现了Call
的接口,当调用RealCall.execute()
时:
RealCall.getResponseWithInterceptorChain()会被调用,发起网络请求并拿到返回的响应值Response,同样的异步请求RealCall.enqueue()的调用也是大同小异的,主要区别在于Dispatcher的介入,通过线程池的调用将请求加入后台,实际上也是对getResponseWithInterceptorChain()的调用。另外不同的是,对于请求队列的维护是不同的(Dispatcher)中。
getResponseWithInterceptorChain(),计数核心方法,也是OkHttp责任链模式的核心方法,主要的工作就是将多个Interceptorr组装起来(List),创建一个RealInterceptorChain对象,而chain.proceed(request)一步步推荐链的执行,一步步分析。
Dispatcher
1.几个重要的参数:1
2
3
4
5
6
7
8
9
10private int maxRequests = 64;
private int maxRequestsPerHost = 5;
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
/** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
/** Running synchronous calls. Includes canceled calls that haven't finished yet. */
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
2.定义了最大异步请求数为64,而单个Host最大的请求数为5,同时对于异步请求实现了两个双端队列来保存请求runningAsyncCalls、readyAsyncCalls
;这个很好理解,当我们的请求已经达到最大值64(或者Host为5),那么此时要是有新的请求过来当然是要将请求先保存起来。对于早期的处理逻辑,当有请求过来时,先判断是否达到请求阀值。决定将请求放入哪个队列当中,在新版本中这个逻辑已经被修改了。同样的对于同步请求是直接入队列runningSyncCalls
。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51//早期的异步请求入队列操作
synchronized void enqueue(AsyncCall call) {
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
runningAsyncCalls.add(call);
getExecutorService().execute(call);
} else {
readyAsyncCalls.add(call);
}
}
//后期版本中是直接先入到readyAsyncCalls中,当然主要逻辑还是一样的
void enqueue(AsyncCall call) {
synchronized (this) {
readyAsyncCalls.add(call);
// Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
// the same host.
if (!call.get().forWebSocket) {
AsyncCall existingCall = findExistingCallWithHost(call.host());
if (existingCall != null) call.reuseCallsPerHostFrom(existingCall);
}
}
//在此方法中处理判断是否达到阀值决定是否要加入到runningAsyncCalls。
promoteAndExecute();
}
private boolean promoteAndExecute() {
assert (!Thread.holdsLock(this));
List<AsyncCall> executableCalls = new ArrayList<>();
boolean isRunning;
synchronized (this) {
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall asyncCall = i.next();
if (runningAsyncCalls.size() >= maxRequests) break; // Max capacity.
if (asyncCall.callsPerHost().get() >= maxRequestsPerHost) continue; // Host max capacity.
i.remove();
asyncCall.callsPerHost().incrementAndGet();
executableCalls.add(asyncCall);
runningAsyncCalls.add(asyncCall);
}
isRunning = runningCallsCount() > 0;
}
for (int i = 0, size = executableCalls.size(); i < size; i++) {
AsyncCall asyncCall = executableCalls.get(i);
asyncCall.executeOn(executorService());
}
return isRunning;
}
3.同步方法的调用,getResponseWithInterceptorChain,直接请求返回Response。1
2
3
4
5
6
7
8
9
10
11
12
13
14public Response execute() throws IOException {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
transmitter.timeoutEnter();
transmitter.callStart();
try {
client.dispatcher().executed(this);
return getResponseWithInterceptorChain();
} finally {
client.dispatcher().finished(this);
}
}
异步请求关键类-AsyncCall
1.当我们发起异步请求时:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40client.newCall(request).enqueue();
public void enqueue(Callback responseCallback) {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
transmitter.callStart();
//AsyncCall继承了抽象类NamedRunnable(实现了Runnable接口),其实就是线程的实现,对于run()方法中的具体逻辑,增加了抽象方法execute(),看看具体实现。
client.dispatcher().enqueue(new AsyncCall(responseCallback));
}
AsyncCall#execute()
protected void execute() {
boolean signalledCallback = false;
transmitter.timeoutEnter();
try {
//无论异步请求还是同步请求,本质都是对getResponseWithInterceptorChain()调用,只是异步请求增加了线程的管理与调度。
Response response = getResponseWithInterceptorChain();
signalledCallback = true;
responseCallback.onResponse(RealCall.this, response);
} catch (IOException e) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
responseCallback.onFailure(RealCall.this, e);
}
} catch (Throwable t) {
cancel();
if (!signalledCallback) {
IOException canceledException = new IOException("canceled due to " + t);
canceledException.addSuppressed(t);
responseCallback.onFailure(RealCall.this, canceledException);
}
throw t;
} finally {
client.dispatcher().finished(this);
}
}
关键方法-getResponseWithInterceptorChain()
1.RealCall#getResponseWithInterceptorChain()1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41Response getResponseWithInterceptorChain() throws IOException {
// Build a full stack of interceptors.
//拦截器集合
List<Interceptor> interceptors = new ArrayList<>();
//用户可以自定拦截器,这里保存了用户自定义的拦截器
interceptors.addAll(client.interceptors());
//重试拦截器
interceptors.add(new RetryAndFollowUpInterceptor(client));
//桥接拦截器,包括gzip的压缩,host信息的设置等
interceptors.add(new BridgeInterceptor(client.cookieJar()));
//缓存请求
interceptors.add(new CacheInterceptor(client.internalCache()));
//这个拦截器的代码量很少,主要就是与服务器建立链接TCP链接或TCP-TLS链接
interceptors.add(new ConnectInterceptor(client));
if (!forWebSocket) {
interceptors.addAll(client.networkInterceptors());
}
//责任链尾,实质的请求与I/O操作,将请求数据写入Socket中,并从Socket读取响应数据(TCP/TCP-TLS对应的端口)。
interceptors.add(new CallServerInterceptor(forWebSocket));
Interceptor.Chain chain = new RealInterceptorChain(interceptors, transmitter, null, 0,
originalRequest, this, client.connectTimeoutMillis(),
client.readTimeoutMillis(), client.writeTimeoutMillis());
boolean calledNoMoreExchanges = false;
try {
Response response = chain.proceed(originalRequest);
if (transmitter.isCanceled()) {
closeQuietly(response);
throw new IOException("Canceled");
}
return response;
} catch (IOException e) {
calledNoMoreExchanges = true;
throw transmitter.noMoreExchanges(e);
} finally {
if (!calledNoMoreExchanges) {
transmitter.noMoreExchanges(null);
}
}
}
2.我们的Response经过核心方法getResponseWithInterceptorChain()的包装,最终拿到了想要的结果,这里是OkHttp责任链模式的核心,设计的很巧妙。看看具体做了哪些操作。
RetryAndFollowUpInterceptor
1.拦截器都实现了统一的接口Interceptor,看其中的关键方法:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80//定义了默认最大重试次数20次
private static final int MAX_FOLLOW_UPS = 20;
public Response intercept(Chain chain) throws IOException {
Request request = chain.request();
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Transmitter transmitter = realChain.transmitter();
int followUpCount = 0;
Response priorResponse = null;
while (true) {
transmitter.prepareToConnect(request);
if (transmitter.isCanceled()) {
throw new IOException("Canceled");
}
Response response;
boolean success = false;
try {
//分界点,包括其他的拦截器,在责任链传递之前所做的工作都是前序工作,之后将request继续下发
response = realChain.proceed(request, transmitter, null);
//此后便是拦截器的后序工作,需要注意的是,并不是每次都会走完所有的拦截器,如cacheInterceptor,当有缓存存在(开启缓存),那么之后的拦截就不在继续传递。
success = true;
} catch (RouteException e) {
// The attempt to connect via a route failed. The request will not have been sent.
if (!recover(e.getLastConnectException(), transmitter, false, request)) {
throw e.getFirstConnectException();
}
continue;
} catch (IOException e) {
// An attempt to communicate with a server failed. The request may have been sent.
boolean requestSendStarted = !(e instanceof ConnectionShutdownException);
if (!recover(e, transmitter, requestSendStarted, request)) throw e;
continue;
} finally {
// The network call threw an exception. Release any resources.
if (!success) {
transmitter.exchangeDoneDueToException();
}
}
// Attach the prior response if it exists. Such responses never have a body.
if (priorResponse != null) {
response = response.newBuilder()
.priorResponse(priorResponse.newBuilder()
.body(null)
.build())
.build();
}
Exchange exchange = Internal.instance.exchange(response);
Route route = exchange != null ? exchange.connection().route() : null;
Request followUp = followUpRequest(response, route);
if (followUp == null) {
if (exchange != null && exchange.isDuplex()) {
transmitter.timeoutEarlyExit();
}
return response;
}
RequestBody followUpBody = followUp.body();
if (followUpBody != null && followUpBody.isOneShot()) {
return response;
}
closeQuietly(response.body());
if (transmitter.hasExchange()) {
exchange.detachWithViolence();
}
if (++followUpCount > MAX_FOLLOW_UPS) {
throw new ProtocolException("Too many follow-up requests: " + followUpCount);
}
request = followUp;
priorResponse = response;
}
}
2.(RealInterceptorChain)关键方法proceed
,之前已经了解,在RealCall是整个链开始传递的起点:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43//RealCall,可以看到index为0,我们所有的拦截都被保存在list集合之中,可以发现后序的取interceptor都是基于这个index自增来获取。
//这也是精妙之处
Interceptor.Chain chain = new RealInterceptorChain(interceptors, transmitter, null, 0,
originalRequest, this, client.connectTimeoutMillis(),
client.readTimeoutMillis(), client.writeTimeoutMillis());
//RealInterceptorChain#proceed()
public Response proceed(Request request, Transmitter transmitter, @Nullable Exchange exchange)
throws IOException {
if (index >= interceptors.size()) throw new AssertionError();
calls++;
// If we already have a stream, confirm that the incoming request will use it.
if (this.exchange != null && !this.exchange.connection().supportsUrl(request.url())) {
throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
+ " must retain the same host and port");
}
// If we already have a stream, confirm that this is the only call to chain.proceed().
if (this.exchange != null && calls > 1) {
throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
+ " must call proceed() exactly once");
}
// Call the next interceptor in the chain.
//这里对index做了自增操作,因为每次实例化RealInterceptorChain,传入的都是初始的interceptor集合,当每次调用proceed时都对index操作,这样
//我们的request就被一步步传递下去直到链尾。
RealInterceptorChain next = new RealInterceptorChain(interceptors, transmitter, exchange,
index + 1, request, call, connectTimeout, readTimeout, writeTimeout);
Interceptor interceptor = interceptors.get(index);
Response response = interceptor.intercept(next);
// Confirm that the next interceptor made its required call to chain.proceed().
if (exchange != null && index + 1 < interceptors.size() && next.calls != 1) {
throw new IllegalStateException("network interceptor " + interceptor
+ " must call proceed() exactly once");
}
// Confirm that the intercepted response isn't null.
if (response == null) {
throw new NullPointerException("interceptor " + interceptor + " returned null");
}
if (response.body() == null) {
throw new IllegalStateException(
"interceptor " + interceptor + " returned a response with no body");
}
return response;
}
ConnectInterceptor
1.主要就是与服务器建立链接TCP链接或TCP-TLS链接,这个比较特殊,之前提到拦截的前序操作基于调用方法realChain.proceed();
之前,但是这个是没有后序
操作的:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20/** Opens a connection to the target server and proceeds to the next interceptor. */
//代码量很少,建立链接
public final class ConnectInterceptor implements Interceptor {
public final OkHttpClient client;
public ConnectInterceptor(OkHttpClient client) {
this.client = client;
}
public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Request request = realChain.request();
Transmitter transmitter = realChain.transmitter();
// We need the network to satisfy this request. Possibly for validating a conditional GET.
boolean doExtensiveHealthChecks = !request.method().equals("GET");
Exchange exchange = transmitter.newExchange(chain, doExtensiveHealthChecks);
//下发传递
return realChain.proceed(request, transmitter, exchange);
}
}
CallServerInterceptor
1 | /** This is the last interceptor in the chain. It makes a network call to the server. */ |
1.责任链尾,实质的请求与I/O操作,将请求数据写入Socket中,并从Socket读取响应数据(TCP/TCP-TLS对应的端口)。对于I/O
的操作是基于Okio
,OkHttp
的高效请求同样离不开Okio
的支持,这里先不细说。整理整个流程图就是:
Tip
1.OkHttp
作为优秀的Android
网络请求框架,是很值得深入研究的,这里主要对整个流程做一下梳理,具体的实现细节同样需要花大量的时间整理。对全局的把握有助于整个实现思路做到心中有数。