Eureka源码3-Client启动入口(注册,续约,定时任务)

0. 环境

eureka版本:1.10.11 Spring Cloud : 2020.0.2 Spring Boot :2.4.4测试代码:github.com/hsfxuebao/s…

1. 前言

本文主要是解析下Spring Cloud整合Eureka Client的源码,这块代码比较多,而且都是些简单代码,我们稍微看下就行,这就是介绍下Eureka Client初始化过程,不管你Spring Cloud 怎样封装,底层还是Eureka Client的内容,初始化过程包括下面:

  • 去Eureka Server 拉取全量注册表,
  • 创建定时任务,包括定时去Eureka Server 上增量拉取注册表信息,定时renew (服务续约)。
  • 服务注册
  • 2. Spring Cloud整合Eureka Client 启动入口

    要看Spring Cloud 怎样整合 Eureka Client ,就需要找到它们的自动装配配置类 在spring-cloud-starter-netflix-eureka-client 依赖的pom文件中,在依赖pom文件中有spring-cloud-netflix-eureka-client, 在这个里面能够找到spring.factories 文件,这个文件是spring spi文件。

    核心就是EurekaClientAutoConfiguration 这个自动装配类:

    @Configuration(proxyBeanMethods = false)@EnableConfigurationProperties@ConditionalOnClass(EurekaClientConfig.class)@ConditionalOnProperty(value = “eureka.client.enabled”, matchIfMissing = true)@ConditionalOnDiscoveryEnabled@AutoConfigureBefore({ CommonsClientAutoConfiguration.class, ServiceRegistryAutoConfiguration.class })@AutoConfigureAfter(name = { “org.springframework.cloud.netflix.eureka.config.DiscoveryClientOptionalArgsConfiguration”,”org.springframework.cloud.autoconfigure.RefreshAutoConfiguration”,”org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration”,”org.springframework.cloud.client.serviceregistry.AutoServiceRegistrationAutoConfiguration” })public class EurekaClientAutoConfiguration {}2.1 封装配置文件的类2.1.1 EurekaClientConfigBean@Bean@ConditionalOnMissingBean(value = EurekaClientConfig.class, search = SearchStrategy.CURRENT)public EurekaClientConfigBean eurekaClientConfigBean(ConfigurableEnvironment env) { return new EurekaClientConfigBean();}

    其读取的是eureka.client前辍的配置信息。这个类已经被@ConfigurationProperties注解了,所以这些 配置信息可以被自动封装并注册到容器。

    2.1.2 EurekaInstanceConfigBean@Bean@ConditionalOnMissingBean(value = EurekaInstanceConfig.class, search = SearchStrategy.CURRENT)public EurekaInstanceConfigBean eurekaInstanceConfigBean(InetUtils inetUtils,ManagementMetadataProvider managementMetadataProvider) {}

    其读取的是eureka.instance的属性值。这个类也已经被@ConfigurationProperties注解了,所以这些配 置信息可以被自动封装并注册到容器。

    2.2 EurekaClient

    接下来,看看核心类EurekaClient是怎么注入进去的? 在EurekaClientAutoConfiguration文件中,我们发现有两个地方都可以注入EurekaClient,分别为:

    @Configuration(proxyBeanMethods = false)@ConditionalOnMissingRefreshScopeprotected static class EurekaClientConfiguration { @Bean(destroyMethod = “shutdown”) @ConditionalOnMissingBean(value = EurekaClient.class, search = SearchStrategy.CURRENT) public EurekaClient eurekaClient(ApplicationInfoManager manager, EurekaClientConfig config) {return new CloudEurekaClient(manager, config, this.optionalArgs, this.context); }}// 另一个是:@Configuration(proxyBeanMethods = false)@ConditionalOnRefreshScopeprotected static class RefreshableEurekaClientConfiguration { @Bean(destroyMethod = “shutdown”) @ConditionalOnMissingBean(value = EurekaClient.class, search = SearchStrategy.CURRENT) @org.springframework.cloud.context.config.annotation.RefreshScope @Lazy public EurekaClient eurekaClient(ApplicationInfoManager manager, EurekaClientConfig config, EurekaInstanceConfig instance, @Autowired(required = false) HealthCheckHandler healthCheckHandler) {}}

    这就需要分析到底哪一个注解生效了?

    • @ConditionalOnMissingRefreshScope

    @Target({ ElementType.TYPE, ElementType.METHOD })

    @Retention(RetentionPolicy.RUNTIME)

    @Documented

    @Conditional(OnMissingRefreshScopeCondition.class)

    @interface ConditionalOnMissingRefreshScope {

    }

    private static class OnMissingRefreshScopeCondition extends AnyNestedCondition {

    OnMissingRefreshScopeCondition() {

    super(ConfigurationPhase.REGISTER_BEAN);

    }

    @ConditionalOnMissingClass(“org.springframework.cloud.context.scope.refresh.RefreshScope”)

    static class MissingClass {

    }

    @ConditionalOnMissingBean(RefreshAutoConfiguration.class)

    static class MissingScope {

    }

    @ConditionalOnProperty(value = “eureka.client.refresh.enable”, havingValue = “false”)

    static class OnPropertyDisabled {

    }

    }

    大家 可以看看 AnyNestedCondition这个注解,意思就是 只要满足任意一个条件就符合。通过分析,我们知道这三个条件都是满足的,所以这个注解不生效,这个类不生效。

    • @ConditionalOnRefreshScope

    @Target({ ElementType.TYPE, ElementType.METHOD })

    @Retention(RetentionPolicy.RUNTIME)

    @Documented

    @ConditionalOnClass(RefreshScope.class)

    @ConditionalOnBean(RefreshAutoConfiguration.class)

    @ConditionalOnProperty(value = “eureka.client.refresh.enable”, havingValue = “true”, matchIfMissing = true)

    @interface ConditionalOnRefreshScope {

    }

    通过这个注解EurekaClientAutoConfiguration上的注解@AutoConfigureAfter,我们知道当前类注入是在RefreshAutoConfiguration之后注入到容器中。而RefreshScope就是在RefreshAutoConfiguration之后中注入的。所以我们需要分析这个类就可以了。

    @AutoConfigureAfter(name = { “org.springframework.cloud.netflix.eureka.config.DiscoveryClientOptionalArgsConfiguration”,

    “org.springframework.cloud.autoconfigure.RefreshAutoConfiguration”,

    “org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration”,

    “org.springframework.cloud.client.serviceregistry.AutoServiceRegistrationAutoConfiguration” })

    public class EurekaClientAutoConfiguration {

    }

    2.2.1 ApplicationInfoManager@Bean@ConditionalOnMissingBean(value = ApplicationInfoManager.class, search = SearchStrategy.CURRENT)public ApplicationInfoManager eurekaApplicationInfoManager(EurekaInstanceConfig config) {InstanceInfo instanceInfo = new InstanceInfoFactory().create(config);return new ApplicationInfoManager(config, instanceInfo);}

    创建ApplicationInfoManager 对象,这个对象主要就是管着当前实例信息,也就是instanceInfo , 可以看到,在这个方法中先是创建的instanceInfo,然后将instanceInfo 作为构造参数传入了ApplicationInfoManager 中。

    这个实例信息instanceInfo 里面维护了你当前实例的ip ,端口,appName等信息,注册的时候就是拿这些信息到Eureka Server 上注册。

    2.2.2 EurekaClient@Bean(destroyMethod = “shutdown”)@ConditionalOnMissingBean(value = EurekaClient.class, search = SearchStrategy.CURRENT)public EurekaClient eurekaClient(ApplicationInfoManager manager, EurekaClientConfig config) {return new CloudEurekaClient(manager, config, this.optionalArgs,this.context);}

    创建Eureka Client 对象,这个CloudEurekaClient 类是Spring Cloud 搞得,然后继承Eureka 原生的DiscoveryClient 类。

    public class CloudEurekaClient extends DiscoveryClient

    我们可以看看它的构造

    最重要的是,它调用了父类的DiscoveryClient 的构造,下面重点介绍。

    2.3 小结

    总结以上的信息,从EurekaClientAutoConfiguration等方面可罗列出如下几个比较重要的类,如下:

    类名

    介绍与作用

    EurekaClientConfig

    封装了Eureka Client 与 Eureka Server 交互时所需要的配置信息,Spring Cloud 为其提供了默认配置类: EurekaClientConfigBean。

    ApplicationInfoManager

    作为应用信息管理器,管理服务实例类 Instancenfo 和服务实例配置信息类EurekaInstanceConfig。

    InstanceInfo

    封装了将被发送到 Eureka Server 进行服务注册的服务实例元数据,它在Eureka 注册表中代表着一个服务实例,其他服务可通过 InstanceInfo来了解该服务实例的相关信息,从而进行相关操作。

    EurekaInstanceConfig

    封装了 Eureka Client 自身服务实例的配置信息,主要用于构建 InstanceInfo,通常这些信息在配置文件的 eureka.instance 前缀下进行设置,Spring Cloud 通过 EurekaInstanceBean 配置类提供默认配置。

    DiscoveryClient

    Spring Cloud中定义用来做服务发现的客户端接口。

    3. DiscoveryClient类的解析3.1 DiscoveryClient 作用

    DiscoveryClient 是Eureka Client 的核心类,其作用与下:

    • 注册实例到 Eureka Server 中
    • 发送心跳更新与 Eureka Server 的续约
    • 在服务关闭时取消与 Eureka Server 的续约,完成服务下限
    • 获取在 Eureka Server 中的服务实例列表

    3.2 DiscoveryClient 的类结构

    可以先看下 DiscoveryClient 的类结构图:

    从类结构图上可以看出 DiscoveryClient 类实现了 EurekaCient,EurekaCient 又继承了LookupService,这里看看 LookupService 类:

    public interface LookupService {// 根据服务实例名称获取 ApplicationApplication getApplication(String appName);// 获取当前注册表中所有的服务实例信息Applications getApplications();// 根据服务实例 Id 获取服务实例信息List getInstancesById(String id); InstanceInfo getNextServerFromEureka(String virtualHostname, boolean secure);}

    Application 是持有服务实例信息列表,它表示同一个服务的集群信息,这些服务实例乃是挂载在同一个服务名 appName 之下,而 InstanceInfo 则是代表着一个服务实例的信息,Application 类代码如下:

    public class Application {private static Random shuffleRandom = new Random();// 服务名private String name;// 标识服务状态@XStreamOmitFieldprivate volatile boolean isDirty = false; @XStreamImplicitprivate final Set instances; private final AtomicReference shuffledInstances; private final Map instancesMap;// ……..}

    在 Application 中对 InstanceInfo 的操作都是同步的,为的是保证其原子性。Applications 则是注册表中所有服务实例的集合,其间的操作也都是同步的。EurekaClient 继承了 LookupService 接口,为 DiscoveryClient 提供一个上层接口,其目的是为了Eureka1.0x 到 Eureka2.x 的升级做过渡。

    EurekaCient 接口在 LookupService 的基础上提供了更丰富的方法,譬如:

    • 提供做种方式获取 InstanceInfo,例如根据区域、Eureka Server 地址获取等。
    • 提供本地客户端(区域、可用区)的数据,这部分与 AWS 相关
    • 提供了为客户端注册和获取健康检查处理器的功能

    除了相关查询接口外,EurekaClient 提供以下的两个方法,需颇多关注:

    public interface EurekaClient extends LookupService {// …….// 为 Eureka Client 注册健康处理器public void registerHealthCheck(HealthCheckHandler healthCheckHandler);// 监听 Client 服务实例信息的更新public void registerEventListener(EurekaEventListener eventListener);}

    在 Eureka Server 中一般是通过心跳来识别一个实例的状态,而在 Eureka Client 中则存在一个定时任务定时通过 HealthCheckHandler 检测当前 Client 的状态,当 其状态发生变化的时候,将会触发新的注册事件,更新 Eureka Server 的注册表中的相关实例信息。

    3.3 DiscoveryClient 构造函数

    在 DiscoveryClient 的构造函数中,会有如下操作,如:服注册表信息、服务注册、初始化发送心跳、缓存刷新、注册定时任务等。因此 DiscoveryClient 的构造函数贯穿了 Eureka Client 启动阶段的各项任务。

    DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,Provider backupRegistryProvider, EndpointRandomizer endpointRandomizer) {// 省略相关信息}

    在DiscoveryClient 的构造函数中有如下几个参数:ApplicationInfoManager、EurekaClientConfig、AbstractDiscoveryClientOptionalArgs、Provider、EndpointRandomizer。前两个参数前面已做介绍,AbstractDiscoveryClientOptionalArgs 用于注入一些可选参数,BackupRegistry则充当备份注册中心的职责,EndpointRandomizer 则是作为端点随机器。对DiscoveryClient 的构造函数的职责做一个简单概括:

    • 相关配置赋值,如ApplicationInfoManager、EurekaClientConfig等
    • 备份注册中心初始化,默认没有实现
    • 拉去 Eureka Server 注册表信息
    • 注册前预处理
    • 向 Eureka Server 注册自身
    • 初始化定时任务、缓存刷新、按需注册定时任务

    后面将会对这些步骤中对重要点进行相关分析。

    4. Eureka Client 初始化

    接下来我们看下DiscoveryClient 是怎样初始化的(构造方法中)。代码如下:

    @InjectDiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,Provider backupRegistryProvider, EndpointRandomizer endpointRandomizer) {…// 如果开启拉取注册表的话if (clientConfig.shouldFetchRegistry()) {try {// todo 拉取注册表信息boolean primaryFetchRegistryResult = fetchRegistry(false);if (!primaryFetchRegistryResult) {logger.info(“Initial registry fetch from primary servers failed”);}…}}…// 如果进行服务注册的话 clientConfig.shouldEnforceRegistrationAtInit() 默认falseif (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {try {// todo 进行服务注册if (!register()) {throw new IllegalStateException(“Registration error at startup. Invalid server response.”);}} …}// finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetch// todo 定时任务initScheduledTasks(); …}4.1 拉取注册表信息 // 如果开启拉取注册表的话if (clientConfig.shouldFetchRegistry()) {// 拉取注册表信息boolean primaryFetchRegistryResult = fetchRegistry(false);}

    如果开启拉取注册信息,就会调用fetchRegistry 方法去Eureka Server上面拉取注册表信息。

    private boolean fetchRegistry(boolean forceFullRegistryFetch) {// If the delta is disabled or if it is the first time, get all// applications Applications applications = getApplications();if (clientConfig.shouldDisableDelta()// 关闭增量,默认false|| (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))|| forceFullRegistryFetch|| (applications == null)|| (applications.getRegisteredApplications().size() == 0)|| (applications.getVersion() == -1)) //Client application does not have latest library supporting delta{// todo 全量拉取注册表信息getAndStoreFullRegistry();} else {// todo 增量更新getAndUpdateDelta(applications);}// 设置hashCodeapplications.setAppsHashCode(applications.getReconcileHashCode());logTotalInstances();}

    可以看下最上面的注释,不启用增量 或者是第一次,就拉取全量注册表信息。

    不启用增量|| 强制全量|| 本地注册表是空的, 这个时候就会调用getAndStoreFullRegistry 方法去Eureka Server 拉取全量注册表。 否则的话调用 getAndUpdateDelta 方法获取增量注册表信息。

    4.1.1 全量拉取注册表信息

    接下来我们看下getAndStoreFullRegistry 方法,看看是怎样拉取全量注册表的。

    // 获取所有注册表信息private void getAndStoreFullRegistry() throws Throwable {long currentUpdateGeneration = fetchRegistryGeneration.get();Applications apps = null;// 交给网络传输组件,发起网络请求,获得响应EurekaHttpResponse httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null// todo apps请求url? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get()): eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {apps = httpResponse.getEntity();}if (apps == null) {logger.error(“The application is null for some reason. Not storing this information”);} else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {//localRegionApps.set(this.filterAndShuffle(apps));logger.debug(“Got full registry with apps hashcode {}”, apps.getAppsHashCode());} else {logger.warn(“Not updating applications as another thread is updating it already”);}}

    这里其实就是调用网络组件来发起请求,得到响应了,然后拿到所有得实例信息后,将实例信息设置到本地注册表中。 我们这里再深入一点,看看eurekaTransport.queryClient.getApplications(remoteRegionsRef.get()) 是请求得哪个url:

    @Overridepublic EurekaHttpResponse getApplications(String… regions) {return getApplicationsInternal(“apps/”, regions);}private EurekaHttpResponse getApplicationsInternal(String urlPath, String[] regions) {ClientResponse response = null;String regionsParamValue = null;try {WebResource webResource = jerseyClient.resource(serviceUrl).path(urlPath);// 拼接regionif (regions != null && regions.length > 0) {regionsParamValue = StringUtil.join(regions);webResource = webResource.queryParam(“regions”, regionsParamValue);}Builder requestBuilder = webResource.getRequestBuilder();addExtraHeaders(requestBuilder);// 提交get请求response = requestBuilder.accept(MediaType.APPLICATION_JSON_TYPE).get(ClientResponse.class);Applications applications = null;if (response.getStatus() == Status.OK.getStatusCode() && response.hasEntity()) {applications = response.getEntity(Applications.class);}return anEurekaHttpResponse(response.getStatus(), Applications.class).headers(headersOf(response)).entity(applications).build();} }

    拉取全量注册表的请求为:GET请求,path为:apps/

    4.1.2 增量拉取注册表信息

    getAndUpdateDelta(applications);代码如下:

    private void getAndUpdateDelta(Applications applications) throws Throwable {long currentUpdateGeneration = fetchRegistryGeneration.get();Applications delta = null;// 提交请求EurekaHttpResponse httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {delta = httpResponse.getEntity();}if (delta == null) {getAndStoreFullRegistry();} else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {String reconcileHashCode = “”;if (fetchRegistryUpdateLock.tryLock()) {try {/** * 这里要将从Server获取到的所有变更信息更新到本地缓存。这些变 * 更信来自于两类Region:本地Region与远程Region。而本地缓存也 * 分为两类:缓存本地Region的applications与缓存所有远程Region * 的注册信息的map(key为远程Region,value为该远程Region的注册 * 表) */// todoupdateDelta(delta);reconcileHashCode = getReconcileHashCode(applications);} finally {fetchRegistryUpdateLock.unlock();}} …}

    增量拉取注册表的请求: GET请求 path为: apps/delta

    然后,我们重点看一下updateDelta(delta);方法:

    private void updateDelta(Applications delta) {int deltaCount = 0;for (Application app : delta.getRegisteredApplications()) {for (InstanceInfo instance : app.getInstances()) {Applications applications = getApplications();String instanceRegion = instanceRegionChecker.getInstanceRegion(instance);// 不是本地region,远程regionif (!instanceRegionChecker.isLocalRegion(instanceRegion)) {Applications remoteApps = remoteRegionVsApps.get(instanceRegion);if (null == remoteApps) {remoteApps = new Applications();remoteRegionVsApps.put(instanceRegion, remoteApps);}applications = remoteApps;}++deltaCount;// 有新增加的实例信息if (ActionType.ADDED.equals(instance.getActionType())) {Application existingApp = applications.getRegisteredApplications(instance.getAppName());if (existingApp == null) {applications.addApplication(app);}logger.debug(“Added instance {} to the existing apps in region {}”, instance.getId(), instanceRegion);applications.getRegisteredApplications(instance.getAppName()).addInstance(instance);// 有修改的} else if (ActionType.MODIFIED.equals(instance.getActionType())) {Application existingApp = applications.getRegisteredApplications(instance.getAppName());if (existingApp == null) {applications.addApplication(app);}logger.debug(“Modified instance {} to the existing apps “, instance.getId());applications.getRegisteredApplications(instance.getAppName()).addInstance(instance);// 有删除的} else if (ActionType.DELETED.equals(instance.getActionType())) {Application existingApp = applications.getRegisteredApplications(instance.getAppName());if (existingApp != null) {logger.debug(“Deleted instance {} to the existing apps “, instance.getId());existingApp.removeInstance(instance);/* * We find all instance list from application(The status of instance status is not only the status is UP but also other status) * if instance list is empty, we remove the application. */if (existingApp.getInstancesAsIsFromEureka().isEmpty()) {applications.removeApplication(existingApp);}}}}} …}

    这个方法就是更新客户端本地的注册表信息。

    4.2 服务注册// 如果进行服务注册的话 clientConfig.shouldEnforceRegistrationAtInit() 默认falseif (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {try {// todo 进行服务注册if (!register()) {throw new IllegalStateException(“Registration error at startup. Invalid server response.”);}} catch (Throwable th) {logger.error(“Registration error at startup: {}”, th.getMessage());throw new IllegalStateException(th);}}

    如果在这里进行服务注册的话,需要配置文件中增加下面配置(默认是false):

    eureka.client.should-enforce-registration-at-init: true

    所以在这里是没有服务注册的,那么服务注册是在哪里呢?在会面分析续约定时任务时完成了服务注册,不过,我们在这里也看一下服务注册的代码:

    boolean register() throws Throwable {EurekaHttpResponse httpResponse;try {// todo 进行服务注册httpResponse = eurekaTransport.registrationClient.register(instanceInfo);} …}return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();}

    接下来看:

    @Overridepublic EurekaHttpResponse register(InstanceInfo info) {String urlPath = “apps/” + info.getAppName();Response response = null;try {Builder resourceBuilder = jerseyClient.target(serviceUrl).path(urlPath).request();addExtraProperties(resourceBuilder);addExtraHeaders(resourceBuilder);response = resourceBuilder.accept(MediaType.APPLICATION_JSON).acceptEncoding(“gzip”).post(Entity.json(info));return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();} finally {if (logger.isDebugEnabled()) {logger.debug(“Jersey2 HTTP POST {}/{} with instance {}; statusCode={}”, serviceUrl, urlPath, info.getId(),response == null ? “N/A” : response.getStatus());}if (response != null) {response.close();}}}

    服务注册:POST请求,path为:“apps/” + appName

    4.3 定时任务initScheduledTasks();

    初始化定时任务。我们分别看一下:

    4.3.1 定时更新客户端注册表任务private void initScheduledTasks() {// todo 拉取注册表 增量拉取定时任务if (clientConfig.shouldFetchRegistry()) {// registry cache refresh timer// 拉取间隔 默认是30sint registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();cacheRefreshTask = new TimedSupervisorTask(“cacheRefresh”,scheduler,cacheRefreshExecutor,registryFetchIntervalSeconds,TimeUnit.SECONDS,expBackOffBound,new CacheRefreshThread());// todo 放入定时任务,默认30s执行一次// 在这里看只有一个任务,在任务完成的时候会重新开启一个新的任务,可以点进去看看scheduler.schedule(cacheRefreshTask,registryFetchIntervalSeconds, TimeUnit.SECONDS);}}

    默认每隔30s 增量拉取注册表信息。拉取注册表信息,最终还是走我们上面介绍的fetchRegistry 方法。

    我们看一下com.netflix.discovery.TimedSupervisorTask#run:

    @Overridepublic void run() {Future future = null;try {// 使用Future,可以设定子线程的超时时间,这样当前线程就不用无限等待了future = executor.submit(task);threadPoolLevelGauge.set((long) executor.getActiveCount());// 阻塞 获取任务的执行结果future.get(timeoutMillis, TimeUnit.MILLISECONDS);// block until done or timeout// delay是个很有用的变量,后面会用到,这里记得每次执行任务成功都会将delay重置delay.set(timeoutMillis);threadPoolLevelGauge.set((long) executor.getActiveCount());successCounter.increment();} catch (TimeoutException e) {logger.warn(“task supervisor timed out”, e);timeoutCounter.increment();long currentDelay = delay.get();// 任务线程超时的时候,就把delay变量翻倍,但不会超过外部调用时设定的最大延时时间long newDelay = Math.min(maxDelay, currentDelay * 2);// 设置为最新的值,考虑到多线程,所以用了CASdelay.compareAndSet(currentDelay, newDelay);} catch (RejectedExecutionException e) {// 一旦线程池的阻塞队列中放满了待处理任务,触发了拒绝策略,就会将调度器停掉if (executor.isShutdown() || scheduler.isShutdown()) {logger.warn(“task supervisor shutting down, reject the task”, e);} else {logger.warn(“task supervisor rejected the task”, e);}rejectedCounter.increment();} catch (Throwable e) {// 一旦出现未知的异常,就停掉调度器if (executor.isShutdown() || scheduler.isShutdown()) {logger.warn(“task supervisor shutting down, can’t accept the task”);} else {logger.warn(“task supervisor threw an exception”, e);}throwableCounter.increment();} finally {// 这里任务要么执行完毕,要么发生异常,都用cancel方法来清理任务;if (future != null) {future.cancel(true);}// 只要调度器没有停止,就再指定等待时间之后在执行一次同样的任务if (!scheduler.isShutdown()) {// todo 下一次时间 再次执行这个任务//这里就是周期性任务的原因:只要没有停止调度器,就再创建一次性任务,执行时间时delay的值,//假设外部调用时传入的超时时间为30秒(构造方法的入参timeout),最大间隔时间为50秒(构造方法的入参expBackOffBound)//如果最近一次任务没有超时,那么就在30秒后开始新任务,//如果最近一次任务超时了,那么就在50秒后开始新任务(异常处理中有个乘以二的操作,乘以二后的60秒超过了最大间隔50秒)scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS);}}}

    在这个Task中时机执行的还是入参的方法new CacheRefreshThread():

    new TimedSupervisorTask(“cacheRefresh”,scheduler,cacheRefreshExecutor,registryFetchIntervalSeconds,TimeUnit.SECONDS,expBackOffBound,new CacheRefreshThread());class CacheRefreshThread implements Runnable {public void run() {refreshRegistry();}}@VisibleForTestingvoid refreshRegistry() {try {…// todo 拉取注册表boolean success = fetchRegistry(remoteRegionsModified);if (success) {registrySize = localRegionApps.get().size();lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis();} …} …}

    在执行完这个任务之后,会调用TimedSupervisorTask#run中finally代码,在这里又重新开启了新的定时任务:

    finally {// 这里任务要么执行完毕,要么发生异常,都用cancel方法来清理任务;if (future != null) {future.cancel(true);}// 只要调度器没有停止,就再指定等待时间之后在执行一次同样的任务if (!scheduler.isShutdown()) {// todo 下一次时间 再次执行这个任务//这里就是周期性任务的原因:只要没有停止调度器,就再创建一次性任务,执行时间时delay的值,//假设外部调用时传入的超时时间为30秒(构造方法的入参timeout),最大间隔时间为50秒(构造方法的入参expBackOffBound)//如果最近一次任务没有超时,那么就在30秒后开始新任务,//如果最近一次任务超时了,那么就在50秒后开始新任务(异常处理中有个乘以二的操作,乘以二后的60秒超过了最大间隔50秒)scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS);}}

    这样就实现了每隔30s调用一个拉取注册表的任务。

    4.3.2 定时服务续约任务private void initScheduledTasks() {…// 开启注册if (clientConfig.shouldRegisterWithEureka()) {// todo 服务续约定时任务// 续约间隔时间 30sint renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();// 指定client从server更新注册表的最大时间间隔指数(倍数),默认为10int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();logger.info(“Starting heartbeat executor: ” + “renew interval is: {}”, renewalIntervalInSecs);// Heartbeat timer// todo 续约,心跳定时任务heartbeatTask = new TimedSupervisorTask(“heartbeat”,scheduler,heartbeatExecutor,renewalIntervalInSecs,TimeUnit.SECONDS,expBackOffBound,new HeartbeatThread());// 续约定时任务scheduler.schedule(heartbeatTask,renewalIntervalInSecs, TimeUnit.SECONDS);

    每30s 执行一次服务续约。直接看下HeartbeatThread 类。

    private class HeartbeatThread implements Runnable {public void run() {if (renew()) {lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();}}}

    走的是renew 方法请求服务续约,成功后会更新lastSuccessfulHeartbeatTimestamp 字段。

    boolean renew() {EurekaHttpResponse httpResponse;try {httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);logger.debug(PREFIX + “{} – Heartbeat status: {}”, appPathIdentifier, httpResponse.getStatusCode());// 如果是没有发现该实例信息的话if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {REREGISTER_COUNTER.increment();logger.info(PREFIX + “{} – Re-registering apps/{}”, appPathIdentifier, instanceInfo.getAppName());long timestamp = instanceInfo.setIsDirtyWithTime();// todo 进行服务注册,如果我们不在配置文件中指定服务初始化就注册该服务,那么服务的注册实际是在这里注册的boolean success = register();if (success) {instanceInfo.unsetIsDirty(timestamp);}return success;}return httpResponse.getStatusCode() == Status.OK.getStatusCode();} catch (Throwable e) {logger.error(PREFIX + “{} – was unable to send heartbeat!”, appPathIdentifier, e);return false;}}

    很简单,就是调用 eurekaTransport.registrationClient.sendHeartBeat 方法发送服务续约的请求,如果你实例信息在Eureka Server中不存在的话,就进行服务注册,我们再稍微看下sendHeartBeat 方法,里面请求uri就是 String urlPath = “apps/” + appName + ‘/’ + id;

    服务续约请求:PUT请求, path为:apps/{appName}/{instanceId}

    4.3.3 定时更新Client信息给Server任务private void initScheduledTasks() {…// 开启注册if (clientConfig.shouldRegisterWithEureka()) {…// todo 定时更新Client信息给服务端// InstanceInfo replicatorinstanceInfoReplicator = new InstanceInfoReplicator(this,instanceInfo,clientConfig.getInstanceInfoReplicationIntervalSeconds(),2); // burstSizestatusChangeListener = new ApplicationInfoManager.StatusChangeListener() {@Overridepublic String getId() {return “statusChangeListener”;}// 监听到StatusChangeEvent 事件,调用notify方法@Overridepublic void notify(StatusChangeEvent statusChangeEvent) {logger.info(“Saw local status change event {}”, statusChangeEvent);// todo 通知执行方法,这个方法就是立即向 服务端发起注册请求instanceInfoReplicator.onDemandUpdate();}};// 向applicationInfoManager 中注册 状态变化事件监听器if (clientConfig.shouldOnDemandUpdateStatusChange()) {applicationInfoManager.registerStatusChangeListener(statusChangeListener);}// todo参数默认40sinstanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());} …}

    我们看下这个start启动 方法:

    public void start(int initialDelayMs) { if (started.compareAndSet(false, true)) { instanceInfo.setIsDirty();// for initial register Future next = scheduler.schedule(this, initialDelayMs, TimeUnit.SECONDS); scheduledPeriodicRef.set(next); }}

    这里有个非常重要的点,调用了实例信息的setIsDirty 方法,后面的注释说是为了初始化服务注册。

    创建一个延时任务,默认是40s。看看40s执行啥东西。com.netflix.discovery.InstanceInfoReplicator#run:

    public void run() {try {// 刷新实例信息discoveryClient.refreshInstanceInfo();// 获取脏的时间戳Long dirtyTimestamp = instanceInfo.isDirtyWithTime();if (dirtyTimestamp != null) {// todo 客户端重新发起注册请求discoveryClient.register();instanceInfo.unsetIsDirty(dirtyTimestamp);}} catch (Throwable t) {logger.warn(“There was a problem with the instance info replicator”, t);} finally {Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);scheduledPeriodicRef.set(next);}}

    如果这个时间戳不是null的话,调用register 方法进行服务注册,这个时间戳肯定不是null的, instanceInfo.setIsDirty(); // for initial register 我们上面这个方法就是设置了这个时间戳。最后又将这个任务放入延时调度中。

    其实这个定时任务是为了检测服务信息有没有变动,如果有变动重新注册到Eureka Server上去。

    下面我们来看一下状态改变监听器statusChangeListener:

    statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {@Overridepublic String getId() {return “statusChangeListener”;}// 监听到StatusChangeEvent 事件,调用notify方法@Overridepublic void notify(StatusChangeEvent statusChangeEvent) {logger.info(“Saw local status change event {}”, statusChangeEvent);// todo 通知执行方法,这个方法就是立即向 服务端发起注册请求instanceInfoReplicator.onDemandUpdate();}};// 向applicationInfoManager 中注册 状态变化事件监听器if (clientConfig.shouldOnDemandUpdateStatusChange()) {applicationInfoManager.registerStatusChangeListener(statusChangeListener);}

    如果 Eureka Client 状态发生变化(在Spring Boot 通过 Actuator 对服务状态进行监控,具体实现为 EurekaHealthCheckHandler),注册在 ApplicationInfoManager 的状态改变监控器将会被触发,从而调用InstanceInfoReplicator#onDemandUpdate方法,检查服务实例信息和服务状态的变化,可能会引起按需注册任务,代码如下:

    public boolean onDemandUpdate() {if (rateLimiter.acquire(burstSize, allowedRatePerMinute)) {if (!scheduler.isShutdown()) {// 提交scheduler.submit(new Runnable() {@Overridepublic void run() {logger.debug(“Executing on-demand update of local InstanceInfo”);Future latestPeriodic = scheduledPeriodicRef.get();if (latestPeriodic != null && !latestPeriodic.isDone()) {logger.debug(“Canceling the latest scheduled update, it will be rescheduled at the end of on demand update”);// 取消定时任务latestPeriodic.cancel(false);}// todo 执行 向 Server端重新 注册的请求InstanceInfoReplicator.this.run();}});return true;} else {logger.warn(“Ignoring onDemand update due to stopped scheduler”);return false;}} else {logger.warn(“Ignoring onDemand update due to rate limiter”);return false;}}

    InstanceInfoReplicator#onDemandUpdate 方法中调用 InstanceInfoReplicator#run 方法检查服务实例信息和服务状态的变化,并在服务实例信息和服务状态发生变化的情况下向 Eureka Server 发起重新注册的请求,为了防止重新执行 run 方法,onDemandUpdate 方法还会取消执行上次已经提交且未完成的 run方法,执行最新的按需注册任务。

    4.4 总结服务注册的时机

    Client提交register()请求的情况有三种:

    • 在应用启动时就可以直接进行register(),不过,需要提前在配置文件中配置
    • 在renew时,如果server端返回的是NOT_FOUND,则提交register()
    • 当Client的配置信息发生了变更,则Client提交register()

    Client实例化

    Eureka Client 实例化的时候有几个重要步骤,分别如下:

    • 全量拉取注册表信息,放入自己本地注册表中。
    • 创建定时任务, 定时服务续约任务,默认是30s, 定时更新 客户端注册表信息,默认是30s, 定时更新Client信息给Server端,重新服务注册,默认是40s。

    参考文章

    eureka-0.10.11源码(注释)springcloud-source-study学习github地址Eureka源码解析SpringCloud技术栈系列文章

    本文内容及图片均整理自互联网,不代表本站立场,版权归原作者所有,如有侵权请联系admin#jikehao.com删除。
    (0)
    极客号的头像极客号
    上一篇 2022年 10月 14日
    下一篇 2022年 10月 14日

    相关推荐

    • 开幼儿园失败的惨痛经历

      1 最近刚好分享了一个开猫舍的朋友的失败故事,被一个开幼儿园的朋友看到了。 昨晚上死活抓着我要分享一下她的糟心经历。 她刚好来杭州和人谈生意,打算把自己开了九年的幼儿园卖掉。 准确…

      2022年 9月 28日
    • 美媒:美国消费者饱受通胀困扰 自由支配金额所剩无几

      综合美国金融媒体“Advisor Perspective”、彭博社15日消息,美国9月通胀率增幅不减,核心物价涨幅冲上40年来最高位。美媒认为,美国消费者正在承受“猖獗上涨”的价格…

      2022年 10月 17日
    • 50岁的刘强东退休,29岁的章泽天身价暴露,她的第二春终于来了

      这大概才是奶茶妹妹章泽天最大的反击吧! 1993年出生的奶茶妹妹,在最美好的年华,因为爱情和自己的老公刘强东在一起,婚后立马生了女儿稳坐京东老板娘的位置。可惜婚姻中的男人总是容易出…

      2022年 9月 27日
    • 大蒜能杀菌抗肿瘤?每周生吃两次可降低44%肺癌风险?靠谱吗

      “生吃大蒜能抗癌!” 吴伯最近迷上了生吃大蒜,一开口就熏得别人马上“逃走”。这不,今天他又开始拿着一瓣蒜开始就着酒来吃,不过因为之前买得太多,蒜都放发芽了,他也丝毫不介意。 “你这…

      2022年 10月 2日
    • 腐剧开车

        最近很少安利腐剧的剧姐感到很惭愧!毕竟自从看了《一年生》之后,剧姐的口味越来越挑剔了~   不过剧姐听说,台湾最近有部腐剧《HIStory》很火,于是鸡冻的剧姐就屁颠屁颠的去看…

      2023年 8月 17日
    • RNG晋级八强

      #RNG晋级八强# 在今天的加赛中,RNG 0:1 GEN 不敌对手。在经历今日的4场比赛后,最终以D组第二晋级八强!RNG加油!5分半对方拿下一血;前期我方控下小龙和先锋资源,但…

      2022年 10月 17日
    • 2021城市新封面 -11区县地标建筑盘点

      1 桃城区 衡水摩天轮日景 拍摄:S眼看衡水 衡水摩天轮夜景 拍摄:宗悦新 桃城区2021年新建成的“建筑地标“,最具城市符号象征的应属“衡水摩天轮”。 “衡水摩天轮”荣登城市封面…

      2022年 10月 2日
    • 致“马化腾先生”的一封公开信

      致“马化腾先生”的一封公开信 敬佩的马化腾先生你好!今冒昧来信,也不知你是否能收到,但我如鲠在喉的几句话还是想对你说: 我们的共同点就是同年代的人,不同点就是你牢牢抓住了十一届三中…

      2022年 10月 16日
    • 一九四九我来自未来下部(一九四九)

      大家好,今天给各位分享一九四九的一些知识,其中也会对一九四九我来自未来下部进行解释,文章篇幅可能偏长,如果能碰巧解决你现在面临的问题,别忘了关注本站,现在就马上开始吧! 一九四九年…

      2023年 5月 2日
    • 东安子鸡(东安子鸡的历史典故)

      本文主要讲的是东安子鸡,以及和东安子鸡的历史典故相关的知识,如果觉得本文对您有所帮助,不要忘了将本文分享给朋友。 湖南名菜东安仔鸡有何历史典故,做法有什么窍门吗?    东安鸡的典…

      2022年 11月 25日

    合作联系:1152QQ041010