hadoop2.10.0搭建集群

一、准备

1、三台虚拟机

host名字 IP 功能模块划分
hadoop1 10.12.210.28
hadoop2 10.12.210.29
hadoop3 10.12.210.30

2、linux环境搭建

//主机改名
hostnamectl set-hostname hadoop1

//创建hadoop用户
groupadd hadoop
useradd -g hadoop hadoop

//创建密码
passwd hadoop
dmall123

//授权新的角色与root用户权限相同
[root@hadoop1 ~]# vim /etc/sudoers
## Next comes the main part: which users can run what software on 
## which machines (the sudoers file can be shared between multiple
## systems).
## Syntax:
##
##      user    MACHINE=COMMANDS
##
## The COMMANDS section may have other options added to it.
##
## Allow root to run any commands anywhere 
root    ALL=(ALL)       ALL
hadoop  ALL=(ALL)       ALL

## Allows members of the 'sys' group to run networking, software, 
## service management apps and more.
# %sys ALL = NETWORKING, SOFTWARE, SERVICES, STORAGE, DELEGATING, PROCESSES, LOCATE, DRIVERS

## Allows people in group wheel to run all commands
%wheel  ALL=(ALL)       ALL

## Same thing without a password
# %wheel        ALL=(ALL)       NOPASSWD: ALL

## Allows members of the users group to mount and unmount the 
## cdrom as root

wq! 保存后
增加hadoop行后, hadoop用户具有root的权限 

//安装JDK
[root@hadoop1 ~]# yum install -y java-1.6.0-openjdk.x86_64 java-1.8.0-openjdk-devel.x86_64
//安装rsync
[root@hadoop1 ~]# yum install -y rsync.x86_64

//下载hadoop包
[root@hadoop1 soft]# wget https://mirrors.bfsu.edu.cn/apache/hadoop/common/hadoop-2.10.0/hadoop-2.10.0.tar.gz

[root@hadoop1 ~]# mkdir /hadoop
[root@hadoop1 ~]# cd /hadoop
[root@hadoop1 hadoop]# tar zxvf /soft/hadoop-2.10.0.tar.gz ./
//对当前目录进行用户授权,后续执行的时候会用到
[root@hadoop1 hadoop]#chown -R hadoop.hadoop ./

3、修改etc/profile文件

//在文件的最后添加,JDK,HADOOP的路径配置
export JAVA_HOME=/usr/lib/jvm/java
export HADOOP_HOME=/hadoop/hadoop-2.10.0
#export HADOOP_OPTS="-Djava.library.path=$HADOOP_PREFIX/lib:$HADOOP_PREFIX/lib/native"
export LD_LIBRARY_PATH=$HADOOP_HOME/lib/native
export HADOOP_COMMON_LIB_NATIVE_DIR=/hadoop/hadoop-2.10.0/lib/native
export HADOOP_OPTS="-Djava.library.path=/usr/local/hadoop/lib"
#export HADOOP_ROOT_LOGGER=DEBUG,console
export PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/sbin:$HADOOP_HOME/bin

修改profile文件后在命令行环境下执行,使配置生效

[root@hadoop1 ~]# source /etc/profile
[root@hadoop1 ~]# echo $JAVA_HOME

4、修改hosts文件

//增加下面三个解析
10.12.210.28 hadoop1
10.12.210.29 hadoop2
10.12.210.30 hadoop3

5、集群功能划分详情

hadoop1 hadoop2 hadoop3
HDFS NameNode
DataNode
DataNode SecondaryNameNode
DataNode
YARN NodeManager ResourceManager
NodeManager
NodeManager

6、编写同步脚本

[root@hadoop1 bin]# cd /usr/local/bin/
[root@hadoop1 bin]# touch xsync
[root@hadoop1 bin]# vim xsync
#!/bin/bash
#1 获取输入参数个数,如果没有参数,直接退出
pcount=$#
if((pcount==0)); then
echo no args;
exit;
fi

#2 获取文件名称
p1=$1
fname=`basename $p1`
echo fname=$fname

#3 获取上级目录到绝对路径
pdir=`cd -P $(dirname $p1); pwd`
echo pdir=$pdir

#4 获取当前用户名称
user=`whoami`

#5 循环
for((host=1; host<4; host++)); do
        echo ------------------- hadoop$host --------------
        rsync -rvl $pdir/$fname $user@hadoop$host:$pdir
done

//给上执行权限
[root@hadoop1 bin]# chmod 777 xsync
//拷贝文件查看是否成功
[root@hadoop1 bin]# xsync xsync 
fname=xsync
pdir=/usr/local/bin
------------------- hadoop1 --------------
root@hadoop1's password: 
sending incremental file list

sent 42 bytes  received 12 bytes  9.82 bytes/sec
total size is 495  speedup is 9.17
------------------- hadoop2 --------------
root@hadoop2's password: 
sending incremental file list
xsync

sent 584 bytes  received 35 bytes  176.86 bytes/sec
total size is 495  speedup is 0.80
------------------- hadoop3 --------------
root@hadoop3's password: 
sending incremental file list
xsync

sent 584 bytes  received 35 bytes  137.56 bytes/sec
total size is 495  speedup is 0.80

二、开始配置集群

1、核心配置文件

配置core-site.xml

[hadoop@hadoop1 hadoop]$ vi core-site.xml

在该文件中编写如下配置

<!-- 指定HDFS中NameNode的地址 -->
<property>
	<name>fs.defaultFS</name>
    <value>hdfs://hadoop1:9000</value>
</property>
<!-- 指定Hadoop运行时产生文件的存储目录 -->
<property>
	<name>hadoop.tmp.dir</name>
	<value>/hadoop/hadoop-2.10.0/data/tmp</value>
</property>

2、HDFS配置文件

配置hadoop-env.sh

[hadoop@hadoop1 hadoop]$ vim hadoop-env.sh
export JAVA_HOME=/usr/lib/jvm/java

配置hdfs-site.xml

[hadoop@hadoop1 hadoop]$ vim hdfs-site.xml

在该文件中编写如下配置

<property>
		<name>dfs.replication</name>
		<value>3</value>
</property>
<!-- 指定Hadoop辅助名称节点主机配置 -->
<property>
      <name>dfs.namenode.secondary.http-address</name>
      <value>hadoop3:50090</value>
</property>

3、使用HA模式

https://hadoop.apache.org/docs/r2.10.0/hadoop-project-dist/hadoop-hdfs/HDFSHighAvailabilityWithQJM.html

###Configuration details
配置的先后顺序不重要,但其中某些配置之间存在依耐。例如:
dfs.ha.namenodes.[nameservice ID] 依耐 dfs.nameservices 即如果dfs.nameservices配置value为a那么dfs.ha.namenodes.[nameservice ID]则写成dfs.ha.namenodes.a
####hdfs-site.xml

dfs.nameservices
hdfs-site.xml名称可自定义,建议取个合理的名字。该配置影响到其它配置,也会影响到hdfs文件系统存储的绝对路径。
如果您还在使用HDFS Federation,则此配置设置还应包括其他名称服务列表,HA或其他,用逗号作为分隔列表。

<property>
  <name>dfs.nameservices</name>
  <value>mycluster</value>
</property>

