首页 > 技术文章 > Spring生态系列文章 >

Eureka Client源码分析

更新时间:2018-10-31 | 阅读量(1,314)

对于一个优秀的程序员而言,一个技术不仅要会用,还要知道他的实现原理和思想,即不仅要知其然还要知其所以然,这样我们写代码才会特别自信,出现bug才能很快定位到问题所在。接下来我们就来简单探讨一下SpringCloud的实现原理,即:源码分析 #### 一.服务注册 服务注册与发现是SpringCloud最基础的部分,我们就从这部分开始着手分析,我们来看一下我们第一章写的服务注册案例,图: ![image.png](https://upload-images.jianshu.io/upload_images/11046204-8666aed7b2c8aab6.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240) 我们知道当服务(Eureka Client)启动会主动向 EurekaServer注册自己,而消费者服务需要调用提供者服务实现消费时是需要向EurekaServer获取目标服务的服务地址等信息,那么我们就先来分析一下EurekaClient是如何实现服务注册的。 回想一下我们构造一个EurekaClient服务实例的时候需要做哪些事情呢? 1.在主程序配置类打开 @EnableDiscoveryClient(或EnableEurekaClient)标签开启EurekaClient功能 ``` @SpringBootApplication @EnableDiscoveryClient public class EurekaClientApplication { public static void main(String[] args) { SpringApplication.run(EurekaClientApplication.class, args); } } ``` 2.在applicatiton.properties中做一些服务的基础配置和注册中心地址配置 ``` eureka: client: serviceUrl: defaultZone: http://localhost:1111/eureka/ #服务注册地址 instance: prefer-ip-address: true server: port: 3333 spring: application: name: consumer1 ... ``` #### 那么我们就沿着这个2个点来切入源码: 点开 @ EnableDiscoveryClient 源码如下: ``` /** * Annotation to enable a DiscoveryClient implementation. * @author Spencer Gibb */ @Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Documented @Inherited @Import(EnableDiscoveryClientImportSelector.class) public @interface EnableDiscoveryClient { /** * If true, the ServiceRegistry will automatically register the local server. */ boolean autoRegister() default true; } ``` EnableDiscoveryClient的文档注释: Annotation to enable a DiscoveryClient implementation 告诉我们:这个EnableDiscoveryClient这个标签是用来开启 DiscoveryClient的,那么我们来看一下 org.springframework.cloud.client.discovery.DiscoveryClient 如下: ``` /** * DiscoveryClient represents read operations commonly available to Discovery service such as * Netflix Eureka or consul.io * @author Spencer Gibb */ public interface DiscoveryClient { /** * A human readable description of the implementation, used in HealthIndicator * @return the description */ String description(); /** * Get all ServiceInstances associated with a particular serviceId * @param serviceId the serviceId to query * @return a List of ServiceInstance */ List getInstances(String serviceId); /** * @return all known service ids */ List getServices(); } ``` DiscoveryClient 的注释: DiscoveryClient represents read operations commonly available to Discovery service such as Netflix Eureka or consul.io 告诉我们这个接口定义了服务通常的读取操作的抽象方法,分析这个接口下的三个方法,作用分别是:获取备注 ,根据服务id获取服务列表,获取所有的服务的id 这个接口貌似到顶了,我们看一下他的实现类你可以看到一个让你很亲切的名字 EurekaDiscoveryClient(Eureka的服务客户端) 如下: ``` public class EurekaDiscoveryClient implements DiscoveryClient { public static final String DESCRIPTION = "Spring Cloud Eureka Discovery Client"; private final EurekaInstanceConfig config; private final EurekaClient eurekaClient; public EurekaDiscoveryClient(EurekaInstanceConfig config, EurekaClient eurekaClient) { this.config = config; this.eurekaClient = eurekaClient; } @Override public String description() { return DESCRIPTION; } @Override public List getInstances(String serviceId) { List infos = this.eurekaClient.getInstancesByVipAddress(serviceId, false); List instances = new ArrayList<>(); for (InstanceInfo info : infos) { instances.add(new EurekaServiceInstance(info)); } return instances; } ...省略代码... @Override public List getServices() { Applications applications = this.eurekaClient.getApplications(); if (applications == null) { return Collections.emptyList(); } List registered = applications.getRegisteredApplications(); List names = new ArrayList<>(); for (Application app : registered) { if (app.getInstances().isEmpty()) { continue; } names.add(app.getName().toLowerCase()); } return names; } ... ``` 可以看到 EurekaDiscoveryClient 依赖了 EurekaClient 接口 ,而对于getServices和 getInstances方法的实现代码中都是调用了EurekaClient的方法,即:“this.eurekaClient...” 继续跟踪 EurekaClient 的源码: ``` @ImplementedBy(DiscoveryClient.class) public interface EurekaClient extends LookupService { ``` EurekaClient 实现了 LookupService接口,继续跟踪 LookupService ``` /** * Lookup service for finding active instances. * * @author Karthik Ranganathan, Greg Kim. * @param for backward compatibility */ public interface LookupService { /** * Returns the corresponding {@link Application} object which is basically a * container of all registered appName {@link InstanceInfo}s. * * @param appName * @return a {@link Application} or null if we couldn't locate any app of * the requested appName */ Application getApplication(String appName); /** * Returns the {@link Applications} object which is basically a container of * all currently registered {@link Application}s. * * @return {@link Applications} */ Applications getApplications(); /** * Returns the {@link List} of {@link InstanceInfo}s matching the the passed * in id. A single {@link InstanceInfo} can possibly be registered w/ more * than one {@link Application}s * * @param id * @return {@link List} of {@link InstanceInfo}s or * {@link java.util.Collections#emptyList()} */ List getInstancesById(String id); ....省略代码.... ``` 从注释 :Lookup service for finding active instances. 可以知道,这个lookupService 的作用是用于查找活动实例的服务。并提供了一些查找方法 然而在 EurekaDiscoveryClient 中使用的到底是 EurekaClient 哪个实现类的实例呢?我们继续跟踪一下 EurekaClient的实现类 com.netflix.discovery.DiscoveryClient ,从包名可以知道这个类是netflix提供的: ``` /** * The class that is instrumental for interactions with Eureka Server. * *

