Springcloud eureka源码学习(上)
了解过程,弄清含义,明白原理是学习源码的三个心法。了解执行过程,弄清具体含义、明白其中原理。
1. Eureka-client初始化

结合上图,Eureka-client初始化过程包括5个步骤
- 创建 EurekaInstanceConfig对象
- 使用 EurekaInstanceConfig对象创建InstanceInfo对象
- 使用 EurekaInstanceConfig对象 + InstanceInfo对象 创建 ApplicationInfoManager对象
- 创建 EurekaClientConfig对象
- 使用 ApplicationInfoManager对象 + EurekaClientConfig对象 创建 EurekaClient对象
上述类含义解释
EurekaInstanceConfig:记录一些应用实例的配置信息,如应用名、端口、IP、应用组名、租约续约频率等信息。
InstanceInfo:InstanceInfo是根据EurekaInstanceConfig创建的,所以InstanceInfo基本上同后者的某些属性是一致的,Eureka-client向Eureka-server注册的就是这个实例信息。
ApplicationInfoManager:管理上面的两个实例。
EurekaClientConfig:配置客户端所在的region和zone。 springcloud eureka中的region和zone是指什么
EurekaClient:这是最为重要的一个对象,通过这个对象可以1.获取其他应用实例信息。2.注册本地客户端的健康检查和事件监听。3.注册、续约、取消、查询服务都和它有关。
原理
每一个eureka客户端在启动后,都会读取环境变量然后构建配置信息,称为eureka-client的初始化。准备好自身的初始化信息后,下一步才能够注册到注册中心服务器。
InstanceRegistry
本节简单介绍下InstanceRegistry,即注册表类,他继承了lookupService接口和LeaseManager两个接口,前者提供了多种应用实例的查询方法,包括通过serviceId查询、查询全部、轮询。后者提供了租用的所有方法,包括租约的注册、续租、取消( 主动下线 )、过期( 过期下线 )。而InstanceRegistry丰富了租约的功能,并提供了更多个性化的定制,包括应用实例状态变更监控、缓存、自我保护、调用和监控。
2. Eureka-server初始化
同Eureka-client一样,Eureka-server在启动的时候同样是读取一些属性并做一些基本的配置。这里的配置会写到EurekaServerConfig这个对象中,它们包括认证相关、限流相关、注册信息相关、自我保护机制、集群相关。然后EurekaBootStrap这个对象会初始化应用的上下文信息。
3. 应用实例注册
三个问题
- 什么时候注册?
- 注册的时候客户端发生了什么?
- 注册的时候服务端发生了什么?
1. 什么时候注册?
结论:实例状态变更的时候会注册 ,应用信息从starting变为up的时候会触发一个监听器,然后会调用instanceInfoReplicator的方法完成注册。不止第一次会注册,在状态变更的时候也会注册。
2. 注册的时候客户端发生了什么?
结论:调用registry方法向server端发送自身实例信息在上面已经讲过EurekaClient这个对象是是应用客户端实例和服务器交互的关键类,所以在Eureka-client发起注册的时候也是这个类的子类来调用一个Registry方法,这个方法做了一件事情就是向server端发送post请求将自身的实例信息类InstanceInfo发送过去。
3. 注册的时候服务端发生了什么?
结论:调用父类方法注册和同步到集群中的其他节点,然后主动让response缓存失效,会调用父类注册方法注册,如果注册中心是集群模式,会将自身的增量信息加入到一个修改队列里面,以便集群中的其他Eureka-server实例同步应用实例的信息。eureka-server发生了什么
对于第三点的详细过程:
最近修改队列:这个队列是一个默认长度为180s的队列,保存最近180s内应用实例的修改情况。
Response缓存:Eureka-client的查询请求会首先经过Response缓存,在注册和取消的时候由于应用实例的修改会使缓存失效,避免读到脏数据。
这里对最近修改队列的线程安全使用了读写锁,但是和通常使用读写锁的方式不太一样,。这里多个实例在写最近修改队列的时候使用读锁,读应用实例信息的时候使用写锁,每个服务实例注册,取消的时候,都会修改这个队列,这个队列是多线程修改的。但是读取,只有loadingcache的ALL_APPS_DELTAkey初始化线程会读取,而且在缓存失效前都不会再有线程读取。所以可以归纳为,多线程频繁修改,但是单线程不频繁读取。所以这里是一个写多读少的场景。

