Flink 1.18.1 Standalone集群搭建保姆级教程:从SSH免密到Web UI验证,一次搞定

张开发
2026/4/3 18:32:31 15 分钟阅读
Flink 1.18.1 Standalone集群搭建保姆级教程:从SSH免密到Web UI验证,一次搞定
Flink 1.18.1 Standalone集群搭建实战指南从零开始构建高可靠数据处理平台引言在当今数据驱动的时代实时数据处理能力已成为企业技术栈中的关键组成部分。Apache Flink作为一款开源的流处理框架以其高吞吐、低延迟的特性在金融风控、实时推荐、物联网数据分析等领域大放异彩。对于刚接触Flink的技术人员来说如何快速搭建一个稳定可靠的Standalone集群是掌握这项技术的第一步。本文将带领你从零开始一步步完成Flink 1.18.1 Standalone集群的搭建过程。不同于简单的步骤罗列我们会深入每个环节背后的原理解释为什么需要这些配置以及在遇到常见问题时该如何排查和解决。无论你是开发人员还是运维工程师都能通过本指南获得可立即应用于生产环境的实用知识。1. 环境准备与基础配置1.1 硬件与软件需求在开始搭建Flink集群前我们需要确保基础设施满足基本要求。以下是推荐的配置服务器数量至少3台1个JobManager 2个TaskManager操作系统LinuxCentOS 7或Ubuntu 18.04Java环境JDK 8或11推荐OpenJDK 11内存每台服务器至少4GB磁盘空间每台服务器至少20GB可用空间网络节点间千兆网络连接提示生产环境建议使用专用服务器而非虚拟机以获得更好的性能表现。1.2 系统基础配置在所有节点上执行以下基础配置# 关闭防火墙仅用于测试环境 systemctl stop firewalld systemctl disable firewalld # 禁用SELinux setenforce 0 sed -i s/SELINUXenforcing/SELINUXdisabled/g /etc/selinux/config # 配置主机名解析在所有节点上执行 cat /etc/hosts EOF 192.168.0.168 node1 192.168.0.73 node2 192.168.0.34 node3 EOF # 安装必要工具 yum install -y wget vim openssh-clients # CentOS # 或 apt-get install -y wget vim openssh-client # Ubuntu1.3 Java环境安装与配置Flink运行依赖Java环境以下是安装OpenJDK 11的步骤# CentOS yum install -y java-11-openjdk-devel # Ubuntu apt-get install -y openjdk-11-jdk # 验证安装 java -version配置JAVA_HOME环境变量# 查找Java安装路径 readlink -f $(which java) | sed s:/bin/java:: # 将输出结果添加到/etc/profile中 echo export JAVA_HOME/usr/lib/jvm/java-11-openjdk-amd64 /etc/profile echo export PATH$JAVA_HOME/bin:$PATH /etc/profile source /etc/profile2. Flink安装与集群配置2.1 下载与安装Flink在主节点(node1)上执行以下操作# 创建安装目录 mkdir -p /opt/flink cd /opt/flink # 下载Flink 1.18.1 wget https://archive.apache.org/dist/flink/flink-1.18.1/flink-1.18.1-bin-scala_2.12.tgz # 解压安装包 tar -xzf flink-1.18.1-bin-scala_2.12.tgz ln -s flink-1.18.1 current # 配置环境变量 echo export FLINK_HOME/opt/flink/current /etc/profile echo export PATH$PATH:$FLINK_HOME/bin /etc/profile source /etc/profile2.2 配置SSH免密登录集群节点间的SSH免密登录是Flink集群正常工作的基础。以下是详细配置步骤在主节点(node1)上生成SSH密钥对ssh-keygen -t rsa -b 4096 -N -f ~/.ssh/id_rsa将公钥分发到所有节点包括自身ssh-copy-id node1 ssh-copy-id node2 ssh-copy-id node3验证免密登录ssh node1 date ssh node2 date ssh node3 date注意如果遇到Permission denied错误检查以下几点目标节点的sshd服务是否运行systemctl status sshd/etc/ssh/sshd_config中是否允许公钥认证PubkeyAuthentication yes目标节点上的~/.ssh/authorized_keys文件权限是否为6002.3 核心配置文件详解2.3.1 flink-conf.yaml配置编辑$FLINK_HOME/conf/flink-conf.yaml文件以下是关键配置项# JobManager配置 jobmanager.rpc.address: node1 jobmanager.bind-host: 0.0.0.0 jobmanager.rpc.port: 6123 jobmanager.memory.process.size: 1600m # TaskManager配置 taskmanager.numberOfTaskSlots: 4 taskmanager.memory.process.size: 4096m # 网络与IO配置 taskmanager.network.memory.fraction: 0.1 taskmanager.network.memory.max: 1024mb io.tmp.dirs: /tmp # 检查点配置根据需求调整 state.backend: filesystem state.checkpoints.dir: hdfs://namenode:8020/flink/checkpoints state.savepoints.dir: hdfs://namenode:8020/flink/savepoints # Web UI配置 rest.address: node1 rest.bind-address: 0.0.0.0 rest.port: 80812.3.2 masters文件配置指定JobManager节点echo node1:8081 $FLINK_HOME/conf/masters2.3.3 workers文件配置指定所有TaskManager节点cat $FLINK_HOME/conf/workers EOF node1 node2 node3 EOF2.4 分发Flink安装包将配置好的Flink分发到所有工作节点scp -r /opt/flink node2:/opt/ scp -r /opt/flink node3:/opt/ # 在工作节点上创建符号链接 ssh node2 ln -s /opt/flink/flink-1.18.1 /opt/flink/current ssh node3 ln -s /opt/flink/flink-1.18.1 /opt/flink/current # 同步环境变量 scp /etc/profile node2:/etc/ scp /etc/profile node3:/etc/ ssh node2 source /etc/profile ssh node3 source /etc/profile3. 集群启动与验证3.1 启动Flink集群在主节点(node1)上执行$FLINK_HOME/bin/start-cluster.sh启动日志通常会输出到$FLINK_HOME/log目录下。检查日志确认无错误tail -f $FLINK_HOME/log/flink-*-standalonesession-*.log3.2 集群状态验证3.2.1 命令行验证# 查看集群状态 $FLINK_HOME/bin/flink list # 查看TaskManager状态 $FLINK_HOME/bin/flink run -m node1:8081 --class org.apache.flink.examples.java.wordcount.WordCount $FLINK_HOME/examples/batch/WordCount.jar3.2.2 Web UI验证访问http://node1:8081你应该能看到类似如下的界面![Flink Web UI示意图]在Web UI中重点关注以下几个指标Task Managers应该显示3个node1, node2, node3Slots Available总共应该有12个每个TaskManager 4个slot × 3个节点JobManager Status显示为Healthy3.3 常见问题排查以下是搭建过程中可能遇到的典型问题及解决方案问题现象可能原因解决方案TaskManager未出现在Web UI网络连接问题/配置错误检查workers文件配置查看TaskManager日志Job提交失败资源不足/配置错误检查slot数量调整taskmanager.memory.process.size节点间通信失败防火墙/SSH配置问题禁用防火墙验证SSH免密登录Web UI无法访问绑定地址/端口冲突检查rest.address和rest.bind-address配置对于更详细的日志分析可以使用以下命令# 查看JobManager日志 less $FLINK_HOME/log/flink-*-standalonesession-*.log # 查看TaskManager日志在各工作节点上 less $FLINK_HOME/log/flink-*-taskexecutor-*.log4. 集群优化与生产建议4.1 性能调优参数对于生产环境建议调整以下参数以获得更好性能# 在flink-conf.yaml中添加或修改 # 网络与缓冲 taskmanager.network.memory.max: 2048mb taskmanager.network.memory.fraction: 0.2 # 检查点优化 execution.checkpointing.interval: 30s execution.checkpointing.timeout: 10min execution.checkpointing.min-pause: 500ms # 状态后端推荐使用RocksDB state.backend: rocksdb state.backend.rocksdb.localdir: /data/flink/rocksdb # 高可用配置可选 high-availability: zookeeper high-availability.zookeeper.quorum: zk1:2181,zk2:2181,zk3:2181 high-availability.storageDir: hdfs://namenode:8020/flink/ha/4.2 资源隔离与调度对于多租户环境考虑以下策略资源隔离通过YARN或Kubernetes部署模式实现Slot共享组限制哪些任务可以共享slotCPU隔离使用cgroups限制每个TaskManager的CPU使用4.3 监控与告警建议配置以下监控项关键指标监控检查点成功率与耗时背压指标各算子的延迟资源使用率CPU、内存、网络集成方案Prometheus GrafanaELK收集和分析日志自定义告警规则示例Prometheus配置# 在flink-conf.yaml中 metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter metrics.reporter.prom.port: 9250-92604.4 安全加固建议对于生产环境务必考虑以下安全措施网络层限制Web UI和RPC端口的访问IP启用TLS加密通信认证授权配置Kerberos认证启用基于角色的访问控制(RBAC)数据安全启用传输加密配置敏感信息加密5. 实际应用案例演示5.1 运行WordCount示例让我们通过经典案例验证集群功能# 提交WordCount批处理作业 $FLINK_HOME/bin/flink run -m node1:8081 \ $FLINK_HOME/examples/batch/WordCount.jar \ --input hdfs://namenode:8020/input/text.txt \ --output hdfs://namenode:8020/output/wordcount # 提交流处理Socket示例需要先启动Socket服务 $FLINK_HOME/bin/flink run -m node1:8081 \ $FLINK_HOME/examples/streaming/SocketWindowWordCount.jar \ --hostname localhost \ --port 90005.2 自定义作业提交开发一个简单的Flink作业并提交到集群// SimpleStreamingJob.java public class SimpleStreamingJob { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(); env.addSource(new FlinkKafkaConsumer(input-topic, new SimpleStringSchema(), properties)) .flatMap((String value, CollectorTuple2String, Integer out) - { for (String word : value.split(\\s)) { out.collect(new Tuple2(word, 1)); } }) .keyBy(0) .sum(1) .addSink(new FlinkKafkaProducer(output-topic, new SimpleStringSchema(), properties)); env.execute(Kafka WordCount); } }打包并提交作业mvn clean package $FLINK_HOME/bin/flink run -m node1:8081 target/your-job.jar5.3 集群管理常用命令掌握以下命令对日常运维很有帮助# 停止集群 $FLINK_HOME/bin/stop-cluster.sh # 单独启动/停止TaskManager $FLINK_HOME/bin/taskmanager.sh start|stop # 查看运行中的作业 $FLINK_HOME/bin/flink list -m node1:8081 # 取消作业 $FLINK_HOME/bin/flink cancel -m node1:8081 jobID # 保存点操作 $FLINK_HOME/bin/flink savepoint -m node1:8081 jobID [targetDirectory] $FLINK_HOME/bin/flink run -s :savepointPath [:runArgs]6. 升级与维护策略6.1 版本升级流程准备阶段备份配置文件和重要数据阅读新版本发布说明和迁移指南在测试环境验证升级过程执行升级# 停止当前集群 $FLINK_HOME/bin/stop-cluster.sh # 安装新版本 wget https://archive.apache.org/dist/flink/flink-1.18.1/flink-1.18.1-bin-scala_2.12.tgz tar -xzf flink-1.18.1-bin-scala_2.12.tgz ln -sfn flink-1.18.1 current # 应用原有配置 cp flink-1.17.0/conf/* flink-1.18.1/conf/ # 启动新集群 $FLINK_HOME/bin/start-cluster.sh验证阶段运行健康检查作业监控关键指标逐步迁移生产作业6.2 日常维护建议日志管理配置日志轮转log4j.properties集中收集和分析日志备份策略定期备份配置文件和检查点实现配置版本控制容量规划监控资源使用趋势提前规划扩容6.3 故障恢复流程JobManager故障检查日志定位原因从最近检查点恢复考虑配置高可用模式TaskManager故障自动重启策略检查资源使用情况验证节点健康状况数据一致性检查验证检查点完整性检查端到端精确一次语义# 从保存点恢复作业示例 $FLINK_HOME/bin/flink run -m node1:8081 \ -s hdfs://namenode:8020/flink/savepoints/savepoint-123456 \ -n /path/to/your-job.jar

更多文章