📌 文档版本: 2.0 (深度版) | 🔧 Aeron 版本: 1.48.x | 📅 更新时间: 2025-12-08 | 🎯 源码路径: aeron-cluster/src/main/java/io/aeron/cluster/

一、模块概述与架构

Aeron Cluster 是 Raft 共识算法的工业级实现,核心设计目标是:

核心类关系

ConsensusModule (入口容器)
    │
    └──> ConsensusModuleAgent (核心引擎) implements Agent
            │
            ├──> Election (选举控制器)
            │       │
            │       ├── ElectionState (17个状态)
            │       ├── canvass() / nominate() / candidateBallot()
            │       └── leaderLogReplication() / leaderReady()
            │
            ├──> RecordingLog (日志持久化管理)
            │       └── Entry (Term / Snapshot)
            │
            ├──> LogPublisher (Leader 日志发布)
            │       └── appendMessage() / appendSessionOpen() / appendTimer()
            │
            ├──> LogAdapter (Follower 日志消费)
            │       └── poll() -> onFragment()
            │
            ├──> IngressAdapter (接收客户端请求)
            ├──> EgressPublisher (发送响应给客户端)
            ├──> ServiceProxy (与服务容器通信)
            └──> TimerService (分布式定时器)

二、Election.doWork() - 选举状态机核心

📂 源码位置: Election.java:171-250

完整源码与注释

/**
 * 选举状态机的主工作循环
 * 这是 Raft 选举算法的核心调度器,根据当前状态执行不同的选举逻辑
 *
 * @param nowNs 当前纳秒时间戳(用于超时判断)
 * @return 本次执行的工作量(用于 Agent 调度)
 */
int doWork(final long nowNs)
{
    int workCount = 0;

    // ===== 状态机核心 switch =====
    // 根据当前选举状态,分发到不同的处理方法
    switch (state)
    {
        case INIT:
            // 初始化选举,清理旧状态,准备新选举
            workCount += init(nowNs);
            break;

        case CANVASS:
            // 拉票阶段:广播自己的日志位置,收集其他节点状态
            // 评估自己是否有资格成为候选人
            workCount += canvass(nowNs);
            break;

        case NOMINATE:
            // 提名阶段:增加 candidateTermId,准备发起投票
            workCount += nominate(nowNs);
            break;

        case CANDIDATE_BALLOT:
            // 候选人投票阶段:向所有节点请求投票,统计票数
            workCount += candidateBallot(nowNs);
            break;

        case FOLLOWER_BALLOT:
            // 跟随者投票阶段:已投票给其他候选人,等待选举结果
            workCount += followerBallot(nowNs);
            break;

        case LEADER_LOG_REPLICATION:
            // Leader 日志复制阶段:等待所有 Follower 确认日志位置
            // 确保所有节点的 appendPosition 达到一致
            workCount += leaderLogReplication(nowNs);
            break;

        case LEADER_REPLAY:
            // Leader 回放阶段:回放本地未提交的日志
            workCount += leaderReplay(nowNs);
            break;

        case LEADER_INIT:
            // Leader 初始化阶段:准备日志 Publication,初始化 Leader 状态
            workCount += leaderInit(nowNs);
            break;

        case LEADER_READY:
            // Leader 就绪阶段:等待法定数量 Follower 就绪,追加 NewLeadershipTermEvent
            workCount += leaderReady(nowNs);
            break;

        case FOLLOWER_LOG_REPLICATION:
            // Follower 日志复制阶段:从 Leader 拉取缺失的日志
            workCount += followerLogReplication(nowNs);
            break;

        case FOLLOWER_REPLAY:
            // Follower 回放阶段:回放本地日志
            workCount += followerReplay(nowNs);
            break;

        case FOLLOWER_CATCHUP_INIT:
            // Follower 追赶初始化:准备从 Leader 追赶日志
            workCount += followerCatchupInit(nowNs);
            break;

        case FOLLOWER_CATCHUP_AWAIT:
            // Follower 等待追赶:等待 Leader 的 Replay 流
            workCount += followerCatchupAwait(nowNs);
            break;

        case FOLLOWER_CATCHUP:
            // Follower 追赶中:接收 Leader 的 Replay 数据
            workCount += followerCatchup(nowNs);
            break;

        case FOLLOWER_LOG_INIT:
            // Follower 日志初始化:准备连接到 Leader 的 live log
            workCount += followerLogInit(nowNs);
            break;

        case FOLLOWER_LOG_AWAIT:
            // Follower 等待日志:等待连接到 Leader 的日志流
            workCount += followerLogAwait(nowNs);
            break;

        case FOLLOWER_READY:
            // Follower 就绪阶段:发送 AppendPosition 给 Leader
            workCount += followerReady(nowNs);
            break;

        case CLOSED:
            // 选举结束,退出
            break;
    }

    return workCount;
}
🔍 设计要点:
  • 状态驱动: 通过 state 字段驱动整个选举流程,每个状态对应一个独立的方法
  • 非阻塞: 每个方法返回 workCount,表示本次执行的工作量。若无工作则返回0,Agent 调度器会执行 idle 策略
  • 超时机制: 每个阶段都有超时检查(通过 nowNs 判断),超时后状态回退或转换
  • 异常处理: 若任何阶段抛出异常,handleError() 会重置状态到 INIT,重新开始选举

三、Election.canvass() - 拉票阶段详解

📂 源码位置: Election.java:656-686

完整源码与逐行注释

/**
 * CANVASS 阶段:拉票/评估阶段
 *
 * 目的:
 * 1. 定期广播自己的日志位置 (CanvassPosition 消息)
 * 2. 收集其他节点的 CanvassPosition,比较日志完整性
 * 3. 评估自己是否有资格成为候选人
 * 4. 如果一致同意或超时后达到法定人数,进入 NOMINATE 阶段
 *
 * @param nowNs 当前纳秒时间戳
 * @return 工作量计数
 */
private int canvass(final long nowNs)
{
    int workCount = 0;

    // ===== 计算拉票截止时间 =====
    // 如果是扩展拉票(节点启动时),使用更长的超时时间
    final long deadlineNs = isExtendedCanvass ?
        timeOfLastStateChangeNs + ctx.startupCanvassTimeoutNs() :  // 启动时 ~10秒
        consensusModuleAgent.timeOfLastLeaderUpdateNs() + ctx.leaderHeartbeatTimeoutNs(); // 心跳超时 ~1秒

    // ===== 定期发送 CanvassPosition 消息 =====
    // 判断是否达到发送间隔(默认 ~100ms)
    if (hasUpdateIntervalExpired(nowNs, ctx.electionStatusIntervalNs()))
    {
        timeOfLastUpdateNs = nowNs; // 更新最后发送时间
        publishCanvassPosition();  // 向所有成员广播: logLeadershipTermId, logPosition, leadershipTermId

        workCount++;
    }

    // ===== 检查是否有指定的 Leader (手动指定) =====
    // 如果配置了 appointedLeaderId,且不是自己,则不参与竞选
    if (ctx.appointedLeaderId() != NULL_VALUE && ctx.appointedLeaderId() != thisMember.id())
    {
        return workCount; // 直接返回,不竞选
    }

    // ===== 评估是否可以成为候选人 =====
    // 两种情况可以进入 NOMINATE:
    // 1. 一致同意 (Unanimous): 所有成员都认为我的日志最新
    // 2. 超时后达到法定人数 (Quorum): 大多数成员认为我的日志足够新
    if (ClusterMember.isUnanimousCandidate(clusterMembers, thisMember, gracefulClosedLeaderId) ||
        (nowNs >= deadlineNs && ClusterMember.isQuorumCandidate(clusterMembers, thisMember)))
    {
        // ===== 计算随机延迟 =====
        // 避免多个候选人同时提名,引入随机延迟 (0 ~ electionTimeout/2)
        final long delayNs = (long)(ctx.random().nextDouble() * (ctx.electionTimeoutNs() >> 1));
        nominationDeadlineNs = nowNs + delayNs; // 设置提名截止时间

        // ===== 状态转换: CANVASS -> NOMINATE =====
        state(NOMINATE, nowNs, "");
        workCount++;
    }

    return workCount;
}
🔍 Raft 原理对应:
  • 日志完整性比较: ClusterMember.isUnanimousCandidate() 内部会调用 compareLog(),比较 logLeadershipTermId 和 logPosition,确保只有日志最新的节点才能成为候选人
  • 一致同意优化: 如果所有节点都同意,立即进入 NOMINATE,无需等待超时。这是对 Raft 的优化,减少选举延迟
  • 法定人数机制: 超时后只需大多数节点(N/2+1)认可即可,这是 Raft 的核心容错机制
  • 随机延迟: 防止多个候选人同时提名导致选举分裂(Split Vote),这是 Raft 论文中的关键设计