4. 应用实例续租
两个问题
- 为什么要续租?
- 续租的过程是怎样的?
1. 为什么要续租?
在注册完服务后,服务提供者会维护一个心跳用来持告诉Eureka Server自己还“活着”,以防止被Eureka Server的剔除任务将该服务实例下线,所以要续租(renew)。
- 续租的过程是怎样的?
同样,续租的过程分为client发起续租和server接收续租。client的续租逻辑依然写在EurekaClient子类中,但是整个续租的过程是值得研究的,在学习后能收获一些定时任务的知识。
Eureka-client发起续租
同样,还是在DiscoveryClient执行续租逻辑,开启一个heartbeat任务定时向server发送续租请求,这是一个put请求,会更新租约信息(时长等),如果调用远程server成功,则开启下一次的定时任务,如果不存在租约,就发起一次注册任务。那么这里的问题是,作者在写这段代码的时候是如何写这个定时任务的,也就是如何完成每隔一段时间向server发送一次续租请求,如果请求失败又是如何完成处理下一个续租请求的。
续租逻辑
// DiscoveryClient.java
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
Provider<BackupRegistry> backupRegistryProvider) {
// ... 省略无关代码
scheduler = Executors.newScheduledThreadPool(2,
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-%d")
.setDaemon(true)
.build());
heartbeatExecutor = new ThreadPoolExecutor(
1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
.setDaemon(true)
.build()
); // use direct handoff
// ... 省略无关代码
// 【3.2.14】初始化定时任务
initScheduledTasks();
}
private void initScheduledTasks() {
// 向 Eureka-Server 心跳(续租)执行器
if (clientConfig.shouldRegisterWithEureka()) {
int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs(); // 续租频率
int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound(); //
logger.info("Starting heartbeat executor: " + "renew interval is: " + renewalIntervalInSecs);
// Heartbeat timer
scheduler.schedule(
new TimedSupervisorTask(
"heartbeat",
scheduler,
heartbeatExecutor,
renewalIntervalInSecs,
TimeUnit.SECONDS,
expBackOffBound,
new HeartbeatThread()
),
renewalIntervalInSecs, TimeUnit.SECONDS);
// ... 省略无关代码
}
// ... 省略无关代码
}
执行逻辑:
- 创建了两个线程池,一个定时调度线程和一个自定义线程池。但是作者没有直接用定时调度线程池执行定时续租逻辑,而是使用了scheduler.schedule这个方法,它的特定是在规定的延迟后只执行一次任务,具体用法如下:
// 执行一个一次性任务
那么可以看到传入的这个实现了Runnable这个接口的类TimerSuperVisorTask,它是一个实现了TimerTask的类,而TimerTask一般与Timer配合使用,在这里被当做Runnable传入了newScheduledThreadPool线程池中执行,作者有什么意图呢?猜不透~~// command:command中的run方法代表需要执行的任务 // delay:执行任务的延迟 public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); // 将任务包装成ScheduledThreadPoolExecutor.ScheduledFutureTask对象 // decorateTask方法直接返回这个ScheduledThreadPoolExecutor.ScheduledFutureTask对象 // 子类中可以重写decorateTask方法 RunnableScheduledFuture<?> t = decorateTask(command, new ScheduledFutureTask<Void>(command, null, triggerTime(delay, unit))); delayedExecute(t); return t; }
再来看看TimeSuperVisorTask里面到底实现了什么,
public class TimedSupervisorTask extends TimerTask {
private final Counter timeoutCounter;
private final Counter rejectedCounter;
private final Counter throwableCounter;
private final LongGauge threadPoolLevelGauge;
/**
* 定时任务服务
*/
private final ScheduledExecutorService scheduler;
/**
* 执行子任务线程池
*/
private final ThreadPoolExecutor executor;
/**
* 子任务执行超时时间
*/
private final long timeoutMillis;
/**
* 子任务
*/
private final Runnable task;
/**
* 当前任子务执行频率
*/
private final AtomicLong delay;
/**
* 最大子任务执行频率
*
* 子任务执行超时情况下使用
*/
private final long maxDelay;
public TimedSupervisorTask(String name, ScheduledExecutorService scheduler, ThreadPoolExecutor executor,
int timeout, TimeUnit timeUnit, int expBackOffBound, Runnable task) {
this.scheduler = scheduler;
this.executor = executor;
this.timeoutMillis = timeUnit.toMillis(timeout);
this.task = task;
this.delay = new AtomicLong(timeoutMillis);
this.maxDelay = timeoutMillis * expBackOffBound;
// Initialize the counters and register.
timeoutCounter = Monitors.newCounter("timeouts");
rejectedCounter = Monitors.newCounter("rejectedExecutions");
throwableCounter = Monitors.newCounter("throwables");
threadPoolLevelGauge = new LongGauge(MonitorConfig.builder("threadPoolUsed").build());
Monitors.registerObject(name, this);
}
}
以上是这个类的成员变量和构造函数,接下来是关键的run方法:
// TimedSupervisorTask.java
1: @Override
2: public void run() {
3: Future<?> future = null;
4: try {
5: // 提交 任务
6: future = executor.submit(task);
7: //
8: threadPoolLevelGauge.set((long) executor.getActiveCount());
9: // 等待任务 执行完成 或 超时
10: future.get(timeoutMillis, TimeUnit.MILLISECONDS); // block until done or timeout
11: // 设置 下一次任务执行频率
12: delay.set(timeoutMillis);
13: //
14: threadPoolLevelGauge.set((long) executor.getActiveCount());
15: } catch (TimeoutException e) {
16: logger.error("task supervisor timed out", e);
17: timeoutCounter.increment(); //
18:
19: // 设置 下一次任务执行频率
20: long currentDelay = delay.get();
21: long newDelay = Math.min(maxDelay, currentDelay * 2);
22: delay.compareAndSet(currentDelay, newDelay);
23:
24: } catch (RejectedExecutionException e) {
25: if (executor.isShutdown() || scheduler.isShutdown()) {
26: logger.warn("task supervisor shutting down, reject the task", e);
27: } else {
28: logger.error("task supervisor rejected the task", e);
29: }
30:
31: rejectedCounter.increment(); //
32: } catch (Throwable e) {
33: if (executor.isShutdown() || scheduler.isShutdown()) {
34: logger.warn("task supervisor shutting down, can't accept the task");
35: } else {
36: logger.error("task supervisor threw an exception", e);
37: }
38:
39: throwableCounter.increment(); //
40: } finally {
41: // 取消 未完成的任务
42: if (future != null) {
43: future.cancel(true);
44: }
45:
46: // 调度 下次任务
47: if (!scheduler.isShutdown()) {
48: scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS);
49: }
50: }
51: }
执行逻辑:提交子任务(发送续租请求),如果成功就提交自己到scheduler中并设置同样的时延执行下一次的续租,这里是一个鸡生蛋蛋生鸡的问题,scheduler中提交TimedSupervisorTask任务,然后TimedSupervisorTask中又会传入scheduler并调用它;那如果提交失败,就会捕获超时任务,然后将时延增加为上次的两倍再提交。
Eureka-server接收续租
主要是一些失败的处理方法,新的注册数据和旧的注册数据之间的一致性对比,只有在发生变化的时候才会改变应用实例信息。
5. 应用实例下线
Eureka-client部分没有很多有价值的部分,正常的下线逻辑。。
Eureka-server下线某个实例的时候同时也会对注册的时候做的操作做一遍反操作:
6: protected boolean internalCancel(String appName, String id, boolean isReplication) {
7: try {
8: // 获得读锁
9: read.lock();
10: // 增加 取消注册次数 到 监控
11: CANCEL.increment(isReplication);
12: // 移除 租约映射
13: Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
14: Lease<InstanceInfo> leaseToCancel = null;
15: if (gMap != null) {
16: leaseToCancel = gMap.remove(id);
17: }
18: // 添加到 最近取消注册的调试队列
19: synchronized (recentCanceledQueue) {
20: recentCanceledQueue.add(new Pair<Long, String>(System.currentTimeMillis(), appName + "(" + id + ")"));
21: }
22: // 移除 应用实例覆盖状态映射
23: InstanceStatus instanceStatus = overriddenInstanceStatusMap.remove(id);
24: if (instanceStatus != null) {
25: logger.debug("Removed instance id {} from the overridden map which has value {}", id, instanceStatus.name());
26: }
27: // 租约不存在
28: if (leaseToCancel == null) {
29: CANCEL_NOT_FOUND.increment(isReplication); // 添加 取消注册不存在 到 监控
30: logger.warn("DS: Registry: cancel failed because Lease is not registered for: {}/{}", appName, id);
31: return false; // 失败
32: } else {
33: // 设置 租约的取消注册时间戳
34: leaseToCancel.cancel();
35: // 添加到 最近租约变更记录队列
36: InstanceInfo instanceInfo = leaseToCancel.getHolder();
37: String vip = null;
38: String svip = null;
39: if (instanceInfo != null) {
40: instanceInfo.setActionType(ActionType.DELETED);
41: recentlyChangedQueue.add(new RecentlyChangedItem(leaseToCancel));
42: instanceInfo.setLastUpdatedTimestamp();
43: vip = instanceInfo.getVIPAddress();
44: svip = instanceInfo.getSecureVipAddress();
45: }
46: // 设置 响应缓存 过期
47: invalidateCache(appName, vip, svip);
48: logger.info("Cancelled instance {}/{} (replication={})", appName, id, isReplication);
49: return true; // 成功
50: }
51: } finally {
52: // 释放锁
53: read.unlock();
54: }
55: }
- 集群下线状态同步。
- 移除租约映射,
Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);取得目前所有维护的租约信息,然后根据ID删除租约信息。 - 添加到最近取消注册实例队列
- 使缓存失效