dfs.ha.namenodes.[nameservice ID]
让dn确定集群中有多少个nn。nn至少两个,推荐3个,不建议超过5个。

<property>
  <name>dfs.ha.namenodes.mycluster</name>
  <value>nn1,nn2, nn3</value>
</property>

dfs.namenode.rpc-address.[nameservice ID].[name node ID]
侦听的每个NameNode的完全限定的RPC地址。

<property>
  <name>dfs.namenode.rpc-address.mycluster.nn1</name>
  <value>machine1.example.com:8020</value>
</property>
<property>
  <name>dfs.namenode.rpc-address.mycluster.nn2</name>
  <value>machine2.example.com:8020</value>
</property>
<property>
  <name>dfs.namenode.rpc-address.mycluster.nn3</name>
  <value>machine3.example.com:8020</value>
</property>

dfs.namenode.http-address.[nameservice ID].[name node ID]
侦听的每个NameNode的完全限定HTTP地址。
注意:如果启用了Hadoop的安全功能,则还应为每个NameNode设置https-address。

<property>
  <name>dfs.namenode.http-address.mycluster.nn1</name>
  <value>machine1.example.com:9870</value>
</property>
<property>
  <name>dfs.namenode.http-address.mycluster.nn2</name>
  <value>machine2.example.com:9870</value>
</property>
<property>
  <name>dfs.namenode.http-address.mycluster.nn3</name>
  <value>machine3.example.com:9870</value>
</property>

dfs.namenode.shared.edits.dir
配置JournalNodes (jn)地址。如果是管理脚本,则会根据改地址启动jn,如果是active nn,则会通过该地址传输命名空间变更信息。备用的nn则会通过该配置地址拉取变更数据。配置值最后的/mycluster作为存储的根路径,多个HA可公用服务器进行数据存储,节约服务器成本。因此每个HA服务的根路径不能一样,便于区分。

<property>
  <name>dfs.namenode.shared.edits.dir</name>
  <value>qjournal://node1.example.com:8485;node2.example.com:8485;node3.example.com:8485/mycluster</value>
</property>

dfs.journalnode.edits.dir
这是JournalNode本地存储绝对路径。

<property>
  <name>dfs.journalnode.edits.dir</name>
  <value>/path/to/journal/node/local/data</value>
</property>

dfs.client.failover.proxy.provider.[nameservice ID]
便于客户端确定哪个nn是主节点。对于第一次调用,它同时调用所有名称节点以确定活动的名称节点,之后便直接调用主节点(active nn )
ConfiguredFailoverProxyProvider 和RequestHedgingProxyProvider 选其一即可。

<property>
  <name>dfs.client.failover.proxy.provider.mycluster</name>
 <value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
</property>

dfs.ha.fencing.methods
当发生故障转移时,以前的Active NameNode仍可能向客户端提供读取请求,这可能已过期,直到NameNode在尝试写入JournalNode时关闭。因此,即使使用Quorum Journal Manager,仍然需要配置一些防护方法。但是,为了在防护机制失败的情况下提高系统的可用性,建议配置防护方法,能确保不会发生此类情况。请注意,如果您选择不使用实际的防护方法,则仍必须为此设置配置某些内容,例如“shell(/ bin / true)”。 故障转移期间使用的防护方法配置为回车分隔列表,将按顺序尝试,直到指示防护成功为止。 Hadoop有两种方法:shell和sshfence。有关实现自定义防护方法的信息,请参阅org.apache.hadoop.ha.NodeFencer类。
0.1) sshfence
sshfence选项通过SSH连接到目标节点,并使用fuser来终止侦听服务TCP端口的进程。为了使此防护选项起作用,它必须能够在不提供密码的情况下SSH到目标节点。因此,还必须配置dfs.ha.fencing.ssh.private-key-files选项,该选项是以逗号分隔的SSH私钥文件列表。
如果使用ssh免密的方式,那么两个nn需要互相免密登陆,因为涉及到zkfc需要登陆另一台nn将其降序的操作。

<property>
   <name>dfs.ha.fencing.methods</name>
   <value>sshfence</value>
</property>
<property>
  <name>dfs.ha.fencing.ssh.private-key-files</name>
  <value>/home/exampleuser/.ssh/id_rsa</value>
</property>

可选项,可以配置非标准用户名或端口以执行SSH。也可以为SSH配置超时(以毫秒为单位),之后将认为此防护方法已失败。

<property>
  <name>dfs.ha.fencing.methods</name>
  <value>sshfence([[username][:port]])</value>
</property>
<property>
  <name>dfs.ha.fencing.ssh.connect-timeout</name>
  <value>30000</value>
</property>

0.2) shell

通过脚本终止active nn

   <property>
      <name>dfs.ha.fencing.methods</name>
      <value>shell(/path/to/my/script.sh arg1 arg2 ...)</value>
    </property>

core-site.xml
fs.defaultFS
配置和nameservice ID值一样。将通过mycluster结合hdfs配置中的dfs.nameservices和dfs.ha.namenodes.mycluster找到该服务下的所有nn,确认主节点。

<property>
  <name>fs.defaultFS</name>
  <value>hdfs://mycluster</value>
</property>

4、YARN配置文件

配置yarn-env.sh

[hadoop@hadoop1 hadoop]$ vim yarn-env.sh
export JAVA_HOME=/usr/lib/jvm/java

配置yarn-site.xml

[hadoop@hadoop1 hadoop]$ vim yarn-site.xml

在该文件中增加如下配置

<!-- Reducer获取数据的方式 -->
<property>
		<name>yarn.nodemanager.aux-services</name>
		<value>mapreduce_shuffle</value>
</property>
<!-- 指定YARN的ResourceManager的地址 -->
<property>
		<name>yarn.resourcemanager.hostname</name>
		<value>hadoop2</value>
</property>

5、MapReduce配置文件

配置mapred-env.sh

[hadoop@hadoop1 hadoop]$ vim mapred-env.sh
export JAVA_HOME=/usr/lib/jvm/java

配置mapred-site.xml

[hadoop@hadoop1 hadoop]$ cp mapred-site.xml.template mapred-site.xml
[hadoop@hadoop1 hadoop]$ vim mapred-site.xml

在该文件中增加如下配置

<!-- 指定MR运行在Yarn上 -->
<property>
		<name>mapreduce.framework.name</name>
		<value>yarn</value>
</property>

6.在集群上分发配置好的Hadoop配置文件

[hadoop@hadoop1 hadoop]$ xsync /hadoop/hadoop-2.10.0/

7.查看文件分发情况

[hadoop@hadoop2 hadoop]$ cat /hadoop/hadoop-2.10.0/etc/hadoop/core-site.xml

8、配置文件注意

如果JDK在etc/profile文件中是配置了JAVA_HOME的形式

export JAVA_HOME=/usr/lib/jvm/java

配置这个方式,那么在几个配置文件中的JDK 路径是不需要修改的了

三、集群单点启动

1、集群的NameNode初始化

如果集群是第一次启动,需要格式化NameNode

[hadoop@hadoop1 hadoop-2.10.0]$ hadoop namenode -format

2、在hadoop1上启动NameNode

[hadoop@hadoop1 hadoop-2.10.0]$ hadoop-daemon.sh start namenode
starting namenode, logging to /hadoop/hadoop-2.10.0/logs/hadoop-hadoop-namenode-hadoop1.out
[hadoop@hadoop1 hadoop-2.10.0]$ jps
11192 Jps
11117 NameNode