ClusterMember.isUnanimousCandidate() 内部逻辑

// 判断是否所有成员都认为 candidate 的日志最新
static boolean isUnanimousCandidate(
    final ClusterMember[] members,
    final ClusterMember candidate,
    final int gracefulClosedLeaderId)
{
    for (final ClusterMember member : members)
    {
        // 跳过已优雅关闭的 Leader
        if (member.id() == gracefulClosedLeaderId)
        {
            continue;
        }

        // 比较日志: candidate 的日志必须 >= 其他所有成员
        if (compareLog(
            candidate.logLeadershipTermId(), candidate.logPosition(),
            member.logLeadershipTermId(), member.logPosition()) < 0)
        {
            return false; // candidate 日志落后于某个成员
        }
    }

    return true; // candidate 的日志 >= 所有成员
}

四、Election.nominate() - 提名阶段

📂 源码位置: Election.java:688-708
/**
 * NOMINATE 阶段:提名阶段
 *
 * 目的:
 * 1. 增加 candidateTermId (候选任期ID)
 * 2. 持久化 candidateTermId 到磁盘 (NodeStateFile)
 * 3. 等待随机延迟后进入 CANDIDATE_BALLOT
 *
 * 关键:candidateTermId 必须持久化,防止崩溃后重复投票给不同候选人
 *
 * @param nowNs 当前纳秒时间戳
 * @return 工作量计数
 */
private int nominate(final long nowNs)
{
    // ===== 检查是否达到提名截止时间 =====
    if (nowNs >= nominationDeadlineNs)
    {
        // ===== 增加候选任期ID =====
        // proposeMaxCandidateTermId() 确保 candidateTermId 单调递增
        // 同时与磁盘上的值比较,取最大值(防止并发或崩溃恢复问题)
        candidateTermId = ctx.nodeStateFile().proposeMaxCandidateTermId(
            candidateTermId + 1,    // 新的候选任期ID
            logPosition,         // 当前日志位置
            ctx.epochClock().time() // 时间戳
        );

        // ===== 标记自己为候选人 =====
        // 更新 clusterMembers 数组中本节点的 candidateTermId 和投票状态
        ClusterMember.becomeCandidate(clusterMembers, candidateTermId, thisMember.id());

        // ===== 状态转换: NOMINATE -> CANDIDATE_BALLOT =====
        state(CANDIDATE_BALLOT, nowNs, "");

        return 1;
    }
    else if (hasUpdateIntervalExpired(nowNs, ctx.electionStatusIntervalNs()))
    {
        // ===== 等待期间继续发送 CanvassPosition =====
        // 持续告知其他节点自己的状态
        timeOfLastUpdateNs = nowNs;
        publishCanvassPosition();

        return 1;
    }

    return 0; // 无工作,等待超时
}
⚠️ 关键设计 - NodeStateFile 持久化:

proposeMaxCandidateTermId() 会将 candidateTermId 写入磁盘文件 node-state.dat。 这是 Raft 算法的关键要求:节点在一个任期内只能投票给一个候选人

如果不持久化,节点崩溃重启后可能忘记自己已经投过票,导致一个任期内投票给多个候选人,破坏 Raft 的安全性。


五、Election.candidateBallot() - 候选人投票阶段

📂 源码位置: Election.java:710-750
/**
 * CANDIDATE_BALLOT 阶段:候选人请求投票并统计结果
 *
 * 目的:
 * 1. 向所有成员发送 RequestVote 消息
 * 2. 统计收到的投票结果 (通过 onVote() 回调更新)
 * 3. 判断是否获得多数票:是 -> 成为 Leader, 否 -> 回退到 CANVASS
 *
 * @param nowNs 当前纳秒时间戳
 * @return 工作量计数
 */
private int candidateBallot(final long nowNs)
{
    int workCount = 0;

    // ===== 情况1: 一致同意 (所有节点都投票给我) =====
    // 快速路径:无需等待超时,立即成为 Leader
    if (ClusterMember.isUnanimousLeader(clusterMembers, candidateTermId, gracefulClosedLeaderId))
    {
        leaderMember = thisMember;           // 设置自己为 Leader
        leadershipTermId = candidateTermId;  // 确认领导任期ID
        state(LEADER_LOG_REPLICATION, nowNs, ""); // 进入日志复制阶段
        workCount++;
    }
    // ===== 情况2: 超时后检查是否获得法定人数 =====
    else if (nowNs >= (timeOfLastStateChangeNs + ctx.electionTimeoutNs()))
    {
        // 投票超时,检查是否获得多数票(N/2+1)
        if (ClusterMember.isQuorumLeader(clusterMembers, candidateTermId))
        {
            // ✅ 获得多数票,成为 Leader
            leaderMember = thisMember;
            leadershipTermId = candidateTermId;
            state(LEADER_LOG_REPLICATION, nowNs, "");
        }
        else
        {
            // ❌ 未获得多数票,回退到 CANVASS,重新拉票
            state(CANVASS, nowNs, "");
        }

        workCount++;
    }
    // ===== 情况3: 超时前,持续发送 RequestVote 消息 =====
    else
    {
        // 遍历所有成员,向未发送投票请求的成员发送 RequestVote
        for (final ClusterMember member : clusterMembers)
        {
            // 检查是否已发送过 RequestVote (避免重复发送)
            if (!member.isBallotSent())
            {
                workCount++;
                // ===== 发送 RequestVote 消息 =====
                // 参数: logLeadershipTermId, appendPosition, candidateTermId, candidateId
                member.isBallotSent(consensusPublisher.requestVote(
                    member.publication(),        // 目标成员的 Publication
                    logLeadershipTermId,         // 我的日志领导任期ID
                    appendPosition,              // 我的日志追加位置
                    candidateTermId,             // 我的候选任期ID
                    thisMember.id()              // 我的成员ID
                ));
            }
        }
    }

    return workCount;
}
🔍 Raft 投票规则:
  • 多数派原则: 必须获得 N/2+1 票才能成为 Leader (例如3节点需要2票,5节点需要3票)
  • 任期单调性: candidateTermId 必须严格递增,防止旧 Leader 重新上任
  • 日志完整性: Follower 只投票给日志至少和自己一样新的候选人
  • 一次投票: 每个节点在一个任期内只能投票给一个候选人(通过 NodeStateFile 持久化保证)

ClusterMember.isQuorumLeader() 实现

/**
 * 判断候选人是否获得法定数量的投票
 *
 * @param members 集群成员数组
 * @param candidateTermId 候选任期ID
 * @return true 如果获得多数票
 */
static boolean isQuorumLeader(
    final ClusterMember[] members,
    final long candidateTermId)
{
    int voteCount = 0;
    final int threshold = quorumThreshold(members.length); // N/2+1

    // 统计投票数
    for (final ClusterMember member : members)
    {
        // 检查成员的 candidateTermId 和投票状态
        if (member.candidateTermId() == candidateTermId &&
            Boolean.TRUE.equals(member.vote())) // 投票为 true
        {
            voteCount++;
        }
    }

    // 判断是否达到多数派
    return voteCount >= threshold;
}

