首页 > 技术文章 > Spring生态系列文章 >

Eureka-server源码分析

更新时间:2018-10-31 | 阅读量(1,611)

我们在分析eureka-client的时候发现他是通过Http Rest 的方式做请求的,那么eureka-server一定是基于Rest风格 类似SpringMvc一样的模式接受处理请求。在 EurekaServerAutoConfiguration 自动配置类中有一个 jerseyFilterRegistration方法,这个方法表明eureka-server使用了Jersey实现Rest ``` /** * Register the Jersey filter */ @Bean public FilterRegistrationBean jerseyFilterRegistration( javax.ws.rs.core.Application eurekaJerseyApp) { FilterRegistrationBean bean = new FilterRegistrationBean(); bean.setFilter(new ServletContainer(eurekaJerseyApp)); bean.setOrder(Ordered.LOWEST_PRECEDENCE); bean.setUrlPatterns( Collections.singletonList(EurekaConstants.DEFAULT_PREFIX + "/*")); return bean; } ``` 这里在注册一个 Jersey 的 filter ,配置好相应的Filter 和 url映射等 #### 处理服务注册 Eureka Server对于Eureka client的REST请求的定义都位于com.netflix.eureka.resources包下,我们先来看 com.netflix.eureka.resources.ApplicationResource下面的addInstance方法,看名字就知道是添加服务实例 ``` @POST @Consumes({"application/json", "application/xml"}) public Response addInstance(InstanceInfo info, @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) { logger.debug("Registering instance {} (replication={})", info.getId(), isReplication); // validate that the instanceinfo contains all the necessary required fields if (isBlank(info.getId())) { return Response.status(400).entity("Missing instanceId").build(); } else if (isBlank(info.getHostName())) { return Response.status(400).entity("Missing hostname").build(); } else if (isBlank(info.getIPAddr())) { return Response.status(400).entity("Missing ip address").build(); } else if (isBlank(info.getAppName())) { return Response.status(400).entity("Missing appName").build(); } else if (!appName.equals(info.getAppName())) { return Response.status(400).entity("Mismatched appName, expecting " + appName + " but was " + info.getAppName()).build(); } else if (info.getDataCenterInfo() == null) { return Response.status(400).entity("Missing dataCenterInfo").build(); } else if (info.getDataCenterInfo().getName() == null) { return Response.status(400).entity("Missing dataCenterInfo Name").build(); } // handle cases where clients may be registering with bad DataCenterInfo with missing data DataCenterInfo dataCenterInfo = info.getDataCenterInfo(); if (dataCenterInfo instanceof UniqueIdentifier) { String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId(); if (isBlank(dataCenterInfoId)) { boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId")); if (experimental) { String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id"; return Response.status(400).entity(entity).build(); } else if (dataCenterInfo instanceof AmazonInfo) { AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo; String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId); if (effectiveId == null) { amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId()); } } else { logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass()); } } } registry.register(info, "true".equals(isReplication)); return Response.status(204).build(); // 204 to be backwards compatible ``` InstanceInfo 参数就是客户端提交的服务元配置信息, 做了一大堆判断后调用了com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl.register: 进行注册,我们跟踪进去 ``` @Override public void register(final InstanceInfo info, final boolean isReplication) { int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS; if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) { leaseDuration = info.getLeaseInfo().getDurationInSecs(); } super.register(info, leaseDuration, isReplication); replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication); } ``` 继续跟踪 super.register :调用了父类的注册方法。 ``` /** * Registers a new instance with a given duration. * * @see com.netflix.eureka.lease.LeaseManager#register(java.lang.Object, int, boolean) */ public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) { try { read.lock(); Map> gMap = registry.get(registrant.getAppName()); REGISTER.increment(isReplication); if (gMap == null) { final ConcurrentHashMap> gNewMap = new ConcurrentHashMap>(); gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap); if (gMap == null) { gMap = gNewMap; } } ``` 这里我们看到 ConcurrentHashMap> 这样的一个 线程安全的map去存放客户端注册的服务, 它是一个两层Map结构,第一层的key存储服务名:InstanceInfo中的appName属性,第二层的key存储实例名:InstanceInfo中的instanceId属性。 #### 服务的续约(心跳) 服务续约在 com.netflix.eureka.resources.InstanceResource#renewLease方法中 ``` @PUT public Response renewLease( @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication, @QueryParam("overriddenstatus") String overriddenStatus, @QueryParam("status") String status, @QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) { boolean isFromReplicaNode = "true".equals(isReplication); boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode); ``` 获取到客户端服务名字 调用registry.renew实现续约 ,我们跟踪进去最终会调用 com.netflix.eureka.registry.AbstractInstanceRegistry#renew方法 ``` public boolean renew(String appName, String id, boolean isReplication) { RENEW.increment(isReplication); Map> gMap = registry.get(appName); Lease leaseToRenew = null; if (gMap != null) { leaseToRenew = gMap.get(id); } if (leaseToRenew == null) { RENEW_NOT_FOUND.increment(isReplication); logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id); return false; } else { InstanceInfo instanceInfo = leaseToRenew.getHolder(); if (instanceInfo != null) { // touchASGCache(instanceInfo.getASGName()); InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus( instanceInfo, leaseToRenew, isReplication); if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) { logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}" + "; re-register required", instanceInfo.getId()); RENEW_NOT_FOUND.increment(isReplication); return false; } if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) { logger.info( "The instance status {} is different from overridden instance status {} for instance {}. " + "Hence setting the status to overridden status", instanceInfo.getStatus().name(), instanceInfo.getOverriddenStatus().name(), instanceInfo.getId()); instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus); } } renewsLastMin.increment(); leaseToRenew.renew(); return true; } ``` Map> gMap = registry.get(appName); leaseToRenew = gMap.get(id); 在获取注册的服务实例 instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus); 在修改服务实例的状态 leaseToRenew.renew(); 在增加租约时间 ``` public void renew() { lastUpdateTimestamp = System.currentTimeMillis() + duration; } ``` #### 取消租约:客户端线下 追踪 InstanceResource#cancelLease() 方法,最终会调用 com.netflix.eureka.registry.AbstractInstanceRegistry#internalCancel ``` protected boolean internalCancel(String appName, String id, boolean isReplication) { boolean var7; try { this.read.lock(); EurekaMonitors.CANCEL.increment(isReplication); Map> gMap = (Map)this.registry.get(appName); Lease leaseToCancel = null; if (gMap != null) { leaseToCancel = (Lease)gMap.remove(id); } ...... ``` gMap.remove(id); 就是获取当前要下线的服务id,然后从注册的map中移除掉 服务端的请求接收差不多,对于其他的服务端处理这里就不在多说,可执行断点跟踪。
叩丁狼学员采访 叩丁狼学员采访
叩丁狼头条 叩丁狼头条
叩丁狼在线课程 叩丁狼在线课程