WhatAKitty Daily

A Programmer's Daily Record

Eureka内核逻辑解析

WhatAKitty   阅读次数loading...

概览

Eureka高可用架构

本文分以下几点:

  1. Eureka的高可用实现
    a. eureka客户端与服务端通讯机制
    b. eureka服务端与其他服务节点的通讯
    c. eureka服务端的自我保护机制
  2. Eureka与ZK的比较
    a. CAP模型
    b. 使用场景

Eureka高可用实现

Eureka的高可用通过以下三点实现:
a. eureka客户端与服务端通讯机制
b. eureka服务端与其他服务节点的通讯
c. eureka服务端的自我保护机制

eureka客户端与服务端通讯机制

可以看下客户端与Eureka服务端之间的通讯的概述流程:

通讯

同时为了更好的说明高可用的实现,在这里大概描述一下Eureka服务端的存储结构:

存储结构

服务注册
  1. 客户端请求POST /eureka/v2/apps接口,将客户端的服务信息上传
    PeerAwareInstanceRegistryImpl.java 399行:

    1
    2
    3
    4
    5
    6
    7
    8
    int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
    if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
    leaseDuration = info.getLeaseInfo().getDurationInSecs();
    }
    // 本地注册
    super.register(info, leaseDuration, isReplication);
    // 协同其他eureka服务节点同步注册
    replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
  2. Eureka服务端将客户端信息保存至Registry注册信息载体内

  3. 将此次添加变更增加到变更队列
    AbstractInstanceRegistry.java 194行:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
    try {
    // 读锁
    read.lock();
    // 存储本次注册信息(appName -> instanceId -> instanceInfo)
    Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
    REGISTER.increment(isReplication);
    if (gMap == null) {
    final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
    gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
    if (gMap == null) {
    gMap = gNewMap;
    }
    }
    // ...其他逻辑
    // 将本次添加变更加入最近变更队列内
    recentlyChangedQueue.add(new RecentlyChangedItem(lease));
    // 设置租约的上一次更新时间
    registrant.setLastUpdatedTimestamp();
    // 执行缓存清空
    invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
    } finally {
    // 读锁解锁
    read.unlock();
    }
    }
  4. 清空读写缓存(先清理APP、再清理ALL_APPS、最后清理ALL_APPS_DELTA)
    ResponseCacheImpl.java 251行:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    public void invalidate(String appName, @Nullable String vipAddress, @Nullable String secureVipAddress) {
    for (Key.KeyType type : Key.KeyType.values()) {
    for (Version v : Version.values()) {
    // 删除appName本身的缓存
    // 删除ALL_APPS的缓存
    // 删除ALL_APPS_DELTA的缓存
    invalidate(
    new Key(Key.EntityType.Application, appName, type, v, EurekaAccept.full),
    new Key(Key.EntityType.Application, appName, type, v, EurekaAccept.compact),
    new Key(Key.EntityType.Application, ALL_APPS, type, v, EurekaAccept.full),
    new Key(Key.EntityType.Application, ALL_APPS, type, v, EurekaAccept.compact),
    new Key(Key.EntityType.Application, ALL_APPS_DELTA, type, v, EurekaAccept.full),
    new Key(Key.EntityType.Application, ALL_APPS_DELTA, type, v, EurekaAccept.compact)
    );
    // 存在虚拟地址名称的,删除虚拟地址为key的缓存
    if (null != vipAddress) {
    invalidate(new Key(Key.EntityType.VIP, vipAddress, type, v, EurekaAccept.full));
    }
    // 存在安全虚拟地址名称的,删除安全虚拟地址为key的缓存
    if (null != secureVipAddress) {
    invalidate(new Key(Key.EntityType.SVIP, secureVipAddress, type, v, EurekaAccept.full));
    }
    }
    }
    }

ResponseCacheImpl.java 277行:

1
2
3
4
5
6
7
8
9
public void invalidate(Key... keys) {
for (Key key : keys) {
// Guava缓存对key设置无效
readWriteCacheMap.invalidate(key);
Collection<Key> keysWithRegions = regionSpecificKeys.get(key);
// key关联区域的缓存无效设置
// ...
}
}

  1. 判断不存存在其他eureka服务节点,或者本次请求为同步请求则完成注册
    PeerAwareInstanceRegistryImpl.java 620行:

    1
    2
    3
    if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
    return;
    }
  2. 否则,获取所有其他的服务节点,排除本身服务,将本次注册同步至其他eureka服务节点
    PeerAwareInstanceRegistryImpl.java 624行:

    1
    2
    3
    4
    5
    6
    7
    8
    for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
    // 排除自身
    if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
    continue;
    }
    // 同步其他服务节点
    replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
    }
服务续约
  1. 客户端请求PUT /eureka/v2/apps/{appId}
  2. 根据appName获取租约
  3. 更新租约
  4. 同步至其他eureka服务节点

续约类似心跳机制,客户端会按照默认时间的30秒定时做一次续约,如果超过3次没有成功,则服务端会将该客户端剔除。

服务取消注册
  1. 客户端请求DELETE /eureka/v2/apps/{appId}
  2. 根据appName获取租约
  3. 设置租约过期时间evictionTimestamp为当前时间
  4. 将此次删除增加到变更队列中
  5. 清空读写缓存(先清理APP、再清理ALL_APPS、最后清理ALL_APPS_DELTA)
服务剔除
  1. Eureka服务端会在内部会初始化一个Timer定时器用于定时调度处理剔除任务;剔除时间间隔为evictionIntervalTimerInMs
    AbstractInstanceRegistry.java 1212行:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    protected void postInit() {
    // ....
    // 设置新的过期任务
    evictionTaskRef.set(new EvictionTask());
    // 任务调度
    evictionTimer.schedule(evictionTaskRef.get(),
    serverConfig.getEvictionIntervalTimerInMs(),
    serverConfig.getEvictionIntervalTimerInMs());
    }
  2. 判断是否启用自我保护,如果禁用,则不进行服务剔除

  3. 判断 上一分钟实际的续约次数 <= numberOfRenewsPerMinThreshold,则会触发自我保护机制,停止服务剔除
    PeerAwareInstanceRegistryImpl.java 474行:

    1
    2
    3
    4
    5
    6
    // 判断自我保护是否禁用
    if (!isSelfPreservationModeEnabled()) {
    return true;
    }
    // 自我保护机制阈值判断
    return numberOfRenewsPerMinThreshold > 0 && getNumOfRenewsInLastMin() > numberOfRenewsPerMinThreshold;
  4. 遍历Registry内所有的租约信息,判断当前租约是否过期

  5. 将这些过期的租约放置到一个过期列表内
    AbstractInstanceRegistry.java 597行:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
    // 遍历租约
    for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
    Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
    if (leaseMap != null) {
    for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
    Lease<InstanceInfo> lease = leaseEntry.getValue();
    // 判断租约是否过期
    // additionalLeaseMs是补偿时间,防止由于GC或者本地时间造成的一个时间误差,确保能够按照预期时间执行
    if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
    // 将过期租约加入过期列表
    expiredLeases.add(lease);
    }
    }
    }
    }
  6. 计算可被剔除的过期实例数(过期数 = Math.min(过期列表大小, (本地租约数 - 本地租约数 * 续约百分比)))
    AbstractInstanceRegistry.java 612行:

    1
    2
    3
    4
    5
    6
    // 获取本地租约数
    int registrySize = (int) getLocalRegistrySize();
    // 计算租约
    int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());
    // 实际需要剔除的租约个数
    int evictionLimit = registrySize - registrySizeThreshold;
  7. 使用洗牌算法找出需要剔除的n个实例
    AbstractInstanceRegistry.java 620行:

    1
    2
    3
    4
    5
    6
    7
    8
    // 按照剔除数遍历,每次的交换对象都是基于上次的随机结果
    Random random = new Random(System.currentTimeMillis());
    for (int i = 0; i < toEvict; i++) {
    // 筛选出需要交换位置的索引next并与i交换位置
    int next = i + random.nextInt(expiredLeases.size() - i);
    Collections.swap(expiredLeases, i, next);
    // 剔除逻辑
    }
  8. 设置租约过期时间evictionTimestamp为当前时间

  9. 将此次删除增加到变更队列中
  10. 清空读写缓存