/**
 * 计算法定人数阈值: N/2+1
 */
static int quorumThreshold(final int memberCount)
{
    return (memberCount / 2) + 1;
}

六、Election.leaderLogReplication() - Leader 日志复制

📂 源码位置: Election.java:765-782
/**
 * LEADER_LOG_REPLICATION 阶段:Leader 等待 Follower 日志同步
 *
 * 目的:
 * 1. 定期发送 NewLeadershipTerm 消息给所有 Followers
 * 2. 收集 Followers 的 AppendPosition 确认
 * 3. 计算法定位置 (quorumPosition)
 * 4. 当 quorumPosition >= appendPosition 时,所有节点日志一致,进入 LEADER_REPLAY
 *
 * 关键:确保新 Leader 上任前,所有节点的日志已经达到一致
 *
 * @param nowNs 当前纳秒时间戳
 * @return 工作量计数
 */
private int leaderLogReplication(final long nowNs)
{
    int workCount = 0;

    // ===== 更新自己的日志位置 =====
    // Leader 也是集群成员之一,更新自己的 logPosition 和时间戳
    thisMember
        .logPosition(appendPosition)  // 设置自己的日志位置为 appendPosition
        .timeOfLastAppendPositionNs(nowNs);

    // ===== 计算法定位置 =====
    // 收集所有成员的 logPosition,取中位数 (第 N/2+1 个位置)
    final long quorumPosition = consensusModuleAgent.quorumPosition();

    // ===== 定期发送 NewLeadershipTerm 消息 =====
    // 通知 Followers: 我是新 Leader,这是我的任期和日志位置
    workCount += publishNewLeadershipTermOnInterval(nowNs);

    // ===== 定期发送 CommitPosition 消息 =====
    // 告知 Followers 当前的提交位置
    workCount += publishCommitPositionOnInterval(quorumPosition, nowNs);

    // ===== 判断是否所有 Follower 都已同步 =====
    // 当法定位置 >= 追加位置时,表示多数派已确认
    if (quorumPosition >= appendPosition)
    {
        workCount++;
        // ===== 状态转换: LEADER_LOG_REPLICATION -> LEADER_REPLAY =====
        state(LEADER_REPLAY, nowNs, "");
    }

    return workCount;
}
🔍 关键逻辑解析:
  • quorumPosition 计算: 收集所有成员的 logPosition,排序后取中位数。这是 Raft 多数派机制的核心实现
  • NewLeadershipTerm 消息: 包含 leadershipTermId, termBaseLogPosition, logSessionId 等信息,Follower 收到后会更新自己的状态
  • CommitPosition 消息: 告知 Followers 哪些日志已经提交,Followers 可以安全地应用这些日志到状态机
  • 日志一致性保证: 只有当多数派 Followers 的 logPosition 都达到 appendPosition 时,Leader 才认为日志已同步

ConsensusModuleAgent.quorumPosition() 实现

/**
 * 计算法定位置 (多数派位置)
 *
 * 算法:
 * 1. 收集所有成员的 logPosition
 * 2. 排序(从小到大)
 * 3. 取第 N/2+1 个位置(中位数)
 *
 * 示例(5节点):
 * positions = [100, 150, 200, 250, 300]
 * quorumIndex = 5/2 = 2
 * quorumPosition = positions[2] = 200
 *
 * 含义:至少有3个节点的 logPosition >= 200
 *
 * @return 法定位置
 */
long quorumPosition()
{
    // ===== 收集所有成员的 logPosition =====
    for (int i = 0; i < activeMembers.length; i++)
    {
        rankedPositions[i] = activeMembers[i].logPosition();
    }

    // ===== 排序(从小到大)=====
    Arrays.sort(rankedPositions);

    // ===== 计算法定索引: N/2 (向下取整) =====
    // 对于5节点: quorumIndex = 2 (第3个位置)
    // 对于3节点: quorumIndex = 1 (第2个位置)
    final int quorumIndex = (activeMembers.length / 2);

    // ===== 返回中位数 =====
    return rankedPositions[quorumIndex];
}
⚠️ 为什么是 N/2 而不是 (N/2)+1?

因为数组索引从0开始!

对于5节点集群:
- 法定人数 = 3 (需要3个节点确认)
- 排序后数组索引: [0, 1, 2, 3, 4]
- quorumIndex = 5/2 = 2 (Java整数除法)
- rankedPositions[2] 就是第3个位置 (0-based index)

这个位置保证了:至少有 N/2+1 个节点的 position >= rankedPositions[quorumIndex]


七、投票接收与处理逻辑

📂 源码位置: Election.java:325-366

Election.onRequestVote() - Follower 收到投票请求

/**
 * 当收到候选人的 RequestVote 消息时调用
 *
 * 投票规则(严格按照 Raft 论文):
 * 1. candidateTermId > this.candidateTermId (候选任期必须更新)
 * 2. 候选人的日志至少和自己一样新(通过 compareLog 判断)
 * 3. 本任期内未投票给其他候选人
 *
 * @param logLeadershipTermId 候选人的日志领导任期ID
 * @param logPosition 候选人的日志位置
 * @param candidateTermId 候选任期ID
 * @param candidateId 候选人成员ID
 * @param protocolVersion 协议版本
 */
void onRequestVote(
    final long logLeadershipTermId,
    final long logPosition,
    final long candidateTermId,
    final int candidateId,
    final int protocolVersion)
{
    // ===== 选举未开始,忽略 =====
    if (INIT == state)
    {
        return;
    }

    // ===== 自己给自己投票,忽略 =====
    if (candidateId == thisMember.id())
    {
        return;
    }

    // ===== 情况1: 候选任期过旧,拒绝投票 =====
    if (candidateTermId <= this.candidateTermId)
    {
        placeVote(candidateTermId, candidateId, false); // 投反对票
    }
    // ===== 情况2: 我的日志比候选人更新,拒绝投票 =====
    // compareLog 返回值:
    //   > 0: 我的日志更新
    //   = 0: 日志相同
    //   < 0: 候选人日志更新
    else if (compareLog(
        this.logLeadershipTermId, appendPosition,  // 我的日志
        logLeadershipTermId, logPosition)          // 候选人的日志
        > 0)
    {
        // 我的日志更新,更新 candidateTermId 但拒绝投票
        this.candidateTermId = ctx.nodeStateFile().proposeMaxCandidateTermId(
            candidateTermId, logPosition, ctx.epochClock().time());

        placeVote(candidateTermId, candidateId, false); // 投反对票

        // 如果候选人是旧 Leader,通知其新的领导任期
        final ClusterMember candidateMember = clusterMemberByIdMap.get(candidateId);
        if (null != candidateMember && Cluster.Role.LEADER == consensusModuleAgent.role())
        {
            publishNewLeadershipTerm(candidateMember, logLeadershipTermId, ctx.clusterClock().time());
        }
    }
    // ===== 情况3: 候选人日志足够新,同意投票 =====
    else if (CANVASS == state || NOMINATE == state || CANDIDATE_BALLOT == state || FOLLOWER_BALLOT == state)
    {
        // ✅ 更新并持久化 candidateTermId
        this.candidateTermId = ctx.nodeStateFile().proposeMaxCandidateTermId(
            candidateTermId, logPosition, ctx.epochClock().time());

        // ✅ 投赞成票
        placeVote(candidateTermId, candidateId, true); // 投赞成票

        // ===== 状态转换: 进入 FOLLOWER_BALLOT =====
        // 我已经投票给候选人,等待选举结果
        state(FOLLOWER_BALLOT, ctx.clusterClock().timeNanos(), "");
    }
}

ClusterMember.compareLog() - 日志完整性比较

/**
 * 比较两个日志的新旧程度
 *
 * Raft 日志比较规则:
 * 1. 先比较 leadershipTermId (任期ID): 更大的更新
 * 2. 如果任期ID相同,比较 logPosition (日志位置): 更大的更新
 *
 * @return > 0: lhs 更新, = 0: 相同, < 0: rhs 更新
 */
