host名字 | IP | 功能模块划分 |
---|---|---|
hadoop1 | 10.12.210.28 | |
hadoop2 | 10.12.210.29 | |
hadoop3 | 10.12.210.30 |
//主机改名
hostnamectl set-hostname hadoop1
//创建hadoop用户
groupadd hadoop
useradd -g hadoop hadoop
//创建密码
passwd hadoop
dmall123
//授权新的角色与root用户权限相同
[root@hadoop1 ~]# vim /etc/sudoers
## Next comes the main part: which users can run what software on
## which machines (the sudoers file can be shared between multiple
## systems).
## Syntax:
##
## user MACHINE=COMMANDS
##
## The COMMANDS section may have other options added to it.
##
## Allow root to run any commands anywhere
root ALL=(ALL) ALL
hadoop ALL=(ALL) ALL
## Allows members of the 'sys' group to run networking, software,
## service management apps and more.
# %sys ALL = NETWORKING, SOFTWARE, SERVICES, STORAGE, DELEGATING, PROCESSES, LOCATE, DRIVERS
## Allows people in group wheel to run all commands
%wheel ALL=(ALL) ALL
## Same thing without a password
# %wheel ALL=(ALL) NOPASSWD: ALL
## Allows members of the users group to mount and unmount the
## cdrom as root
wq! 保存后
增加hadoop行后, hadoop用户具有root的权限
//安装JDK
[root@hadoop1 ~]# yum install -y java-1.6.0-openjdk.x86_64 java-1.8.0-openjdk-devel.x86_64
//安装rsync
[root@hadoop1 ~]# yum install -y rsync.x86_64
//下载hadoop包
[root@hadoop1 soft]# wget https://mirrors.bfsu.edu.cn/apache/hadoop/common/hadoop-2.10.0/hadoop-2.10.0.tar.gz
[root@hadoop1 ~]# mkdir /hadoop
[root@hadoop1 ~]# cd /hadoop
[root@hadoop1 hadoop]# tar zxvf /soft/hadoop-2.10.0.tar.gz ./
//对当前目录进行用户授权,后续执行的时候会用到
[root@hadoop1 hadoop]#chown -R hadoop.hadoop ./
//在文件的最后添加,JDK,HADOOP的路径配置
export JAVA_HOME=/usr/lib/jvm/java
export HADOOP_HOME=/hadoop/hadoop-2.10.0
#export HADOOP_OPTS="-Djava.library.path=$HADOOP_PREFIX/lib:$HADOOP_PREFIX/lib/native"
export LD_LIBRARY_PATH=$HADOOP_HOME/lib/native
export HADOOP_COMMON_LIB_NATIVE_DIR=/hadoop/hadoop-2.10.0/lib/native
export HADOOP_OPTS="-Djava.library.path=/usr/local/hadoop/lib"
#export HADOOP_ROOT_LOGGER=DEBUG,console
export PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/sbin:$HADOOP_HOME/bin
修改profile文件后在命令行环境下执行,使配置生效
[root@hadoop1 ~]# source /etc/profile
[root@hadoop1 ~]# echo $JAVA_HOME
//增加下面三个解析
10.12.210.28 hadoop1
10.12.210.29 hadoop2
10.12.210.30 hadoop3
hadoop1 | hadoop2 | hadoop3 | |
---|---|---|---|
HDFS | NameNode DataNode |
DataNode | SecondaryNameNode DataNode |
YARN | NodeManager | ResourceManager NodeManager |
NodeManager |
[root@hadoop1 bin]# cd /usr/local/bin/
[root@hadoop1 bin]# touch xsync
[root@hadoop1 bin]# vim xsync
#!/bin/bash
#1 获取输入参数个数,如果没有参数,直接退出
pcount=$#
if((pcount==0)); then
echo no args;
exit;
fi
#2 获取文件名称
p1=$1
fname=`basename $p1`
echo fname=$fname
#3 获取上级目录到绝对路径
pdir=`cd -P $(dirname $p1); pwd`
echo pdir=$pdir
#4 获取当前用户名称
user=`whoami`
#5 循环
for((host=1; host<4; host++)); do
echo ------------------- hadoop$host --------------
rsync -rvl $pdir/$fname $user@hadoop$host:$pdir
done
//给上执行权限
[root@hadoop1 bin]# chmod 777 xsync
//拷贝文件查看是否成功
[root@hadoop1 bin]# xsync xsync
fname=xsync
pdir=/usr/local/bin
------------------- hadoop1 --------------
root@hadoop1's password:
sending incremental file list
sent 42 bytes received 12 bytes 9.82 bytes/sec
total size is 495 speedup is 9.17
------------------- hadoop2 --------------
root@hadoop2's password:
sending incremental file list
xsync
sent 584 bytes received 35 bytes 176.86 bytes/sec
total size is 495 speedup is 0.80
------------------- hadoop3 --------------
root@hadoop3's password:
sending incremental file list
xsync
sent 584 bytes received 35 bytes 137.56 bytes/sec
total size is 495 speedup is 0.80
配置core-site.xml
[hadoop@hadoop1 hadoop]$ vi core-site.xml
在该文件中编写如下配置
<!-- 指定HDFS中NameNode的地址 -->
<property>
<name>fs.defaultFS</name>
<value>hdfs://hadoop1:9000</value>
</property>
<!-- 指定Hadoop运行时产生文件的存储目录 -->
<property>
<name>hadoop.tmp.dir</name>
<value>/hadoop/hadoop-2.10.0/data/tmp</value>
</property>
配置hadoop-env.sh
[hadoop@hadoop1 hadoop]$ vim hadoop-env.sh
export JAVA_HOME=/usr/lib/jvm/java
配置hdfs-site.xml
[hadoop@hadoop1 hadoop]$ vim hdfs-site.xml
在该文件中编写如下配置
<property>
<name>dfs.replication</name>
<value>3</value>
</property>
<!-- 指定Hadoop辅助名称节点主机配置 -->
<property>
<name>dfs.namenode.secondary.http-address</name>
<value>hadoop3:50090</value>
</property>
###Configuration details
配置的先后顺序不重要,但其中某些配置之间存在依耐。例如:
dfs.ha.namenodes.[nameservice ID] 依耐 dfs.nameservices 即如果dfs.nameservices配置value为a那么dfs.ha.namenodes.[nameservice ID]则写成dfs.ha.namenodes.a
####hdfs-site.xml
dfs.nameservices
hdfs-site.xml名称可自定义,建议取个合理的名字。该配置影响到其它配置,也会影响到hdfs文件系统存储的绝对路径。
如果您还在使用HDFS Federation,则此配置设置还应包括其他名称服务列表,HA或其他,用逗号作为分隔列表。
<property>
<name>dfs.nameservices</name>
<value>mycluster</value>
</property>
dfs.ha.namenodes.[nameservice ID]
让dn确定集群中有多少个nn。nn至少两个,推荐3个,不建议超过5个。
<property>
<name>dfs.ha.namenodes.mycluster</name>
<value>nn1,nn2, nn3</value>
</property>
dfs.namenode.rpc-address.[nameservice ID].[name node ID]
侦听的每个NameNode的完全限定的RPC地址。
<property>
<name>dfs.namenode.rpc-address.mycluster.nn1</name>
<value>machine1.example.com:8020</value>
</property>
<property>
<name>dfs.namenode.rpc-address.mycluster.nn2</name>
<value>machine2.example.com:8020</value>
</property>
<property>
<name>dfs.namenode.rpc-address.mycluster.nn3</name>
<value>machine3.example.com:8020</value>
</property>
dfs.namenode.http-address.[nameservice ID].[name node ID]
侦听的每个NameNode的完全限定HTTP地址。
注意:如果启用了Hadoop的安全功能,则还应为每个NameNode设置https-address。
<property>
<name>dfs.namenode.http-address.mycluster.nn1</name>
<value>machine1.example.com:9870</value>
</property>
<property>
<name>dfs.namenode.http-address.mycluster.nn2</name>
<value>machine2.example.com:9870</value>
</property>
<property>
<name>dfs.namenode.http-address.mycluster.nn3</name>
<value>machine3.example.com:9870</value>
</property>
dfs.namenode.shared.edits.dir
配置JournalNodes (jn)地址。如果是管理脚本,则会根据改地址启动jn,如果是active nn,则会通过该地址传输命名空间变更信息。备用的nn则会通过该配置地址拉取变更数据。配置值最后的/mycluster作为存储的根路径,多个HA可公用服务器进行数据存储,节约服务器成本。因此每个HA服务的根路径不能一样,便于区分。
<property>
<name>dfs.namenode.shared.edits.dir</name>
<value>qjournal://node1.example.com:8485;node2.example.com:8485;node3.example.com:8485/mycluster</value>
</property>
dfs.journalnode.edits.dir
这是JournalNode本地存储绝对路径。
<property>
<name>dfs.journalnode.edits.dir</name>
<value>/path/to/journal/node/local/data</value>
</property>
dfs.client.failover.proxy.provider.[nameservice ID]
便于客户端确定哪个nn是主节点。对于第一次调用,它同时调用所有名称节点以确定活动的名称节点,之后便直接调用主节点(active nn )
ConfiguredFailoverProxyProvider 和RequestHedgingProxyProvider 选其一即可。
<property>
<name>dfs.client.failover.proxy.provider.mycluster</name>
<value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
</property>
dfs.ha.fencing.methods
当发生故障转移时,以前的Active NameNode仍可能向客户端提供读取请求,这可能已过期,直到NameNode在尝试写入JournalNode时关闭。因此,即使使用Quorum Journal Manager,仍然需要配置一些防护方法。但是,为了在防护机制失败的情况下提高系统的可用性,建议配置防护方法,能确保不会发生此类情况。请注意,如果您选择不使用实际的防护方法,则仍必须为此设置配置某些内容,例如“shell(/ bin / true)”。 故障转移期间使用的防护方法配置为回车分隔列表,将按顺序尝试,直到指示防护成功为止。 Hadoop有两种方法:shell和sshfence。有关实现自定义防护方法的信息,请参阅org.apache.hadoop.ha.NodeFencer类。
0.1) sshfence
sshfence选项通过SSH连接到目标节点,并使用fuser来终止侦听服务TCP端口的进程。为了使此防护选项起作用,它必须能够在不提供密码的情况下SSH到目标节点。因此,还必须配置dfs.ha.fencing.ssh.private-key-files选项,该选项是以逗号分隔的SSH私钥文件列表。
如果使用ssh免密的方式,那么两个nn需要互相免密登陆,因为涉及到zkfc需要登陆另一台nn将其降序的操作。
<property>
<name>dfs.ha.fencing.methods</name>
<value>sshfence</value>
</property>
<property>
<name>dfs.ha.fencing.ssh.private-key-files</name>
<value>/home/exampleuser/.ssh/id_rsa</value>
</property>
可选项,可以配置非标准用户名或端口以执行SSH。也可以为SSH配置超时(以毫秒为单位),之后将认为此防护方法已失败。
<property>
<name>dfs.ha.fencing.methods</name>
<value>sshfence([[username][:port]])</value>
</property>
<property>
<name>dfs.ha.fencing.ssh.connect-timeout</name>
<value>30000</value>
</property>
0.2) shell
通过脚本终止active nn
<property>
<name>dfs.ha.fencing.methods</name>
<value>shell(/path/to/my/script.sh arg1 arg2 ...)</value>
</property>
core-site.xml
fs.defaultFS
配置和nameservice ID值一样。将通过mycluster结合hdfs配置中的dfs.nameservices和dfs.ha.namenodes.mycluster找到该服务下的所有nn,确认主节点。
<property>
<name>fs.defaultFS</name>
<value>hdfs://mycluster</value>
</property>
配置yarn-env.sh
[hadoop@hadoop1 hadoop]$ vim yarn-env.sh
export JAVA_HOME=/usr/lib/jvm/java
配置yarn-site.xml
[hadoop@hadoop1 hadoop]$ vim yarn-site.xml
在该文件中增加如下配置
<!-- Reducer获取数据的方式 -->
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<!-- 指定YARN的ResourceManager的地址 -->
<property>
<name>yarn.resourcemanager.hostname</name>
<value>hadoop2</value>
</property>
配置mapred-env.sh
[hadoop@hadoop1 hadoop]$ vim mapred-env.sh
export JAVA_HOME=/usr/lib/jvm/java
配置mapred-site.xml
[hadoop@hadoop1 hadoop]$ cp mapred-site.xml.template mapred-site.xml
[hadoop@hadoop1 hadoop]$ vim mapred-site.xml
在该文件中增加如下配置
<!-- 指定MR运行在Yarn上 -->
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
[hadoop@hadoop1 hadoop]$ xsync /hadoop/hadoop-2.10.0/
[hadoop@hadoop2 hadoop]$ cat /hadoop/hadoop-2.10.0/etc/hadoop/core-site.xml
如果JDK在etc/profile文件中是配置了JAVA_HOME的形式
export JAVA_HOME=/usr/lib/jvm/java
配置这个方式,那么在几个配置文件中的JDK 路径是不需要修改的了
如果集群是第一次启动,需要格式化NameNode
[hadoop@hadoop1 hadoop-2.10.0]$ hadoop namenode -format
[hadoop@hadoop1 hadoop-2.10.0]$ hadoop-daemon.sh start namenode
starting namenode, logging to /hadoop/hadoop-2.10.0/logs/hadoop-hadoop-namenode-hadoop1.out
[hadoop@hadoop1 hadoop-2.10.0]$ jps
11192 Jps
11117 NameNode
在hadoop1、hadoop2以及hadoop3上分别
[hadoop@hadoop1 hadoop-2.10.0]$ hadoop-daemon.sh start datanode
starting datanode, logging to /hadoop/hadoop-2.10.0/logs/hadoop-hadoop-datanode-hadoop1.out
[hadoop@hadoop1 hadoop-2.10.0]$ jps
11312 Jps
11117 NameNode
11215 DataNode
[hadoop@hadoop2 hadoop-2.10.0]$ hadoop-daemon.sh start datanode
starting datanode, logging to /hadoop/hadoop-2.10.0/logs/hadoop-hadoop-datanode-hadoop2.out
[hadoop@hadoop2 hadoop-2.10.0]$ jps
11090 DataNode
11142 Jps
[hadoop@hadoop3 hadoop]$ hadoop-daemon.sh start datanode
starting datanode, logging to /hadoop/hadoop-2.10.0/logs/hadoop-hadoop-datanode-hadoop3.out
[hadoop@hadoop3 hadoop]$ jps
11079 DataNode
11161 Jps
[hadoop@hadoop1 hadoop-2.10.0]$ hadoop-daemon.sh stop datanode
stopping datanode
[hadoop@hadoop1 hadoop-2.10.0]$ hadoop-daemon.sh stop namenode
stopping namenode
[hadoop@hadoop1 hadoop-2.10.0]$ jps
11403 Jps
[hadoop@hadoop2 bin]$ hadoop-daemon.sh stop datanode
stopping datanode
[hadoop@hadoop2 bin]$ jps
11244 Jps
[hadoop@hadoop3 hadoop-2.10.0]$ hadoop-daemon.sh stop datanode
stopping datanode
[hadoop@hadoop3 hadoop-2.10.0]$ jps
11220 Jps
每次都一个一个节点启动,如果节点数增加到1000个怎么办?
早上来了开始一个一个节点启动,到晚上下班刚好完成,下班?
(1)基本语法
ssh另一台电脑的ip地址
(2)ssh连接时出现Host key verification failed的解决方法
[hadoop@hadoop1 hadoop] $ ssh 192.168.1.103
The authenticity of host '192.168.1.103 (192.168.1.103)' can't be established.
RSA key fingerprint is cf:1e:de:d7:d0:4c:2d:98:60:b4:fd:ae:b1:2d:ad:06.
Are you sure you want to continue connecting (yes/no)?
Host key verification failed.
(3)解决方案如下:直接输入yes
(1)免密登录原理,如图2-40所示
图2-40 免密登陆原理
(2)生成公钥和私钥:
[hadoop@hadoop1 .ssh]$ ssh-keygen -t rsa
然后敲(三个回车),就会生成两个文件id_rsa(私钥)、id_rsa.pub(公钥)
(3)将公钥拷贝到要免密登录的目标机器上
[hadoop@hadoop1 .ssh]$ ssh-copy-id hadoop1
[hadoop@hadoop1 .ssh]$ ssh-copy-id hadoop2
[hadoop@hadoop1 .ssh]$ ssh-copy-id hadoop3
注意:
表2-4
文件名 | 作用 |
---|---|
known_hosts | 记录ssh访问过计算机的公钥(public key) |
id_rsa | 生成的私钥 |
id_rsa.pub | 生成的公钥 |
authorized_keys | 存放授权过得无密登录服务器公钥 |
文件地址/hadoop/hadoop-2.10.0/etc/hadoop/slaves
[hadoop@hadoop1 hadoop]$ vi slaves
在该文件中增加如下内容:
hadoop1
hadoop2
hadoop3
注意:该文件中添加的内容结尾不允许有空格,文件中不允许有空行。
在这里一定要注意了, 配置的JDK环境一定要指定JDK目录,否则群起的时候,根本无法找到了,会出下面这个错误
[hadoop@hadoop1 hadoop-2.10.0]$ sbin/start-dfs.sh
20/07/16 18:49:41 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Starting namenodes on [hadoop1]
hadoop1: Error: JAVA_HOME is not set and could not be found.
hadoop2: Error: JAVA_HOME is not set and could not be found.
hadoop3: Error: JAVA_HOME is not set and could not be found.
hadoop1: Error: JAVA_HOME is not set and could not be found.
Starting secondary namenodes [hadoop3]
hadoop3: Error: JAVA_HOME is not set and could not be found.
20/07/16 18:49:46 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
同步所有节点配置文件
[hadoop@hadoop1 hadoop]$ xsync slaves
fname=slaves
pdir=/hadoop/hadoop-2.10.0/etc/hadoop
------------------- hadoop1 --------------
sending incremental file list
sent 43 bytes received 12 bytes 110.00 bytes/sec
total size is 24 speedup is 0.44
------------------- hadoop2 --------------
sending incremental file list
slaves
sent 114 bytes received 41 bytes 310.00 bytes/sec
total size is 24 speedup is 0.15
------------------- hadoop3 --------------
sending incremental file list
slaves
sent 114 bytes received 41 bytes 310.00 bytes/sec
total size is 24 speedup is 0.15
(1)如果集群是第一次启动,
需要格式化NameNode(注意格式化之前,一定要先停止上次启动的所有namenode和datanode进程,然后再删除data和log数据)
[hadoop@hadoop1 hadoop-2.10.0]$ bin/hdfs namenode -format
(2)启动HDFS
[hadoop@hadoop1 hadoop-2.10.0]$ sbin/start-dfs.sh
20/07/16 18:53:18 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Starting namenodes on [hadoop1]
hadoop1: starting namenode, logging to /hadoop/hadoop-2.10.0/logs/hadoop-hadoop-namenode-hadoop1.out
hadoop3: starting datanode, logging to /hadoop/hadoop-2.10.0/logs/hadoop-hadoop-datanode-hadoop3.out
hadoop2: starting datanode, logging to /hadoop/hadoop-2.10.0/logs/hadoop-hadoop-datanode-hadoop2.out
hadoop1: starting datanode, logging to /hadoop/hadoop-2.10.0/logs/hadoop-hadoop-datanode-hadoop1.out
Starting secondary namenodes [hadoop3]
hadoop3: starting secondarynamenode, logging to /hadoop/hadoop-2.10.0/logs/hadoop-hadoop-secondarynamenode-hadoop3.out
20/07/16 18:53:37 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
[hadoop@hadoop1 hadoop-2.10.0]$ jps
12728 DataNode
12587 NameNode
12939 Jps
[hadoop@hadoop2 hadoop-2.10.0]$ jps
11595 DataNode
11678 Jps
[hadoop@hadoop3 hadoop-2.10.0]$ jps
11603 SecondaryNameNode
11652 Jps
11494 DataNode
(3)启动YARN
[hadoop@hadoop2 hadoop-2.10.0]$ sbin/start-yarn.sh
starting yarn daemons
starting resourcemanager, logging to /hadoop/hadoop-2.10.0/logs/yarn-hadoop-resourcemanager-hadoop2.out
hadoop3: starting nodemanager, logging to /hadoop/hadoop-2.10.0/logs/yarn-hadoop-nodemanager-hadoop3.out
hadoop1: nodemanager running as process 12981. Stop it first.
hadoop2: starting nodemanager, logging to /hadoop/hadoop-2.10.0/logs/yarn-hadoop-nodemanager-hadoop2.out
[hadoop@hadoop2 hadoop-2.10.0]$ jps
12129 Jps
11595 DataNode
11836 NodeManager
11727 ResourceManager
[hadoop@hadoop3 hadoop-2.10.0]$ jps
11603 SecondaryNameNode
11494 DataNode
11831 NodeManager
11945 Jps
[hadoop@hadoop1 hadoop-2.10.0]$ jps
12981 NodeManager
12728 DataNode
13128 Jps
12587 NameNode
注意:NameNode和ResourceManger如果不是同一台机器,不能在NameNode上启动 YARN,应该在ResouceManager所在的机器上启动YARN。
(4)Web端查看SecondaryNameNode
(a)浏览器中输入:http://hadoop3:50090/status.html
(b)查看SecondaryNameNode信息,此信息已经不可见了, 不可用了
(c) 查看目录情况:http://hadoop1:50070/explorer.html#/
准备包:https://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.6.1/apache-zookeeper-3.6.1-bin.tar.gz
[root@hadoop1 soft]# wget https://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.6.1/apache-zookeeper-3.6.1-bin.tar.gz
[root@hadoop1 soft]# cd /hadoop
[root@hadoop1 hadoop]# tar zxvf /soft/apache-zookeeper-3.6.1-bin.tar.gz
[root@hadoop1 hadoop]# su hadoop
[hadoop@hadoop1 hadoop]$ ln -s apache-zookeeper-3.6.1-bin zookeeper
[hadoop@hadoop1 hadoop]$ cd zookeeper
[hadoop@hadoop1 zookeeper]$ cd conf
[hadoop@hadoop1 conf]$ mv zoo_sample.cfg zoo.cfg
[hadoop@hadoop1 conf]$ vim zoo.cfg
修改配置文件zoo.cfg
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=/hadoop/zookeeper/zkData
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1
## Metrics Providers
#
# https://prometheus.io Metrics Exporter
#metricsProvider.className=org.apache.zookeeper.metrics.prometheus.PrometheusMetricsProvider
#metricsProvider.httpPort=7000
#metricsProvider.exportJvmInfo=true
server.1=hadoop1:2888:3888
server.2=hadoop2:2888:3888
server.3=hadoop3:2888:3888
增加配置目录为/hadoop/zookeeper/zkData
同时为集群配置信息ID
server.1=hadoop1:2888:3888
server.2=hadoop2:2888:3888
server.3=hadoop3:2888:3888
注意这里的server.[serverId]=[通信域或者IP]:2888:3888
创建存储目录
[hadoop@hadoop1 conf]$ cd ../
[hadoop@hadoop1 zookeeper]$ mkdir zkData
[hadoop@hadoop1 zookeeper]$ cd /hadoop
[hadoop@hadoop1 hadoop]$ xsync zookeeper
[hadoop@hadoop1 hadoop]$ xsync apache-zookeeper-3.6.1-bin
同步完数据后,需要在三台机器上创建serverId
[hadoop@hadoop1 hadoop]$ cd zookeeper/zkData
[hadoop@hadoop1 zkData]$ touch myid
[hadoop@hadoop1 zkData]$ vim myid
1
[hadoop@hadoop1 zkData]$ xsync myid
[hadoop@hadoop2 zkData]$ vim myid
2
[hadoop@hadoop3 zkData]$ vim myid
3
创建保存完成后,开始启动集群
[hadoop@hadoop1 zookeeper]$ bin/zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /hadoop/zookeeper/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
[hadoop@hadoop2 zookeeper]$ bin/zkServer.sh start
[hadoop@hadoop2 zookeeper]$ bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /hadoop/zookeeper/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Mode: Leader
[hadoop@hadoop3 zookeeper]$ bin/zkServer.sh start
[hadoop@hadoop3 zookeeper]$ bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /hadoop/zookeeper/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Mode: follower
[hadoop@hadoop1 zookeeper]$ bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /hadoop/zookeeper/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Mode: follower
到此集群处理完成,可以使用了。
包:https://mirror.bit.edu.cn/apache/hbase/hbase-1.3.6/hbase-1.3.6-bin.tar.gz
[root@hadoop1 soft]# wget https://mirror.bit.edu.cn/apache/hbase/hbase-1.3.6/hbase-1.3.6-bin.tar.gz
[root@hadoop1 soft]# cd /hadoop
[root@hadoop1 soft]# su hadoop
[hadoop@hadoop1 hadoop]$ tar zxvf /soft/hbase-1.3.6-bin.tar.gz
[hadoop@hadoop1 hadoop]$ cd hbase-1.3.6/conf
a、修改配置文件hbase-env.sh
[hadoop@hadoop1 conf]$ vim hbase-env.sh
修改下面两项记录
# The java implementation to use. Java 1.7+ required.
# export JAVA_HOME=/usr/java/jdk1.6.0/
export JAVA_HOME=/usr/lib/jvm/java
注释掉下面两行
# Configure PermSize. Only needed in JDK7. You can safely remove it for JDK8+
#export HBASE_MASTER_OPTS="$HBASE_MASTER_OPTS -XX:PermSize=128m -XX:MaxPermSize=128m -XX:ReservedCodeCacheSize=256m"
#export HBASE_REGIONSERVER_OPTS="$HBASE_REGIONSERVER_OPTS -XX:PermSize=128m -XX:MaxPermSize=128m -XX:ReservedCodeCacheSize=256m"
# Tell HBase whether it should manage it's own instance of Zookeeper or not.
export HBASE_MANAGES_ZK=false
b、修改hbase-site.xml
[hadoop@hadoop1 conf]$ vim hbase-site.xml
在中增加以下内容
<configuration>
<property>
<name>hbase.rootdir</name>
<value>hdfs://hadoop1:9000/hbase</value>
</property>
<property>
<name>hbase.cluster.distributed</name>
<value>true</value>
</property>
<property>
<name>hbase.master.port</name>
<value>16000</value>
</property>
<property>
<name>hbase.zookeeper.quorum</name>
<value>hadoop1:2181,hadoop2:2181,hadoop3:2181</value>
</property>
<property>
<name>hbase.zookeeper.property.dataDir</name>
<value>/hadoop/zookeeper/zkData</value>
</property>
</configuration>
c、修改regionservers
[hadoop@hadoop1 conf]$ vim regionservers
修改这个文件内容为了三台机器名
hadoop1
hadoop2
hadoop3
不能有任何的空格在这个文档中,与zk的配置一样
d、创建hadoop关联配置
[hadoop@hadoop1 conf]$ ln -s /hadoop/hadoop-2.10.0/etc/hadoop/hdfs-site.xml hdfs-site.xml
[hadoop@hadoop1 conf]$ ln -s /hadoop/hadoop-2.10.0/etc/hadoop/core-site.xml core-site.xml
d、需要分发同步一下hbase
[hadoop@hadoop1 hadoop]$ xsync hbase-1.3.6
包:
docker环境的安装:
[root@docker-flink111 opt]# curl -sSL https://get.daocloud.io/docker > docker.sh
[root@docker-flink111 opt]# sh docker.sh --mirror Aliyun
[root@docker-flink111 opt]# systemctl start docker
修改镜像地址配置
[root@docker-flink111 opt]# sudo mkdir -p /etc/docker
[root@docker-flink111 opt]# sudo tee /etc/docker/daemon.json <<-'EOF'
{
"registry-mirrors": ["https://hub-mirror.c.163.com"]
}
EOF
[root@docker-flink111 opt]# sudo systemctl daemon-reload
[root@docker-flink111 opt]# sudo systemctl restart docker
Docker Compose 存放在Git Hub,不太稳定。
你可以也通过执行下面的命令,高速安装Docker Compose。
[root@docker-flink111 opt]# curl -L https://get.daocloud.io/docker/compose/releases/download/1.26.2/docker-compose-`uname -s`-`uname -m` > /usr/local/bin/docker-compose
[root@docker-flink111 opt]# chmod +x /usr/local/bin/docker-compose
你可以通过修改URL中的版本,可以自定义您的需要的版本。
新增加flink的compose配置文件docker-compose.yml
官方配置1Session Cluster with Docker Compose
docker-compose.yml:
version: "2.2"
services:
jobmanager:
image: flink:1.11.0-scala_2.11
ports:
- "8081:8081"
command: jobmanager
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
taskmanager:
image: flink:1.11.0-scala_2.11
depends_on:
- jobmanager
command: taskmanager
scale: 1
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
taskmanager.numberOfTaskSlots: 2
下面的官方配置2
version: "2.2"
services:
jobmanager:
image: flink:1.11.0-scala_2.11
ports:
- "8081:8081"
command: standalone-job --job-classname com.job.ClassName [--job-id <job id>] [--fromSavepoint /path/to/savepoint [--allowNonRestoredState]] [job arguments]
volumes:
- /host/path/to/job/artifacts:/opt/flink/usrlib
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
parallelism.default: 2
taskmanager:
image: flink:1.11.0-scala_2.11
depends_on:
- jobmanager
command: taskmanager
scale: 1
volumes:
- /host/path/to/job/artifacts:/opt/flink/usrlib
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
taskmanager.numberOfTaskSlots: 2
parallelism.default: 2
而我们测试使用了一个最简单的配置信息:
version: "2.1"
services:
jobmanager:
image: flink:1.11.0-scala_2.11
expose:
- "6123"
ports:
- "8081:8081"
command: jobmanager
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
parallelism.default: 2
web.submit.enable: false
taskmanager:
image: flink:1.11.0-scala_2.11
expose:
- "6121"
- "6122"
depends_on:
- jobmanager
command: taskmanager
links:
- "jobmanager:jobmanager"
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
一定要注意docker-compose文件的格式, 换行,退格要求,类型于python语法
一定要注意在生产环境,一定需要配置web.submit.enable: false 去掉web提交包任务,否则会被 入侵
启动flink-docker集群
[root@docker-flink111 opt]# docker-compose up -d
查看日志
[root@docker-flink111 opt]# docker-compose logs
增加taskManager的节点个数
[root@docker-flink111 opt]# docker-compose scale taskmanager=<N>
这里面的taskmanager就是taskmanager的在docker-compose.yaml的配置的taskmanger名字
关掉集群
[root@docker-flink111 opt]# docker-compose kill
Launch a cluster in the foreground 启动一个集群在前台
docker-compose up
Launch a cluster in the background 启动一个集群在后台模式
docker-compose up -d
Scale the cluster up or down to N TaskManagers 可以扩展taskmanager的个数
docker-compose scale taskmanager=<N>
Access the JobManager container 访问jobmanager容器
docker exec -it $(docker ps --filter name=jobmanager --format={{.ID}}) /bin/sh
Kill the cluster 关闭集群
docker-compose kill
To submit a job to a Session cluster via the command line, you can either
使用命令行向集群提交一个任务,你可选择
如有flink_cli安装后可使用下面命令提交业务
[root@docker-flink111 opt]# flink run -d -c ${JOB_CLASS_NAME} /job.jar
或者拷贝一个JAR包到容器中使用cli命令直接执行,命令如下:
[root@docker-flink111 opt]# JOB_CLASS_NAME="com.job.ClassName"
[root@docker-flink111 opt]# JM_CONTAINER=$(docker ps --filter name=jobmanager --format={{.ID}}))
[root@docker-flink111 opt]# docker cp path/to/jar "${JM_CONTAINER}":/job.jar
[root@docker-flink111 opt]# docker exec -t -i "${JM_CONTAINER}" flink run -d -c ${JOB_CLASS_NAME} /job.jar
This page describes how to deploy a Flink Job and Session cluster on Kubernetes.
Info This page describes deploying a standalone Flink cluster on top of Kubernetes. You can find more information on native Kubernetes deployments here.
Please follow Kubernetes’ setup guide in order to deploy a Kubernetes cluster. If you want to run Kubernetes locally, we recommend using MiniKube.
Note: If using MiniKube please make sure to execute minikube ssh 'sudo ip link set docker0 promisc on'
before deploying a Flink cluster. Otherwise Flink components are not able to self reference themselves through a Kubernetes service.
Before deploying the Flink Kubernetes components, please read the Flink Docker image documentation, its tags, how to customize the Flink Docker image and enable plugins to use the image in the Kubernetes definition files.
Using the common resource definitions, launch the common cluster components with the kubectl
command:
kubectl create -f flink-configuration-configmap.yaml
kubectl create -f jobmanager-service.yaml
Note that you could define your own customized options of flink-conf.yaml
within flink-configuration-configmap.yaml
.
Then launch the specific components depending on whether you want to deploy a Session or Job cluster.
You can then access the Flink UI via different ways:
kubectl proxy
:kubectl proxy
in a terminal.kubectl port-forward
:
kubectl port-forward ${flink-jobmanager-pod} 8081:8081
to forward your jobmanager’s web ui port to local 8081.Moreover, you could use the following command below to submit jobs to the cluster:
./bin/flink run -m localhost:8081 ./examples/streaming/WordCount.jar
Create a
NodePort
service on the rest service of jobmanager:
kubectl create -f jobmanager-rest-service.yaml
to create the NodePort
service on jobmanager. The example of jobmanager-rest-service.yaml
can be found in appendix.kubectl get svc flink-jobmanager-rest
to know the node-port
of this service and navigate to http://: in your browser.minikube ip
.Similarly to the port-forward
solution, you could also use the following command below to submit jobs to the cluster:
./bin/flink run -m <public-node-ip>:<node-port> ./examples/streaming/WordCount.jar
You can also access the queryable state of TaskManager if you create a NodePort
service for it:
kubectl create -f taskmanager-query-state-service.yaml
to create the NodePort
service on taskmanager. The example of taskmanager-query-state-service.yaml
can be found in appendix.kubectl get svc flink-taskmanager-query-state
to know the node-port
of this service. Then you can create the QueryableStateClient(, to submit the state queries.In order to terminate the Flink cluster, delete the specific Session or Job cluster components and use kubectl
to terminate the common components:
kubectl delete -f jobmanager-service.yaml
kubectl delete -f flink-configuration-configmap.yaml
# if created then also the rest service
kubectl delete -f jobmanager-rest-service.yaml
# if created then also the queryable state service
kubectl delete -f taskmanager-query-state-service.yaml
A Flink Session cluster is executed as a long-running Kubernetes Deployment. Note that you can run multiple Flink jobs on a Session cluster. Each job needs to be submitted to the cluster after the cluster has been deployed.
A Flink Session cluster deployment in Kubernetes has at least three components:
After creating the common cluster components, use the Session specific resource definitions to launch the Session cluster with the kubectl
command:
kubectl create -f jobmanager-session-deployment.yaml
kubectl create -f taskmanager-session-deployment.yaml
To terminate the Session cluster, these components can be deleted along with the common ones with the kubectl
command:
kubectl delete -f taskmanager-session-deployment.yaml
kubectl delete -f jobmanager-session-deployment.yaml
A Flink Job cluster is a dedicated cluster which runs a single job. You can find more details here.
A basic Flink Job cluster deployment in Kubernetes has three components:
Check the Job cluster specific resource definitions and adjust them accordingly.
The args
attribute in the jobmanager-job.yaml
has to specify the main class of the user job. See also how to specify the JobManager arguments to understand how to pass other args
to the Flink image in the jobmanager-job.yaml
.
The job artifacts should be available from the job-artifacts-volume
in the resource definition examples. The definition examples mount the volume as a local directory of the host assuming that you create the components in a minikube cluster. If you do not use a minikube cluster, you can use any other type of volume, available in your Kubernetes cluster, to supply the job artifacts. Alternatively, you can build a custom image which already contains the artifacts instead.
After creating the common cluster components, use the Job cluster specific resource definitions to launch the cluster with the kubectl
command:
kubectl create -f jobmanager-job.yaml
kubectl create -f taskmanager-job-deployment.yaml
To terminate the single job cluster, these components can be deleted along with the common ones with the kubectl
command:
kubectl delete -f taskmanager-job-deployment.yaml
kubectl delete -f jobmanager-job.yaml
flink-configuration-configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: flink-config
labels:
app: flink
data:
flink-conf.yaml: |+
jobmanager.rpc.address: flink-jobmanager
taskmanager.numberOfTaskSlots: 2
blob.server.port: 6124
jobmanager.rpc.port: 6123
taskmanager.rpc.port: 6122
queryable-state.proxy.ports: 6125
jobmanager.memory.process.size: 1600m
taskmanager.memory.process.size: 1728m
parallelism.default: 2
log4j-console.properties: |+
# This affects logging for both user code and Flink
rootLogger.level = INFO
rootLogger.appenderRef.console.ref = ConsoleAppender
rootLogger.appenderRef.rolling.ref = RollingFileAppender
# Uncomment this if you want to _only_ change Flink's logging
#logger.flink.name = org.apache.flink
#logger.flink.level = INFO
# The following lines keep the log level of common libraries/connectors on
# log level INFO. The root logger does not override this. You have to manually
# change the log levels here.
logger.akka.name = akka
logger.akka.level = INFO
logger.kafka.name= org.apache.kafka
logger.kafka.level = INFO
logger.hadoop.name = org.apache.hadoop
logger.hadoop.level = INFO
logger.zookeeper.name = org.apache.zookeeper
logger.zookeeper.level = INFO
# Log all infos to the console
appender.console.name = ConsoleAppender
appender.console.type = CONSOLE
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
# Log all infos in the given rolling file
appender.rolling.name = RollingFileAppender
appender.rolling.type = RollingFile
appender.rolling.append = false
appender.rolling.fileName = ${sys:log.file}
appender.rolling.filePattern = ${sys:log.file}.%i
appender.rolling.layout.type = PatternLayout
appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
appender.rolling.policies.type = Policies
appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
appender.rolling.policies.size.size=100MB
appender.rolling.strategy.type = DefaultRolloverStrategy
appender.rolling.strategy.max = 10
# Suppress the irrelevant (wrong) warnings from the Netty channel handler
logger.netty.name = org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
logger.netty.level = OFF
jobmanager-service.yaml
apiVersion: v1
kind: Service
metadata:
name: flink-jobmanager
spec:
type: ClusterIP
ports:
- name: rpc
port: 6123
- name: blob-server
port: 6124
- name: webui
port: 8081
selector:
app: flink
component: jobmanager
jobmanager-rest-service.yaml
. Optional service, that exposes the jobmanager rest
port as public Kubernetes node’s port.
apiVersion: v1
kind: Service
metadata:
name: flink-jobmanager-rest
spec:
type: NodePort
ports:
- name: rest
port: 8081
targetPort: 8081
nodePort: 30081
selector:
app: flink
component: jobmanager
taskmanager-query-state-service.yaml
. Optional service, that exposes the TaskManager port to access the queryable state as a public Kubernetes node’s port.
apiVersion: v1
kind: Service
metadata:
name: flink-taskmanager-query-state
spec:
type: NodePort
ports:
- name: query-state
port: 6125
targetPort: 6125
nodePort: 30025
selector:
app: flink
component: taskmanager
jobmanager-session-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: flink-jobmanager
spec:
replicas: 1
selector:
matchLabels:
app: flink
component: jobmanager
template:
metadata:
labels:
app: flink
component: jobmanager
spec:
containers:
- name: jobmanager
image: flink:1.11.0-scala_2.11
args: ["jobmanager"]
ports:
- containerPort: 6123
name: rpc
- containerPort: 6124
name: blob-server
- containerPort: 8081
name: webui
livenessProbe:
tcpSocket:
port: 6123
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf
securityContext:
runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
volumes:
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j-console.properties
path: log4j-console.properties
taskmanager-session-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: flink-taskmanager
spec:
replicas: 2
selector:
matchLabels:
app: flink
component: taskmanager
template:
metadata:
labels:
app: flink
component: taskmanager
spec:
containers:
- name: taskmanager
image: flink:1.11.0-scala_2.11
args: ["taskmanager"]
ports:
- containerPort: 6122
name: rpc
- containerPort: 6125
name: query-state
livenessProbe:
tcpSocket:
port: 6122
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf/
securityContext:
runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
volumes:
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j-console.properties
path: log4j-console.properties
jobmanager-job.yaml
apiVersion: batch/v1
kind: Job
metadata:
name: flink-jobmanager
spec:
template:
metadata:
labels:
app: flink
component: jobmanager
spec:
restartPolicy: OnFailure
containers:
- name: jobmanager
image: flink:1.11.0-scala_2.11
env:
args: ["standalone-job", "--job-classname", "com.job.ClassName", <optional arguments>, <job arguments>] # optional arguments: ["--job-id", "<job id>", "--fromSavepoint", "/path/to/savepoint", "--allowNonRestoredState"]
ports:
- containerPort: 6123
name: rpc
- containerPort: 6124
name: blob-server
- containerPort: 8081
name: webui
livenessProbe:
tcpSocket:
port: 6123
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf
- name: job-artifacts-volume
mountPath: /opt/flink/usrlib
securityContext:
runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
volumes:
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j-console.properties
path: log4j-console.properties
- name: job-artifacts-volume
hostPath:
path: /host/path/to/job/artifacts
taskmanager-job-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: flink-taskmanager
spec:
replicas: 2
selector:
matchLabels:
app: flink
component: taskmanager
template:
metadata:
labels:
app: flink
component: taskmanager
spec:
containers:
- name: taskmanager
image: flink:1.11.0-scala_2.11
env:
args: ["taskmanager"]
ports:
- containerPort: 6122
name: rpc
- containerPort: 6125
name: query-state
livenessProbe:
tcpSocket:
port: 6122
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf/
- name: job-artifacts-volume
mountPath: /opt/flink/usrlib
securityContext:
runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
volumes:
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j-console.properties
path: log4j-console.properties
- name: job-artifacts-volume
hostPath:
path: /host/path/to/job/artifacts
本页面描述了如何在 Kubernetes 原生地部署 Flink session 集群。
Flink 的原生 Kubernetes 集成仍处于试验阶段。在以后的版本中,配置和 CLI flags 可能会发生变化。
~/.kube/config
配置。你可以通过运行 kubectl auth can-i <list|create|edit|delete> pods
来验证权限。按照以下说明在 Kubernetes 集群中启动 Flink Session。
Session 集群将启动所有必需的 Flink 服务(JobManager 和 TaskManagers),以便你可以将程序提交到集群。 注意你可以在每个 session 上运行多个程序。
$ ./bin/kubernetes-session.sh
所有 Kubernetes 配置项都可以在我们的配置指南中找到。
示例: 执行以下命令启动 session 集群,每个 TaskManager 分配 4 GB 内存、2 CPUs、4 slots:
在此示例中,我们覆盖了 resourcemanager.taskmanager-timeout
配置,为了使运行 taskmanager 的 pod 停留时间比默认的 30 秒更长。 尽管此设置可能在云环境下增加成本,但在某些情况下能够更快地启动新作业,并且在开发过程中,你有更多的时间检查作业的日志文件。
$ ./bin/kubernetes-session.sh \
-Dkubernetes.cluster-id=<ClusterId> \
-Dtaskmanager.memory.process.size=4096m \
-Dkubernetes.taskmanager.cpu=2 \
-Dtaskmanager.numberOfTaskSlots=4 \
-Dresourcemanager.taskmanager-timeout=3600000
系统将使用 conf/flink-conf.yaml
中的配置。 如果你更改某些配置,请遵循我们的配置指南。
如果你未通过 kubernetes.cluster-id
为 session 指定特定名称,Flink 客户端将会生成一个 UUID 名称。
如果要使用自定义的 Docker 镜像部署 Flink 容器,请查看 Flink Docker 镜像文档、镜像 tags、如何自定义 Flink Docker 镜像和启用插件。 如果创建了自定义的 Docker 镜像,则可以通过设置 kubernetes.container.image
配置项来指定它:
$ ./bin/kubernetes-session.sh \
-Dkubernetes.cluster-id=<ClusterId> \
-Dtaskmanager.memory.process.size=4096m \
-Dkubernetes.taskmanager.cpu=2 \
-Dtaskmanager.numberOfTaskSlots=4 \
-Dresourcemanager.taskmanager-timeout=3600000 \
-Dkubernetes.container.image=<CustomImageName>
使用以下命令将 Flink 作业提交到 Kubernetes 集群。
$ ./bin/flink run -d -t kubernetes-session -Dkubernetes.cluster-id=<ClusterId> examples/streaming/WindowJoin.jar
有几种方法可以将服务暴露到外部(集群外部) IP 地址。 可以使用 kubernetes.rest-service.exposed.type
进行配置。
ClusterIP
:通过集群内部 IP 暴露服务。 该服务只能在集群中访问。如果想访问 JobManager ui 或将作业提交到现有 session,则需要启动一个本地代理。 然后你可以使用 localhost:8081
将 Flink 作业提交到 session 或查看仪表盘。
$ kubectl port-forward service/<ServiceName> 8081
NodePort
:通过每个 Node 上的 IP 和静态端口(NodePort
)暴露服务。<NodeIP>:<NodePort>
可以用来连接 JobManager 服务。NodeIP
可以很容易地用 Kubernetes ApiServer 地址替换。 你可以在 kube 配置文件找到它。
LoadBalancer
:使用云提供商的负载均衡器在外部暴露服务。 由于云提供商和 Kubernetes 需要一些时间来准备负载均衡器,因为你可能在客户端日志中获得一个 NodePort
的 JobManager Web 界面。 你可以使用 kubectl get services/<ClusterId>
获取 EXTERNAL-IP 然后手动构建负载均衡器 JobManager Web 界面 http://<EXTERNAL-IP>:8081
。
警告! JobManager 可能会在无需认证的情况下暴露在公网上,同时可以提交任务运行。
ExternalName
:将服务映射到 DNS 名称,当前版本不支持。有关更多信息,请参考官方文档在 Kubernetes 上发布服务。
默认情况下,Kubernetes session 以后台模式启动,这意味着 Flink 客户端在将所有资源提交到 Kubernetes 集群后会退出。使用以下命令来连接现有 session。
$ ./bin/kubernetes-session.sh -Dkubernetes.cluster-id=<ClusterId> -Dexecution.attached=true
要停止 Flink Kubernetes session,将 Flink 客户端连接到集群并键入 stop
。
$ echo 'stop' | ./bin/kubernetes-session.sh -Dkubernetes.cluster-id=<ClusterId> -Dexecution.attached=true
Flink 用 Kubernetes OwnerReference’s 来清理所有集群组件。 所有 Flink 创建的资源,包括 ConfigMap
、Service
、Pod
,已经将 OwnerReference 设置为 deployment/<ClusterId>
。 删除 deployment 后,所有其他资源将自动删除。
$ kubectl delete deployment/<ClusterID>
默认情况下,JobManager 和 TaskManager 只把日志存储在每个 pod 中的 /opt/flink/log
下。 如果要使用 kubectl logs <PodName>
查看日志,必须执行以下操作:
rootLogger.appenderRef.console.ref = ConsoleAppender
。-Dkubernetes.container-start-command-template="%java% %classpath% %jvmmem% %jvmopts% %logging% %class% %args%"
来删除重定向的参数。停止并重启你的 session。现在你可以使用 kubectl logs
查看日志了。
# Log all infos to the console
appender.console.name = ConsoleAppender
appender.console.type = CONSOLE
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
如果 pod 正在运行,可以使用 kubectl exec -it <PodName> bash
进入 pod 并查看日志或调试进程。
Application 模式允许用户创建单个镜像,其中包含他们的作业和 Flink 运行时,该镜像将按需自动创建和销毁集群组件。Flink 社区提供了可以构建多用途自定义镜像的基础镜像。
FROM flink
RUN mkdir -p $FLINK_HOME/usrlib
COPY /path/of/my-flink-job-*.jar $FLINK_HOME/usrlib/my-flink-job.jar
使用以下命令启动 Flink Application。
$ ./bin/flink run-application -p 8 -t kubernetes-application \
-Dkubernetes.cluster-id=<ClusterId> \
-Dtaskmanager.memory.process.size=4096m \
-Dkubernetes.taskmanager.cpu=2 \
-Dtaskmanager.numberOfTaskSlots=4 \
-Dkubernetes.container.image=<CustomImageName> \
local:///opt/flink/usrlib/my-flink-job.jar
注意:Application 模式只支持 “local” 作为 schema。默认 jar 位于镜像中,而不是 Flink 客户端中。
注意:镜像的 “$FLINK_HOME/usrlib” 目录下的所有 jar 将会被加到用户 classpath 中。
当 Application 停止时,所有 Flink 集群资源都会自动销毁。 与往常一样,作业可能会在手动取消或执行完的情况下停止。
$ ./bin/flink cancel -t kubernetes-application -Dkubernetes.cluster-id=<ClusterID> <JobID>
Kubernetes 中的命名空间是一种在多个用户之间划分集群资源的方法(通过资源配额)。 它类似于 Yarn 集群中的队列概念。Flink on Kubernetes 可以使用命名空间来启动 Flink 集群。 启动 Flink 集群时,可以使用 -Dkubernetes.namespace=default
参数来指定命名空间。
资源配额提供了限制每个命名空间的合计资源消耗的约束。 它可以按类型限制可在命名空间中创建的对象数量,以及该项目中的资源可能消耗的计算资源总量。
基于角色的访问控制(RBAC)是一种在企业内部基于单个用户的角色来调节对计算或网络资源的访问的方法。 用户可以配置 RBAC 角色和服务账户,JobManager 使用这些角色和服务帐户访问 Kubernetes 集群中的 Kubernetes API server。
每个命名空间有默认的服务账户,但是默认
服务账户可能没有权限在 Kubernetes 集群中创建或删除 pod。 用户可能需要更新默认
服务账户的权限或指定另一个绑定了正确角色的服务账户。
$ kubectl create clusterrolebinding flink-role-binding-default --clusterrole=edit --serviceaccount=default:default
如果你不想使用默认
服务账户,使用以下命令创建一个新的 flink
服务账户并设置角色绑定。 然后使用配置项 -Dkubernetes.jobmanager.service-account=flink
来使 JobManager pod 使用 flink
服务账户去创建和删除 TaskManager pod。
$ kubectl create serviceaccount flink
$ kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=default:flink
有关更多信息,请参考 Kubernetes 官方文档 RBAC 授权。
本节简要解释了 Flink 和 Kubernetes 如何交互。
创建 Flink Kubernetes session 集群时,Flink 客户端首先将连接到 Kubernetes ApiServer 提交集群描述信息,包括 ConfigMap 描述信息、Job Manager Service 描述信息、Job Manager Deployment 描述信息和 Owner Reference。 Kubernetes 将创建 JobManager 的 deployment,在此期间 Kubelet 将拉取镜像,准备并挂载卷,然后执行 start 命令。 JobManager pod 启动后,Dispatcher 和 KubernetesResourceManager 服务会相继启动,然后集群准备完成,并等待提交作业。
当用户通过 Flink 客户端提交作业时,将通过客户端生成 jobGraph 并将其与用户 jar 一起上传到 Dispatcher。 然后 Dispatcher 会为每个 job 启动一个单独的 JobMaster。
JobManager 向 KubernetesResourceManager 请求被称为 slots 的资源。 如果没有可用的 slots,KubernetesResourceManager 将拉起 TaskManager pod 并且把它们注册到集群中。