Flink HA模式下JobManager切换时发送告警

资源&版本信息

Flink版本1.14.6

运行平台:K8s

HA使用ZK(使用K8s的ETC应该是一个道理)

详解Flink HA原理

Flink启动时会创建HighAvailabilityServices提供HA和相关基础服务,其中包括leaderRetrievalService和LeaderElectionService服务;

  • leaderRetrievalService用于高可用组件的调用方获得leader节点,例如在JobManager中通过ResourceManagerLeaderRetriever服务获取ResourceManager的Leader节点;

  • LeaderElectionService用于主节点竞选,一旦当前组件被选为Leader节点,就可以对外提供服务,leaderRetrievalService就能够获取已注册且有效的Leader节点;

LeaderRetrievalService讲解

DefaultLeaderRetrievalService类结构如下

在这里插入图片描述

  1. LeaderRetrievalService默认实现类DefaultLeaderRetrievalService;DefaultLeaderRetrievalService还实现了LeaderRetrievalEventHandler接口(该接口只有notifyLeaderAddress方法,用于状态变化时被回调);

  2. DefaultLeaderRetrievalService中notifyLeaderAddress方法会判断当前是否处于运行状态,然后调用leaderListener.notifyLeaderAddress方法通知监听器leader变更!

  3. DefaultLeaderRetrievalService.leaderListener是LeaderRetrievalListener一种实现JobManagerLeaderListener,用于TaskManager监听Jobmanager变更的实现类,实现在jobmanager变更时及时修改连接信息。

如何实现HA变更时发送告警信息了?

JobManager宕机重启或ZK不可用后恢复,此时肯定会发生HA切换,其次根据代码观察每次HA切换必会导致leaderId变化(每次连接),根据上述背景知识逐个情况分析。

JobManager宕机:

根据日志观察leaderListener.notifyLeaderAddress方法会被调用两次,第一次是将leaderId地址设置为空,在JobManager启动并选举为leader后,notifyLeaderAddress会被再次调用将leaderId设置为最新的leaderId;

ZK不可用:

根据日志观察leaderListener.notifyLeaderAddress方法会被调用三次,假设leaderId原先是A,先被设置为null,然后被设置为A,再被设置为B;

结论:

根据上述情况,我们可以在leaderListener.notifyLeaderAddress方法中记录每次的leaderId的值,当该值发生变化时,变为null或者由A变成B时发送HA变更告警即可;为了更加精准,选择leaderId在A变成B时,或者leaderId在null变成B时发送告警

如何在leaderListener.notifyLeaderAddress方法中将告警发出了?

使用java agent,在flink启动时设置agent即可(-javaagent:agent.jar=123其中123是入参);具体代码插入点可选择方法进入时或同步块中

@Override
public void notifyLeaderAddress(
        @Nullable final String leaderAddress, @Nullable final UUID leaderId) {
    Optional<JobMasterId> jobManagerLostLeadership = Optional.empty();

    synchronized (lock) {
        if (stopped) {
            LOG.debug(
                    "{}'s leader retrieval listener reported a new leader for job {}. "
                            + "However, the service is no longer running.",
                    DefaultJobLeaderService.class.getSimpleName(),
                    jobId);
        } else {
            final JobMasterId jobMasterId = JobMasterId.fromUuidOrNull(leaderId);

            LOG.debug(
                    "New leader information for job {}. Address: {}, leader id: {}.",
                    jobId,
                    leaderAddress,
                    jobMasterId);

            if (leaderAddress == null || leaderAddress.isEmpty()) {
                // the leader lost leadership but there is no other leader yet.
                jobManagerLostLeadership = Optional.ofNullable(currentJobMasterId);
                closeRpcConnection();
            } else {
                // check whether we are already connecting to this leader
                if (Objects.equals(jobMasterId, currentJobMasterId)) {
                    LOG.debug(
                            "Ongoing attempt to connect to leader of job {}. Ignoring duplicate leader information.",
                            jobId);
                } else {
                    closeRpcConnection();
                    openRpcConnectionTo(leaderAddress, jobMasterId);
                }
            }
        }
    }

    // send callbacks outside of the lock scope
    jobManagerLostLeadership.ifPresent(
            oldJobMasterId ->
                    jobLeaderListener.jobManagerLostLeadership(jobId, oldJobMasterId));
}

最终实现待更新~

相关推荐

  1. PostgreSQL数据库切换到另一个模式

    2024-05-14 08:10:04       21 阅读
  2. iOS app切换后台添加模糊遮罩层

    2024-05-14 08:10:04       27 阅读

最近更新

  1. .Net Core WebAPI参数的传递方式

    2024-05-14 08:10:04       0 阅读
  2. QT--气泡框的实现

    2024-05-14 08:10:04       0 阅读
  3. LeetCode 968.监控二叉树 (hard)

    2024-05-14 08:10:04       0 阅读
  4. leetcode热题100.完全平方数(动态规划进阶)

    2024-05-14 08:10:04       0 阅读
  5. leetcode328-Odd Even Linked List

    2024-05-14 08:10:04       0 阅读
  6. C 语言设计模式(结构型)

    2024-05-14 08:10:04       0 阅读
  7. v-if 与 v-show(vue3条件渲染)

    2024-05-14 08:10:04       0 阅读
  8. kafka防止消息丢失配置

    2024-05-14 08:10:04       0 阅读

热门阅读

  1. 量子计算入门:原理与编程

    2024-05-14 08:10:04       2 阅读
  2. MySQL和MongoDB区别

    2024-05-14 08:10:04       3 阅读
  3. k8s 配置管理

    2024-05-14 08:10:04       2 阅读
  4. Redis 5.0 Stream数据结构深入分析

    2024-05-14 08:10:04       3 阅读
  5. 力扣:93. 复原 IP 地址

    2024-05-14 08:10:04       4 阅读
  6. 数据库和Redis数据不一致的问题

    2024-05-14 08:10:04       3 阅读
  7. Rust 语言不支持 goto 语句

    2024-05-14 08:10:04       4 阅读
  8. ubuntu 24.04 devilspie 报错解决

    2024-05-14 08:10:04       3 阅读
  9. CircleCI的原理及应用详解(二)

    2024-05-14 08:10:04       3 阅读
  10. 10、Go Gin 连接Redis以及Cookie&Session

    2024-05-14 08:10:04       4 阅读
  11. 使用frp通过http访问内网web服务

    2024-05-14 08:10:04       3 阅读
  12. Nginx-01-Nginx 是什么? 能做什么?

    2024-05-14 08:10:04       7 阅读
  13. hdfs中的小知识(hadoop hdfs hive)

    2024-05-14 08:10:04       3 阅读