static int compareLog(
    final long lhsLeadershipTermId, final long lhsLogPosition,
    final long rhsLeadershipTermId, final long rhsLogPosition)
{
    // ===== 先比较 leadershipTermId =====
    if (lhsLeadershipTermId > rhsLeadershipTermId)
    {
        return 1;  // lhs 任期更新
    }

    if (lhsLeadershipTermId < rhsLeadershipTermId)
    {
        return -1; // rhs 任期更新
    }

    // ===== 任期相同,比较 logPosition =====
    return Long.compare(lhsLogPosition, rhsLogPosition);
}
🔍 Raft 投票安全性保证:
  • 任期单调性: candidateTermId 必须严格递增,防止旧候选人重新上位
  • 日志完整性: 只投票给日志至少和自己一样新的候选人,防止数据丢失
  • 一次投票: 通过 NodeStateFile 持久化 candidateTermId,防止崩溃后重复投票
  • 拒绝旧任期: 如果收到 candidateTermId <= this.candidateTermId 的请求,直接拒绝

八、LogPublisher.appendMessage() - Leader 日志发布

📂 源码位置: LogPublisher.java:115-144
/**
 * Leader 追加会话消息到日志
 *
 * 流程:
 * 1. 使用 tryClaim() 在 Publication 中预留空间(Zero-Copy)
 * 2. 编码 SessionMessageHeader (leadershipTermId, clusterSessionId, timestamp)
 * 3. 复制消息体到 BufferClaim
 * 4. commit() 提交,触发:
 *    - Archive 自动记录到磁盘
 *    - Followers 通过 Subscription 接收日志流
 *
 * @param leadershipTermId 领导任期ID
 * @param clusterSessionId 集群会话ID
 * @param timestamp 时间戳
 * @param buffer 消息缓冲区
 * @param offset 消息偏移量
 * @param length 消息长度
 * @return 日志位置(成功)或错误码(失败)
 */
long appendMessage(
    final long leadershipTermId,
    final long clusterSessionId,
    final long timestamp,
    final DirectBuffer buffer,
    final int offset,
    final int length)
{
    // ===== 编码 Session Header =====
    // SessionMessageHeader 包含: leadershipTermId, clusterSessionId, timestamp
    sessionHeaderEncoder
        .leadershipTermId(leadershipTermId)
        .clusterSessionId(clusterSessionId)
        .timestamp(timestamp);

    // ===== 计算总消息长度 =====
    final int messageLength = SESSION_HEADER_LENGTH + length;

    // ===== 重试机制 =====
    // 最多重试3次,处理 Publication.BACK_PRESSURED 情况
    int attempts = SEND_ATTEMPTS;
    long position;
    do
    {
        // ===== 尝试 Claim 日志空间 (Zero-Copy) =====
        // tryClaim() 返回日志位置(成功)或错误码(失败)
        position = publication.offer(
            sessionHeaderBuffer, 0, SESSION_HEADER_LENGTH, // Header
            buffer, offset, length,                     // Payload
            null                                        // Reserved value
        );

        // ===== 成功,退出循环 =====
        if (position > 0)
        {
            break; // 返回日志位置
        }

        // ===== 失败,检查错误码 =====
        // 可能的错误: BACK_PRESSURED, NOT_CONNECTED, CLOSED 等
        checkResult(position, publication);
    }
    while (--attempts > 0);

    // ===== 返回结果 =====
    // position > 0: 日志位置(成功)
    // position < 0: 错误码(失败)
    return position;
}
🔍 Zero-Copy 原理:
  • BufferClaim 机制: offer() 内部使用 tryClaim() 直接在共享内存中预留空间
  • 无拷贝写入: 消息直接写入 Publication 的共享内存缓冲区,无需中间拷贝
  • Commit 触发: offer() 内部调用 commit(),触发:
    • Archive Recording: Archive 自动记录到磁盘
    • Network Replication: Followers 通过 Subscription 接收日志流(UDP/IPC)
  • 背压处理: 如果 Publication 缓冲区满,返回 BACK_PRESSURED,重试或通知上层

Archive Recording 自动化

// Leader 创建 LogPublication 时,关联 Archive Recording

// 1. 创建 Publication (带 Archive Recording)
final ExclusivePublication logPublication = aeron.addExclusivePublication(
    logChannel,   // 例如: "aeron:udp?term-length=64k"
    logStreamId
);

// 2. Archive 开始记录
final long recordingId = archive.startRecording(
    logChannel,
    logStreamId,
    SourceLocation.LOCAL,  // 本地录制
    true                   // 自动停止
);

// ===== 此后,所有通过 logPublication.offer() 的消息都会自动记录到 Archive =====

// 3. Followers 订阅日志流
final Subscription logSubscription = aeron.addSubscription(
    leaderLogEndpoint,  // 例如: "aeron:udp?endpoint=leader:9002"
    logStreamId
);

// ===== Followers 通过 poll() 接收日志流,Archive 同时记录到本地 =====

九、ConsensusModuleAgent.doWork() - 核心工作循环

📂 源码位置: ConsensusModuleAgent.java:425-479
/**
 * Agent 主工作循环
 *
 * 职责:
 * 1. 处理共识消息 (CanvassPosition, RequestVote, Vote 等)
 * 2. 执行选举状态机 (election.doWork())
 * 3. 处理客户端请求和日志复制 (consensusWork())
 * 4. 更新集群时钟
 *
 * @return 本次执行的工作量
 */
public int doWork()
{
    // ===== 获取当前集群时间 =====
    final long timestamp = clusterClock.time();          // 集群时间(毫秒或纳秒)
    final long nowNs = clusterClock.convertToNanos(timestamp);

    int workCount = 0;

    // ===== 更新 Duty Cycle Tracker (性能监控) =====
    dutyCycleTracker.measureAndUpdate(nowNs);

    try
    {
        // ===== Slow Tick Work (低频任务) =====
        // 每 10ms 执行一次:心跳检查、超时检查、标记文件更新等
        if (nowNs >= slowTickDeadlineNs)
        {
            final int slowTickWorkCount = slowTickWork(nowNs);

            workCount += slowTickWorkCount;
            slowTickDeadlineNs = slowTickWorkCount > 0 ? nowNs + 1 : nowNs + SLOW_TICK_INTERVAL_NS;
        }

        // ===== 处理共识消息 =====
        // Poll ConsensusAdapter: 接收并处理 CanvassPosition, RequestVote, Vote 等消息
        workCount += consensusAdapter.poll();

        // ===== 选举 vs 正常共识工作 =====
        if (null != election)
        {
            // 正在选举中,执行选举状态机
            workCount += election.doWork(nowNs);
        }
        else
        {
            // 选举已完成,执行正常共识工作
            // - Leader: 接收客户端请求,追加日志,更新 commitPosition
            // - Follower: 消费 Leader 的日志流,更新 commitPosition
            workCount += consensusWork(timestamp, nowNs);
        }

        // ===== 扩展模块工作 (可选) =====
        if (null != consensusModuleExtension)
        {
            workCount += consensusModuleExtension.doWork(nowNs);
        }
    }
    catch (final AgentTerminationException ex)
    {
        // Agent 终止异常,执行清理钩子
        runTerminationHook();
        throw ex;
    }
    catch (final Exception ex)
    {
        // 其他异常
        if (null != election)
        {
            // 选举中的异常交给 Election 处理(重置选举)
            election.handleError(nowNs, ex);
        }
        else
        {
            // 正常共识工作中的异常,重新抛出
            throw ex;
        }
    }

    // ===== 更新集群时间消费者 =====
    // 通知外部模块当前集群时间
    clusterTimeConsumer.accept(timestamp);

    return workCount;
}
💡 Agent 调度机制:

Aeron 使用 Agent Runner 模式

  • doWork() 返回 workCount: 表示本次循环的工作量
  • workCount > 0: 有工作,立即再次调用 doWork()
  • workCount = 0: 无工作,执行 IdleStrategy (例如: yield, park, busySpin)
  • 非阻塞设计: 所有操作都是非阻塞的,通过轮询(polling)实现

十、多数派位置计算 - Raft 核心机制

算法原理

Raft 算法要求:日志必须复制到多数派节点后才能提交。Aeron Cluster 通过 quorumPosition() 实现这一机制。

// 示例:5节点集群

// 1. 收集所有成员的 logPosition
activeMembers[0].logPosition() = 100  // Node 0
activeMembers[1].logPosition() = 150  // Node 1
activeMembers[2].logPosition() = 200  // Node 2 (Leader)
activeMembers[3].logPosition() = 250  // Node 3
activeMembers[4].logPosition() = 300  // Node 4

// 2. 复制到 rankedPositions 数组
rankedPositions = [100, 150, 200, 250, 300]

// 3. 排序(已排序)
rankedPositions = [100, 150, 200, 250, 300]
                    ↑     ↑     ↑     ↑     ↑
                   [0]   [1]   [2]   [3]   [4]

// 4. 计算法定索引
quorumIndex = activeMembers.length / 2 = 5 / 2 = 2

// 5. 取中位数
quorumPosition = rankedPositions[2] = 200

// ===== 含义 =====
// 至少有 3 个节点 (Node 2, 3, 4) 的 logPosition >= 200
// 满足多数派要求(5/2+1 = 3)

// ===== Leader 可以安全提交 position <= 200 的日志 =====
commitPosition.set(200);

极端情况分析

节点数 法定人数 示例 Positions quorumIndex quorumPosition 说明
3 2 [100, 200, 300] 3/2 = 1 200 至少2个节点 >= 200
3 2 [100, 100, 300] 1 100 只有1个节点达到300,未达到多数派
5 3 [100, 100, 100, 200, 300] 2 100 多数派(3个节点)仅达到100
7 4 [50, 100, 150, 200, 250, 300, 350] 3 200 至少4个节点 >= 200
⚠️ 常见误区:
  • 误区1: 以为 quorumPosition 是"最慢节点的位置" - 错误! 实际是中位数,允许少数慢节点
  • 误区2: 以为需要所有节点确认 - 错误! 只需多数派(N/2+1)
  • 误区3: 以为 quorumIndex 应该是 (N/2)+1 - 错误! 数组索引从0开始,N/2 就是第 (N/2+1) 个元素

十一、启动与恢复流程

📂 源码位置: ConsensusModuleAgent.java:353-420

ConsensusModuleAgent.onStart() - 启动入口

/**
 * Agent 启动时调用
 *
 * 流程:
 * 1. 连接 Archive
 * 2. 停止上一次的 Recording (如果存在)
 * 3. 创建恢复计划 (RecoveryPlan)
 * 4. 加载快照和回放日志
 * 5. 创建 Election,开始选举
 * 6. 状态设置为 ACTIVE
 */
public void onStart()
{
    // ===== 1. 连接 Archive =====
    archive = AeronArchive.connect(ctx.archiveContext().clone());
    recordingSignalPoller = new RecordingSignalPoller(
        archive.controlSessionId(),
        archive.controlResponsePoller().subscription()
    );

    // ===== 2. 停止上一次的 Recording =====
    // 查找最后一个 Term Recording,如果仍在录制,停止它
    final long lastTermRecordingId = recordingLog.findLastTermRecordingId();
    if (NULL_VALUE != lastTermRecordingId)
    {
        archive.tryStopRecordingByIdentity(lastTermRecordingId);
    }

    // ===== 3. 恢复或引导启动 =====
    if (null == ctx.bootstrapState())
    {
        // ===== 正常恢复 =====
        // 3.1. 复制备用快照 (如果配置了备用节点)
        replicateStandbySnapshotsForStartup();

        // 3.2. 从快照和日志恢复
        recoveryPlan = recoverFromSnapshotAndLog();
    }
    else
    {
        // ===== 引导启动 (Bootstrap) =====
        // 从外部提供的状态启动(用于动态添加节点)
        recoveryPlan = recoverFromBootstrapState();
    }

    // ===== 4. 创建共识 Publications =====
    // 为每个集群成员创建 Publication (用于发送共识消息)
    ClusterMember.addConsensusPublications(
        activeMembers,
        thisMember,
        ctx.consensusChannel(),
        ctx.consensusStreamId(),
        ctx.enableControlOnConsensusChannel(),
        aeron,
        ctx.countedErrorHandler()
    );

    // ===== 5. 创建 Election,开始选举 =====
    final long lastLeadershipTermId = recoveryPlan.lastLeadershipTermId;
    final long commitPosition = this.commitPosition.getWeak();
    final long appendedPosition = recoveryPlan.appendedLogPosition;

    // 记录选举日志
    logNewElection(memberId, lastLeadershipTermId, commitPosition, appendedPosition, "node started");

    // ===== 创建 Election 对象 =====
    election = new Election(
        true,                              // isNodeStartup = true
        NULL_VALUE,                        // gracefulClosedLeaderId
        lastLeadershipTermId,              // 恢复的领导任期ID
        recoveryPlan.lastTermBaseLogPosition,
        commitPosition,                    // 已提交位置
        appendedPosition,                  // 已追加位置
        activeMembers,
        clusterMemberByIdMap,
        thisMember,
        consensusPublisher,
        ctx,
        this
    );

    // ===== 6. 立即执行一次选举工作 =====
    election.doWork(clusterClock.timeNanos());

    // ===== 7. 状态设置为 ACTIVE =====
    state(ConsensusModule.State.ACTIVE);

    // ===== 8. 初始化扩展模块 (可选) =====
    if (null != consensusModuleExtension)
    {
        // 为扩展模块创建独立的 Archive 连接
        final AeronArchive.Context extensionArchiveCtx = ctx.archiveContext().clone();

        // ... 配置扩展 Archive 通道 ...

        extensionArchive = AeronArchive.connect(extensionArchiveCtx);
    }

    // ===== 9. 注册计数器不可用回调 =====
    unavailableCounterHandlerRegistrationId = aeron.addUnavailableCounterHandler(this::onUnavailableCounter);

    // ===== 10. 更新性能追踪器 =====
    dutyCycleTracker.update(clusterClock.timeNanos());
}

RecoveryPlan 结构

/**
 * 恢复计划:指导节点如何从磁盘恢复状态
 */
class RecoveryPlan
{
    long lastLeadershipTermId;      // 最后的领导任期ID
    long lastTermBaseLogPosition;   // 最后任期的基准日志位置
    long appendedLogPosition;       // 已追加的日志位置
    long committedLogPosition;      // 已提交的日志位置

    // 快照列表(ConsensusModule + 各个 Service)
    List<RecordingLog.Snapshot> snapshots;

    // 日志信息
    RecordingLog.Entry log;

    // ===== 恢复策略 =====
    // 1. 加载最新快照(如果存在)
    // 2. 从快照位置开始回放日志到 appendedLogPosition
    // 3. commitPosition 设置为 committedLogPosition
}
🔍 恢复流程详解:
  1. RecordingLog.createRecoveryPlan(): 扫描 recording.log 文件,找到最新快照和日志条目
  2. 加载快照:
    • ConsensusModule 快照:恢复 sessions, timers, 内部状态
    • Service 快照:通过 Archive Replay 发送给 ClusteredServiceAgent
  3. 回放日志: 从快照位置到 appendedLogPosition 的所有日志消息重新处理
  4. 提交位置: 恢复 commitPosition,确保不会重复提交
  5. 选举启动: 恢复完成后,创建 Election,开始选举或加入现有集群

十二、ClusteredService.onSessionMessage() 完整调用链分析

这一章节将详细追踪客户端消息从发送到最终被应用服务处理的完整路径, 揭示 Aeron Cluster 框架如何调用用户实现的 ClusteredService.onSessionMessage() 方法。