* Eureka Client is responsible for a) Registering the * instance with Eureka Server b) Renewalof the lease with * Eureka Server c) Cancellation of the lease from * Eureka Server during shutdown *

* d) Querying the list of services/instances registered with * Eureka Server *

* *

* Eureka Client needs a configured list of Eureka Server * {@link java.net.URL}s to talk to.These {@link java.net.URL}s are typically amazon elastic eips * which do not change. All of the functions defined above fail-over to other * {@link java.net.URL}s specified in the list in the case of failure. *

* * @author Karthik Ranganathan, Greg Kim * @author Spencer Gibb * */ @Singleton public class DiscoveryClient implements EurekaClient { ``` 翻译一下这个类的文档注释,大致意思为: DiscoveryClient 和 Eureka Server 类进行交互 向 Eureka Server 注册服务 向 Eureka Server租约续期 服务关闭,取消租约续期 获取服务列表 Eureka client需要配置 Eureka Server的url列表 看到这里我们大概知道,其实真正实现服务发现的Netflix包中的com.netflix.discovery.DiscoveryClient类,我们整理一下这几个类/接口的关系图如下: ![image.png](https://upload-images.jianshu.io/upload_images/11046204-842e8c484e9ec9fd.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240) 接下来我们详细看一下DiscoveryClient是如何实现服务注册和发现等功能的,找到DiscoveryClient中的代码: ``` /** * Initializes all scheduled tasks. */ private void initScheduledTasks() { ...省略代码... // InstanceInfo replicator instanceInfoReplicator = new InstanceInfoReplicator( this, instanceInfo, clientConfig.getInstanceInfoReplicationIntervalSeconds(), 2); // burstSize instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds()); ...省略代码... } ``` 从方法名字就可以看出方法里面初始化了很多的定时任务, instanceInfo:是根据服务配置创建出来的服务实例相关信息对象,是在EurekaClientAutoConfiguration类中的 eurekaApplicationInfoManager方法中被创建的 ``` public ApplicationInfoManager eurekaApplicationInfoManager( EurekaInstanceConfig config) { InstanceInfo instanceInfo = new InstanceInfoFactory().create(config); return new ApplicationInfoManager(config, instanceInfo); } ``` 而 EurekaInstanceConfig 就是application.properties配置的绑定 而instanceInfoReplicator 是一个线程对象,他的代码如下: ``` /** * A task for updating and replicating the local instanceinfo to the remote server. Properties of this task are: * - configured with a single update thread to guarantee sequential update to the remote server * - update tasks can be scheduled on-demand via onDemandUpdate() * - task processing is rate limited by burstSize * - a new update task is always scheduled automatically after an earlier update task. However if an on-demand task * is started, the scheduled automatic update task is discarded (and a new one will be scheduled after the new * on-demand update). * * @author dliu */ class InstanceInfoReplicator implements Runnable { ``` 翻译注释知道 InstanceInfoReplicator的作用是用于更新本地instanceinfo并将其复制到远程服务器的任务 ,其实就是把本地服务实例的相关配置信息(地址,端口,服务名等)发送到注册中心完成注册 我们来跟踪一下他的 run方法 : ``` public void run() { try { discoveryClient.refreshInstanceInfo(); Long dirtyTimestamp = instanceInfo.isDirtyWithTime(); if (dirtyTimestamp != null) { discoveryClient.register(); instanceInfo.unsetIsDirty(dirtyTimestamp); } } catch (Throwable t) { logger.warn("There was a problem with the instance info replicator", t); } finally { Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS); scheduledPeriodicRef.set(next); } } ``` 定位关键代码: discoveryClient.register(); 这里就是在实现服务注册,继续跟踪进去: ``` boolean register() throws Throwable { logger.info(PREFIX + "{}: registering service...", appPathIdentifier); EurekaHttpResponse httpResponse; try { httpResponse = eurekaTransport.registrationClient.register(instanceInfo); } catch (Exception e) { logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e); throw e; } if (logger.isInfoEnabled()) { logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode()); } return httpResponse.getStatusCode() == 204; } ``` 这里通过调用:eurekaTransport.registrationClient.register(instanceInfo);实现注册,而instanceInfo其实就是当前服务实例的元数据(配置信息),继续跟踪 ``` /** * Low level Eureka HTTP client API. * * @author Tomasz Bak */ public interface EurekaHttpClient { EurekaHttpResponse register(InstanceInfo info); ``` 翻译 Low level Eureka HTTP client API. 得知这里是一个HTTP客户端的API,那么我们可以大胆猜测,register方法的实现其实就是通过 rest 请求的方式。继续往下追踪该方法 ``` public abstract class EurekaHttpClientDecorator implements EurekaHttpClient { @Override public EurekaHttpResponse register(final InstanceInfo info) { return execute(new RequestExecutor() { @Override public EurekaHttpResponse execute(EurekaHttpClient delegate) { return delegate.register(info); } @Override public RequestType getRequestType() { return RequestType.Register; } }); } ``` 看到这里我们应该就明白了,在register方法中获取到了 serviceUrl 即配置文件中的注册服务地址,,把InstanceInfo作为参数,底层通过EurekaHttpClient(Rest方式)来发请求请求,实现服务注册。 #### 服务获取 继续看com.netflix.discovery.DiscoveryClient的initScheduledTasks()方法,里面还有两个定时任务 ``` if (clientConfig.shouldFetchRegistry()) { // registry cache refresh timer int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds(); int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound(); scheduler.schedule( new TimedSupervisorTask( "cacheRefresh", scheduler, cacheRefreshExecutor, registryFetchIntervalSeconds, TimeUnit.SECONDS, expBackOffBound, new CacheRefreshThread() ), registryFetchIntervalSeconds, TimeUnit.SECONDS); } if (clientConfig.shouldRegisterWithEureka()) { int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs(); int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound(); logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs); // Heartbeat timer scheduler.schedule( new TimedSupervisorTask( "heartbeat", scheduler, heartbeatExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, new HeartbeatThread() ), renewalIntervalInSecs, TimeUnit.SECONDS); ....省略代码.... ``` 我们先看第一个任务:服务获取 , clientConfig.getRegistryFetchIntervalSeconds()是从配置中获取服务清单获取时间间隔,他是执行线程是new HeartbeatThread(),我们跟踪进去 ``` class CacheRefreshThread implements Runnable { public void run() { refreshRegistry(); } } @VisibleForTesting void refreshRegistry() { try { ...省略代码... boolean success = fetchRegistry(remoteRegionsModified); ...省略代码... ``` fetchRegistry:就是获取注册表(注册服务清单)的方法 ``` private boolean fetchRegistry(boolean forceFullRegistryFetch) { Stopwatch tracer = FETCH_REGISTRY_TIMER.start(); try { // If the delta is disabled or if it is the first time, get all // applications Applications applications = getApplications(); if (clientConfig.shouldDisableDelta() || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress())) || forceFullRegistryFetch || (applications == null) || (applications.getRegisteredApplications().size() == 0) || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta { logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta()); logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress()); logger.info("Force full registry fetch : {}", forceFullRegistryFetch); logger.info("Application is null : {}", (applications == null)); logger.info("Registered Applications size is zero : {}", (applications.getRegisteredApplications().size() == 0)); logger.info("Application version is -1: {}", (applications.getVersion() == -1)); getAndStoreFullRegistry(); } else { getAndUpdateDelta(applications); } applications.setAppsHashCode(applicat ...省略代码... ``` etAndStoreFullRegistry(); 获得并存储完整的注册表,跟踪进去 ``` private void getAndStoreFullRegistry() throws Throwable { long currentUpdateGeneration = fetchRegistryGeneration.get(); logger.info("Getting all instance registry info from the eureka server"); Applications apps = null; EurekaHttpResponse httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get()) : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get()); if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) { apps = httpResponse.getEntity(); } logger.info("The response status is {}", httpResponse.getStatusCode()); if (apps == null) { logger.error("The application is null for some reason. Not storing this information"); } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) { localRegionApps.set(this.filterAndShuffle(apps)); logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode()); } else { logger.warn("Not updating applications as another thread is updating it already"); } } ``` 我们可以看到 eurekaTransport.queryClient.这样的代码其实就是通过Rest方式去获取服务清单 最后通过 localRegionApps.set把服务存储到本地区域 #### 服务续约 继续看 com.netflix.discovery.DiscoveryClient 中的 initScheduledTasks() 方法中的另一个定时任务 ``` // Heartbeat timer scheduler.schedule( new TimedSupervisorTask( "heartbeat", scheduler, heartbeatExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, new HeartbeatThread() ), renewalIntervalInSecs, TimeUnit.SECONDS); ``` heartbeat :使用心跳机制实现服务续约,即每间隔多少秒去请求一下注册中心证明服务还在线,防止被剔除。 renewalIntervalInSecs :就是心跳时间间隔 对应的配置: eureka.instance.lease-renewal-interval-in-seconds=30 eureka.instance.lease-expiration-duration-in-seconds=90 我们来看一下他的执线程:HeartbeatThread ``` private class HeartbeatThread implements Runnable { public void run() { if (renew()) { lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis(); } } } boolean renew() { EurekaHttpResponse httpResponse; try { httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null); logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode()); if (httpResponse.getStatusCode() == 404) { REREGISTER_COUNTER.increment(); logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName()); long timestamp = instanceInfo.setIsDirtyWithTime(); boolean success = register(); if (success) { instanceInfo.unsetIsDirty(timestamp); } return success; } return httpResponse.getStatusCode() == 200; } catch (Throwable e) { logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e); return false; } } ``` 不难看出他是通过 eurekaTransport.registrationClient.sendHeartBeat:去发送心跳 ,依然是Rest方式
叩丁狼学员采访 叩丁狼学员采访
叩丁狼头条 叩丁狼头条
叩丁狼在线课程 叩丁狼在线课程