3、启动DataNode

在hadoop1、hadoop2以及hadoop3上分别

[hadoop@hadoop1 hadoop-2.10.0]$ hadoop-daemon.sh start datanode
starting datanode, logging to /hadoop/hadoop-2.10.0/logs/hadoop-hadoop-datanode-hadoop1.out
[hadoop@hadoop1 hadoop-2.10.0]$ jps
11312 Jps
11117 NameNode
11215 DataNode
[hadoop@hadoop2 hadoop-2.10.0]$ hadoop-daemon.sh start datanode
starting datanode, logging to /hadoop/hadoop-2.10.0/logs/hadoop-hadoop-datanode-hadoop2.out
[hadoop@hadoop2 hadoop-2.10.0]$ jps
11090 DataNode
11142 Jps
[hadoop@hadoop3 hadoop]$ hadoop-daemon.sh start datanode
starting datanode, logging to /hadoop/hadoop-2.10.0/logs/hadoop-hadoop-datanode-hadoop3.out
[hadoop@hadoop3 hadoop]$ jps
11079 DataNode
11161 Jps

4、单节点关闭

[hadoop@hadoop1 hadoop-2.10.0]$ hadoop-daemon.sh stop datanode
stopping datanode
[hadoop@hadoop1 hadoop-2.10.0]$ hadoop-daemon.sh stop namenode
stopping namenode
[hadoop@hadoop1 hadoop-2.10.0]$ jps
11403 Jps
[hadoop@hadoop2 bin]$ hadoop-daemon.sh stop datanode
stopping datanode
[hadoop@hadoop2 bin]$ jps
11244 Jps
[hadoop@hadoop3 hadoop-2.10.0]$ hadoop-daemon.sh stop datanode
stopping datanode
[hadoop@hadoop3 hadoop-2.10.0]$ jps
11220 Jps

5、思考:

​ 每次都一个一个节点启动,如果节点数增加到1000个怎么办?

​ 早上来了开始一个一个节点启动,到晚上下班刚好完成,下班?

四、SSH无密登录配置

1. 配置ssh

(1)基本语法

ssh另一台电脑的ip地址

(2)ssh连接时出现Host key verification failed的解决方法

[hadoop@hadoop1 hadoop] $ ssh 192.168.1.103
The authenticity of host '192.168.1.103 (192.168.1.103)' can't be established.
RSA key fingerprint is cf:1e:de:d7:d0:4c:2d:98:60:b4:fd:ae:b1:2d:ad:06.
Are you sure you want to continue connecting (yes/no)? 
Host key verification failed.

(3)解决方案如下:直接输入yes

2. 无密钥配置

(1)免密登录原理,如图2-40所示

img

​ 图2-40 免密登陆原理

(2)生成公钥和私钥:

[hadoop@hadoop1 .ssh]$ ssh-keygen -t rsa

然后敲(三个回车),就会生成两个文件id_rsa(私钥)、id_rsa.pub(公钥)

(3)将公钥拷贝到要免密登录的目标机器上

[hadoop@hadoop1 .ssh]$ ssh-copy-id hadoop1
[hadoop@hadoop1 .ssh]$ ssh-copy-id hadoop2
[hadoop@hadoop1 .ssh]$ ssh-copy-id hadoop3

注意

3. .ssh文件夹下(~/.ssh)的文件功能解释

表2-4

文件名 作用
known_hosts 记录ssh访问过计算机的公钥(public key)
id_rsa 生成的私钥
id_rsa.pub 生成的公钥
authorized_keys 存放授权过得无密登录服务器公钥

五、 群起集群

1、配置slaves

文件地址/hadoop/hadoop-2.10.0/etc/hadoop/slaves

[hadoop@hadoop1 hadoop]$ vi slaves

在该文件中增加如下内容:

hadoop1
hadoop2
hadoop3

注意:该文件中添加的内容结尾不允许有空格,文件中不允许有空行。

在这里一定要注意了, 配置的JDK环境一定要指定JDK目录,否则群起的时候,根本无法找到了,会出下面这个错误

[hadoop@hadoop1 hadoop-2.10.0]$ sbin/start-dfs.sh
20/07/16 18:49:41 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Starting namenodes on [hadoop1]
hadoop1: Error: JAVA_HOME is not set and could not be found.
hadoop2: Error: JAVA_HOME is not set and could not be found.
hadoop3: Error: JAVA_HOME is not set and could not be found.
hadoop1: Error: JAVA_HOME is not set and could not be found.
Starting secondary namenodes [hadoop3]
hadoop3: Error: JAVA_HOME is not set and could not be found.
20/07/16 18:49:46 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

同步所有节点配置文件

[hadoop@hadoop1 hadoop]$ xsync slaves
fname=slaves
pdir=/hadoop/hadoop-2.10.0/etc/hadoop
------------------- hadoop1 --------------
sending incremental file list

sent 43 bytes  received 12 bytes  110.00 bytes/sec
total size is 24  speedup is 0.44
------------------- hadoop2 --------------
sending incremental file list
slaves

sent 114 bytes  received 41 bytes  310.00 bytes/sec
total size is 24  speedup is 0.15
------------------- hadoop3 --------------
sending incremental file list
slaves

sent 114 bytes  received 41 bytes  310.00 bytes/sec
total size is 24  speedup is 0.15

2. 启动集群

​ (1)如果集群是第一次启动,

​ 需要格式化NameNode(注意格式化之前,一定要先停止上次启动的所有namenode和datanode进程,然后再删除data和log数据)

[hadoop@hadoop1 hadoop-2.10.0]$ bin/hdfs namenode -format

(2)启动HDFS

[hadoop@hadoop1 hadoop-2.10.0]$ sbin/start-dfs.sh
20/07/16 18:53:18 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Starting namenodes on [hadoop1]
hadoop1: starting namenode, logging to /hadoop/hadoop-2.10.0/logs/hadoop-hadoop-namenode-hadoop1.out
hadoop3: starting datanode, logging to /hadoop/hadoop-2.10.0/logs/hadoop-hadoop-datanode-hadoop3.out
hadoop2: starting datanode, logging to /hadoop/hadoop-2.10.0/logs/hadoop-hadoop-datanode-hadoop2.out
hadoop1: starting datanode, logging to /hadoop/hadoop-2.10.0/logs/hadoop-hadoop-datanode-hadoop1.out
Starting secondary namenodes [hadoop3]
hadoop3: starting secondarynamenode, logging to /hadoop/hadoop-2.10.0/logs/hadoop-hadoop-secondarynamenode-hadoop3.out
20/07/16 18:53:37 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
[hadoop@hadoop1 hadoop-2.10.0]$ jps
12728 DataNode
12587 NameNode
12939 Jps
[hadoop@hadoop2 hadoop-2.10.0]$ jps
11595 DataNode
11678 Jps
[hadoop@hadoop3 hadoop-2.10.0]$ jps
11603 SecondaryNameNode
11652 Jps
11494 DataNode

(3)启动YARN