调用链概览

// 完整调用链(8个步骤)

1️⃣ 客户端发送
   AeronCluster.offer(buffer, offset, length)
        │
        └──> Publication.offer() 发送到 Ingress Channel

2️⃣ Leader 接收请求
   ConsensusModuleAgent.doWork()
        │
        ├──> IngressAdapter.poll()
        │       └──> 从 Ingress Channel 接收消息
        │
        └──> ConsensusModuleAgent.onIngressMessage()
                └──> 验证会话并准备追加日志

3️⃣ Leader 追加到日志
   LogPublisher.appendMessage()
        │
        ├──> publication.offer(sessionHeader + payload)
        │       └──> Zero-Copy 写入共享内存
        │
        └──> Archive 自动记录 + 广播给 Followers

4️⃣ Followers 复制日志
   ConsensusModuleAgent.doWork() [Follower]
        │
        ├──> LogAdapter.poll()
        │       └──> 从 Leader 接收日志流
        │
        └──> Archive 本地记录

5️⃣ Leader 更新提交位置
   ConsensusModuleAgent.updateCommitPosition()
        │
        ├──> 收集所有节点的 appendPosition
        ├──> quorumPosition = calculateQuorumPosition()
        └──> commitPosition.setOrdered(quorumPosition)

6️⃣ Leader 转发给服务容器
   ConsensusModuleAgent.serviceSessionMessageSweeper()
        │
        └──> ServiceProxy.offer() via Publication
                └──> 发送到服务容器的 IPC Channel

7️⃣ 服务容器消费消息
   ClusteredServiceAgent.doWork()
        │
        ├──> BoundedLogAdapter.poll()
        │       │
        │       └──> onMessage(buffer, offset, length, header)
        │               │
        │               ├──> 解码 MessageHeader
        │               ├──> 识别 SessionMessageHeaderDecoder.TEMPLATE_ID
        │               │
        │               └──> agent.onSessionMessage(
        │                       header.position(),
        │                       sessionHeader.clusterSessionId(),
        │                       sessionHeader.timestamp(),
        │                       buffer,
        │                       offset + SESSION_HEADER_LENGTH,
        │                       length - SESSION_HEADER_LENGTH,
        │                       header
        │                   )

8️⃣ 调用用户服务
   ClusteredServiceAgent.onSessionMessage()
        │
        ├──> logPosition = header.position()
        ├──> clusterTime = timestamp
        ├──> clientSession = sessionByIdMap.get(clusterSessionId)
        │
        └──> service.onSessionMessage(
                 clientSession,
                 timestamp,
                 buffer,
                 offset,
                 length,
                 header
             )
                └──> 🎯 用户业务逻辑执行

详细源码分析

步骤 1: 客户端发送消息

📂 源码位置: aeron-cluster/src/main/java/io/aeron/cluster/client/AeronCluster.java
/**
 * 客户端发送消息到集群
 *
 * @param buffer 消息缓冲区
 * @param offset 消息偏移量
 * @param length 消息长度
 * @return 消息发送结果位置
 */
public long offer(
    final DirectBuffer buffer,
    final int offset,
    final int length)
{
    // 1. 检查会话状态
    if (!isConnected)
    {
        throw new ClusterException("not connected");
    }

    // 2. 编码消息头(包含 correlationId, clusterSessionId)
    ingressMessageHeaderEncoder
        .correlationId(correlationId)
        .clusterSessionId(clusterSessionId)
        .timestamp(clusterTime);

    // 3. 通过 Publication 发送到 Leader 的 Ingress Channel
    return publication.offer(
        ingressMessageHeaderBuffer, 0, INGRESS_MESSAGE_HEADER_LENGTH,
        buffer, offset, length,
        null
    );
}
🔍 关键点:
  • Ingress Channel: 客户端与 Leader 之间的专用通道(通常是 UDP)
  • correlationId: 用于关联请求和响应
  • clusterSessionId: 客户端会话的唯一标识

步骤 2: Leader 接收并验证

📂 源码位置: ConsensusModuleAgent.java:425-479 (doWork 方法)
// ConsensusModuleAgent 主工作循环

public int doWork()
{
    final long timestamp = clusterClock.time();
    final long nowNs = clusterClock.convertToNanos(timestamp);

    int workCount = 0;

    // ===== 处理客户端 Ingress 消息 =====
    if (Role.LEADER == role)
    {
        workCount += ingressAdapter.poll();  // 接收客户端消息
    }

    // ===== 选举 vs 正常共识工作 =====
    if (null != election)
    {
        workCount += election.doWork(nowNs);
    }
    else
    {
        workCount += consensusWork(timestamp, nowNs);  // 处理日志追加和提交
    }

    return workCount;
}
// IngressAdapter.poll() 回调到 ConsensusModuleAgent

void onIngressMessage(
    final long clusterSessionId,
    final DirectBuffer buffer,
    final int offset,
    final int length)
{
    // 1. 查找会话
    final ClusterSession session = sessionByIdMap.get(clusterSessionId);

    // 2. 验证会话状态
    if (null == session || session.state() != OPEN)
    {
        return; // 会话无效,忽略消息
    }

    // 3. 检查消息大小
    if (length > ctx.maxMessageLength())
    {
        session.closing(CloseReason.MSG_TOO_LARGE);
        return;
    }

    // 4. 追加到日志
    final long position = logPublisher.appendMessage(
        leadershipTermId,
        clusterSessionId,
        clusterClock.time(),
        buffer,
        offset,
        length
    );

    // 5. 检查发布结果
    if (position > 0)
    {
        session.timeOfLastActivityNs(clusterTimeNs);
    }
    else
    {
        // 处理背压或错误
    }
}

步骤 3: 追加到日志(已在第八章详解)

// LogPublisher.appendMessage() - 详见第八章

long appendMessage(...) {
    // 1. 编码 SessionHeader (leadershipTermId, clusterSessionId, timestamp)
    sessionHeaderEncoder
        .leadershipTermId(leadershipTermId)
        .clusterSessionId(clusterSessionId)
        .timestamp(timestamp);

    // 2. Zero-Copy 追加到 Publication
    return publication.offer(
        sessionHeaderBuffer, 0, SESSION_HEADER_LENGTH,
        buffer, offset, length,
        null
    );
}

步骤 4: Followers 复制日志

// Follower 节点的 ConsensusModuleAgent.doWork()

if (Role.FOLLOWER == role && null == election)
{
    // ===== 消费 Leader 的日志流 =====
    workCount += logAdapter.poll();  // LogAdapter 从 Leader 接收日志
}

// LogAdapter 内部调用 Subscription.controlledPoll()
// Archive 自动记录每个收到的日志片段到本地磁盘

步骤 5: Leader 更新提交位置

// ConsensusModuleAgent.updateCommitPosition() - 详见第九章

private void updateCommitPosition()
{
    // 1. 计算法定位置(多数派确认的位置)
    final long quorumPosition = quorumPosition();

    // 2. 更新 commitPosition
    if (quorumPosition > commitPosition.get())
    {
        commitPosition.setOrdered(quorumPosition);

        // 3. 通知服务容器处理已提交的日志
        // (通过 ServiceProxy 转发)
    }
}

步骤 6: Leader 转发给服务容器

📂 源码位置: ConsensusModuleAgent.java (consensusWork 方法中)
/**
 * Leader 将已提交的消息转发给服务容器
 * 通过 IPC Publication 发送到 ClusteredServiceAgent
 */
private int serviceSessionMessageSweeper(final long nowNs)
{
    int workCount = 0;

    // ===== 扫描所有会话,转发已提交的消息 =====
    for (int i = 0, size = sessions.size(); i < size; i++)
    {
        final ClusterSession session = sessions.get(i);

        // 检查会话是否有待处理的消息
        if (session.hasUnsentMessages())
        {
            // ===== 通过 ServiceProxy 转发消息 =====
            workCount += serviceProxy.offer(
                session.id(),
                session.timestamp(),
                session.buffer(),
                session.offset(),
                session.length()
            );
        }
    }

    return workCount;
}
🔍 关键点:
  • IPC Channel: ConsensusModule 与 ClusteredServiceContainer 之间的进程内通信通道
  • ServiceProxy: 封装了向服务容器发送消息的 Publication
  • 消息已提交: 只有 commitPosition 之前的消息才会转发给服务

