interceptors = getInterceptors();
if (!CollectionUtils.isEmpty(interceptors)) {
ClientHttpRequestFactory factory = this.interceptingRequestFactory;
if (factory == null) {
factory = new InterceptingClientHttpRequestFactory(super.getRequestFactory(), interceptors);
this.interceptingRequestFactory = factory;
}
return factory;
}
else {
return super.getRequestFactory();
}
}
/**
* Return the request interceptors that this accessor uses.
* The returned {@link List} is active and may get appended to.
*/
public List getInterceptors() {
return this.interceptors;
}
```
可以看到这里调用getInterceptors()方法在获取 ClientHttpRequestInterceptor http客户端请求拦截器 ,然后使用拦截器new 了一个 InterceptingClientHttpRequestFactory 工厂出来,他这里用到了请求拦截器,是想干嘛呢???(思考一下)
我们继续往下执行结束完getRequestFactory() 方法我们跟踪一下 getRequestFactory().createRequest(url, method); 的createRequest 方法
```
@Override
protected ClientHttpRequest createRequest(
URI uri, HttpMethod httpMethod, ClientHttpRequestFactory requestFactory) {
return new InterceptingClientHttpRequest(requestFactory, this.interceptors, uri, httpMethod);
}
```
没有任何悬念,断点跟踪下去通过调用了InterceptingClientHttpRequestFactory#createRequest方法来创建 l了一个InterceptingClientHttpRequest对象,而InterceptingClientHttpRequest肯定是ClientHttpRequest的实现类
```
/**
* Wrapper for a {@link ClientHttpRequest} that has support for {@link ClientHttpRequestInterceptor}s.
*
* @author Arjen Poutsma
* @since 3.1
*/
class InterceptingClientHttpRequest extends AbstractBufferingClientHttpRequest {
private final ClientHttpRequestFactory requestFactory;
private final List interceptors;
private HttpMethod method;
private URI uri;
protected InterceptingClientHttpRequest(ClientHttpRequestFactory requestFactory,
List interceptors, URI uri, HttpMethod method) {
this.requestFactory = requestFactory;
this.interceptors = interceptors;
this.method = method;
this.uri = uri;
}
```
注释:“Wrapper for a {@link ClientHttpRequest} that has support for {@link ClientHttpRequestInterceptor}s.” 告诉我们
InterceptingClientHttpRequest他是对ClientHttpRequest做了包装,并且ClientHttpRequestInterceptor提供支持,对HttpMethod (请求方式)和 URI(服务地址)亦有描述 ,我们看一下他的继承体系

ok,看到这里我们知道 RestTemplate底层是通过 ClientHttpRequestFactory工厂创建了 InterceptingClientHttpRequest 拥有可拦截功能的http客户端请求对象,那么他是怎么调用的呢??又是如何实现拦截的呢???
我们回到 org.springframework.web.client.RestTemplate#doExecute 方法中的 response = request.execute(); 代码继续跟踪,然后你就会发现,当执行request.execute方法的时候会跳转到 org.springframework.cloud.client.loadbalancer.LoadBalancerInterceptor#intercept 拦截器方法中,看名字就能猜到这个拦截器跟负载均衡(LoadBalancer)有关,他是ClientHttpRequestInterceptor的实现,
```
/**
* @author Spencer Gibb
* @author Dave Syer
* @author Ryan Baxter
* @author William Tran
*/
public class LoadBalancerInterceptor implements ClientHttpRequestInterceptor {
private LoadBalancerClient loadBalancer;
private LoadBalancerRequestFactory requestFactory;
public LoadBalancerInterceptor(LoadBalancerClient loadBalancer, LoadBalancerRequestFactory requestFactory) {
this.loadBalancer = loadBalancer;
this.requestFactory = requestFactory;
}
public LoadBalancerInterceptor(LoadBalancerClient loadBalancer) {
// for backwards compatibility
this(loadBalancer, new LoadBalancerRequestFactory(loadBalancer));
}
@Override
public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
final ClientHttpRequestExecution execution) throws IOException {
final URI originalUri = request.getURI();
String serviceName = originalUri.getHost();
Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri);
return this.loadBalancer.execute(serviceName, requestFactory.createRequest(request, body, execution));
}
}
```
继续跟踪一下 ClientHttpRequestInterceptor 接口
```
/**
* Intercepts client-side HTTP requests. Implementations of this interface can be
* {@linkplain org.springframework.web.client.RestTemplate#setInterceptors registered}
* with the {@link org.springframework.web.client.RestTemplate RestTemplate},
* as to modify the outgoing {@link ClientHttpRequest} and/or the incoming
* {@link ClientHttpResponse}.
*
* The main entry point for interceptors is
* {@link #intercept(HttpRequest, byte[], ClientHttpRequestExecution)}.
*
* @author Arjen Poutsma
* @since 3.1
*/
@FunctionalInterface
public interface ClientHttpRequestInterceptor {
/**
* Intercept the given request, and return a response. The given
* {@link ClientHttpRequestExecution} allows the interceptor to pass on the
* request and response to the next entity in the chain.
*
A typical implementation of this method would follow the following pattern:
*
* - Examine the {@linkplain HttpRequest request} and body
* - Optionally {@linkplain org.springframework.http.client.support.HttpRequestWrapper
* wrap} the request to filter HTTP attributes.
* - Optionally modify the body of the request.
* - Either
*
* - execute the request using
* {@link ClientHttpRequestExecution#execute(org.springframework.http.HttpRequest, byte[])},
* or
* - do not execute the request to block the execution altogether.
*
* - Optionally wrap the response to filter HTTP attributes.
*
* @param request the request, containing method, URI, and headers
* @param body the body of the request
* @param execution the request execution
* @return the response
* @throws IOException in case of I/O errors
*/
ClientHttpResponse intercept(HttpRequest request, byte[] body, ClientHttpRequestExecution execution)
throws IOException;
```
翻译注释“Intercepts client-side HTTP requests. Implementations of this interface can be” 我们知道 ClientHttpRequestInterceptor就是对http请求的拦截到这里,我们的请求已经被拦截到,貌似离真相越来越近了,我们仔细分析一下 LoadBalancerInterceptor 的 intercept方法
```
@Override
public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
final ClientHttpRequestExecution execution) throws IOException {
final URI originalUri = request.getURI();
String serviceName = originalUri.getHost();
Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri);
return this.loadBalancer.execute(serviceName, requestFactory.createRequest(request, body, execution));
}
```
这里通过 request(之前创建的InterceptingClientHttpRequest) 对象获取到URI调用的服务地址 ,然后获取到 调用的服务名字 serviceName ,然后调用org.springframework.cloud.client.loadbalancer.LoadBalancerClient.execute去执行 ,LoadBalancerClient是什么,不就是最开始我们分析的负载均衡器客户端嘛???他使用的是RibbonLoadBalancerClient实现类
继续跟踪下去 RibbonLoadBalancerClient.execute
```
@Override
public T execute(String serviceId, LoadBalancerRequest request) throws IOException {
ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
Server server = getServer(loadBalancer);
if (server == null) {
throw new IllegalStateException("No instances available for " + serviceId);
}
RibbonServer ribbonServer = new RibbonServer(serviceId, server, isSecure(server,
serviceId), serverIntrospector(serviceId).getMetadata(server));
return execute(serviceId, ribbonServer, request);
}
```
ILoadBalancer loadBalancer = getLoadBalancer(serviceId);通过服务id获取负载均衡器 ,跟踪一下 ILoadBalancer源码
```
/**
* Interface that defines the operations for a software loadbalancer. A typical
* loadbalancer minimally need a set of servers to loadbalance for, a method to
* mark a particular server to be out of rotation and a call that will choose a
* server from the existing list of server.
*
* @author stonse
*
*/
public interface ILoadBalancer {
/**
* Initial list of servers.
* This API also serves to add additional ones at a later time
* The same logical server (host:port) could essentially be added multiple times
* (helpful in cases where you want to give more "weightage" perhaps ..)
*
* @param newServers new servers to add
*/
public void addServers(List newServers);
/**
* Choose a server from load balancer.
*
* @param key An object that the load balancer may use to determine which server to return. null if
* the load balancer does not use this parameter.
* @return server chosen
*/
public Server chooseServer(Object key);
/**
* To be called by the clients of the load balancer to notify that a Server is down
* else, the LB will think its still Alive until the next Ping cycle - potentially
* (assuming that the LB Impl does a ping)
*
* @param server Server to mark as down
*/
public void markServerDown(Server server);
/**
* @deprecated 2016-01-20 This method is deprecated in favor of the
* cleaner {@link #getReachableServers} (equivalent to availableOnly=true)
* and {@link #getAllServers} API (equivalent to availableOnly=false).
*
* Get the current list of servers.
*
* @param availableOnly if true, only live and available servers should be returned
*/
@Deprecated
public List getServerList(boolean availableOnly);
/**
* @return Only the servers that are up and reachable.
*/
public List getReachableServers();
/**
* @return All known servers, both reachable and unreachable.
*/
public List getAllServers();
}
```
翻译注释“Interface that defines the operations for a software loadbalancer.”它定义软负载均衡器操作的接口规范
addServers:初始化服务列表
chooseServer:选择一个服务
markServerDown:标记服务下线,从服务列表中移除
getReachableServers:获取可访问的服务
getAllServers:获取所有的服务
而默认情况下这里默认使用的是 ZoneAwareLoadBalancer,断点跟踪或者查看RibbonClientConfiguration配置类的ribbonLoadBalancer方法得知
```
@Bean
@ConditionalOnMissingBean
public ILoadBalancer ribbonLoadBalancer(IClientConfig config,
ServerList serverList, ServerListFilter serverListFilter,
IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) {
return this.propertiesFactory.get(ILoadBalancer.class, config, name);
}
return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,
serverListFilter, serverListUpdater);
}
```
那么我们继续跟踪 RibbonLoadBalancerClient.execute中的 getServer(loadBalancer);代码这里是在根据负载均衡器选择将要执行调用的服务,跟踪下去
com.netflix.loadbalancer.ZoneAwareLoadBalancer#chooseServer
```
@Override
public Server chooseServer(Object key) {
if (!ENABLED.get() || getLoadBalancerStats().getAvailableZones().size() <= 1) {
logger.debug("Zone aware logic disabled or there is only one zone");
return super.chooseServer(key);
}
Server server = null;
try {
LoadBalancerStats lbStats = getLoadBalancerStats();
Map zoneSnapshot = ZoneAvoidanceRule.createSnapshot(lbStats);
logger.debug("Zone snapshots: {}", zoneSnapshot);
if (triggeringLoad == null) {
triggeringLoad = DynamicPropertyFactory.getInstance().getDoubleProperty(
"ZoneAwareNIWSDiscov
```
这里的key接受了一个默认值"default",因为我们的可用的区域只有一个,所以这里调用了super.chooseServer即com.netflix.loadbalancer.BaseLoadBalancer#chooseServer
```
public Server chooseServer(Object key) {
if (counter == null) {
counter = createCounter();
}
counter.increment();
if (rule == null) {
return null;
} else {
try {
return rule.choose(key);
} catch (Exception e) {
logger.warn("LoadBalancer [{}]: Error choosing server for key {}", name, key, e);
return null;
}
}
}
```
继续跟踪 rule.choose(key); 这里调用了 com.netflix.loadbalancer.PredicateBasedRule#choose 的服务选择方法,
```
/**
* A rule which delegates the server filtering logic to an instance of {@link AbstractServerPredicate}.
* After filtering, a server is returned from filtered list in a round robin fashion.
*
*
* @author awang
*
*/
public abstract class PredicateBasedRule extends ClientConfigEnabledRoundRobinRule {
/**
* Method that provides an instance of {@link AbstractServerPredicate} to be used by this class.
*
*/
public abstract AbstractServerPredicate getPredicate();
/**
* Get a server by calling {@link AbstractServerPredicate#chooseRandomlyAfterFiltering(java.util.List, Object)}.
* The performance for this method is O(n) where n is number of servers to be filtered.
*/
@Override
public Server choose(Object key) {
ILoadBalancer lb = getLoadBalancer();
Optional server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key);
if (server.isPresent()) {
return server.get();
} else {
return null;
}
}
}
```
PredicateBasedRule本身是一个抽象策略,继承自ClientConfigEnabledRoundRobinRule ,而ClientConfigEnabledRoundRobinRule是一个实现了轮询策略的客户端配置
```
/**
RoundRobinRule :轮询
* This class essentially contains the RoundRobinRule class defined in the
* loadbalancer package
*
* @author stonse
*
*/
public class ClientConfigEnabledRoundRobinRule extends AbstractLoadBalancerRule {
//轮询策略
RoundRobinRule roundRobinRule = new RoundRobinRule();
```
继续跟踪chooseRoundRobinAfterFiltering方法
```
/**
* Choose a server in a round robin fashion after the predicate filters a given list of servers and load balancer key.
*/
public Optional chooseRoundRobinAfterFiltering(List servers, Object loadBalancerKey) {
List eligible = getEligibleServers(servers, loadBalancerKey);
if (eligible.size() == 0) {
return Optional.absent();
}
return Optional.of(eligible.get(incrementAndGetModulo(eligible.size())));
}
```
翻译方法注释“ Choose a server in a round robin fashion after the predicate filters a given list of servers and load balancer key.”告诉我们这里是把过滤后的服务进行轮询选择 ,再看代码 List eligible获取到合格的服务器 ,incrementAndGetModulo就是在以轮询的方式获取到服务的索引。
继续往下回到 RibbonLoadBalancerClient#execute方法
```
@Override
public T execute(String serviceId, LoadBalancerRequest request) throws IOException {
ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
Server server = getServer(loadBalancer);
if (server == null) {
throw new IllegalStateException("No instances available for " + serviceId);
}
RibbonServer ribbonServer = new RibbonServer(serviceId, server, isSecure(server,
serviceId), serverIntrospector(serviceId).getMetadata(server));
return execute(serviceId, ribbonServer, request);
}
```
这里拿到要调用的服务的实例的后,把服务名,服务实例等信息包装到 RibbonServer对象中
```
public static class RibbonServer implements ServiceInstance {
private final String serviceId;
private final Server server;
private final boolean secure;
private Map metadata;
public RibbonServer(String serviceId, Server server) {
this(serviceId, server, false, Collections. emptyMap());
}
public RibbonServer(String serviceId, Server server, boolean secure,
Map metadata) {
this.serviceId = serviceId;
this.server = server;
this.secure = secure;
this.metadata = metadata;
}
.....
public interface ServiceInstance {
/**
* @return the service id as registered.
*/
String getServiceId();
/**
* @return the hostname of the registered ServiceInstance
*/
String getHost();
/**
* @return the port of the registered ServiceInstance
*/
int getPort();
/**
* @return if the port of the registered ServiceInstance is https or not
*/
boolean isSecure();
/**
* @return the service uri address
*/
URI getUri();
/**
* @return the key value pair metadata associated with the service instance
*/
Map getMetadata();
/**
* @return the scheme of the instance
*/
default String getScheme() {
return null;
}
}
```
RibbonServer 实现了 ServiceInstance接口, ServiceInstance本身就是对服务实例的规范,有获取服务id,主机,端口等方法。
然后继续调用 excute方法执行请求
```
@Override
public T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest request) throws IOException {
Server server = null;
if(serviceInstance instanceof RibbonServer) {
server = ((RibbonServer)serviceInstance).getServer();
}
if (server == null) {
throw new IllegalStateException("No instances available for " + serviceId);
}
RibbonLoadBalancerContext context = this.clientFactory
.getLoadBalancerContext(serviceId);
RibbonStatsRecorder statsRecorder = new RibbonStatsRecorder(context, server);
try {
T returnVal = request.apply(serviceInstance);
statsRecorder.recordStats(returnVal);
return returnVal;
}
// catch IOException and rethrow so RestTemplate behaves correctly
......
```
看代码 T returnVal = request.apply(serviceInstance);
这里最终会调用InterceptingClientHttpRequest.InterceptingRequestExecution的execute方法,传入服务实例,执行请求,拿到返回结果。
```
private class InterceptingRequestExecution implements ClientHttpRequestExecution {
private final Iterator iterator;
public InterceptingRequestExecution() {
this.iterator = interceptors.iterator();
}
@Override
public ClientHttpResponse execute(HttpRequest request, byte[] body) throws IOException {
if (this.iterator.hasNext()) {
ClientHttpRequestInterceptor nextInterceptor = this.iterator.next();
return nextInterceptor.intercept(request, body, this);
}
else {
HttpMethod method = request.getMethod();
Assert.state(method != null, "No standard HTTP method");
ClientHttpRequest delegate = requestFactory.createRequest(request.getURI(), method);
request.getHeaders().forEach((key, value) -> delegate.getHeaders().addAll(key, value));
if (body.length > 0) {
if (delegate instanceof StreamingHttpOutputMessage) {
StreamingHttpOutputMessage streamingOutputMessage = (StreamingHttpOutputMessage) delegate;
streamingOutputMessage.setBody(outputStream -> StreamUtils.copy(body, outputStream));
}
else {
StreamUtils.copy(body, delegate.getBody());
}
}
return delegate.execute();
}
}
}
```
跟踪 requestFactory.createRequest方法
```
@Override
public ClientHttpRequest createRequest(URI uri, HttpMethod httpMethod) throws IOException {
HttpURLConnection connection = openConnection(uri.toURL(), this.proxy);
prepareConnection(connection, httpMethod.name());
if (this.bufferRequestBody) {
return new SimpleBufferingClientHttpRequest(connection, this.outputStreaming);
}
else {
return new SimpleStreamingClientHttpRequest(connection, this.chunkSize, this.outputStreaming);
}
}
.....
protected HttpURLConnection openConnection(URL url, @Nullable Proxy proxy) throws IOException {
URLConnection urlConnection = (proxy != null ? url.openConnection(proxy) : url.openConnection());
if (!HttpURLConnection.class.isInstance(urlConnection)) {
throw new IllegalStateException("HttpURLConnection required for [" + url + "] but got: " + urlConnection);
}
return (HttpURLConnection) urlConnection;
}
```
到这里我们看到他是通过 URLConnection 来调用远程服务的。。。
好吧总结一下,这个远程服务调用的背后到底做了哪些事情呢??
1.@LoadBalanced开启了RibbonLoadBalancerClient负载均衡支持
2.RestTemplate对服务的地址(Uri),主机(host),端口(port)等做了一些描述,然后创建了 InterceptingClientHttpRequest http请求的客户端对象,用来执行请求用,
3.当调用RestTemplate发起请求时会被 LoadBalancerInterceptor请求拦截器给拦截到
4.拦截器中使用了 RibbonLoadBalancerClient执行请求,然后根据服务id获取了负载均衡器,默认 ZoneAwareLoadBalancer
5.然后负载均衡器进行服务的选择,默认使用了轮询策略
6.拿到服务实例后调用 InterceptingClientHttpRequest 完成服务调用请求,获取返回结果。