[hadoop@hadoop2 hadoop-2.10.0]$ sbin/start-yarn.sh
starting yarn daemons
starting resourcemanager, logging to /hadoop/hadoop-2.10.0/logs/yarn-hadoop-resourcemanager-hadoop2.out
hadoop3: starting nodemanager, logging to /hadoop/hadoop-2.10.0/logs/yarn-hadoop-nodemanager-hadoop3.out
hadoop1: nodemanager running as process 12981. Stop it first.
hadoop2: starting nodemanager, logging to /hadoop/hadoop-2.10.0/logs/yarn-hadoop-nodemanager-hadoop2.out
[hadoop@hadoop2 hadoop-2.10.0]$ jps
12129 Jps
11595 DataNode
11836 NodeManager
11727 ResourceManager
[hadoop@hadoop3 hadoop-2.10.0]$ jps
11603 SecondaryNameNode
11494 DataNode
11831 NodeManager
11945 Jps
[hadoop@hadoop1 hadoop-2.10.0]$ jps
12981 NodeManager
12728 DataNode
13128 Jps
12587 NameNode

注意:NameNode和ResourceManger如果不是同一台机器,不能在NameNode上启动 YARN,应该在ResouceManager所在的机器上启动YARN。

(4)Web端查看SecondaryNameNode

(a)浏览器中输入:http://hadoop3:50090/status.html

(b)查看SecondaryNameNode信息,此信息已经不可见了, 不可用了

(c) 查看目录情况:http://hadoop1:50070/explorer.html#/

六、zookeeper集群搭建

1、服务器环境同hadoop

准备包:https://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.6.1/apache-zookeeper-3.6.1-bin.tar.gz

[root@hadoop1 soft]# wget https://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.6.1/apache-zookeeper-3.6.1-bin.tar.gz
[root@hadoop1 soft]# cd /hadoop
[root@hadoop1 hadoop]# tar zxvf /soft/apache-zookeeper-3.6.1-bin.tar.gz
[root@hadoop1 hadoop]# su hadoop
[hadoop@hadoop1 hadoop]$ ln -s apache-zookeeper-3.6.1-bin zookeeper
[hadoop@hadoop1 hadoop]$ cd zookeeper
[hadoop@hadoop1 zookeeper]$ cd conf
[hadoop@hadoop1 conf]$ mv zoo_sample.cfg zoo.cfg
[hadoop@hadoop1 conf]$ vim zoo.cfg

2、修改配置

修改配置文件zoo.cfg

# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial 
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between 
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just 
# example sakes.
dataDir=/hadoop/zookeeper/zkData
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the 
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1

## Metrics Providers
#
# https://prometheus.io Metrics Exporter
#metricsProvider.className=org.apache.zookeeper.metrics.prometheus.PrometheusMetricsProvider
#metricsProvider.httpPort=7000
#metricsProvider.exportJvmInfo=true
server.1=hadoop1:2888:3888
server.2=hadoop2:2888:3888
server.3=hadoop3:2888:3888

增加配置目录为/hadoop/zookeeper/zkData

同时为集群配置信息ID

server.1=hadoop1:2888:3888
server.2=hadoop2:2888:3888
server.3=hadoop3:2888:3888

注意这里的server.[serverId]=[通信域或者IP]:2888:3888

3、存储及服务ID

创建存储目录

[hadoop@hadoop1 conf]$ cd ../
[hadoop@hadoop1 zookeeper]$ mkdir zkData
[hadoop@hadoop1 zookeeper]$ cd /hadoop
[hadoop@hadoop1 hadoop]$ xsync zookeeper
[hadoop@hadoop1 hadoop]$ xsync apache-zookeeper-3.6.1-bin

同步完数据后,需要在三台机器上创建serverId

[hadoop@hadoop1 hadoop]$ cd zookeeper/zkData
[hadoop@hadoop1 zkData]$ touch myid
[hadoop@hadoop1 zkData]$ vim myid
1
[hadoop@hadoop1 zkData]$ xsync myid
[hadoop@hadoop2 zkData]$ vim myid
2
[hadoop@hadoop3 zkData]$ vim myid
3

创建保存完成后,开始启动集群

4、启动集群

[hadoop@hadoop1 zookeeper]$ bin/zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /hadoop/zookeeper/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
[hadoop@hadoop2 zookeeper]$ bin/zkServer.sh start
[hadoop@hadoop2 zookeeper]$ bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /hadoop/zookeeper/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Mode: Leader
[hadoop@hadoop3 zookeeper]$ bin/zkServer.sh start
[hadoop@hadoop3 zookeeper]$ bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /hadoop/zookeeper/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Mode: follower
[hadoop@hadoop1 zookeeper]$ bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /hadoop/zookeeper/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Mode: follower

到此集群处理完成,可以使用了。

七、HBase的安装

1、准备安装环境

包:https://mirror.bit.edu.cn/apache/hbase/hbase-1.3.6/hbase-1.3.6-bin.tar.gz

[root@hadoop1 soft]# wget https://mirror.bit.edu.cn/apache/hbase/hbase-1.3.6/hbase-1.3.6-bin.tar.gz
[root@hadoop1 soft]# cd /hadoop
[root@hadoop1 soft]# su hadoop
[hadoop@hadoop1 hadoop]$ tar zxvf /soft/hbase-1.3.6-bin.tar.gz
[hadoop@hadoop1 hadoop]$ cd hbase-1.3.6/conf

2、修改配置

a、修改配置文件hbase-env.sh

[hadoop@hadoop1 conf]$ vim hbase-env.sh 

修改下面两项记录

# The java implementation to use.  Java 1.7+ required.
# export JAVA_HOME=/usr/java/jdk1.6.0/
export JAVA_HOME=/usr/lib/jvm/java
注释掉下面两行
# Configure PermSize. Only needed in JDK7. You can safely remove it for JDK8+
#export HBASE_MASTER_OPTS="$HBASE_MASTER_OPTS -XX:PermSize=128m -XX:MaxPermSize=128m -XX:ReservedCodeCacheSize=256m"
#export HBASE_REGIONSERVER_OPTS="$HBASE_REGIONSERVER_OPTS -XX:PermSize=128m -XX:MaxPermSize=128m -XX:ReservedCodeCacheSize=256m"

# Tell HBase whether it should manage it's own instance of Zookeeper or not.
export HBASE_MANAGES_ZK=false

b、修改hbase-site.xml

[hadoop@hadoop1 conf]$ vim hbase-site.xml

在中增加以下内容

<configuration>
    <property>
        <name>hbase.rootdir</name>
        <value>hdfs://hadoop1:9000/hbase</value>
    </property>
    <property>
        <name>hbase.cluster.distributed</name>
        <value>true</value>
    </property>
    <property>
        <name>hbase.master.port</name>
        <value>16000</value>
    </property>
    <property>
        <name>hbase.zookeeper.quorum</name>
        <value>hadoop1:2181,hadoop2:2181,hadoop3:2181</value>
    </property>
    <property>
        <name>hbase.zookeeper.property.dataDir</name>
        <value>/hadoop/zookeeper/zkData</value>
    </property>
</configuration>

c、修改regionservers

[hadoop@hadoop1 conf]$ vim regionservers

修改这个文件内容为了三台机器名

hadoop1
hadoop2
hadoop3

不能有任何的空格在这个文档中,与zk的配置一样

d、创建hadoop关联配置