步骤 7: 服务容器消费消息

📂 源码位置: ClusteredServiceAgent.java:300-400 (doWork 方法)
// ClusteredServiceAgent 主工作循环

public int doWork()
{
    int workCount = 0;

    // ===== 消费来自 ConsensusModule 的日志 =====
    if (null != logAdapter && !logAdapter.isDone())
    {
        final int polled = logAdapter.poll(
            commitPosition.get() - logPosition
        );

        workCount += polled;
    }

    return workCount;
}
📂 源码位置: BoundedLogAdapter.java:130-165
/**
 * BoundedLogAdapter 消费日志流
 * 调用 Image.boundedControlledPoll() 接收消息片段
 */
int poll(final long limit)
{
    return image.boundedControlledPoll(this, limit, fragmentLimit);
    // this 实现了 ControlledFragmentHandler 接口
    // 每个片段会回调 onFragment() 方法
}

/**
 * 处理每个日志片段
 * 这是框架调用用户服务的关键入口
 */
private Action onMessage(
    final DirectBuffer buffer,
    final int offset,
    final int length,
    final Header header)
{
    // ===== 1. 解码消息头 =====
    messageHeaderDecoder.wrap(buffer, offset);

    final int schemaId = messageHeaderDecoder.schemaId();
    final int templateId = messageHeaderDecoder.templateId();

    // ===== 2. 判断消息类型 =====
    if (templateId == SessionMessageHeaderDecoder.TEMPLATE_ID)
    {
        // ===== 这是会话消息,解码 SessionHeader =====
        sessionHeaderDecoder.wrap(
            buffer,
            offset + MessageHeaderDecoder.ENCODED_LENGTH,
            messageHeaderDecoder.blockLength(),
            messageHeaderDecoder.version()
        );

        // ===== 3. 调用 Agent 的 onSessionMessage =====
        agent.onSessionMessage(
            header.position(),                             // 日志位置
            sessionHeaderDecoder.clusterSessionId(),       // 会话ID
            sessionHeaderDecoder.timestamp(),              // 时间戳
            buffer,
            offset + AeronCluster.SESSION_HEADER_LENGTH,   // 实际消息偏移
            length - AeronCluster.SESSION_HEADER_LENGTH,   // 实际消息长度
            header                                         // Aeron Header
        );

        return Action.CONTINUE;
    }

    // ===== 处理其他消息类型 =====
    switch (templateId)
    {
        case TimerEventDecoder.TEMPLATE_ID:
            // 定时器事件
            agent.onTimerEvent(...);
            break;

        case SessionOpenEventDecoder.TEMPLATE_ID:
            // 会话打开事件
            agent.onSessionOpen(...);
            break;

        case SessionCloseEventDecoder.TEMPLATE_ID:
            // 会话关闭事件
            agent.onSessionClose(...);
            break;

        // ... 其他事件类型
    }

    return Action.CONTINUE;
}

步骤 8: 调用用户服务

📂 源码位置: ClusteredServiceAgent.java:483-497
/**
 * ClusteredServiceAgent 收到会话消息后的处理
 * 这是框架层的最后一步,下一步就是调用用户服务
 */
void onSessionMessage(
    final long logPosition,
    final long clusterSessionId,
    final long timestamp,
    final DirectBuffer buffer,
    final int offset,
    final int length,
    final Header header)
{
    // ===== 1. 更新内部状态 =====
    this.logPosition = logPosition;      // 记录当前日志位置
    clusterTime = timestamp;                // 更新集群时间

    // ===== 2. 查找客户端会话 =====
    final ClientSession clientSession = sessionByIdMap.get(clusterSessionId);

    // ===== 3. 🎯 调用用户实现的服务方法 =====
    service.onSessionMessage(
        clientSession,  // 会话对象(用户可通过它响应消息)
        timestamp,      // 集群时间戳(确保时间一致性)
        buffer,         // 消息缓冲区
        offset,         // 消息偏移量
        length,         // 消息长度
        header          // Aeron Header(包含元数据)
    );
}
🎯 终点: 用户服务

至此,消息完成了从客户端到用户服务的完整旅程:

  • Client → Leader: 通过 UDP/IPC 发送到 Ingress Channel
  • Leader → Log: 追加到本地日志并 Archive 持久化
  • Leader → Followers: 通过 Aeron Publication 并行复制
  • Log → Service: 通过 IPC Channel 转发给服务容器
  • Framework → User: 调用用户实现的 onSessionMessage()

用户服务示例

/**
 * 用户实现的 ClusteredService
 */
public class MyClusteredService implements ClusteredService
{
    // ===== 业务状态 =====
    private final Map<Long, String> dataStore = new HashMap<>();

    /**
     * 🎯 框架调用此方法处理客户端消息
     *
     * @param session  客户端会话(可用于响应)
     * @param timestamp 集群时间戳
     * @param buffer   消息缓冲区
     * @param offset   消息偏移量
     * @param length   消息长度
     * @param header   Aeron Header
     */
    @Override
    public void onSessionMessage(
        final ClientSession session,
        final long timestamp,
        final DirectBuffer buffer,
        final int offset,
        final int length,
        final Header header)
    {
        // ===== 1. 解码业务消息 =====
        final String message = buffer.getStringWithoutLengthAscii(offset, length);

        // ===== 2. 执行业务逻辑 =====
        if (message.startsWith("PUT:"))
        {
            final String[] parts = message.split(":");
            final long key = Long.parseLong(parts[1]);
            final String value = parts[2];

            dataStore.put(key, value);  // 更新状态

            // ===== 3. 响应客户端(可选)=====
            final ExpandableArrayBuffer responseBuffer = new ExpandableArrayBuffer();
            responseBuffer.putStringWithoutLengthAscii(0, "OK");

            session.offer(responseBuffer, 0, 2);  // 发送响应
        }
        else if (message.startsWith("GET:"))
        {
            final long key = Long.parseLong(message.substring(4));
            final String value = dataStore.get(key);

            if (null != value)
            {
                final ExpandableArrayBuffer responseBuffer = new ExpandableArrayBuffer();
                responseBuffer.putStringWithoutLengthAscii(0, value);

                session.offer(responseBuffer, 0, value.length());
            }
        }

        // ===== 4. 所有节点都会执行相同的逻辑 =====
        // 确保状态一致性
    }

    // ===== 其他生命周期方法 =====

    @Override
    public void onStart(final Cluster cluster, final Image snapshotImage)
    {
        // 启动时加载快照(如果存在)
    }

    @Override
    public void onTakeSnapshot(final ExclusivePublication snapshotPublication)
    {
        // 保存 dataStore 状态到快照
    }
}

Mermaid 流程图

sequenceDiagram
    participant Client as 🖥️ AeronCluster (客户端)
    participant Ingress as 📨 IngressAdapter
    participant CMA as 🧠 ConsensusModuleAgent (Leader)
    participant LogPub as 📝 LogPublisher
    participant Archive as 💾 Archive
    participant Followers as 🔄 Follower Nodes
    participant ServiceProxy as 🚀 ServiceProxy
    participant CSA as 🎯 ClusteredServiceAgent
    participant LogAdapter as 📖 BoundedLogAdapter
    participant UserService as ⚙️ MyClusteredService

    %% Step 1: 客户端发送
    Client->>Ingress: 1. offer(buffer, offset, length)
    Note over Client,Ingress: UDP/IPC 到 Leader Ingress Channel

    %% Step 2: Leader 接收
    Ingress->>CMA: 2. poll() → onIngressMessage()
    Note over CMA: 验证会话状态

    %% Step 3: 追加到日志
    CMA->>LogPub: 3. appendMessage(leadershipTermId, sessionId, timestamp, buffer)
    LogPub->>Archive: 3a. publication.offer() → Archive 记录
    LogPub->>Followers: 3b. 广播日志流

    %% Step 4: Followers 复制
    Followers->>Followers: 4. LogAdapter.poll() → Archive 本地记录
    Followers-->>CMA: 4a. 发送 AppendPosition 确认

    %% Step 5: 更新提交位置
    CMA->>CMA: 5. updateCommitPosition() → quorumPosition()
    Note over CMA: 计算多数派位置