eureka服务端与其他服务节点的通讯

eureka服务端与其他服务节点的通讯主要包含两部分:

  1. eureka服务端启动时候自动拉取其他服务节点的注册信息并落入本地Registry
  2. 一旦有诸如register,renew,cancel请求,则会将这些请求通过线程池自动同步至其他服务节点
启动初始化
  1. Servlet容器初始化,调用eureka环境初始化以及eureka上下文初始化
    EurekaBootstrap.java 111行:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    public void contextInitialized(ServletContextEvent event) {
    try {
    // 初始化环境
    initEurekaEnvironment();
    // 初始化上下文
    initEurekaServerContext();

    ServletContext sc = event.getServletContext();
    sc.setAttribute(EurekaServerContext.class.getName(), serverContext);
    } catch (Throwable e) {
    logger.error("Cannot bootstrap eureka server :", e);
    throw new RuntimeException("Cannot bootstrap eureka server :", e);
    }
    }
  2. 初始化上下文的过程中会开始同步其他eureka服务节点的信息
    EurekaBootstrap.java 147行:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    protected void initEurekaServerContext() throws Exception {
    // ...其他逻辑
    ApplicationInfoManager applicationInfoManager = null;

    // 初始化eureka客户端,用于做为获取其他eureka服务信息的client
    if (eurekaClient == null) {
    EurekaInstanceConfig instanceConfig = isCloud(ConfigurationManager.getDeploymentContext())
    ? new CloudInstanceConfig()
    : new MyDataCenterInstanceConfig();

    applicationInfoManager = new ApplicationInfoManager(
    instanceConfig, new EurekaConfigBasedInstanceInfoProvider(instanceConfig).get());

    EurekaClientConfig eurekaClientConfig = new DefaultEurekaClientConfig();
    eurekaClient = new DiscoveryClient(applicationInfoManager, eurekaClientConfig);
    } else {
    applicationInfoManager = eurekaClient.getApplicationInfoManager();
    }

    // 初始化其他服务实例感知的注册器
    PeerAwareInstanceRegistry registry;
    if (isAws(applicationInfoManager.getInfo())) {
    // aws 服务
    // ...
    } else {
    registry = new PeerAwareInstanceRegistryImpl(
    eurekaServerConfig,
    eurekaClient.getEurekaClientConfig(),
    serverCodecs,
    eurekaClient
    );
    }

    // 为其他eureka服务注册eurekaNode实例
    PeerEurekaNodes peerEurekaNodes = getPeerEurekaNodes(
    registry,
    eurekaServerConfig,
    eurekaClient.getEurekaClientConfig(),
    serverCodecs,
    applicationInfoManager
    );

    // servlet容器初始化
    // ...

    // 从其他服务拷贝注册信息
    int registryCount = registry.syncUp();
    // 等待接收请求
    registry.openForTraffic(applicationInfoManager, registryCount);

    // 其他操作
    // ...
    }