[hadoop@hadoop1 conf]$ ln -s /hadoop/hadoop-2.10.0/etc/hadoop/hdfs-site.xml hdfs-site.xml
[hadoop@hadoop1 conf]$ ln -s /hadoop/hadoop-2.10.0/etc/hadoop/core-site.xml core-site.xml

d、需要分发同步一下hbase

[hadoop@hadoop1 hadoop]$ xsync hbase-1.3.6

3、启动

八、Hive的安装

1、准备安装环境

包:

2、修改配置

3、启动

九、docker+flink集群的安装

1、准备安装环境

docker环境的安装:

[root@docker-flink111 opt]# curl -sSL https://get.daocloud.io/docker > docker.sh
[root@docker-flink111 opt]# sh docker.sh --mirror Aliyun
[root@docker-flink111 opt]# systemctl start docker

修改镜像地址配置

[root@docker-flink111 opt]# sudo mkdir -p /etc/docker
[root@docker-flink111 opt]# sudo tee /etc/docker/daemon.json <<-'EOF'
{
  "registry-mirrors": ["https://hub-mirror.c.163.com"]
}
EOF
[root@docker-flink111 opt]# sudo systemctl daemon-reload
[root@docker-flink111 opt]# sudo systemctl restart docker

2、安装 Docker Compose


Docker Compose 存放在Git Hub,不太稳定。
你可以也通过执行下面的命令,高速安装Docker Compose。

[root@docker-flink111 opt]# curl -L https://get.daocloud.io/docker/compose/releases/download/1.26.2/docker-compose-`uname -s`-`uname -m` > /usr/local/bin/docker-compose
[root@docker-flink111 opt]# chmod +x /usr/local/bin/docker-compose

你可以通过修改URL中的版本,可以自定义您的需要的版本。

3、修改配置

新增加flink的compose配置文件docker-compose.yml

官方配置1Session Cluster with Docker Compose
docker-compose.yml:

version: "2.2"
services:
  jobmanager:
    image: flink:1.11.0-scala_2.11
    ports:
      - "8081:8081"
    command: jobmanager
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager

  taskmanager:
    image: flink:1.11.0-scala_2.11
    depends_on:
      - jobmanager
    command: taskmanager
    scale: 1
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager
        taskmanager.numberOfTaskSlots: 2

下面的官方配置2

version: "2.2"
services:
  jobmanager:
    image: flink:1.11.0-scala_2.11
    ports:
      - "8081:8081"
    command: standalone-job --job-classname com.job.ClassName [--job-id <job id>] [--fromSavepoint /path/to/savepoint [--allowNonRestoredState]] [job arguments]
    volumes:
      - /host/path/to/job/artifacts:/opt/flink/usrlib
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager
        parallelism.default: 2

  taskmanager:
    image: flink:1.11.0-scala_2.11
    depends_on:
      - jobmanager
    command: taskmanager
    scale: 1
    volumes:
      - /host/path/to/job/artifacts:/opt/flink/usrlib
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager
        taskmanager.numberOfTaskSlots: 2
        parallelism.default: 2

而我们测试使用了一个最简单的配置信息:

version: "2.1"
services:
  jobmanager:
    image: flink:1.11.0-scala_2.11
    expose:
      - "6123"
    ports:
      - "8081:8081"
    command: jobmanager
    environment:
     - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager
        parallelism.default: 2
        web.submit.enable: false

  taskmanager:
    image: flink:1.11.0-scala_2.11
    expose:
      - "6121"
      - "6122"
    depends_on:
      - jobmanager
    command: taskmanager
    links:
      - "jobmanager:jobmanager"
    environment:
      - JOB_MANAGER_RPC_ADDRESS=jobmanager

一定要注意docker-compose文件的格式, 换行,退格要求,类型于python语法

一定要注意在生产环境,一定需要配置web.submit.enable: false 去掉web提交包任务,否则会被 入侵

4、启动与关闭

启动flink-docker集群

[root@docker-flink111 opt]# docker-compose up -d

查看日志

[root@docker-flink111 opt]# docker-compose logs

增加taskManager的节点个数

[root@docker-flink111 opt]# docker-compose scale taskmanager=<N>
这里面的taskmanager就是taskmanager的在docker-compose.yaml的配置的taskmanger名字

关掉集群

[root@docker-flink111 opt]# docker-compose kill
  • Launch a cluster in the foreground 启动一个集群在前台

    docker-compose up
    
  • Launch a cluster in the background 启动一个集群在后台模式

    docker-compose up -d
    
  • Scale the cluster up or down to N TaskManagers 可以扩展taskmanager的个数

    docker-compose scale taskmanager=<N>
    
  • Access the JobManager container 访问jobmanager容器

    docker exec -it $(docker ps --filter name=jobmanager --format={{.ID}}) /bin/sh
    
  • Kill the cluster 关闭集群

    docker-compose kill
    

5、怎么提交业务

To submit a job to a Session cluster via the command line, you can either

使用命令行向集群提交一个任务,你可选择

  • use Flink CLI on the host if it is installed:

如有flink_cli安装后可使用下面命令提交业务

  [root@docker-flink111 opt]# flink run -d -c ${JOB_CLASS_NAME} /job.jar
  • or copy the JAR to the JobManager container and submit the job using the CLI from there, for example:

或者拷贝一个JAR包到容器中使用cli命令直接执行,命令如下:

  [root@docker-flink111 opt]#  JOB_CLASS_NAME="com.job.ClassName"
  [root@docker-flink111 opt]# JM_CONTAINER=$(docker ps --filter name=jobmanager --format={{.ID}}))
  [root@docker-flink111 opt]# docker cp path/to/jar "${JM_CONTAINER}":/job.jar
  [root@docker-flink111 opt]# docker exec -t -i "${JM_CONTAINER}" flink run -d -c ${JOB_CLASS_NAME} /job.jar

6、扩展知识

更新的内容查看官方连接https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/deployment/docker.html#collapse-189

7、kuberNetes+flink设置

This page describes how to deploy a Flink Job and Session cluster on Kubernetes.

Info This page describes deploying a standalone Flink cluster on top of Kubernetes. You can find more information on native Kubernetes deployments here.

Setup Kubernetes

Please follow Kubernetes’ setup guide in order to deploy a Kubernetes cluster. If you want to run Kubernetes locally, we recommend using MiniKube.

Note: If using MiniKube please make sure to execute minikube ssh 'sudo ip link set docker0 promisc on' before deploying a Flink cluster. Otherwise Flink components are not able to self reference themselves through a Kubernetes service.

Flink Docker image

Before deploying the Flink Kubernetes components, please read the Flink Docker image documentation, its tags, how to customize the Flink Docker image and enable plugins to use the image in the Kubernetes definition files.

Deploy Flink cluster on Kubernetes

Using the common resource definitions, launch the common cluster components with the kubectl command:

    kubectl create -f flink-configuration-configmap.yaml
    kubectl create -f jobmanager-service.yaml

Note that you could define your own customized options of flink-conf.yaml within flink-configuration-configmap.yaml.

Then launch the specific components depending on whether you want to deploy a Session or Job cluster.

You can then access the Flink UI via different ways:

service on the rest service of jobmanager:

  1. Run kubectl create -f jobmanager-rest-service.yaml to create the NodePort service on jobmanager. The example of jobmanager-rest-service.yaml can be found in appendix.
  2. Run kubectl get svc flink-jobmanager-rest to know the node-port of this service and navigate to http://: in your browser.
  3. If you use minikube, you can get its public ip by running minikube ip.
  4. Similarly to the port-forward solution, you could also use the following command below to submit jobs to the cluster:

    ./bin/flink run -m <public-node-ip>:<node-port> ./examples/streaming/WordCount.jar
    

You can also access the queryable state of TaskManager if you create a NodePort service for it:

  1. Run kubectl create -f taskmanager-query-state-service.yaml to create the NodePort service on taskmanager. The example of taskmanager-query-state-service.yaml can be found in appendix.
  2. Run kubectl get svc flink-taskmanager-query-state to know the node-port of this service. Then you can create the QueryableStateClient(, to submit the state queries.

In order to terminate the Flink cluster, delete the specific Session or Job cluster components and use kubectl to terminate the common components:

    kubectl delete -f jobmanager-service.yaml
    kubectl delete -f flink-configuration-configmap.yaml
    # if created then also the rest service
    kubectl delete -f jobmanager-rest-service.yaml
    # if created then also the queryable state service
    kubectl delete -f taskmanager-query-state-service.yaml

Deploy Session Cluster

A Flink Session cluster is executed as a long-running Kubernetes Deployment. Note that you can run multiple Flink jobs on a Session cluster. Each job needs to be submitted to the cluster after the cluster has been deployed.

A Flink Session cluster deployment in Kubernetes has at least three components:

  • a Deployment which runs a JobManager
  • a Deployment for a pool of TaskManagers
  • a Service exposing the JobManager’s REST and UI ports

After creating the common cluster components, use the Session specific resource definitions to launch the Session cluster with the kubectl command:

    kubectl create -f jobmanager-session-deployment.yaml
    kubectl create -f taskmanager-session-deployment.yaml

To terminate the Session cluster, these components can be deleted along with the common ones with the kubectl command:

    kubectl delete -f taskmanager-session-deployment.yaml
    kubectl delete -f jobmanager-session-deployment.yaml

Deploy Job Cluster

A Flink Job cluster is a dedicated cluster which runs a single job. You can find more details here.

A basic Flink Job cluster deployment in Kubernetes has three components:

  • a Job which runs a JobManager
  • a Deployment for a pool of TaskManagers
  • a Service exposing the JobManager’s REST and UI ports

Check the Job cluster specific resource definitions and adjust them accordingly.

The args attribute in the jobmanager-job.yaml has to specify the main class of the user job. See also how to specify the JobManager arguments to understand how to pass other args to the Flink image in the jobmanager-job.yaml.

The job artifacts should be available from the job-artifacts-volume in the resource definition examples. The definition examples mount the volume as a local directory of the host assuming that you create the components in a minikube cluster. If you do not use a minikube cluster, you can use any other type of volume, available in your Kubernetes cluster, to supply the job artifacts. Alternatively, you can build a custom image which already contains the artifacts instead.

After creating the common cluster components, use the Job cluster specific resource definitions to launch the cluster with the kubectl command:

    kubectl create -f jobmanager-job.yaml
    kubectl create -f taskmanager-job-deployment.yaml

To terminate the single job cluster, these components can be deleted along with the common ones with the kubectl command:

    kubectl delete -f taskmanager-job-deployment.yaml
    kubectl delete -f jobmanager-job.yaml

Appendix

Common cluster resource definitions

flink-configuration-configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:
  name: flink-config
  labels:
    app: flink
data:
  flink-conf.yaml: |+
    jobmanager.rpc.address: flink-jobmanager
    taskmanager.numberOfTaskSlots: 2
    blob.server.port: 6124
    jobmanager.rpc.port: 6123
    taskmanager.rpc.port: 6122
    queryable-state.proxy.ports: 6125
    jobmanager.memory.process.size: 1600m
    taskmanager.memory.process.size: 1728m
    parallelism.default: 2
  log4j-console.properties: |+
    # This affects logging for both user code and Flink
    rootLogger.level = INFO
    rootLogger.appenderRef.console.ref = ConsoleAppender
    rootLogger.appenderRef.rolling.ref = RollingFileAppender

    # Uncomment this if you want to _only_ change Flink's logging
    #logger.flink.name = org.apache.flink
    #logger.flink.level = INFO

    # The following lines keep the log level of common libraries/connectors on
    # log level INFO. The root logger does not override this. You have to manually
    # change the log levels here.
    logger.akka.name = akka
    logger.akka.level = INFO
    logger.kafka.name= org.apache.kafka
    logger.kafka.level = INFO
    logger.hadoop.name = org.apache.hadoop
    logger.hadoop.level = INFO
    logger.zookeeper.name = org.apache.zookeeper
    logger.zookeeper.level = INFO

    # Log all infos to the console
    appender.console.name = ConsoleAppender
    appender.console.type = CONSOLE
    appender.console.layout.type = PatternLayout
    appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n

    # Log all infos in the given rolling file
    appender.rolling.name = RollingFileAppender
    appender.rolling.type = RollingFile
    appender.rolling.append = false
    appender.rolling.fileName = ${sys:log.file}
    appender.rolling.filePattern = ${sys:log.file}.%i
    appender.rolling.layout.type = PatternLayout
    appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
    appender.rolling.policies.type = Policies
    appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
    appender.rolling.policies.size.size=100MB
    appender.rolling.strategy.type = DefaultRolloverStrategy
    appender.rolling.strategy.max = 10

    # Suppress the irrelevant (wrong) warnings from the Netty channel handler
    logger.netty.name = org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
    logger.netty.level = OFF
jobmanager-service.yaml
apiVersion: v1
kind: Service
metadata:
  name: flink-jobmanager
spec:
  type: ClusterIP
  ports:
  - name: rpc
    port: 6123
  - name: blob-server
    port: 6124
  - name: webui
    port: 8081
  selector:
    app: flink
    component: jobmanager

jobmanager-rest-service.yaml. Optional service, that exposes the jobmanager rest port as public Kubernetes node’s port.

apiVersion: v1
kind: Service
metadata:
  name: flink-jobmanager-rest
spec:
  type: NodePort
  ports:
  - name: rest
    port: 8081
    targetPort: 8081
    nodePort: 30081
  selector:
    app: flink
    component: jobmanager

taskmanager-query-state-service.yaml. Optional service, that exposes the TaskManager port to access the queryable state as a public Kubernetes node’s port.

apiVersion: v1
kind: Service
metadata:
  name: flink-taskmanager-query-state
spec:
  type: NodePort
  ports:
  - name: query-state
    port: 6125
    targetPort: 6125
    nodePort: 30025
  selector:
    app: flink
    component: taskmanager

Session cluster resource definitions

jobmanager-session-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-jobmanager
spec:
  replicas: 1
  selector:
    matchLabels:
      app: flink
      component: jobmanager
  template:
    metadata:
      labels:
        app: flink
        component: jobmanager
    spec:
      containers:
      - name: jobmanager
        image: flink:1.11.0-scala_2.11
        args: ["jobmanager"]
        ports:
        - containerPort: 6123
          name: rpc
        - containerPort: 6124
          name: blob-server
        - containerPort: 8081
          name: webui
        livenessProbe:
          tcpSocket:
            port: 6123
          initialDelaySeconds: 30
          periodSeconds: 60
        volumeMounts:
        - name: flink-config-volume
          mountPath: /opt/flink/conf
        securityContext:
          runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary
      volumes:
      - name: flink-config-volume
        configMap:
          name: flink-config
          items:
          - key: flink-conf.yaml
            path: flink-conf.yaml
          - key: log4j-console.properties
            path: log4j-console.properties
taskmanager-session-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-taskmanager
spec:
  replicas: 2
  selector:
    matchLabels:
      app: flink
      component: taskmanager
  template:
    metadata:
      labels:
        app: flink
        component: taskmanager
    spec:
      containers:
      - name: taskmanager
        image: flink:1.11.0-scala_2.11
        args: ["taskmanager"]
        ports:
        - containerPort: 6122
          name: rpc
        - containerPort: 6125
          name: query-state
        livenessProbe:
          tcpSocket:
            port: 6122
          initialDelaySeconds: 30
          periodSeconds: 60
        volumeMounts:
        - name: flink-config-volume
          mountPath: /opt/flink/conf/
        securityContext:
          runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary
      volumes:
      - name: flink-config-volume
        configMap:
          name: flink-config
          items:
          - key: flink-conf.yaml
            path: flink-conf.yaml
          - key: log4j-console.properties
            path: log4j-console.properties

Job cluster resource definitions

jobmanager-job.yaml
apiVersion: batch/v1
kind: Job
metadata:
  name: flink-jobmanager
spec:
  template:
    metadata:
      labels:
        app: flink
        component: jobmanager
    spec:
      restartPolicy: OnFailure
      containers:
        - name: jobmanager
          image: flink:1.11.0-scala_2.11
          env:
          args: ["standalone-job", "--job-classname", "com.job.ClassName", <optional arguments>, <job arguments>] # optional arguments: ["--job-id", "<job id>", "--fromSavepoint", "/path/to/savepoint", "--allowNonRestoredState"]
          ports:
            - containerPort: 6123
              name: rpc
            - containerPort: 6124
              name: blob-server
            - containerPort: 8081
              name: webui
          livenessProbe:
            tcpSocket:
              port: 6123
            initialDelaySeconds: 30
            periodSeconds: 60
          volumeMounts:
            - name: flink-config-volume
              mountPath: /opt/flink/conf
            - name: job-artifacts-volume
              mountPath: /opt/flink/usrlib
          securityContext:
            runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary
      volumes:
        - name: flink-config-volume
          configMap:
            name: flink-config
            items:
              - key: flink-conf.yaml
                path: flink-conf.yaml
              - key: log4j-console.properties
                path: log4j-console.properties
        - name: job-artifacts-volume
          hostPath:
            path: /host/path/to/job/artifacts
taskmanager-job-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-taskmanager
spec:
  replicas: 2
  selector:
    matchLabels:
      app: flink
      component: taskmanager
  template:
    metadata:
      labels:
        app: flink
        component: taskmanager
    spec:
      containers:
      - name: taskmanager
        image: flink:1.11.0-scala_2.11
        env:
        args: ["taskmanager"]
        ports:
        - containerPort: 6122
          name: rpc
        - containerPort: 6125
          name: query-state
        livenessProbe:
          tcpSocket:
            port: 6122
          initialDelaySeconds: 30
          periodSeconds: 60
        volumeMounts:
        - name: flink-config-volume
          mountPath: /opt/flink/conf/
        - name: job-artifacts-volume
          mountPath: /opt/flink/usrlib
        securityContext:
          runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary
      volumes:
      - name: flink-config-volume
        configMap:
          name: flink-config
          items:
          - key: flink-conf.yaml
            path: flink-conf.yaml
          - key: log4j-console.properties
            path: log4j-console.properties
      - name: job-artifacts-volume
        hostPath:
          path: /host/path/to/job/artifacts

原生 Kubernetes 设置 Beta

本页面描述了如何在 Kubernetes 原生地部署 Flink session 集群。

Flink 的原生 Kubernetes 集成仍处于试验阶段。在以后的版本中,配置和 CLI flags 可能会发生变化。

要求

  • Kubernetes 版本 1.9 或以上。
  • KubeConfig 可以查看、创建、删除 pods 和 services,可以通过~/.kube/config 配置。你可以通过运行 kubectl auth can-i <list|create|edit|delete> pods 来验证权限。
  • 启用 Kubernetes DNS。
  • 具有 RBAC 权限的 Service Account 可以创建、删除 pods。

Flink Kubernetes Session

启动 Flink Session

按照以下说明在 Kubernetes 集群中启动 Flink Session。

Session 集群将启动所有必需的 Flink 服务(JobManager 和 TaskManagers),以便你可以将程序提交到集群。 注意你可以在每个 session 上运行多个程序。

$ ./bin/kubernetes-session.sh

所有 Kubernetes 配置项都可以在我们的配置指南中找到。

示例: 执行以下命令启动 session 集群,每个 TaskManager 分配 4 GB 内存、2 CPUs、4 slots:

在此示例中,我们覆盖了 resourcemanager.taskmanager-timeout 配置,为了使运行 taskmanager 的 pod 停留时间比默认的 30 秒更长。 尽管此设置可能在云环境下增加成本,但在某些情况下能够更快地启动新作业,并且在开发过程中,你有更多的时间检查作业的日志文件。

$ ./bin/kubernetes-session.sh \
  -Dkubernetes.cluster-id=<ClusterId> \
  -Dtaskmanager.memory.process.size=4096m \
  -Dkubernetes.taskmanager.cpu=2 \
  -Dtaskmanager.numberOfTaskSlots=4 \
  -Dresourcemanager.taskmanager-timeout=3600000

系统将使用 conf/flink-conf.yaml 中的配置。 如果你更改某些配置,请遵循我们的配置指南

如果你未通过 kubernetes.cluster-id 为 session 指定特定名称,Flink 客户端将会生成一个 UUID 名称。

自定义 Flink Docker 镜像

如果要使用自定义的 Docker 镜像部署 Flink 容器,请查看 Flink Docker 镜像文档镜像 tags如何自定义 Flink Docker 镜像启用插件。 如果创建了自定义的 Docker 镜像,则可以通过设置 kubernetes.container.image 配置项来指定它:

$ ./bin/kubernetes-session.sh \
  -Dkubernetes.cluster-id=<ClusterId> \
  -Dtaskmanager.memory.process.size=4096m \
  -Dkubernetes.taskmanager.cpu=2 \
  -Dtaskmanager.numberOfTaskSlots=4 \
  -Dresourcemanager.taskmanager-timeout=3600000 \
  -Dkubernetes.container.image=<CustomImageName>

将作业提交到现有 Session

使用以下命令将 Flink 作业提交到 Kubernetes 集群。

$ ./bin/flink run -d -t kubernetes-session -Dkubernetes.cluster-id=<ClusterId> examples/streaming/WindowJoin.jar

访问 Job Manager UI

有几种方法可以将服务暴露到外部(集群外部) IP 地址。 可以使用 kubernetes.rest-service.exposed.type 进行配置。

  • ClusterIP:通过集群内部 IP 暴露服务。 该服务只能在集群中访问。如果想访问 JobManager ui 或将作业提交到现有 session,则需要启动一个本地代理。 然后你可以使用 localhost:8081 将 Flink 作业提交到 session 或查看仪表盘。

    $ kubectl port-forward service/<ServiceName> 8081
    
  • NodePort:通过每个 Node 上的 IP 和静态端口(NodePort)暴露服务。<NodeIP>:<NodePort> 可以用来连接 JobManager 服务。NodeIP 可以很容易地用 Kubernetes ApiServer 地址替换。 你可以在 kube 配置文件找到它。

  • LoadBalancer:使用云提供商的负载均衡器在外部暴露服务。 由于云提供商和 Kubernetes 需要一些时间来准备负载均衡器,因为你可能在客户端日志中获得一个 NodePort 的 JobManager Web 界面。 你可以使用 kubectl get services/<ClusterId> 获取 EXTERNAL-IP 然后手动构建负载均衡器 JobManager Web 界面 http://<EXTERNAL-IP>:8081

警告! JobManager 可能会在无需认证的情况下暴露在公网上,同时可以提交任务运行。

  • ExternalName:将服务映射到 DNS 名称,当前版本不支持。

有关更多信息,请参考官方文档在 Kubernetes 上发布服务

连接现有 Session

默认情况下,Kubernetes session 以后台模式启动,这意味着 Flink 客户端在将所有资源提交到 Kubernetes 集群后会退出。使用以下命令来连接现有 session。

$ ./bin/kubernetes-session.sh -Dkubernetes.cluster-id=<ClusterId> -Dexecution.attached=true

停止 Flink Session

要停止 Flink Kubernetes session,将 Flink 客户端连接到集群并键入 stop

$ echo 'stop' | ./bin/kubernetes-session.sh -Dkubernetes.cluster-id=<ClusterId> -Dexecution.attached=true

手动清理资源

Flink 用 Kubernetes OwnerReference’s 来清理所有集群组件。 所有 Flink 创建的资源,包括 ConfigMapServicePod,已经将 OwnerReference 设置为 deployment/<ClusterId>。 删除 deployment 后,所有其他资源将自动删除。

$ kubectl delete deployment/<ClusterID>

日志文件

默认情况下,JobManager 和 TaskManager 只把日志存储在每个 pod 中的 /opt/flink/log 下。 如果要使用 kubectl logs <PodName> 查看日志,必须执行以下操作:

  1. 在 Flink 客户端的 log4j.properties 中增加新的 appender。
  2. 在 log4j.properties 的 rootLogger 中增加如下 ‘appenderRef’,rootLogger.appenderRef.console.ref = ConsoleAppender
  3. 通过增加配置项 -Dkubernetes.container-start-command-template="%java% %classpath% %jvmmem% %jvmopts% %logging% %class% %args%" 来删除重定向的参数。
  4. 停止并重启你的 session。现在你可以使用 kubectl logs 查看日志了。

    # Log all infos to the console
    appender.console.name = ConsoleAppender
    appender.console.type = CONSOLE
    appender.console.layout.type = PatternLayout
    appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
    

如果 pod 正在运行,可以使用 kubectl exec -it <PodName> bash 进入 pod 并查看日志或调试进程。

Flink Kubernetes Application

启动 Flink Application

Application 模式允许用户创建单个镜像,其中包含他们的作业和 Flink 运行时,该镜像将按需自动创建和销毁集群组件。Flink 社区提供了可以构建多用途自定义镜像的基础镜像。

FROM flink
RUN mkdir -p $FLINK_HOME/usrlib
COPY /path/of/my-flink-job-*.jar $FLINK_HOME/usrlib/my-flink-job.jar

使用以下命令启动 Flink Application。

$ ./bin/flink run-application -p 8 -t kubernetes-application \
  -Dkubernetes.cluster-id=<ClusterId> \
  -Dtaskmanager.memory.process.size=4096m \
  -Dkubernetes.taskmanager.cpu=2 \
  -Dtaskmanager.numberOfTaskSlots=4 \
  -Dkubernetes.container.image=<CustomImageName> \
  local:///opt/flink/usrlib/my-flink-job.jar

注意:Application 模式只支持 “local” 作为 schema。默认 jar 位于镜像中,而不是 Flink 客户端中。

注意:镜像的 “$FLINK_HOME/usrlib” 目录下的所有 jar 将会被加到用户 classpath 中。

停止 Flink Application

当 Application 停止时,所有 Flink 集群资源都会自动销毁。 与往常一样,作业可能会在手动取消或执行完的情况下停止。

$ ./bin/flink cancel -t kubernetes-application -Dkubernetes.cluster-id=<ClusterID> <JobID>

Kubernetes 概念

命名空间

Kubernetes 中的命名空间是一种在多个用户之间划分集群资源的方法(通过资源配额)。 它类似于 Yarn 集群中的队列概念。Flink on Kubernetes 可以使用命名空间来启动 Flink 集群。 启动 Flink 集群时,可以使用 -Dkubernetes.namespace=default 参数来指定命名空间。

资源配额提供了限制每个命名空间的合计资源消耗的约束。 它可以按类型限制可在命名空间中创建的对象数量,以及该项目中的资源可能消耗的计算资源总量。

基于角色的访问控制

基于角色的访问控制(RBAC)是一种在企业内部基于单个用户的角色来调节对计算或网络资源的访问的方法。 用户可以配置 RBAC 角色和服务账户,JobManager 使用这些角色和服务帐户访问 Kubernetes 集群中的 Kubernetes API server。

每个命名空间有默认的服务账户,但是默认服务账户可能没有权限在 Kubernetes 集群中创建或删除 pod。 用户可能需要更新默认服务账户的权限或指定另一个绑定了正确角色的服务账户。

$ kubectl create clusterrolebinding flink-role-binding-default --clusterrole=edit --serviceaccount=default:default

如果你不想使用默认服务账户,使用以下命令创建一个新的 flink 服务账户并设置角色绑定。 然后使用配置项 -Dkubernetes.jobmanager.service-account=flink 来使 JobManager pod 使用 flink 服务账户去创建和删除 TaskManager pod。

$ kubectl create serviceaccount flink
$ kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=default:flink

有关更多信息,请参考 Kubernetes 官方文档 RBAC 授权

背景/内部构造

本节简要解释了 Flink 和 Kubernetes 如何交互。

img

创建 Flink Kubernetes session 集群时,Flink 客户端首先将连接到 Kubernetes ApiServer 提交集群描述信息,包括 ConfigMap 描述信息、Job Manager Service 描述信息、Job Manager Deployment 描述信息和 Owner Reference。 Kubernetes 将创建 JobManager 的 deployment,在此期间 Kubelet 将拉取镜像,准备并挂载卷,然后执行 start 命令。 JobManager pod 启动后,Dispatcher 和 KubernetesResourceManager 服务会相继启动,然后集群准备完成,并等待提交作业。

当用户通过 Flink 客户端提交作业时,将通过客户端生成 jobGraph 并将其与用户 jar 一起上传到 Dispatcher。 然后 Dispatcher 会为每个 job 启动一个单独的 JobMaster。

JobManager 向 KubernetesResourceManager 请求被称为 slots 的资源。 如果没有可用的 slots,KubernetesResourceManager 将拉起 TaskManager pod 并且把它们注册到集群中。

top