spring eureka源码学习(上)

Springcloud eureka源码学习

了解过程,弄清含义,明白原理是学习源码的三个心法。了解执行过程,弄清具体含义、明白其中原理。

1. Eureka-client初始化

Eureka-client初始化

结合上图,Eureka-client初始化过程包括5个步骤

  1. 创建 EurekaInstanceConfig对象
  2. 使用 EurekaInstanceConfig对象创建InstanceInfo对象
  3. 使用 EurekaInstanceConfig对象 + InstanceInfo对象 创建 ApplicationInfoManager对象
  4. 创建 EurekaClientConfig对象
  5. 使用 ApplicationInfoManager对象 + EurekaClientConfig对象 创建 EurekaClient对象

上述类含义解释

  • EurekaInstanceConfig:记录一些应用实例的配置信息,如应用名、端口、IP、应用组名、租约续约频率等信息。

  • InstanceInfo:InstanceInfo是根据EurekaInstanceConfig创建的,所以InstanceInfo基本上同后者的某些属性是一致的,Eureka-client向Eureka-server注册的就是这个实例信息。

  • ApplicationInfoManager:管理上面的两个实例。

  • EurekaClientConfig:配置客户端所在的region和zone。 springcloud eureka中的region和zone是指什么

  • EurekaClient:这是最为重要的一个对象,通过这个对象可以1.获取其他应用实例信息。2.注册本地客户端的健康检查和事件监听。3.注册、续约、取消、查询服务都和它有关。

原理
每一个eureka客户端在启动后,都会读取环境变量然后构建配置信息,称为eureka-client的初始化。准备好自身的初始化信息后,下一步才能够注册到注册中心服务器。


InstanceRegistry

本节简单介绍下InstanceRegistry,即注册表类,他继承了lookupService接口和LeaseManager两个接口,前者提供了多种应用实例的查询方法,包括通过serviceId查询、查询全部、轮询。后者提供了租用的所有方法,包括租约的注册、续租、取消( 主动下线 )、过期( 过期下线 )。而InstanceRegistry丰富了租约的功能,并提供了更多个性化的定制,包括应用实例状态变更监控、缓存、自我保护、调用和监控。

2. Eureka-server初始化

同Eureka-client一样,Eureka-server在启动的时候同样是读取一些属性并做一些基本的配置。这里的配置会写到EurekaServerConfig这个对象中,它们包括认证相关、限流相关、注册信息相关、自我保护机制、集群相关。然后EurekaBootStrap这个对象会初始化应用的上下文信息。

3. 应用实例注册

三个问题

  1. 什么时候注册?
  2. 注册的时候客户端发生了什么?
  3. 注册的时候服务端发生了什么?

1. 什么时候注册?

结论:实例状态变更的时候会注册 ,应用信息从starting变为up的时候会触发一个监听器,然后会调用instanceInfoReplicator的方法完成注册。不止第一次会注册,在状态变更的时候也会注册

2. 注册的时候客户端发生了什么?

结论:调用registry方法向server端发送自身实例信息在上面已经讲过EurekaClient这个对象是是应用客户端实例和服务器交互的关键类,所以在Eureka-client发起注册的时候也是这个类的子类来调用一个Registry方法,这个方法做了一件事情就是向server端发送post请求将自身的实例信息类InstanceInfo发送过去。

3. 注册的时候服务端发生了什么?

结论:调用父类方法注册和同步到集群中的其他节点,然后主动让response缓存失效,会调用父类注册方法注册,如果注册中心是集群模式,会将自身的增量信息加入到一个修改队列里面,以便集群中的其他Eureka-server实例同步应用实例的信息。eureka-server发生了什么

对于第三点的详细过程:

  • 最近修改队列:这个队列是一个默认长度为180s的队列,保存最近180s内应用实例的修改情况。

  • Response缓存:Eureka-client的查询请求会首先经过Response缓存,在注册和取消的时候由于应用实例的修改会使缓存失效,避免读到脏数据。

这里对最近修改队列的线程安全使用了读写锁,但是和通常使用读写锁的方式不太一样,。这里多个实例在写最近修改队列的时候使用读锁,读应用实例信息的时候使用写锁,每个服务实例注册,取消的时候,都会修改这个队列,这个队列是多线程修改的。但是读取,只有loadingcache的ALL_APPS_DELTAkey初始化线程会读取,而且在缓存失效前都不会再有线程读取。所以可以归纳为,多线程频繁修改,但是单线程不频繁读取。所以这里是一个写多读少的场景。

EurekaServer缓存

4. 应用实例续租

两个问题

  1. 为什么要续租?
  2. 续租的过程是怎样的?

1. 为什么要续租?

在注册完服务后,服务提供者会维护一个心跳用来持告诉Eureka Server自己还“活着”,以防止被Eureka Server的剔除任务将该服务实例下线,所以要续租(renew)。

  1. 续租的过程是怎样的?
    同样,续租的过程分为client发起续租和server接收续租。client的续租逻辑依然写在EurekaClient子类中,但是整个续租的过程是值得研究的,在学习后能收获一些定时任务的知识。

Eureka-client发起续租