同步逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public int syncUp() {
int count = 0;

// 尝试同步直到达到最大尝试次数
for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {
if (i > 0) {
// 尝试间隔睡眠时间
// ...
}
// 获取所有的应用
Applications apps = eurekaClient.getApplications();
// 获取所有注册的应用
for (Application app : apps.getRegisteredApplications()) {
for (InstanceInfo instance : app.getInstances()) {
try {
// 判断应用是否在当前区域注册
// 该配置只对于AWS有效,其他非AWS服务全部会拉取
if (isRegisterable(instance)) {
// 是,则落入本地内存
register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
count++;
}
} catch (Throwable t) {
logger.error("During DS init copy", t);
}
}
}
}
return count;
}
  1. 完成后开始接收请求
过程中同步

以注册同步为例子。

  1. 根据请求类型判断执行动作
    PeerAwareInstanceRegistry.java 648行:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    case Cancel:
    node.cancel(appName, id);
    break;
    case Heartbeat:
    InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
    infoFromRegistry = getInstanceByAppAndId(appName, id, false);
    node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
    break;
    case Register:
    // 调用PeerEurekaNode的register方法执行注册动作的同步
    node.register(info);
    break;
    case StatusUpdate:
    infoFromRegistry = getInstanceByAppAndId(appName, id, false);
    node.statusUpdate(appName, id, newStatus, infoFromRegistry);
    break;
    case DeleteStatusOverride:
    infoFromRegistry = getInstanceByAppAndId(appName, id, false);
    node.deleteStatusOverride(appName, id, infoFromRegistry);
    break;
  2. 将任务提交至任务处理器TaskDispatcher进行
    PeerEurekaNode.java 135行:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info);
    batchingDispatcher.process(
    taskId("register", info),
    // 同步任务信息
    new InstanceReplicationTask(targetHost, Action.Register, info, null, true) {
    public EurekaHttpResponse<Void> execute() {
    return replicationClient.register(info);
    }
    },
    expiryTime
    );
  3. 调用Jersey2执行REST请求
    AbstractJersey2EurekaHttpClient.java 85行:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    public EurekaHttpResponse<Void> register(InstanceInfo info) {
    String urlPath = "apps/" + info.getAppName();
    Response response = null;
    try {
    // 封装请求
    Builder resourceBuilder = jerseyClient.target(serviceUrl).path(urlPath).request();
    addExtraProperties(resourceBuilder);
    // 设置x-netflix-discovery-replication头信息为true,其他eureka服务节点接收后会知晓这是个同步请求
    addExtraHeaders(resourceBuilder);
    // 执行请求并获取结果
    response = resourceBuilder
    .accept(MediaType.APPLICATION_JSON)
    .acceptEncoding("gzip")
    .post(Entity.json(info));
    return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
    } finally {
    if (logger.isDebugEnabled()) {
    logger.debug("Jersey2 HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),
    response == null ? "N/A" : response.getStatus());
    }
    if (response != null) {
    response.close();
    }
    }
    }

Eureka自我保护机制

Eureka是Netflix为了解决现有AWS的注册服务无法解决的一些场景而专门研发的。在设计之初,就考虑到高可用特性,防止AWS突然性的大规模断点造成服务不可用的情况设计了Eureka的自我保护机制。

Eureka的自我保护机制设定为:服务总数 每分钟续约数(60s / 客户端续约间隔) 自我保护阈值因子

举个例子说明:
如果某个应用A有100个服务实例,那么按照公式计算,它在一分钟内续约次数必须 >= 170。

如果在上一个分钟内,续约数 > 170,那么服务正常,某个实例就算失败也只会认为是客户端存在问题,会被剔除;
如果在上一个分钟内,续约数 < 170,那么Eureka就会认为是Eureka服务存在问题,会停止剔除流程,保护现有的注册信息,防止服务大规模下线。

Eureka与ZK的比较

CAP模型

CAP模型

Eureka是AP模型,ZK是CP模型。前者强调高可用,即使在某些情况下不同region看到的视图可能不一致;而后者强调的是强一致性,且在超过一定阈值后会造成ZK集群整体不可用

应用场景

场景EurekaZK
数据分发和订阅×
异地大规模集群需求的注册中心×
小规模单机房注册中心