commitPosition = median(positions) %% Step 6: 转发给服务容器 CMA->>ServiceProxy: 6. serviceProxy.offer(sessionId, buffer, ...) Note over ServiceProxy: IPC Publication 到服务容器 %% Step 7: 服务容器消费 ServiceProxy->>CSA: 7a. Image 传输 CSA->>LogAdapter: 7b. poll(limit) LogAdapter->>LogAdapter: 7c. onMessage() 解码消息头 Note over LogAdapter: 识别 SessionMessageHeaderDecoder %% Step 8: 调用用户服务 LogAdapter->>CSA: 7d. agent.onSessionMessage(logPos, sessionId, timestamp, buffer, ...) CSA->>UserService: 8. 🎯 service.onSessionMessage(session, timestamp, buffer, offset, length, header) Note over UserService: 用户业务逻辑执行 %% 可选响应 UserService-->>CSA: 9a. session.offer(responseBuffer) [可选] CSA-->>Client: 9b. 通过 Egress Channel 响应 [可选]
💡 关键观察:
  • 完全确定性: 所有节点按相同顺序处理相同的日志,保证状态一致性
  • 多数派提交: 只有法定数量节点确认后,消息才会被转发给服务
  • Zero-Copy 传输: 从客户端到服务的整个链路都使用 Aeron 的零拷贝机制
  • IPC 隔离: ConsensusModule 和 Service 在不同容器中,通过 IPC 通信
  • Archive 持久化: 所有日志自动记录到磁盘,支持节点恢复

调试断点建议

文件 行号 / 方法 观察内容
AeronCluster.java offer() 方法 客户端发送消息,查看 correlationId 和 clusterSessionId
ConsensusModuleAgent.java onIngressMessage() Leader 接收消息,查看会话验证逻辑
LogPublisher.java appendMessage() 第 132 行 日志追加,查看 publication.offer() 返回值
ConsensusModuleAgent.java quorumPosition() 查看 rankedPositions 数组和中位数计算
BoundedLogAdapter.java onMessage() 第 147 行 查看消息类型识别(templateId)
BoundedLogAdapter.java 第 155 行 调用 agent.onSessionMessage(),查看参数
ClusteredServiceAgent.java onSessionMessage() 第 496 行 调用用户服务前,查看 clientSession 和 buffer 内容
MyClusteredService.java onSessionMessage() 用户实现 用户业务逻辑执行,查看状态变更

十三、总结与调试指南

核心断点位置

文件 行号 方法/位置 观察内容
Election.java 175 doWork() - switch(state) 选举状态转换,查看当前处于哪个阶段
Election.java 656 canvass() 拉票逻辑,日志比较,候选人评估
Election.java 710 candidateBallot() 投票统计,查看 vote 结果
Election.java 765 leaderLogReplication() quorumPosition 计算,日志同步进度
Election.java 325 onRequestVote() 投票请求处理,查看投票规则判断
ConsensusModuleAgent.java 425 doWork() 主工作循环,election vs consensusWork
ConsensusModuleAgent.java quorumPosition() 多数派位置计算 查看 rankedPositions 数组排序结果
LogPublisher.java 115 appendMessage() 日志追加过程,查看 offer() 结果

监控关键指标

// 通过 Aeron Counters 监控集群状态

public void monitorCluster(Aeron aeron) {
    CountersReader counters = aeron.countersReader();

    counters.forEach((counterId, typeId, keyBuffer, label) -> {
        long value = counters.getCounterValue(counterId);

        switch (typeId) {
            case ELECTION_STATE_TYPE_ID:
                System.out.println("Election State: " + ElectionState.get(value));
                break;

            case CLUSTER_NODE_ROLE_TYPE_ID:
                System.out.println("Node Role: " + Cluster.Role.get((int)value));
                break;

            case COMMIT_POSITION_TYPE_ID:
                System.out.println("Commit Position: " + value);
                break;

            case CLUSTER_LEADERSHIP_TERM_ID_TYPE_ID:
                System.out.println("Leadership Term ID: " + value);
                break;
        }
    });
}

常见问题排查

🐛 问题1: 选举卡在 CANVASS 阶段
  • 检查网络: 确认 memberEndpoint 可达,UDP端口未被防火墙阻止
  • 查看日志比较: 在 ClusterMember.isQuorumCandidate() 打断点,查看 compareLog() 结果
  • 时钟偏移: 确认各节点时钟差异 < 1秒
  • 超时配置: 检查 electionTimeoutNsleaderHeartbeatTimeoutNs
🐛 问题2: quorumPosition 不更新
  • 检查 Followers: 查看 Followers 是否正常消费日志(LogAdapter.poll() 返回值)
  • AppendPosition 消息: 确认 Followers 发送 AppendPosition 给 Leader
  • Archive Recording: 确认 Archive 正常工作,Recording 未停止
  • 网络延迟: 检查网络延迟和丢包率
🐛 问题3: 节点恢复时间过长
  • 快照频率: 降低 snapshotIntervalCounter(例如从1000改为100)
  • 快照大小: 优化 onTakeSnapshot(),减少序列化数据量
  • 磁盘IO: 使用SSD,配置 file-sync-level=0(异步刷盘)
  • 日志回放: 查看 LogReplay.doWork() 性能,优化消息处理逻辑

性能优化建议

优化项 配置参数 推荐值 说明
Term Length term-length 64KB - 1MB 更大的 Term 减少 Archive Rollover 开销
快照间隔 snapshotIntervalCounter 100 - 1000 根据消息吞吐量调整,平衡恢复时间和性能
心跳间隔 leaderHeartbeatIntervalNs 100ms - 500ms 更短的间隔提高故障检测速度,但增加网络负载
选举超时 electionTimeoutNs 1s - 5s 根据网络延迟调整,WAN环境使用更大值
Idle Strategy idleStrategy YieldingIdleStrategy 平衡CPU使用率和延迟,低延迟场景使用 BusySpinIdleStrategy

学习路线图

📚 深度学习建议:
  1. 阅读 Raft 论文: 《In Search of an Understandable Consensus Algorithm》- Diego Ongaro & John Ousterhout
  2. 调试 Election 状态机:
    • Election.doWork() 的 switch 语句打断点
    • 单步跟踪从 CANVASS 到 LEADER_READY 的完整流程
    • 观察 state, candidateTermId, leadershipTermId 变化
  3. 跟踪消息流转:
    • 客户端请求 -> IngressAdapter -> LogPublisher -> Archive -> LogAdapter -> ServiceProxy -> ClusteredService
    • 在每个环节打印日志位置,理解 Zero-Copy 机制
  4. 分析 quorumPosition 计算:
    • quorumPosition() 打断点
    • 查看 rankedPositions 数组内容
    • 验证中位数算法的正确性
  5. 模拟故障场景:
    • Kill Leader 节点,观察选举流程
    • 断开 Follower 网络,观察 quorumPosition 变化
    • 重启节点,观察恢复流程
  6. 阅读 Aeron 底层:
    • 理解 Publication / Subscription 机制
    • 学习 BufferClaim 和 Zero-Copy 原理
    • 研究 Archive Recording 实现
  7. 编写集成测试:
    • 参考 ClusterNodeTest.java
    • 编写自定义 ClusteredService
    • 测试快照、恢复、选举等场景