同样,还是在DiscoveryClient执行续租逻辑,开启一个heartbeat任务定时向server发送续租请求,这是一个put请求,会更新租约信息(时长等),如果调用远程server成功,则开启下一次的定时任务,如果不存在租约,就发起一次注册任务。那么这里的问题是,作者在写这段代码的时候是如何写这个定时任务的,也就是如何完成每隔一段时间向server发送一次续租请求,如果请求失败又是如何完成处理下一个续租请求的。

续租逻辑
// DiscoveryClient.java

DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
               Provider<BackupRegistry> backupRegistryProvider) {
    // ... 省略无关代码
    scheduler = Executors.newScheduledThreadPool(2,
               new ThreadFactoryBuilder()
                       .setNameFormat("DiscoveryClient-%d")
                       .setDaemon(true)
                       .build());

    heartbeatExecutor = new ThreadPoolExecutor(
              1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
              new SynchronousQueue<Runnable>(),
              new ThreadFactoryBuilder()
                      .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
                      .setDaemon(true)
                      .build()
    );  // use direct handoff

    // ... 省略无关代码

    // 【3.2.14】初始化定时任务
    initScheduledTasks();
}

private void initScheduledTasks() {

    // 向 Eureka-Server 心跳(续租)执行器
    if (clientConfig.shouldRegisterWithEureka()) {
        int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs(); // 续租频率
        int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound(); //
        logger.info("Starting heartbeat executor: " + "renew interval is: " + renewalIntervalInSecs);

        // Heartbeat timer
        scheduler.schedule(
               new TimedSupervisorTask(
                       "heartbeat",
                       scheduler,
                       heartbeatExecutor,
                       renewalIntervalInSecs,
                       TimeUnit.SECONDS,
                       expBackOffBound,
                       new HeartbeatThread()
               ),
               renewalIntervalInSecs, TimeUnit.SECONDS);

          // ... 省略无关代码
     }
     // ... 省略无关代码
}

执行逻辑:

  1. 创建了两个线程池,一个定时调度线程和一个自定义线程池。但是作者没有直接用定时调度线程池执行定时续租逻辑,而是使用了scheduler.schedule这个方法,它的特定是在规定的延迟后只执行一次任务,具体用法如下:
    // 执行一个一次性任务
    // command:command中的run方法代表需要执行的任务
    // delay:执行任务的延迟
    public ScheduledFuture<?> schedule(Runnable command,
                                       long delay,
                                       TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        // 将任务包装成ScheduledThreadPoolExecutor.ScheduledFutureTask对象
        // decorateTask方法直接返回这个ScheduledThreadPoolExecutor.ScheduledFutureTask对象
        // 子类中可以重写decorateTask方法
        RunnableScheduledFuture<?> t = decorateTask(command,
            new ScheduledFutureTask<Void>(command, null,
                                          triggerTime(delay, unit)));
        delayedExecute(t);
        return t;
    }
    
    那么可以看到传入的这个实现了Runnable这个接口的类TimerSuperVisorTask,它是一个实现了TimerTask的类,而TimerTask一般与Timer配合使用,在这里被当做Runnable传入了newScheduledThreadPool线程池中执行,作者有什么意图呢?猜不透~~

再来看看TimeSuperVisorTask里面到底实现了什么,
public class TimedSupervisorTask extends TimerTask {

    private final Counter timeoutCounter;
    private final Counter rejectedCounter;
    private final Counter throwableCounter;
    private final LongGauge threadPoolLevelGauge;

    /**
     * 定时任务服务
     */
    private final ScheduledExecutorService scheduler;
    /**
     * 执行子任务线程池
     */
    private final ThreadPoolExecutor executor;
    /**
     * 子任务执行超时时间
     */
    private final long timeoutMillis;
    /**
     * 子任务
     */
    private final Runnable task;
    /**
     * 当前任子务执行频率
     */
    private final AtomicLong delay;
    /**
     * 最大子任务执行频率
     *
     * 子任务执行超时情况下使用
     */
    private final long maxDelay;

    public TimedSupervisorTask(String name, ScheduledExecutorService scheduler, ThreadPoolExecutor executor,
                               int timeout, TimeUnit timeUnit, int expBackOffBound, Runnable task) {
        this.scheduler = scheduler;
        this.executor = executor;
        this.timeoutMillis = timeUnit.toMillis(timeout);
        this.task = task;
        this.delay = new AtomicLong(timeoutMillis);
        this.maxDelay = timeoutMillis * expBackOffBound;

        // Initialize the counters and register.
        timeoutCounter = Monitors.newCounter("timeouts");
        rejectedCounter = Monitors.newCounter("rejectedExecutions");
        throwableCounter = Monitors.newCounter("throwables");
        threadPoolLevelGauge = new LongGauge(MonitorConfig.builder("threadPoolUsed").build());
        Monitors.registerObject(name, this);
    }

}

以上是这个类的成员变量和构造函数,接下来是关键的run方法:

// TimedSupervisorTask.java
  1: @Override
  2: public void run() {
  3:     Future<?> future = null;
  4:     try {
  5:         // 提交 任务
  6:         future = executor.submit(task);
  7:         //
  8:         threadPoolLevelGauge.set((long) executor.getActiveCount());
  9:         // 等待任务 执行完成 或 超时
 10:         future.get(timeoutMillis, TimeUnit.MILLISECONDS);  // block until done or timeout
 11:         // 设置 下一次任务执行频率
 12:         delay.set(timeoutMillis);
 13:         //
 14:         threadPoolLevelGauge.set((long) executor.getActiveCount());
 15:     } catch (TimeoutException e) {
 16:         logger.error("task supervisor timed out", e);
 17:         timeoutCounter.increment(); //
 18: 
 19:         // 设置 下一次任务执行频率
 20:         long currentDelay = delay.get();
 21:         long newDelay = Math.min(maxDelay, currentDelay * 2);
 22:         delay.compareAndSet(currentDelay, newDelay);
 23: 
 24:     } catch (RejectedExecutionException e) {
 25:         if (executor.isShutdown() || scheduler.isShutdown()) {
 26:             logger.warn("task supervisor shutting down, reject the task", e);
 27:         } else {
 28:             logger.error("task supervisor rejected the task", e);
 29:         }
 30: 
 31:         rejectedCounter.increment(); //
 32:     } catch (Throwable e) {
 33:         if (executor.isShutdown() || scheduler.isShutdown()) {
 34:             logger.warn("task supervisor shutting down, can't accept the task");
 35:         } else {
 36:             logger.error("task supervisor threw an exception", e);
 37:         }
 38: 
 39:         throwableCounter.increment(); //
 40:     } finally {
 41:         // 取消 未完成的任务
 42:         if (future != null) {
 43:             future.cancel(true);
 44:         }
 45: 
 46:         // 调度 下次任务
 47:         if (!scheduler.isShutdown()) {
 48:             scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS);
 49:         }
 50:     }
 51: }

执行逻辑:提交子任务(发送续租请求),如果成功就提交自己到scheduler中并设置同样的时延执行下一次的续租,这里是一个鸡生蛋蛋生鸡的问题,scheduler中提交TimedSupervisorTask任务,然后TimedSupervisorTask中又会传入scheduler并调用它;那如果提交失败,就会捕获超时任务,然后将时延增加为上次的两倍再提交。

Eureka-server接收续租

主要是一些失败的处理方法,新的注册数据和旧的注册数据之间的一致性对比,只有在发生变化的时候才会改变应用实例信息。

5. 应用实例下线

Eureka-client部分没有很多有价值的部分,正常的下线逻辑。。

Eureka-server下线某个实例的时候同时也会对注册的时候做的操作做一遍反操作:

6: protected boolean internalCancel(String appName, String id, boolean isReplication) {
 7:     try {
 8:         // 获得读锁
 9:         read.lock();
10:         // 增加 取消注册次数 到 监控
11:         CANCEL.increment(isReplication);
12:         // 移除 租约映射
13:         Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
14:         Lease<InstanceInfo> leaseToCancel = null;
15:         if (gMap != null) {
16:             leaseToCancel = gMap.remove(id);
17:         }
18:         // 添加到 最近取消注册的调试队列
19:         synchronized (recentCanceledQueue) {
20:             recentCanceledQueue.add(new Pair<Long, String>(System.currentTimeMillis(), appName + "(" + id + ")"));
21:         }
22:         // 移除 应用实例覆盖状态映射
23:         InstanceStatus instanceStatus = overriddenInstanceStatusMap.remove(id);
24:         if (instanceStatus != null) {
25:             logger.debug("Removed instance id {} from the overridden map which has value {}", id, instanceStatus.name());
26:         }
27:         // 租约不存在
28:         if (leaseToCancel == null) {
29:             CANCEL_NOT_FOUND.increment(isReplication); // 添加 取消注册不存在 到 监控
30:             logger.warn("DS: Registry: cancel failed because Lease is not registered for: {}/{}", appName, id);
31:             return false; // 失败
32:         } else {
33:             // 设置 租约的取消注册时间戳
34:             leaseToCancel.cancel();
35:             // 添加到 最近租约变更记录队列
36:             InstanceInfo instanceInfo = leaseToCancel.getHolder();
37:             String vip = null;
38:             String svip = null;
39:             if (instanceInfo != null) {
40:                 instanceInfo.setActionType(ActionType.DELETED);
41:                 recentlyChangedQueue.add(new RecentlyChangedItem(leaseToCancel));
42:                 instanceInfo.setLastUpdatedTimestamp();
43:                 vip = instanceInfo.getVIPAddress();
44:                 svip = instanceInfo.getSecureVipAddress();
45:             }
46:             // 设置 响应缓存 过期
47:             invalidateCache(appName, vip, svip);
48:             logger.info("Cancelled instance {}/{} (replication={})", appName, id, isReplication);
49:             return true; // 成功
50:         }
51:     } finally {
52:         // 释放锁
53:         read.unlock();
54:     }
55: }
  1. 集群下线状态同步。
  2. 移除租约映射,Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);取得目前所有维护的租约信息,然后根据ID删除租约信息。
  3. 添加到最近取消注册实例队列
  4. 使缓存失效