flink
前言
发展历史
官方介绍
组件栈
应用场景
所有的流式计算
Flink安装部署
local本地模式-了解
原理
操作
1.下载安装包
https://archive.apache.org/dist/flink/
2.上传flink-1.12.0-bin-scala_2.12.tgz到node1的指定目录
3.解压
tar -zxvf flink-1.12.0-bin-scala_2.12.tgz
4.如果出现权限问题,需要修改权限
chown -R root:root /export/server/flink-1.12.0
5.改名或创建软链接
mv flink-1.12.0 flink
ln -s /export/server/flink-1.12.0 /export/server/flink
测试
1.准备文件/root/words.txt
vim /root/words.txt
hello me you her
hello me you
hello me
hello2.启动Flink本地“集群”
/export/server/flink/bin/start-cluster.sh
3.使用jps可以查看到下面两个进程
- TaskManagerRunner
- StandaloneSessionClusterEntrypoint
4.访问Flink的Web UI
http://node1:8081/#/overview
slot在Flink里面可以认为是资源组,Flink是通过将任务分成子任务并且将这些子任务分配到slot来并行执行程序。
5.执行官方示例
/export/server/flink/bin/flink run /export/server/flink/examples/batch/WordCount.jar --input /root/words.txt --output /root/out6.停止Flink
/export/server/flink/bin/stop-cluster.sh
启动shell交互式窗口(目前所有Scala 2.12版本的安装包暂时都不支持 Scala Shell)
/export/server/flink/bin/start-scala-shell.sh local
执行如下命令
benv.readTextFile("/root/words.txt").flatMap(_.split(" ")).map((_,1)).groupBy(0).sum(1).print()退出shell
:quit
Standalone独立集群模式-了解
原理
操作
1.集群规划:
- 服务器: node1(Master + Slave): JobManager + TaskManager
- 服务器: node2(Slave): TaskManager
- 服务器: node3(Slave): TaskManager
2.修改flink-conf.yaml
vim /export/server/flink/conf/flink-conf.yaml
jobmanager.rpc.address: node1
taskmanager.numberOfTaskSlots: 2
web.submit.enable: true
#历史服务器
jobmanager.archive.fs.dir: hdfs://node1:8020/flink/completed-jobs/
historyserver.web.address: node1
historyserver.web.port: 8082
historyserver.archive.fs.dir: hdfs://node1:8020/flink/completed-jobs/2.修改masters
vim /export/server/flink/conf/masters
node1:80813.修改slaves
vim /export/server/flink/conf/workers
node1
node2
node34.添加HADOOP_CONF_DIR环境变量
vim /etc/profile
export HADOOP_CONF_DIR=/export/server/hadoop/etc/hadoop5.分发
scp -r /export/server/flink node2:/export/server/flink
scp -r /export/server/flink node3:/export/server/flink
scp /etc/profile node2:/etc/profile
scp /etc/profile node3:/etc/profile
或
for i in {2..3}; do scp -r flink node$i:$PWD; done6.source
source /etc/profile
测试
1.启动集群,在node1上执行如下命令
/export/server/flink/bin/start-cluster.sh
或者单独启动
/export/server/flink/bin/jobmanager.sh ((start|start-foreground) cluster)|stop|stop-all
/export/server/flink/bin/taskmanager.sh start|start-foreground|stop|stop-all
2.启动历史服务器
/export/server/flink/bin/historyserver.sh start
3.访问Flink UI界面或使用jps查看
http://node1:8081/#/overview
http://node1:8082/#/overview
4.执行官方测试案例
/export/server/flink/bin/flink run /export/server/flink/examples/batch/WordCount.jar6.停止Flink集群
/export/server/flink/bin/stop-cluster.sh
Standalone-HA高可用集群模式-了解
原理
操作
1.集群规划
- 服务器: node1(Master + Slave): JobManager + TaskManager
- 服务器: node2(Master + Slave): JobManager + TaskManager
- 服务器: node3(Slave): TaskManager
2.启动ZooKeeper
zkServer.sh status
zkServer.sh stop
zkServer.sh start
3.启动HDFS
/export/serves/hadoop/sbin/start-dfs.sh
4.停止Flink集群
/export/server/flink/bin/stop-cluster.sh
5.修改flink-conf.yaml
vim /export/server/flink/conf/flink-conf.yaml
增加如下内容G
state.backend: filesystem
state.backend.fs.checkpointdir: hdfs://node1:8020/flink-checkpoints
high-availability: zookeeper
high-availability.storageDir: hdfs://node1:8020/flink/ha/
high-availability.zookeeper.quorum: node1:2181,node2:2181,node3:21816.修改masters
vim /export/server/flink/conf/masters
7.同步
scp -r /export/server/flink/conf/flink-conf.yaml node2:/export/server/flink/conf/
scp -r /export/server/flink/conf/flink-conf.yaml node3:/export/server/flink/conf/
scp -r /export/server/flink/conf/masters node2:/export/server/flink/conf/
scp -r /export/server/flink/conf/masters node3:/export/server/flink/conf/8.修改node2上的flink-conf.yaml
vim /export/server/flink/conf/flink-conf.yaml
jobmanager.rpc.address: node29.重新启动Flink集群,node1上执行
/export/server/flink/bin/stop-cluster.sh
/export/server/flink/bin/start-cluster.sh
10.使用jps命令查看
发现没有Flink相关进程被启动
11.查看日志
cat /export/server/flink/log/flink-root-standalonesession-0-node1.log
发现如下错误
因为在Flink1.8版本后,Flink官方提供的安装包里没有整合HDFS的jar
12.下载jar包并在Flink的lib目录下放入该jar包并分发使Flink能够支持对Hadoop的操作
下载地址
https://flink.apache.org/downloads.html
13.放入lib目录
cd /export/server/flink/lib
14.分发
for i in {2..3}; do scp -r flink-shaded-hadoop-2-uber-2.7.5-10.0.jar node$i:$PWD; done
15.重新启动Flink集群,node1上执行
/export/server/flink/bin/stop-cluster.sh
/export/server/flink/bin/start-cluster.sh
16.使用jps命令查看,发现三台机器已经ok
测试
1.访问WebUI
http://node1:8081/#/job-manager/config
http://node2:8081/#/job-manager/config
2.执行wc
/export/server/flink/bin/flink run /export/server/flink/examples/batch/WordCount.jar
3.kill掉其中一个master
4.重新执行wc,还是可以正常执行
/export/server/flink/bin/flink run /export/server/flink/examples/batch/WordCount.jar
3.停止集群
/export/server/flink/bin/stop-cluster.sh
Flink-On-Yarn-开发使用
原理
两种模式
Session会话模式
Job分离模式
操作
1.关闭yarn的内存检查
vim /export/server/hadoop/etc/hadoop/yarn-site.xml
<!-- 关闭yarn内存检查 -->
<property>
<name>yarn.nodemanager.pmem-check-enabled</name>
<value>false</value>
</property>
<property>
<name>yarn.nodemanager.vmem-check-enabled</name>
<value>false</value>
</property>2.分发
scp -r /export/server/hadoop/etc/hadoop/yarn-site.xml node2:/export/server/hadoop/etc/hadoop/yarn-site.xml
scp -r /export/server/hadoop/etc/hadoop/yarn-site.xml node3:/export/server/hadoop/etc/hadoop/yarn-site.xml3.重启yarn
/export/server/hadoop/sbin/stop-yarn.sh
/export/server/hadoop/sbin/start-yarn.sh
测试
Session会话模式
在Yarn上启动一个Flink集群,并重复使用该集群,后续提交的任务都是给该集群,资源会被一直占用,除非手动关闭该集群----适用于大量的小任务
1.在yarn上启动一个Flink集群/会话,node1上执行以下命令
/export/server/flink/bin/yarn-session.sh -n 2 -tm 800 -s 1 -d
说明:
申请2个CPU、1600M内存
# -n 表示申请2个容器,这里指的就是多少个taskmanager
# -tm 表示每个TaskManager的内存大小
# -s 表示每个TaskManager的slots数量
# -d 表示以后台程序方式运行
注意:
该警告不用管
WARN org.apache.hadoop.hdfs.DFSClient - Caught exception
java.lang.InterruptedException
2.查看UI界面
http://node1:8088/cluster
3.使用flink run提交任务:
/export/server/flink/bin/flink run /export/server/flink/examples/batch/WordCount.jar
运行完之后可以继续运行其他的小任务
/export/server/flink/bin/flink run /export/server/flink/examples/batch/WordCount.jar
4.通过上方的ApplicationMaster可以进入Flink的管理界面
5.关闭yarn-session:
yarn application -kill application_1609508087977_0005
Job分离模式--用的更多
针对每个Flink任务在Yarn上启动一个独立的Flink集群并运行,结束后自动关闭并释放资源,----适用于大任务
1.直接提交job
/export/server/flink/bin/flink run -m yarn-cluster -yjm 1024 -ytm 1024 /export/server/flink/examples/batch/WordCount.jar
# -m jobmanager的地址
# -yjm 1024 指定jobmanager的内存信息
# -ytm 1024 指定taskmanager的内存信息
2.查看UI界面
http://node1:8088/cluster
参数说明
/export/server/flink/bin/flink run -m yarn-cluster -yjm 1024 -ytm 1024 /export/server/flink/examples/batch/WordCount.jar
/export/server/flink/bin/flink --help
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/export/server/flink/lib/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/export/server/hadoop-2.7.5/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
./flink <ACTION> [OPTIONS] [ARGUMENTS]
The following actions are available:
Action "run" compiles and runs a program.
Syntax: run [OPTIONS] <jar-file> <arguments>
"run" action options:
-c,--class <classname> Class with the program entry point
("main()" method). Only needed if the
JAR file does not specify the class in
its manifest.
-C,--classpath <url> Adds a URL to each user code
classloader on all nodes in the
cluster. The paths must specify a
protocol (e.g. file://) and be
accessible on all nodes (e.g. by means
of a NFS share). You can use this
option multiple times for specifying
more than one URL. The protocol must
be supported by the {@link
java.net.URLClassLoader}.
-d,--detached If present, runs the job in detached
mode
-n,--allowNonRestoredState Allow to skip savepoint state that
cannot be restored. You need to allow
this if you removed an operator from
your program that was part of the
program when the savepoint was
triggered.
-p,--parallelism <parallelism> The parallelism with which to run the
program. Optional flag to override the
default value specified in the
configuration.
-py,--python <pythonFile> Python script with the program entry
point. The dependent resources can be
configured with the `--pyFiles`
option.
-pyarch,--pyArchives <arg> Add python archive files for job. The
archive files will be extracted to the
working directory of python UDF
worker. Currently only zip-format is
supported. For each archive file, a
target directory be specified. If the
target directory name is specified,
the archive file will be extracted to
a name can directory with the
specified name. Otherwise, the archive
file will be extracted to a directory
with the same name of the archive
file. The files uploaded via this
option are accessible via relative
path. '#' could be used as the
separator of the archive file path and
the target directory name. Comma (',')
could be used as the separator to
specify multiple archive files. This
option can be used to upload the
virtual environment, the data files
used in Python UDF (e.g.: --pyArchives
file:///tmp/py37.zip,file:///tmp/data.
zip#data --pyExecutable
py37.zip/py37/bin/python). The data
files could be accessed in Python UDF,
e.g.: f = open('data/data.txt', 'r').
-pyexec,--pyExecutable <arg> Specify the path of the python
interpreter used to execute the python
UDF worker (e.g.: --pyExecutable
/usr/local/bin/python3). The python
UDF worker depends on Python 3.5+,
Apache Beam (version == 2.23.0), Pip
(version >= 7.1.0) and SetupTools
(version >= 37.0.0). Please ensure
that the specified environment meets
the above requirements.
-pyfs,--pyFiles <pythonFiles> Attach custom python files for job.
These files will be added to the
PYTHONPATH of both the local client
and the remote python UDF worker. The
standard python resource file suffixes
such as .py/.egg/.zip or directory are
all supported. Comma (',') could be
used as the separator to specify
multiple files (e.g.: --pyFiles
file:///tmp/myresource.zip,hdfs:///$na
menode_address/myresource2.zip).
-pym,--pyModule <pythonModule> Python module with the program entry
point. This option must be used in
conjunction with `--pyFiles`.
-pyreq,--pyRequirements <arg> Specify a requirements.txt file which
defines the third-party dependencies.
These dependencies will be installed
and added to the PYTHONPATH of the
python UDF worker. A directory which
contains the installation packages of
these dependencies could be specified
optionally. Use '#' as the separator
if the optional parameter exists
(e.g.: --pyRequirements
file:///tmp/requirements.txt#file:///t
mp/cached_dir).
-s,--fromSavepoint <savepointPath> Path to a savepoint to restore the job
from (for example
hdfs:///flink/savepoint-1537).
-sae,--shutdownOnAttachedExit If the job is submitted in attached
mode, perform a best-effort cluster
shutdown when the CLI is terminated
abruptly, e.g., in response to a user
interrupt, such as typing Ctrl + C.
Options for Generic CLI mode:
-D <property=value> Allows specifying multiple generic configuration
options. The available options can be found at
https://ci.apache.org/projects/flink/flink-docs-stabl
e/ops/config.html
-e,--executor <arg> DEPRECATED: Please use the -t option instead which is
also available with the "Application Mode".
The name of the executor to be used for executing the
given job, which is equivalent to the
"execution.target" config option. The currently
available executors are: "remote", "local",
"kubernetes-session", "yarn-per-job", "yarn-session".
-t,--target <arg> The deployment target for the given application,
which is equivalent to the "execution.target" config
option. For the "run" action the currently available
targets are: "remote", "local", "kubernetes-session",
"yarn-per-job", "yarn-session". For the
"run-application" action the currently available
targets are: "kubernetes-application",
"yarn-application".
Options for yarn-cluster mode:
-d,--detached If present, runs the job in detached
mode
-m,--jobmanager <arg> Set to yarn-cluster to use YARN
execution mode.
-yat,--yarnapplicationType <arg> Set a custom application type for the
application on YARN
-yD <property=value> use value for given property
-yd,--yarndetached If present, runs the job in detached
mode (deprecated; use non-YARN
specific option instead)
-yh,--yarnhelp Help for the Yarn session CLI.
-yid,--yarnapplicationId <arg> Attach to running YARN session
-yj,--yarnjar <arg> Path to Flink jar file
-yjm,--yarnjobManagerMemory <arg> Memory for JobManager Container with
optional unit (default: MB)
-ynl,--yarnnodeLabel <arg> Specify YARN node label for the YARN
application
-ynm,--yarnname <arg> Set a custom name for the application
on YARN
-yq,--yarnquery Display available YARN resources
(memory, cores)
-yqu,--yarnqueue <arg> Specify YARN queue.
-ys,--yarnslots <arg> Number of slots per TaskManager
-yt,--yarnship <arg> Ship files in the specified directory
(t for transfer)
-ytm,--yarntaskManagerMemory <arg> Memory per TaskManager Container with
optional unit (default: MB)
-yz,--yarnzookeeperNamespace <arg> Namespace to create the Zookeeper
sub-paths for high availability mode
-z,--zookeeperNamespace <arg> Namespace to create the Zookeeper
sub-paths for high availability mode
Options for default mode:
-D <property=value> Allows specifying multiple generic
configuration options. The available
options can be found at
https://ci.apache.org/projects/flink/flink-
docs-stable/ops/config.html
-m,--jobmanager <arg> Address of the JobManager to which to
connect. Use this flag to connect to a
different JobManager than the one specified
in the configuration. Attention: This
option is respected only if the
high-availability configuration is NONE.
-z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths
for high availability mode
Action "run-application" runs an application in Application Mode.
Syntax: run-application [OPTIONS] <jar-file> <arguments>
Options for Generic CLI mode:
-D <property=value> Allows specifying multiple generic configuration
options. The available options can be found at
https://ci.apache.org/projects/flink/flink-docs-stabl
e/ops/config.html
-e,--executor <arg> DEPRECATED: Please use the -t option instead which is
also available with the "Application Mode".
The name of the executor to be used for executing the
given job, which is equivalent to the
"execution.target" config option. The currently
available executors are: "remote", "local",
"kubernetes-session", "yarn-per-job", "yarn-session".
-t,--target <arg> The deployment target for the given application,
which is equivalent to the "execution.target" config
option. For the "run" action the currently available
targets are: "remote", "local", "kubernetes-session",
"yarn-per-job", "yarn-session". For the
"run-application" action the currently available
targets are: "kubernetes-application",
"yarn-application".
Action "info" shows the optimized execution plan of the program (JSON).
Syntax: info [OPTIONS] <jar-file> <arguments>
"info" action options:
-c,--class <classname> Class with the program entry point
("main()" method). Only needed if the JAR
file does not specify the class in its
manifest.
-p,--parallelism <parallelism> The parallelism with which to run the
program. Optional flag to override the
default value specified in the
configuration.
Action "list" lists running and scheduled programs.
Syntax: list [OPTIONS]
"list" action options:
-a,--all Show all programs and their JobIDs
-r,--running Show only running programs and their JobIDs
-s,--scheduled Show only scheduled programs and their JobIDs
Options for Generic CLI mode:
-D <property=value> Allows specifying multiple generic configuration
options. The available options can be found at
https://ci.apache.org/projects/flink/flink-docs-stabl
e/ops/config.html
-e,--executor <arg> DEPRECATED: Please use the -t option instead which is
also available with the "Application Mode".
The name of the executor to be used for executing the
given job, which is equivalent to the
"execution.target" config option. The currently
available executors are: "remote", "local",
"kubernetes-session", "yarn-per-job", "yarn-session".
-t,--target <arg> The deployment target for the given application,
which is equivalent to the "execution.target" config
option. For the "run" action the currently available
targets are: "remote", "local", "kubernetes-session",
"yarn-per-job", "yarn-session". For the
"run-application" action the currently available
targets are: "kubernetes-application",
"yarn-application".
Options for yarn-cluster mode:
-m,--jobmanager <arg> Set to yarn-cluster to use YARN execution
mode.
-yid,--yarnapplicationId <arg> Attach to running YARN session
-z,--zookeeperNamespace <arg> Namespace to create the Zookeeper
sub-paths for high availability mode
Options for default mode:
-D <property=value> Allows specifying multiple generic
configuration options. The available
options can be found at
https://ci.apache.org/projects/flink/flink-
docs-stable/ops/config.html
-m,--jobmanager <arg> Address of the JobManager to which to
connect. Use this flag to connect to a
different JobManager than the one specified
in the configuration. Attention: This
option is respected only if the
high-availability configuration is NONE.
-z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths
for high availability mode
Action "stop" stops a running program with a savepoint (streaming jobs only).
Syntax: stop [OPTIONS] <Job ID>
"stop" action options:
-d,--drain Send MAX_WATERMARK before taking the
savepoint and stopping the pipelne.
-p,--savepointPath <savepointPath> Path to the savepoint (for example
hdfs:///flink/savepoint-1537). If no
directory is specified, the configured
default will be used
("state.savepoints.dir").
Options for Generic CLI mode:
-D <property=value> Allows specifying multiple generic configuration
options. The available options can be found at
https://ci.apache.org/projects/flink/flink-docs-stabl
e/ops/config.html
-e,--executor <arg> DEPRECATED: Please use the -t option instead which is
also available with the "Application Mode".
The name of the executor to be used for executing the
given job, which is equivalent to the
"execution.target" config option. The currently
available executors are: "remote", "local",
"kubernetes-session", "yarn-per-job", "yarn-session".
-t,--target <arg> The deployment target for the given application,
which is equivalent to the "execution.target" config
option. For the "run" action the currently available
targets are: "remote", "local", "kubernetes-session",
"yarn-per-job", "yarn-session". For the
"run-application" action the currently available
targets are: "kubernetes-application",
"yarn-application".
Options for yarn-cluster mode:
-m,--jobmanager <arg> Set to yarn-cluster to use YARN execution
mode.
-yid,--yarnapplicationId <arg> Attach to running YARN session
-z,--zookeeperNamespace <arg> Namespace to create the Zookeeper
sub-paths for high availability mode
Options for default mode:
-D <property=value> Allows specifying multiple generic
configuration options. The available
options can be found at
https://ci.apache.org/projects/flink/flink-
docs-stable/ops/config.html
-m,--jobmanager <arg> Address of the JobManager to which to
connect. Use this flag to connect to a
different JobManager than the one specified
in the configuration. Attention: This
option is respected only if the
high-availability configuration is NONE.
-z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths
for high availability mode
Action "cancel" cancels a running program.
Syntax: cancel [OPTIONS] <Job ID>
"cancel" action options:
-s,--withSavepoint <targetDirectory> **DEPRECATION WARNING**: Cancelling
a job with savepoint is deprecated.
Use "stop" instead.
Trigger savepoint and cancel job.
The target directory is optional. If
no directory is specified, the
configured default directory
(state.savepoints.dir) is used.
Options for Generic CLI mode:
-D <property=value> Allows specifying multiple generic configuration
options. The available options can be found at
https://ci.apache.org/projects/flink/flink-docs-stabl
e/ops/config.html
-e,--executor <arg> DEPRECATED: Please use the -t option instead which is
also available with the "Application Mode".
The name of the executor to be used for executing the
given job, which is equivalent to the
"execution.target" config option. The currently
available executors are: "remote", "local",
"kubernetes-session", "yarn-per-job", "yarn-session".
-t,--target <arg> The deployment target for the given application,
which is equivalent to the "execution.target" config
option. For the "run" action the currently available
targets are: "remote", "local", "kubernetes-session",
"yarn-per-job", "yarn-session". For the
"run-application" action the currently available
targets are: "kubernetes-application",
"yarn-application".
Options for yarn-cluster mode:
-m,--jobmanager <arg> Set to yarn-cluster to use YARN execution
mode.
-yid,--yarnapplicationId <arg> Attach to running YARN session
-z,--zookeeperNamespace <arg> Namespace to create the Zookeeper
sub-paths for high availability mode
Options for default mode:
-D <property=value> Allows specifying multiple generic
configuration options. The available
options can be found at
https://ci.apache.org/projects/flink/flink-
docs-stable/ops/config.html
-m,--jobmanager <arg> Address of the JobManager to which to
connect. Use this flag to connect to a
different JobManager than the one specified
in the configuration. Attention: This
option is respected only if the
high-availability configuration is NONE.
-z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths
for high availability mode
Action "savepoint" triggers savepoints for a running job or disposes existing ones.
Syntax: savepoint [OPTIONS] <Job ID> [<target directory>]
"savepoint" action options:
-d,--dispose <arg> Path of savepoint to dispose.
-j,--jarfile <jarfile> Flink program JAR file.
Options for Generic CLI mode:
-D <property=value> Allows specifying multiple generic configuration
options. The available options can be found at
https://ci.apache.org/projects/flink/flink-docs-stabl
e/ops/config.html
-e,--executor <arg> DEPRECATED: Please use the -t option instead which is
also available with the "Application Mode".
The name of the executor to be used for executing the
given job, which is equivalent to the
"execution.target" config option. The currently
available executors are: "remote", "local",
"kubernetes-session", "yarn-per-job", "yarn-session".
-t,--target <arg> The deployment target for the given application,
which is equivalent to the "execution.target" config
option. For the "run" action the currently available
targets are: "remote", "local", "kubernetes-session",
"yarn-per-job", "yarn-session". For the
"run-application" action the currently available
targets are: "kubernetes-application",
"yarn-application".
Options for yarn-cluster mode:
-m,--jobmanager <arg> Set to yarn-cluster to use YARN execution
mode.
-yid,--yarnapplicationId <arg> Attach to running YARN session
-z,--zookeeperNamespace <arg> Namespace to create the Zookeeper
sub-paths for high availability mode
Options for default mode:
-D <property=value> Allows specifying multiple generic
configuration options. The available
options can be found at
https://ci.apache.org/projects/flink/flink-
docs-stable/ops/config.html
-m,--jobmanager <arg> Address of the JobManager to which to
connect. Use this flag to connect to a
different JobManager than the one specified
in the configuration. Attention: This
option is respected only if the
high-availability configuration is NONE.
-z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths
for high availability modeFlink入门案例
前置说明
注意:入门案例使用DataSet后续就不再使用了,而是使用流批一体的DataStream
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/batch/
准备环境
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>cn.bigdata</groupId>
<artifactId>flink_study_47</artifactId>
<version>1.0-SNAPSHOT</version>
<!-- 指定仓库位置,依次为aliyun、apache和cloudera仓库 -->
<repositories>
<repository>
<id>aliyun</id>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
</repository>
<repository>
<id>apache</id>
<url>https://repository.apache.org/content/repositories/snapshots/</url>
</repository>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
</repositories>
<properties>
<encoding>UTF-8</encoding>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<java.version>1.8</java.version>
<scala.version>2.12</scala.version>
<flink.version>1.12.0</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- flink执行计划,这是1.9版本之前的-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- blink执行计划,1.11+默认的-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
</dependency>
<!--<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep_2.12</artifactId>
<version>${flink.version}</version>
</dependency>-->
<!-- flink连接器-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-connector-kafka_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- <dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-filesystem_2.12</artifactId>
<version>${flink.version}</version>
</dependency>-->
<!--<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-jdbc_2.12</artifactId>
<version>${flink.version}</version>
</dependency>-->
<!--<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-parquet_2.12</artifactId>
<version>${flink.version}</version>
</dependency>-->
<!--<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.9.2</version>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
<version>1.10.0</version>
</dependency>-->
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
<exclusions>
<exclusion>
<artifactId>flink-streaming-java_2.11</artifactId>
<groupId>org.apache.flink</groupId>
</exclusion>
<exclusion>
<artifactId>flink-runtime_2.11</artifactId>
<groupId>org.apache.flink</groupId>
</exclusion>
<exclusion>
<artifactId>flink-core</artifactId>
<groupId>org.apache.flink</groupId>
</exclusion>
<exclusion>
<artifactId>flink-java</artifactId>
<groupId>org.apache.flink</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hive_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-metastore</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-hadoop-2-uber</artifactId>
<version>2.7.5-10.0</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.38</version>
<!--<version>8.0.20</version>-->
</dependency>
<!-- 高性能异步组件:Vertx-->
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-core</artifactId>
<version>3.9.0</version>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-jdbc-client</artifactId>
<version>3.9.0</version>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-redis-client</artifactId>
<version>3.9.0</version>
</dependency>
<!-- 日志 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.44</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.2</version>
<scope>provided</scope>
</dependency>
<!-- 参考:https://blog.csdn.net/f641385712/article/details/84109098-->
<!--<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
<version>4.4</version>
</dependency>-->
<!--<dependency>
<groupId>org.apache.thrift</groupId>
<artifactId>libfb303</artifactId>
<version>0.9.3</version>
<type>pom</type>
<scope>provided</scope>
</dependency>-->
<!--<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>28.2-jre</version>
</dependency>-->
</dependencies>
<build>
<sourceDirectory>src/main/java</sourceDirectory>
<plugins>
<!-- 编译插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<!--<encoding>${project.build.sourceEncoding}</encoding>-->
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.18.1</version>
<configuration>
<useFile>false</useFile>
<disableXmlReport>true</disableXmlReport>
<includes>
<include>**/*Test.*</include>
<include>**/*Suite.*</include>
</includes>
</configuration>
</plugin>
<!-- 打包插件(会包含所有依赖) -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<!--
zip -d learn_spark.jar META-INF/*.RSA META-INF/*.DSA META-INF/*.SF -->
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<!-- 设置jar包的入口类(可选) -->
<mainClass></mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>代码实现-DataSet-了解
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
/**
* Author flink
* Desc 演示Flink-DataSet-API-实现WordCount
*/
public class WordCount {
public static void main(String[] args) throws Exception {
//TODO 0.env
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//TODO 1.source
DataSet<String> lines = env.fromElements("flink hadoop spark", "flink hadoop spark", "flink hadoop", "flink");
//TODO 2.transformation
//切割
/*
@FunctionalInterface
public interface FlatMapFunction<T, O> extends Function, Serializable {
void flatMap(T value, Collector<O> out) throws Exception;
}
*/
DataSet<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
//value表示每一行数据
String[] arr = value.split(" ");
for (String word : arr) {
out.collect(word);
}
}
});
//记为1
/*
@FunctionalInterface
public interface MapFunction<T, O> extends Function, Serializable {
O map(T value) throws Exception;
}
*/
DataSet<Tuple2<String, Integer>> wordAndOne = words.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String value) throws Exception {
//value就是每一个单词
return Tuple2.of(value, 1);
}
});
//分组
UnsortedGrouping<Tuple2<String, Integer>> grouped = wordAndOne.groupBy(0);
//聚合
AggregateOperator<Tuple2<String, Integer>> result = grouped.sum(1);
//TODO 3.sink
result.print();
}
}代码实现-DataStream-匿名内部类-处理批
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
/**
* Author
* Desc 演示Flink-DataStream-API-实现WordCount
* 注意:在Flink1.12中DataStream既支持流处理也支持批处理,如何区分?
*/
public class WordCount2 {
public static void main(String[] args) throws Exception {
//TODO 0.env
//ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//env.setRuntimeMode(RuntimeExecutionMode.BATCH);//注意:使用DataStream实现批处理
//env.setRuntimeMode(RuntimeExecutionMode.STREAMING);//注意:使用DataStream实现流处理
//env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//注意:使用DataStream根据数据源自动选择使用流还是批
//TODO 1.source
//DataSet<String> lines = env.fromElements("flink hadoop spark", "flink hadoop spark", "flink hadoop", "flink");
DataStream<String> lines = env.fromElements("flink hadoop spark", "flink hadoop spark", "flink hadoop", "flink");
//TODO 2.transformation
//切割
/*
@FunctionalInterface
public interface FlatMapFunction<T, O> extends Function, Serializable {
void flatMap(T value, Collector<O> out) throws Exception;
}
*/
DataStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
//value就是每一行数据
String[] arr = value.split(" ");
for (String word : arr) {
out.collect(word);
}
}
});
//记为1
/*
@FunctionalInterface
public interface MapFunction<T, O> extends Function, Serializable {
O map(T value) throws Exception;
}
*/
DataStream<Tuple2<String, Integer>> wordAndOne = words.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String value) throws Exception {
//value就是一个个单词
return Tuple2.of(value, 1);
}
});
//分组:注意DataSet中分组是groupBy,DataStream分组是keyBy
//wordAndOne.keyBy(0);
/*
@FunctionalInterface
public interface KeySelector<IN, KEY> extends Function, Serializable {
KEY getKey(IN value) throws Exception;
}
*/
KeyedStream<Tuple2<String, Integer>, String> grouped = wordAndOne.keyBy(t -> t.f0);
//聚合
SingleOutputStreamOperator<Tuple2<String, Integer>> result = grouped.sum(1);
//TODO 3.sink
result.print();
//TODO 4.execute/启动并等待程序结束
env.execute();
}
}代码实现-DataStream-匿名内部类-处理流
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
/**
*
* Desc 演示Flink-DataStream-API-实现WordCount
* 注意:在Flink1.12中DataStream既支持流处理也支持批处理,如何区分?
*/
public class WordCount3 {
public static void main(String[] args) throws Exception {
//TODO 0.env
//ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//env.setRuntimeMode(RuntimeExecutionMode.BATCH);//注意:使用DataStream实现批处理
//env.setRuntimeMode(RuntimeExecutionMode.STREAMING);//注意:使用DataStream实现流处理
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//注意:使用DataStream根据数据源自动选择使用流还是批
//TODO 1.source
//DataSet<String> lines = env.fromElements("flink hadoop spark", "flink hadoop spark", "flink hadoop", "flink");
//DataStream<String> lines = env.fromElements("flink hadoop spark", "flink hadoop spark", "flink hadoop", "flink");
DataStream<String> lines = env.socketTextStream("node1", 9999);
//TODO 2.transformation
//切割
/*
@FunctionalInterface
public interface FlatMapFunction<T, O> extends Function, Serializable {
void flatMap(T value, Collector<O> out) throws Exception;
}
*/
DataStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
//value就是每一行数据
String[] arr = value.split(" ");
for (String word : arr) {
out.collect(word);
}
}
});
//记为1
/*
@FunctionalInterface
public interface MapFunction<T, O> extends Function, Serializable {
O map(T value) throws Exception;
}
*/
DataStream<Tuple2<String, Integer>> wordAndOne = words.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String value) throws Exception {
//value就是一个个单词
return Tuple2.of(value, 1);
}
});
//分组:注意DataSet中分组是groupBy,DataStream分组是keyBy
//wordAndOne.keyBy(0);
/*
@FunctionalInterface
public interface KeySelector<IN, KEY> extends Function, Serializable {
KEY getKey(IN value) throws Exception;
}
*/
KeyedStream<Tuple2<String, Integer>, String> grouped = wordAndOne.keyBy(t -> t.f0);
//聚合
SingleOutputStreamOperator<Tuple2<String, Integer>> result = grouped.sum(1);
//TODO 3.sink
result.print();
//TODO 4.execute/启动并等待程序结束
env.execute();
}
}代码实现-DataStream-Lambda
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.util.Arrays;
/**
* Author flink
* Desc 演示Flink-DataStream-API-实现WordCount
* 注意:在Flink1.12中DataStream既支持流处理也支持批处理,如何区分?
*/
public class WordCount4 {
public static void main(String[] args) throws Exception {
//TODO 0.env
//ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//env.setRuntimeMode(RuntimeExecutionMode.BATCH);//注意:使用DataStream实现批处理
//env.setRuntimeMode(RuntimeExecutionMode.STREAMING);//注意:使用DataStream实现流处理
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//注意:使用DataStream根据数据源自动选择使用流还是批
//TODO 1.source
//DataSet<String> lines = env.fromElements("flink hadoop spark", "flink hadoop spark", "flink hadoop", "flink");
DataStream<String> lines = env.fromElements("flink hadoop spark", "flink hadoop spark", "flink hadoop", "flink");
//TODO 2.transformation
//切割
/*
@FunctionalInterface
public interface FlatMapFunction<T, O> extends Function, Serializable {
void flatMap(T value, Collector<O> out) throws Exception;
}
*/
/*DataStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
//value就是每一行数据
String[] arr = value.split(" ");
for (String word : arr) {
out.collect(word);
}
}
});*/
SingleOutputStreamOperator<String> words = lines.flatMap(
(String value, Collector<String> out) -> Arrays.stream(value.split(" ")).forEach(out::collect)
).returns(Types.STRING);
//记为1
/*
@FunctionalInterface
public interface MapFunction<T, O> extends Function, Serializable {
O map(T value) throws Exception;
}
*/
/*DataStream<Tuple2<String, Integer>> wordAndOne = words.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String value) throws Exception {
//value就是一个个单词
return Tuple2.of(value, 1);
}
});*/
DataStream<Tuple2<String, Integer>> wordAndOne = words.map(
(String value) -> Tuple2.of(value, 1)
).returns(Types.TUPLE(Types.STRING,Types.INT));
//分组:注意DataSet中分组是groupBy,DataStream分组是keyBy
//wordAndOne.keyBy(0);
/*
@FunctionalInterface
public interface KeySelector<IN, KEY> extends Function, Serializable {
KEY getKey(IN value) throws Exception;
}
*/
KeyedStream<Tuple2<String, Integer>, String> grouped = wordAndOne.keyBy(t -> t.f0);
//聚合
SingleOutputStreamOperator<Tuple2<String, Integer>> result = grouped.sum(1);
//TODO 3.sink
result.print();
//TODO 4.execute/启动并等待程序结束
env.execute();
}
}代码实现-On-Yarn-掌握
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.util.Arrays;
/**
*
* Desc 演示Flink-DataStream-API-实现WordCount
* 注意:在Flink1.12中DataStream既支持流处理也支持批处理,如何区分?
*/
public class WordCount5_Yarn {
public static void main(String[] args) throws Exception {
ParameterTool parameterTool = ParameterTool.fromArgs(args);
String output = "";
if (parameterTool.has("output")) {
output = parameterTool.get("output");
System.out.println("指定了输出路径使用:" + output);
} else {
output = "hdfs://node1:8020/wordcount/output47_";
System.out.println("可以指定输出路径使用 --output ,没有指定使用默认的:" + output);
}
//TODO 0.env
//ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//env.setRuntimeMode(RuntimeExecutionMode.BATCH);//注意:使用DataStream实现批处理
//env.setRuntimeMode(RuntimeExecutionMode.STREAMING);//注意:使用DataStream实现流处理
//env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//注意:使用DataStream根据数据源自动选择使用流还是批
//TODO 1.source
//DataSet<String> lines = env.fromElements("flink hadoop spark", "flink hadoop spark", "flink hadoop", "flink");
DataStream<String> lines = env.fromElements("flink hadoop spark", "flink hadoop spark", "flink hadoop", "flink");
//TODO 2.transformation
//切割
/*
@FunctionalInterface
public interface FlatMapFunction<T, O> extends Function, Serializable {
void flatMap(T value, Collector<O> out) throws Exception;
}
*/
/*DataStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
//value就是每一行数据
String[] arr = value.split(" ");
for (String word : arr) {
out.collect(word);
}
}
});*/
SingleOutputStreamOperator<String> words = lines.flatMap(
(String value, Collector<String> out) -> Arrays.stream(value.split(" ")).forEach(out::collect)
).returns(Types.STRING);
//记为1
/*
@FunctionalInterface
public interface MapFunction<T, O> extends Function, Serializable {
O map(T value) throws Exception;
}
*/
/*DataStream<Tuple2<String, Integer>> wordAndOne = words.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String value) throws Exception {
//value就是一个个单词
return Tuple2.of(value, 1);
}
});*/
DataStream<Tuple2<String, Integer>> wordAndOne = words.map(
(String value) -> Tuple2.of(value, 1)
).returns(Types.TUPLE(Types.STRING, Types.INT));
//分组:注意DataSet中分组是groupBy,DataStream分组是keyBy
//wordAndOne.keyBy(0);
/*
@FunctionalInterface
public interface KeySelector<IN, KEY> extends Function, Serializable {
KEY getKey(IN value) throws Exception;
}
*/
KeyedStream<Tuple2<String, Integer>, String> grouped = wordAndOne.keyBy(t -> t.f0);
//聚合
SingleOutputStreamOperator<Tuple2<String, Integer>> result = grouped.sum(1);
//TODO 3.sink
//如果执行报hdfs权限相关错误,可以执行 hadoop fs -chmod -R 777 /
System.setProperty("HADOOP_USER_NAME", "root");//设置用户名
//result.print();
//result.writeAsText("hdfs://node1:8020/wordcount/output47_"+System.currentTimeMillis()).setParallelism(1);
result.writeAsText(output + System.currentTimeMillis()).setParallelism(1);
//TODO 4.execute/启动并等待程序结束
env.execute();
}
}打包改名上传
提交
/export/server/flink/bin/flink run -Dexecution.runtime-mode=BATCH -m yarn-cluster -yjm 1024 -ytm 1024 -c cn.flink.hello.WordCount5_Yarn /root/wc.jar --output hdfs://node1:8020/wordcount/output_xx注意
RuntimeExecutionMode.BATCH//使用DataStream实现批处理
RuntimeExecutionMode.STREAMING//使用DataStream实现流处理
RuntimeExecutionMode.AUTOMATIC//使用DataStream根据数据源自动选择使用流还是批
//如果不指定,默认是流在后续的Flink开发中,把一切数据源看做流即可或者使用AUTOMATIC就行了
Flink1.12使用StreamingFileSink写hdfs会有truncate file fail失败问题。
解决方案:
1.使用cdh版本的hadoop
2.手动释放租约(spark读取的时候进行修复)
Flink核心概念
角色分工
不同角色在作业流程中的位置
DataFlow
https://ci.apache.org/projects/flink/flink-docs-release-1.12/concepts/glossary.html
DataFlow、Operator、Partition、Parallelism、SubTask
OperatorChain和Task
TaskSlot和TaskSlotSharing
执行流程图生成
流处理说明
无边界的流unbounded stream: 真正的流数据
Operator分类
Source
基于集合
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Arrays;
/**
* Author
* Desc 演示DataStream-Source-基于集合
*/
public class SourceDemo01_Collection {
public static void main(String[] args) throws Exception {
//TODO 0.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
//TODO 1.source
DataStream<String> ds1 = env.fromElements("hadoop spark flink", "hadoop spark flink");
DataStream<String> ds2 = env.fromCollection(Arrays.asList("hadoop spark flink", "hadoop spark flink"));
DataStream<Long> ds3 = env.generateSequence(1, 100);
DataStream<Long> ds4 = env.fromSequence(1, 100);
//TODO 2.transformation
//TODO 3.sink
ds1.print();
ds2.print();
ds3.print();
ds4.print();
//TODO 4.execute
env.execute();
}
}基于文件
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* Author
* Desc 演示DataStream-Source-基于本地/HDFS的文件/文件夹/压缩文件
*/
public class SourceDemo02_File {
public static void main(String[] args) throws Exception {
//TODO 0.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
//TODO 1.source
DataStream<String> ds1 = env.readTextFile("data/input/words.txt");
DataStream<String> ds2 = env.readTextFile("data/input/dir");
DataStream<String> ds3 = env.readTextFile("data/input/wordcount.txt.gz");
//TODO 2.transformation
//TODO 3.sink
ds1.print();
ds2.print();
ds3.print();
//TODO 4.execute
env.execute();
}
}基于Socket
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
/**
* Author
* Desc 演示DataStream-Source-基于Socket
*/
public class SourceDemo03_Socket {
public static void main(String[] args) throws Exception {
//TODO 0.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
//TODO 1.source
DataStream<String> lines = env.socketTextStream("node1", 9999);
//TODO 2.transformation
/*SingleOutputStreamOperator<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
String[] arr = value.split(" ");
for (String word : arr) {
out.collect(word);
}
}
});
words.map(new MapFunction<String, Tuple2<String,Integer>>() {
@Override
public Tuple2<String, Integer> map(String value) throws Exception {
return Tuple2.of(value,1);
}
});*/
//注意:下面的操作将上面的2步合成了1步,直接切割单词并记为1返回
SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] arr = value.split(" ");
for (String word : arr) {
out.collect(Tuple2.of(word, 1));
}
}
});
SingleOutputStreamOperator<Tuple2<String, Integer>> result = wordAndOne.keyBy(t -> t.f0).sum(1);
//TODO 3.sink
result.print();
//TODO 4.execute
env.execute();
}
}自定义Source-随机订单数据
注意: lombok的使用
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import java.util.Random;
import java.util.UUID;
/**
* Author
* Desc 演示DataStream-Source-自定义数据源
* 需求:
*/
public class SourceDemo04_Customer {
public static void main(String[] args) throws Exception {
//TODO 0.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
//TODO 1.source
DataStream<Order> orderDS = env.addSource(new MyOrderSource()).setParallelism(2);
//TODO 2.transformation
//TODO 3.sink
orderDS.print();
//TODO 4.execute
env.execute();
}
@Data
@AllArgsConstructor
@NoArgsConstructor
public static class Order{
private String id;
private Integer userId;
private Integer money;
private Long createTime;
}
public static class MyOrderSource extends RichParallelSourceFunction<Order>{
private Boolean flag = true;
//执行并生成数据
@Override
public void run(SourceContext<Order> ctx) throws Exception {
Random random = new Random();
while (flag) {
String oid = UUID.randomUUID().toString();
int userId = random.nextInt(3);
int money = random.nextInt(101);
long createTime = System.currentTimeMillis();
ctx.collect(new Order(oid,userId,money,createTime));
Thread.sleep(1000);
}
}
//执行cancel命令的时候执行
@Override
public void cancel() {
flag = false;
}
}
}自定义Source-MySQL
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
/**
* Author
* Desc 演示DataStream-Source-自定义数据源-MySQL
* 需求:
*/
public class SourceDemo05_Customer_MySQL {
public static void main(String[] args) throws Exception {
//TODO 0.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
//TODO 1.source
DataStream<Student> studentDS = env.addSource(new MySQLSource()).setParallelism(1);
//TODO 2.transformation
//TODO 3.sink
studentDS.print();
//TODO 4.execute
env.execute();
}
/*
CREATE TABLE `t_student` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`name` varchar(255) DEFAULT NULL,
`age` int(11) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=7 DEFAULT CHARSET=utf8;
INSERT INTO `t_student` VALUES ('1', 'jack', '18');
INSERT INTO `t_student` VALUES ('2', 'tom', '19');
INSERT INTO `t_student` VALUES ('3', 'rose', '20');
INSERT INTO `t_student` VALUES ('4', 'tom', '19');
INSERT INTO `t_student` VALUES ('5', 'jack', '18');
INSERT INTO `t_student` VALUES ('6', 'rose', '20');
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class Student {
private Integer id;
private String name;
private Integer age;
}
public static class MySQLSource extends RichParallelSourceFunction<Student> {
private boolean flag = true;
private Connection conn = null;
private PreparedStatement ps =null;
private ResultSet rs = null;
//open只执行一次,适合开启资源
@Override
public void open(Configuration parameters) throws Exception {
conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata", "root", "root");
String sql = "select id,name,age from t_student";
ps = conn.prepareStatement(sql);
}
@Override
public void run(SourceContext<Student> ctx) throws Exception {
while (flag) {
rs = ps.executeQuery();
while (rs.next()) {
int id = rs.getInt("id");
String name = rs.getString("name");
int age = rs.getInt("age");
ctx.collect(new Student(id,name,age));
}
Thread.sleep(5000);
}
}
//接收到cancel命令时取消数据生成
@Override
public void cancel() {
flag = false;
}
//close里面关闭资源 重要 !否则会出现资源耗尽
@Override
public void close() throws Exception {
if(conn != null) conn.close();
if(ps != null) ps.close();
if(rs != null) rs.close();
}
}
}Kafka Consumer/Source
参数
env.addSource(new Kafka Consumer/Source(参数))
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;
/**
* Author
* Desc 演示Flink-Connectors-KafkaComsumer/Source
*/
public class KafkaComsumerDemo {
public static void main(String[] args) throws Exception {
//TODO 0.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
//TODO 1.source
//准备kafka连接参数
Properties props = new Properties();
props.setProperty("bootstrap.servers", "node1:9092");//集群地址
props.setProperty("group.id", "flink");//消费者组id
props.setProperty("auto.offset.reset","latest");//latest有offset记录从记录位置开始消费,没有记录从最新的/最后的消息开始消费 /earliest有offset记录从记录位置开始消费,没有记录从最早的/最开始的消息开始消费
props.setProperty("flink.partition-discovery.interval-millis","5000");//会开启一个后台线程每隔5s检测一下Kafka的分区情况,实现动态分区检测,要使用字符串格式,否则返回值是null,设置不生效,比如
// props.setProperty("flink.partition-discovery.interval-millis", 60000);
props.setProperty("enable.auto.commit", "true");//自动提交(提交到默认主题,后续学习了Checkpoint后随着Checkpoint存储在Checkpoint和默认主题中)
props.setProperty("auto.commit.interval.ms", "2000");//自动提交的时间间隔
//使用连接参数创建FlinkKafkaConsumer/kafkaSource
FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<String>("flink_kafka", new SimpleStringSchema(), props);
//使用kafkaSource
DataStream<String> kafkaDS = env.addSource(kafkaSource);
//TODO 2.transformation
//TODO 3.sink
kafkaDS.print();
//TODO 4.execute
env.execute();
}
}
//准备主题 /export/server/kafka/bin/kafka-topics.sh --create --zookeeper node1:2181 --replication-factor 2 --partitions 3 --topic flink_kafka
//启动控制台生产者发送数据 /export/server/kafka/bin/kafka-console-producer.sh --broker-list node1:9092 --topic flink_kafka
//启动程序FlinkKafkaConsumer
//观察控制台输出结果Transformation
基本操作
常见的transformation算子:map/filter/rebalance/flatMap/keyBy/sum/reduce...
和之前学习的Scala/Spark里面的一样的意思
需求
对流数据中的单词进行统计,排除敏感词TMD(Theater Missile Defense 战区导弹防御)
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
/**
* Author
* Desc 演示DataStream-Transformation-基本操作
*/
public class TransformationDemo01 {
public static void main(String[] args) throws Exception {
//TODO 0.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
//TODO 1.source
DataStream<String> lines = env.socketTextStream("node1", 9999);
//TODO 2.transformation
DataStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
String[] arr = value.split(" ");
for (String word : arr) {
out.collect(word);
}
}
});
DataStream<String> filted = words.filter(new FilterFunction<String>() {
@Override
public boolean filter(String value) throws Exception {
return !value.equals("TMD");//如果是TMD则返回false表示过滤掉
}
});
SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = filted.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String value) throws Exception {
return Tuple2.of(value, 1);
}
});
KeyedStream<Tuple2<String, Integer>, String> grouped = wordAndOne.keyBy(t -> t.f0);
//SingleOutputStreamOperator<Tuple2<String, Integer>> result = grouped.sum(1);
SingleOutputStreamOperator<Tuple2<String, Integer>> result = grouped.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
//Tuple2<String, Integer> value1 :进来的(单词,历史值)
//Tuple2<String, Integer> value2 :进来的(单词,1)
//需要返回(单词,数量)
return Tuple2.of(value1.f0, value1.f1 + value2.f1); //_+_
}
});
//TODO 3.sink
result.print();
//TODO 4.execute
env.execute();
}
}合并和连接
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
/**
* Author
* Desc 演示DataStream-Transformation-合并和连接操作
*/
public class TransformationDemo02 {
public static void main(String[] args) throws Exception {
//TODO 0.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
//TODO 1.source
DataStream<String> ds1 = env.fromElements("hadoop", "spark", "flink");
DataStream<String> ds2 = env.fromElements("hadoop", "spark", "flink");
DataStream<Long> ds3 = env.fromElements(1L, 2L, 3L);
//TODO 2.transformation
DataStream<String> result1 = ds1.union(ds2);//注意union能合并同类型
//ds1.union(ds3);//注意union不可以合并不同类型
ConnectedStreams<String, String> result2 = ds1.connect(ds2);//注意:connect可以合并同类型
ConnectedStreams<String, Long> result3 = ds1.connect(ds3);//注意conncet可以合并不同类型
/*
public interface CoMapFunction<IN1, IN2, OUT> extends Function, Serializable {
OUT map1(IN1 value) throws Exception;
OUT map2(IN2 value) throws Exception;
}
*/
SingleOutputStreamOperator<String> result = result3.map(new CoMapFunction<String, Long, String>() {
@Override
public String map1(String value) throws Exception {
return "String:" + value;
}
@Override
public String map2(Long value) throws Exception {
return "Long:" + value;
}
});
//TODO 3.sink
result1.print();
//result2.print();//注意:connect之后需要做其他的处理,不能直接输出
//result3.print();//注意:connect之后需要做其他的处理,不能直接输出
result.print();
//TODO 4.execute
env.execute();
}
}拆分和选择
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
/**
* Author
* Desc 演示DataStream-Transformation-拆分(split)和选择(select)操作
* 注意split和select在flink1.12中已经过期并移除了
* 所以得使用outPutTag和process来实现
* 需求:对流中的数据按照奇数和偶数拆分并选择
*/
public class TransformationDemo03 {
public static void main(String[] args) throws Exception {
//TODO 0.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
//TODO 1.source
DataStreamSource<Integer> ds = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
//TODO 2.transformation
//需求:对流中的数据按照奇数和偶数拆分并选择
OutputTag<Integer> oddTag = new OutputTag<>("奇数", TypeInformation.of(Integer.class));
OutputTag<Integer> evenTag = new OutputTag<>("偶数",TypeInformation.of(Integer.class));
/*
public abstract class ProcessFunction<I, O> extends AbstractRichFunction {
public abstract void processElement(I value, ProcessFunction.Context ctx, Collector<O> out) throws Exception;
}
*/
SingleOutputStreamOperator<Integer> result = ds.process(new ProcessFunction<Integer, Integer>() {
@Override
public void processElement(Integer value, Context ctx, Collector<Integer> out) throws Exception {
//out收集完的还是放在一起的,ctx可以将数据放到不同的OutputTag
if (value % 2 == 0) {
ctx.output(evenTag, value);
} else {
ctx.output(oddTag, value);
}
}
});
DataStream<Integer> oddResult = result.getSideOutput(oddTag);
DataStream<Integer> evenResult = result.getSideOutput(evenTag);
//TODO 3.sink
System.out.println(oddTag);//OutputTag(Integer, 奇数)
System.out.println(evenTag);//OutputTag(Integer, 偶数)
oddResult.print("奇数:");
evenResult.print("偶数:");
//TODO 4.execute
env.execute();
}
}rebalance重平衡分区
解决数据倾斜的问题
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* Author
* Desc 演示DataStream-Transformation-rebalance-重平衡分区
*/
public class TransformationDemo04 {
public static void main(String[] args) throws Exception {
//TODO 0.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
//TODO 1.source
DataStream<Long> longDS = env.fromSequence(0, 100);
//下面的操作相当于将数据随机分配一下,有可能出现数据倾斜
DataStream<Long> filterDS = longDS.filter(new FilterFunction<Long>() {
@Override
public boolean filter(Long num) throws Exception {
return num > 10;
}
});
//TODO 2.transformation
//没有经过rebalance有可能出现数据倾斜
SingleOutputStreamOperator<Tuple2<Integer, Integer>> result1 = filterDS
.map(new RichMapFunction<Long, Tuple2<Integer, Integer>>() {
@Override
public Tuple2<Integer, Integer> map(Long value) throws Exception {
int subTaskId = getRuntimeContext().getIndexOfThisSubtask();//子任务id/分区编号
return Tuple2.of(subTaskId, 1);
}
//按照子任务id/分区编号分组,并统计每个子任务/分区中有几个元素
}).keyBy(t -> t.f0).sum(1);
//调用了rebalance解决了数据倾斜
SingleOutputStreamOperator<Tuple2<Integer, Integer>> result2 = filterDS.rebalance()
.map(new RichMapFunction<Long, Tuple2<Integer, Integer>>() {
@Override
public Tuple2<Integer, Integer> map(Long value) throws Exception {
int subTaskId = getRuntimeContext().getIndexOfThisSubtask();//子任务id/分区编号
return Tuple2.of(subTaskId, 1);
}
//按照子任务id/分区编号分组,并统计每个子任务/分区中有几个元素
}).keyBy(t -> t.f0).sum(1);
//TODO 3.sink
result1.print("result1");
result2.print("result2");
//TODO 4.execute
env.execute();
}
}其他分区操作
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
/**
* Author
* Desc 演示DataStream-Transformation-各种分区
*/
public class TransformationDemo05 {
public static void main(String[] args) throws Exception {
//TODO 0.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
//TODO 1.source
DataStream<String> linesDS = env.readTextFile("data/input/words.txt");
SingleOutputStreamOperator<Tuple2<String, Integer>> tupleDS = linesDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] words = value.split(" ");
for (String word : words) {
out.collect(Tuple2.of(word, 1));
}
}
});
//TODO 2.transformation
DataStream<Tuple2<String, Integer>> result1 = tupleDS.global();
DataStream<Tuple2<String, Integer>> result2 = tupleDS.broadcast();
DataStream<Tuple2<String, Integer>> result3 = tupleDS.forward();
DataStream<Tuple2<String, Integer>> result4 = tupleDS.shuffle();
DataStream<Tuple2<String, Integer>> result5 = tupleDS.rebalance();
DataStream<Tuple2<String, Integer>> result6 = tupleDS.rescale();
DataStream<Tuple2<String, Integer>> result7 = tupleDS.partitionCustom(new MyPartitioner(), t -> t.f0);
//TODO 3.sink
result1.print("result1");
result2.print("result2");
result3.print("result3");
result4.print("result4");
result5.print("result5");
result6.print("result6");
result7.print("result7");
//TODO 4.execute
env.execute();
}
public static class MyPartitioner implements Partitioner<String>{
@Override
public int partition(String key, int numPartitions) {
//if(key.equals("北京")) return 0; 这里写自己的分区逻辑即可
return 0;
}
}
}Sink
基于控制台和文件
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* Author
* Desc 演示DataStream-Sink-基于控制台和文件
*/
public class SinkDemo01 {
public static void main(String[] args) throws Exception {
//TODO 0.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
//TODO 1.source
DataStream<String> ds = env.readTextFile("data/input/words.txt");
//TODO 2.transformation
//TODO 3.sink
ds.print();
ds.print("输出标识");
ds.printToErr();//会在控制台上以红色输出
ds.printToErr("输出标识");//会在控制台上以红色输出
ds.writeAsText("data/output/result1").setParallelism(1);
ds.writeAsText("data/output/result2").setParallelism(2);
//TODO 4.execute
env.execute();
}
}自定义Sink
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
/**
* Author
* Desc 演示DataStream-Sink-自定义Sink
*/
public class SinkDemo02 {
public static void main(String[] args) throws Exception {
//TODO 0.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
//TODO 1.source
DataStream<Student> studentDS = env.fromElements(new Student(null, "tony", 18));
//TODO 2.transformation
//TODO 3.sink
studentDS.addSink(new MySQLSink());
//TODO 4.execute
env.execute();
}
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class Student {
private Integer id;
private String name;
private Integer age;
}
//与RichSourceFunction没有一个专门的 RichParallelSinkFunction,RichSinkFunction 本身可以并行执行
public static class MySQLSink extends RichSinkFunction<Student> {
private Connection conn = null;
private PreparedStatement ps =null;
@Override
public void open(Configuration parameters) throws Exception {
conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata", "root", "root");
String sql = "INSERT INTO `t_student` (`id`, `name`, `age`) VALUES (null, ?, ?);";
ps = conn.prepareStatement(sql);
}
@Override
public void invoke(Student value, Context context) throws Exception {
//设置?占位符参数值
ps.setString(1,value.getName());
ps.setInt(2,value.getAge());
//执行sql
ps.executeUpdate();
}
@Override
public void close() throws Exception {
if(conn != null) conn.close();
if(ps != null) ps.close();
}
}
}JDBC
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* Author
* Desc 演示Flink官方提供的JdbcSink
*/
public class JDBCDemo {
public static void main(String[] args) throws Exception {
//TODO 0.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
//TODO 1.source
DataStream<Student> studentDS = env.fromElements(new Student(null, "tony2", 18));
//TODO 2.transformation
//TODO 3.sink
studentDS.addSink(JdbcSink.sink(
"INSERT INTO `t_student` (`id`, `name`, `age`) VALUES (null, ?, ?)",
(ps, value) -> {
ps.setString(1, value.getName());
ps.setInt(2, value.getAge());
}, new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:mysql://localhost:3306/bigdata")
.withUsername("root")
.withPassword("root")
.withDriverName("com.mysql.jdbc.Driver")
.build()));
//TODO 4.execute
env.execute();
}
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class Student {
private Integer id;
private String name;
private Integer age;
}
}总结
原生的 JdbcSink 是一种高效且易于使用的解决方案,但当你需要:
- 执行复杂的 SQL 操作(如
INSERT ... ON DUPLICATE KEY UPDATE) - 手动管理事务或控制批量写入
- 根据数据特征动态路由到不同的数据库或表
时,JdbcSink 的灵活性不足。此时,自定义 RichSinkFunction 或自定义 JdbcSink 是更合适的解决方案。
Kafka Producer/Sink
控制台生成者 ---> flink_kafka主题 --> Flink -->etl ---> flink_kafka2主题--->控制台消费者
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import java.util.Properties;
/**
* Author
* Desc 演示Flink-Connectors-KafkaComsumer/Source + KafkaProducer/Sink
*/
public class KafkaSinkDemo {
public static void main(String[] args) throws Exception {
//TODO 0.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
//TODO 1.source
//准备kafka连接参数
Properties props = new Properties();
props.setProperty("bootstrap.servers", "node1:9092");//集群地址
props.setProperty("group.id", "flink");//消费者组id
props.setProperty("auto.offset.reset","latest");//latest有offset记录从记录位置开始消费,没有记录从最新的/最后的消息开始消费 /earliest有offset记录从记录位置开始消费,没有记录从最早的/最开始的消息开始消费
props.setProperty("flink.partition-discovery.interval-millis","5000");//会开启一个后台线程每隔5s检测一下Kafka的分区情况,实现动态分区检测
props.setProperty("enable.auto.commit", "true");//自动提交(提交到默认主题,后续学习了Checkpoint后随着Checkpoint存储在Checkpoint和默认主题中)
props.setProperty("auto.commit.interval.ms", "2000");//自动提交的时间间隔
//使用连接参数创建FlinkKafkaConsumer/kafkaSource
FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<String>("flink_kafka", new SimpleStringSchema(), props);
//使用kafkaSource
DataStream<String> kafkaDS = env.addSource(kafkaSource);
//TODO 2.transformation
SingleOutputStreamOperator<String> etlDS = kafkaDS.filter(new FilterFunction<String>() {
@Override
public boolean filter(String value) throws Exception {
return value.contains("success");
}
});
//TODO 3.sink
etlDS.print();
Properties props2 = new Properties();
props2.setProperty("bootstrap.servers", "node1:9092");
FlinkKafkaProducer<String> kafkaSink = new FlinkKafkaProducer<>("flink_kafka2", new SimpleStringSchema(), props2);
etlDS.addSink(kafkaSink);
//TODO 4.execute
env.execute();
}
}
//控制台生成者 ---> flink_kafka主题 --> Flink -->etl ---> flink_kafka2主题--->控制台消费者
//准备主题 /export/server/kafka/bin/kafka-topics.sh --create --zookeeper node1:2181 --replication-factor 2 --partitions 3 --topic flink_kafka
//准备主题 /export/server/kafka/bin/kafka-topics.sh --create --zookeeper node1:2181 --replication-factor 2 --partitions 3 --topic flink_kafka2
//启动控制台生产者发送数据 /export/server/kafka/bin/kafka-console-producer.sh --broker-list node1:9092 --topic flink_kafka
//log:2020-10-10 success xxx
//log:2020-10-10 success xxx
//log:2020-10-10 success xxx
//log:2020-10-10 fail xxx
//启动控制台消费者消费数据 /export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic flink_kafka2 --from-beginning
//启动程序FlinkKafkaConsumer
//观察控制台输出结果Redis
https://bahir.apache.org/docs/flink/current/flink-streaming-redis/
需求:
从Socket接收实时流数据,做WordCount,并将结果写入到Redis
数据结构使用:
单词:数量 (key-String, value-String)
wcresult: 单词:数量 (key-String, value-Hash)
注意: Redis的Key始终是String, value可以是:String/Hash/List/Set/有序Set
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
import org.apache.flink.util.Collector;
/**
* Author
* Desc 演示Flink-Connectors-三方提供的RedisSink
*/
public class RedisDemo {
public static void main(String[] args) throws Exception {
//TODO 0.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
//TODO 1.source
DataStream<String> lines = env.socketTextStream("node1", 9999);
//TODO 2.transformation
SingleOutputStreamOperator<Tuple2<String, Integer>> result = lines.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] arr = value.split(" ");
for (String word : arr) {
out.collect(Tuple2.of(word, 1));
}
}
}).keyBy(t -> t.f0).sum(1);
//TODO 3.sink
result.print();
FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").build();
RedisSink<Tuple2<String, Integer>> redisSink = new RedisSink<Tuple2<String, Integer>>(conf,new MyRedisMapper());
result.addSink(redisSink);
//TODO 4.execute
env.execute();
}
public static class MyRedisMapper implements RedisMapper<Tuple2<String, Integer>>{
@Override
public RedisCommandDescription getCommandDescription() {
//我们选择的数据结构对应的是 key:String("wcresult"),value:Hash(单词,数量),命令为HSET
return new RedisCommandDescription(RedisCommand.HSET,"wcresult");
}
@Override
public String getKeyFromData(Tuple2<String, Integer> t) {
return t.f0;
}
@Override
public String getValueFromData(Tuple2<String, Integer> t) {
return t.f1.toString();
}
}
}Flink四大基石
Window
窗口的分类
基于时间的滑动窗口(掌握)
基于时间的滚动窗口(掌握)
基于数量的滑动窗口(了解)
基于数量的滚动窗口(了解)
API
基于时间的滚动和滑动-掌握
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
/**
* Author
* Desc 演示基于时间的滚动和滑动窗口
*/
public class WindowDemo_1_2 {
public static void main(String[] args) throws Exception {
//TODO 0.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
//TODO 1.source
DataStream<String> lines = env.socketTextStream("node1", 9999);
//TODO 2.transformation
SingleOutputStreamOperator<CartInfo> carDS = lines.map(new MapFunction<String, CartInfo>() {
@Override
public CartInfo map(String value) throws Exception {
String[] arr = value.split(",");
return new CartInfo(arr[0], Integer.parseInt(arr[1]));
}
});
//注意: 需求中要求的是各个路口/红绿灯的结果,所以需要先分组
//carDS.keyBy(car->car.getSensorId())
KeyedStream<CartInfo, String> keyedDS = carDS.keyBy(CartInfo::getSensorId);
// * 需求1:每5秒钟统计一次,最近5秒钟内,各个路口通过红绿灯汽车的数量--基于时间的滚动窗口
//keyedDS.timeWindow(Time.seconds(5))
SingleOutputStreamOperator<CartInfo> result1 = keyedDS
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.sum("count");
// * 需求2:每5秒钟统计一次,最近10秒钟内,各个路口通过红绿灯汽车的数量--基于时间的滑动窗口
SingleOutputStreamOperator<CartInfo> result2 = keyedDS
//of(Time size, Time slide)
.window(SlidingProcessingTimeWindows.of(Time.seconds(10),Time.seconds(5)))
.sum("count");
//TODO 3.sink
//result1.print();
result2.print();
/*
1,5
2,5
3,5
4,5
*/
//TODO 4.execute
env.execute();
}
@Data
@AllArgsConstructor
@NoArgsConstructor
public static class CartInfo {
private String sensorId;//信号灯id
private Integer count;//通过该信号灯的车的数量
}
}基于数量的滚动和滑动
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* Author
* Desc 演示基于数量的滚动和滑动窗口
*/
public class WindowDemo_3_4 {
public static void main(String[] args) throws Exception {
//TODO 0.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
//TODO 1.source
DataStream<String> lines = env.socketTextStream("node1", 9999);
//TODO 2.transformation
SingleOutputStreamOperator<CartInfo> carDS = lines.map(new MapFunction<String, CartInfo>() {
@Override
public CartInfo map(String value) throws Exception {
String[] arr = value.split(",");
return new CartInfo(arr[0], Integer.parseInt(arr[1]));
}
});
//注意: 需求中要求的是各个路口/红绿灯的结果,所以需要先分组
//carDS.keyBy(car->car.getSensorId())
KeyedStream<CartInfo, String> keyedDS = carDS.keyBy(CartInfo::getSensorId);
// * 需求1:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现5次进行统计--基于数量的滚动窗口
SingleOutputStreamOperator<CartInfo> result1 = keyedDS
.countWindow(5)
.sum("count");
// * 需求2:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现3次进行统计--基于数量的滑动窗口
SingleOutputStreamOperator<CartInfo> result2 = keyedDS
.countWindow(5,3)
.sum("count");
//TODO 3.sink
//result1.print();
/*
1,1
1,1
1,1
1,1
2,1
1,1
*/
result2.print();
/*
1,1
1,1
2,1
1,1
2,1
3,1
4,1
*/
//TODO 4.execute
env.execute();
}
@Data
@AllArgsConstructor
@NoArgsConstructor
public static class CartInfo {
private String sensorId;//信号灯id
private Integer count;//通过该信号灯的车的数量
}
}Session会话窗口
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
/**
* Author
* Desc 演示会话窗口
*/
public class WindowDemo_5 {
public static void main(String[] args) throws Exception {
//TODO 0.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
//TODO 1.source
DataStream<String> lines = env.socketTextStream("node1", 9999);
//TODO 2.transformation
SingleOutputStreamOperator<CartInfo> carDS = lines.map(new MapFunction<String, CartInfo>() {
@Override
public CartInfo map(String value) throws Exception {
String[] arr = value.split(",");
return new CartInfo(arr[0], Integer.parseInt(arr[1]));
}
});
//注意: 需求中要求的是各个路口/红绿灯的结果,所以需要先分组
//carDS.keyBy(car->car.getSensorId())
KeyedStream<CartInfo, String> keyedDS = carDS.keyBy(CartInfo::getSensorId);
//需求:设置会话超时时间为10s,10s内没有数据到来,则触发上个窗口的计算(前提是上一个窗口得有数据!)
SingleOutputStreamOperator<CartInfo> result = keyedDS.window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))
.sum("count");
//TODO 3.sink
result.print();
/*
1,1
1,1
2,1
2,1
*/
//TODO 4.execute
env.execute();
}
@Data
@AllArgsConstructor
@NoArgsConstructor
public static class CartInfo {
private String sensorId;//信号灯id
private Integer count;//通过该信号灯的车的数量
}
}session window(会话窗口):一段持续时间内的元素为一个窗口,超过时间间隙的归到另一个窗口。比如我们在浏览器上登录访问某网站,会分配一个session,session在有效期内可以持续访问,当超过有效期需要重新登录,session就相当于一个窗口。
Time/Watermarker
时间分类
EventTime的重要性和Watermarker的引入
Watermarker详解
1.Watermarker本质是时间戳
2.Watermarker = 当前进来的数据最大的事件时间 - 最大允许的数据延迟时间或乱序时间
3.Watermarker 可以通过改变窗口触发计算时机来解决一定程度上的数据乱序或延迟达到的问题
4.Watermarker >= 窗口结束时间 时触发窗口计算
5.当前的最大的事件时间 - 最大允许的数据延迟时间或乱序时间>= 窗口结束时间时触发窗口计算
6.当前的最大的事件时间 >= 窗口结束时间 +最大允许的数据延迟时间或乱序时间时触发窗口计算
7.在 [window_start_time,window_end_time) 中有数据存在,这个窗口是左闭右开的。
D不在10:00:00-10:10:00窗口进行计算
代码演示-验证版-了解
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.commons.lang3.time.FastDateFormat;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.UUID;
/**
* Author
* Desc
* 模拟实时订单数据,格式为: (订单ID,用户ID,订单金额,时间戳/事件时间)
* 要求每隔5s,计算5秒内(基于时间的滚动窗口),每个用户的订单总金额
* 并添加Watermaker来解决一定程度上的数据延迟和数据乱序问题。
*/
public class WatermakerDemo02_Check {
public static void main(String[] args) throws Exception {
FastDateFormat df = FastDateFormat.getInstance("HH:mm:ss");
//TODO 1.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//TODO 2.Source
//模拟实时订单数据(数据有延迟和乱序)
DataStreamSource<Order> orderDS = env.addSource(new SourceFunction<Order>() {
private boolean flag = true;
@Override
public void run(SourceContext<Order> ctx) throws Exception {
Random random = new Random();
while (flag) {
String orderId = UUID.randomUUID().toString();
int userId = random.nextInt(3);
int money = random.nextInt(100);
//模拟数据延迟和乱序!
long eventTime = System.currentTimeMillis() - random.nextInt(5) * 1000;
System.out.println("发送的数据为: "+userId + " : " + df.format(eventTime));
ctx.collect(new Order(orderId, userId, money, eventTime));
//TimeUnit.SECONDS.sleep(1);
Thread.sleep(1000);
}
}
@Override
public void cancel() {
flag = false;
}
});
//TODO 3.Transformation
/*DataStream<Order> watermakerDS = orderDS
.assignTimestampsAndWatermarks(
WatermarkStrategy.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withTimestampAssigner((event, timestamp) -> event.getEventTime())
);*/
//开发中直接使用上面的即可
//学习测试时可以自己实现
DataStream<Order> watermakerDS = orderDS
.assignTimestampsAndWatermarks(
new WatermarkStrategy<Order>() {
@Override
public WatermarkGenerator<Order> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
return new WatermarkGenerator<Order>() {
private int userId = 0;
private long eventTime = 0L;
private final long outOfOrdernessMillis = 3000;
private long maxTimestamp = Long.MIN_VALUE + outOfOrdernessMillis + 1;
@Override
public void onEvent(Order event, long eventTimestamp, WatermarkOutput output) {
userId = event.userId;
eventTime = event.eventTime;
maxTimestamp = Math.max(maxTimestamp, eventTimestamp);
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
//Watermaker = 当前最大事件时间 - 最大允许的延迟时间或乱序时间
Watermark watermark = new Watermark(maxTimestamp - outOfOrdernessMillis - 1);
System.out.println("key:" + userId + ",系统时间:" + df.format(System.currentTimeMillis()) + ",事件时间:" + df.format(eventTime) + ",水印时间:" + df.format(watermark.getTimestamp()));
output.emitWatermark(watermark);
}
};
}
}.withTimestampAssigner((order, timestamp) -> order.getEventTime())
);
//代码走到这里,就已经被添加上Watermaker了!接下来就可以进行窗口计算了
//要求每隔5s,计算5秒内(基于时间的滚动窗口),每个用户的订单总金额
/* DataStream<Order> result = watermakerDS
.keyBy(Order::getUserId)
//.timeWindow(Time.seconds(5), Time.seconds(5))
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.sum("money");*/
//开发中使用上面的代码进行业务计算即可
//学习测试时可以使用下面的代码对数据进行更详细的输出,如输出窗口触发时各个窗口中的数据的事件时间,Watermaker时间
SingleOutputStreamOperator<String> result = watermakerDS
.keyBy(Order::getUserId)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
//把apply中的函数应用在窗口中的数据上
//WindowFunction<IN, OUT, KEY, W extends Window>
.apply(new WindowFunction<Order, String, Integer, TimeWindow>() {
@Override
public void apply(Integer key, TimeWindow window, Iterable<Order> orders, Collector<String> out) throws Exception {
//用来存放当前窗口的数据的格式化后的事件时间
List<String> list = new ArrayList<>();
for (Order order : orders) {
Long eventTime = order.eventTime;
String formatEventTime = df.format(eventTime);
list.add(formatEventTime);
}
String start = df.format(window.getStart());
String end = df.format(window.getEnd());
//现在就已经获取到了当前窗口的开始和结束时间,以及属于该窗口的所有数据的事件时间,把这些拼接并返回
String outStr = String.format("key:%s,窗口开始结束:[%s~%s),属于该窗口的事件时间:%s", key.toString(), start, end, list.toString());
out.collect(outStr);
}
});
//4.Sink
result.print();
//5.execute
env.execute();
}
@Data
@AllArgsConstructor
@NoArgsConstructor
public static class Order {
private String orderId;
private Integer userId;
private Integer money;
private Long eventTime;
}
}使用watermark要注意的问题:
1.某一个分区一直没收到数据,导致没更新低水位,不触发窗口计算-->官方解决方案:1.11版本加入了idle机制。
(structed streaming 是固定周期触发器触发,所以flink也可以通过自定义triger来实现)
2.某个分区生成的水印太快,数据也会滞后输出-->官方解决方案:1.15版本支持了水印对齐。
自定义trigger
在Flink中,Trigger(触发器)和Window(窗口)的设定是相互独立的。Window定义了数据流应该被切分成多大的块进行处理,而Trigger则决定了何时开始这些处理。
如果你的Trigger设定为5秒,而Window设定为10秒,那么理论上来说,每5秒触发器就会检查一次是否满足触发条件。如果满足触发条件(例如数据量达到阈值),那么就会对当前窗口内的数据进行计算。而无论是否满足触发条件,每10秒窗口都会关闭,并对窗口内的数据进行计算1234。
所以,你的计算可能会在5秒时就触发,也可能会在10秒时触发,具体取决于你的触发条件。如果你的触发条件仅仅是时间,那么计算将会在5秒和10秒时各触发一次1234。
import groovy.lang.Tuple;
import org.apache.commons.lang3.time.FastDateFormat;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.*;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import java.sql.Timestamp;
import java.time.Duration;
import java.util.Properties;
import java.util.Random;
import java.util.UUID;
/**
* 2019-10-10 12:00:03,dog
* 2019-10-10 12:00:04,cat
* 2019-10-10 12:00:04,dog
* 2019-10-10 12:00:20,dog
* 2019-10-10 12:00:30,dog
* 2019-10-10 12:00:40,dog
* 2019-10-10 12:00:03,dog
* 2019-10-10 12:00:04,cat
* 2019-10-10 12:00:20,dog
* 2019-10-10 12:00:30,dog
*/
public class WaterMaker {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//env.setStreamTimeCharacteristic();默认事件时间
env.setParallelism(3);
/*FastDateFormat df = FastDateFormat.getInstance("HH:mm:ss");
DataStreamSource<Order> orderDS = env.addSource(new SourceFunction<Order>() {
boolean flag = true;
@Override
public void run(SourceContext<Order> ctx) throws Exception {
while (flag) {
Random random = new Random();
String orderId = UUID.randomUUID().toString();
Integer userId = random.nextInt(3);
Integer money = random.nextInt(100);
Long evenTime = System.currentTimeMillis() - random.nextInt(6) * 1000;
System.out.println("发送1条数据: " + userId + " evenTime: " + df.format(evenTime));
ctx.collect(new Order(orderId, userId, money, evenTime));
Thread.sleep(2000);
}
}
@Override
public void cancel() {
flag = false;
}
});*/
//DataStreamSource<String> source = env.socketTextStream("node1", 9999);
ParameterTool param = ParameterTool.fromArgs(args);
String topic = param.get("topic", "KafkaWordCount");
String group_id = param.get("group_id", "flink_wordcount");
boolean isWriteKafka = param.getBoolean("isWriteKafka", false);
boolean isWriteHdfs = param.getBoolean("isWriteHdfs", false);
boolean isWriteMysql = param.getBoolean("isWriteMysql", false);
Properties prop = new Properties();
prop.setProperty("bootstrap.servers", "node1:9092,node2:9092,node3:9092");
prop.setProperty("group.id", group_id);
prop.setProperty("auto.offset.reset", "latest");
prop.setProperty("enable.auto.commit", "true");
prop.setProperty("key.deserializer", "StringDeserializer");
prop.setProperty("value.deserializer", "StringDeserializer");
FlinkKafkaConsumer<String> kafka = new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), prop);
DataStreamSource<String> source = env.addSource(kafka);
SingleOutputStreamOperator<Order> result = source.rebalance().map(new RichMapFunction<String, Order>() {
@Override
public Order map(String value) throws Exception {
String[] timeAndWord = value.split(",");
Timestamp timestamp = Timestamp.valueOf(timeAndWord[0]);
Order order = new Order();
order.setOrderId(timeAndWord[1]);
order.setMoney(1);
order.setEvenTime(timestamp.getTime());
return order;
}
})
.assignTimestampsAndWatermarks(WatermarkStrategy
.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(10))
.withTimestampAssigner((element, recordTimestamp) -> element.getEvenTime()))
.keyBy(t -> t.getOrderId())
//包左不包右
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
//没有trigger,某个分区没数据,不触发计算,或者都没数据,最后一个窗口不触发计算,末尾窗口数据丢失
/**
* 结果是:> Order{orderId='dog', userId=null, money=2, evenTime=12:00:03}
* 结果是:> Order{orderId='cat', userId=null, money=1, evenTime=12:00:04}
* 结果是:> Order{orderId='dog', userId=null, money=1, evenTime=12:00:20}
*
*/
//ProcessingTimeoutTrigger.of(EventTimeTrigger.create(), Duration.ofSeconds(10)结合EventTimeTrigger窗口不触发计算,就等待超时计算,且之前触发过的计算不会再计算,只更新有更新的累计值,对有更新的key覆盖更新,但是过期数据都没销毁
//.trigger(ProcessingTimeoutTrigger.of(EventTimeTrigger.create(), Duration.ofSeconds(10)))
//定期触发 spark定期触发一样
.trigger(ProcessingTimeoutTrigger.of(EventTimeTrigger.create(), Duration.ofSeconds(5),false,true))
/**
* 单并行度验证:
* 结果是:> Order{orderId='dog', userId=null, money=2, evenTime=12:00:03}
* 结果是:> Order{orderId='cat', userId=null, money=1, evenTime=12:00:04}
* 结果是:> Order{orderId='dog', userId=null, money=1, evenTime=12:00:20}
* --以上是水印触发计算
* 结果是:> Order{orderId='dog', userId=null, money=1, evenTime=12:00:30}
* 结果是:> Order{orderId='dog', userId=null, money=1, evenTime=12:00:40}
* 结果是:> Order{orderId='dog', userId=null, money=2, evenTime=12:00:30}
* --超时触发计算
*/
//ProcessingTimeTrigger 立马触发,水印生效,过期数据销毁,跟spark一样trigger不设置或设置为0效果一样
//.trigger(ProcessingTimeTrigger.create())
//但实际要实现的是事件时间窗口根据水印触发计算销毁,数据一直不来或推进水印太慢设置一个定期触发计算并销毁, false,表示 不等待元素到达,即如果在 5 秒内没有接收到事件数据,窗口会直接触发计算。true,表示 处理延迟元素,即如果有延迟到达的事件(例如由于网络延迟),这些延迟的事件仍然会被处理。
//模拟实现.trigger(ProcessingTimeoutTrigger.of(EventTimeTrigger.create(), Duration.ofSeconds(5),false,true))
//.trigger(EventAndProcessingTimeOutTrigger.of(Duration.ofSeconds(5),false))
/**
* 思考1:多并行度时某个分区没有数据,官方1.1 idle机制可以解决,但是都没数据了,无数据推进低水位,最后一个窗口数据不触发计算,丢失。
* 但定义基于eventtime trigger没数据超时触发可以解决
* 思考2:多并行度时某个分区低水位推进很快,其他很慢,快的那个就会堆积,等待慢的,最后一个窗口数据一样也会不触发计算,丢失。
* 针对水印推进速度不一致的问题,官方1.5版本是给一个水印对齐的策略。但定义基于eventtime trigger定期触发可以解决。
* * 综合:定义基于eventtime trigger定期触发可以解决,1.没数据不触发;2.数据部分分区推进水位太慢不触发;3.最后一个窗口水印不推进不触发
*/
.sum("money");
result.print("结果是:");
env.execute();
}
public static class Order{
private String orderId;
private Integer userId;
private Integer money;
private Long evenTime;
public Order(String orderId, Integer userId, Integer money, Long evenTime) {
this.orderId = orderId;
this.userId = userId;
this.money = money;
this.evenTime = evenTime;
}
public Order() {
}
public String getOrderId() {
return orderId;
}
public void setOrderId(String orderId) {
this.orderId = orderId;
}
public Integer getUserId() {
return userId;
}
public void setUserId(Integer userId) {
this.userId = userId;
}
public Integer getMoney() {
return money;
}
public void setMoney(Integer money) {
this.money = money;
}
public Long getEvenTime() {
return evenTime;
}
public void setEvenTime(Long evenTime) {
this.evenTime = evenTime;
}
@Override
public String toString() {
FastDateFormat df = FastDateFormat.getInstance("HH:mm:ss");
return "Order{" +
"orderId='" + orderId + '\'' +
", userId=" + userId +
", money=" + money +
", evenTime=" + df.format(evenTime) +
'}';
}
}
}模拟实现打印定期触发trigger
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.runtime.operators.TaskContext;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Timestamp;
import java.time.Duration;
public class EventAndProcessingTimeOutTrigger<T, W extends Window> extends Trigger<T, W> {
private static final Logger LOG= LoggerFactory.getLogger(EventAndProcessingTimeOutTrigger.class);
private static final long serialVersionUID = 1L;
private final long interval;
private final boolean resetTimerOnNewRecord;
private final ValueStateDescriptor<Long> timeoutStateDesc = new ValueStateDescriptor<>("timeout", LongSerializer.INSTANCE);
private EventAndProcessingTimeOutTrigger(long interval,boolean resetTimerOnNewRecord) {
this.interval=interval;
this.resetTimerOnNewRecord=resetTimerOnNewRecord;
}
@Override
public TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception {
if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
// if the watermark is already past the window fire immediately
return TriggerResult.FIRE;
} else {
ValueState<Long> timeoutState = ctx.getPartitionedState(this.timeoutStateDesc);
long nextFireTimestamp = ctx.getCurrentProcessingTime() + this.interval;
Long timeoutTimestamp = timeoutState.value();
if (timeoutTimestamp != null && resetTimerOnNewRecord) {
ctx.deleteProcessingTimeTimer(timeoutTimestamp);
timeoutState.clear();
timeoutTimestamp = null;
}
if (timeoutTimestamp == null) {
timeoutState.update(nextFireTimestamp);
ctx.registerProcessingTimeTimer(nextFireTimestamp);
}
ctx.registerEventTimeTimer(window.maxTimestamp());
return TriggerResult.CONTINUE;
}
}
@Override
public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
//超时了触发 窗口计算 和推进水位线
long maxTimestamp = window.maxTimestamp();
this.clear(window, ctx);
System.out.println("maxTimestamp = " + new Timestamp(maxTimestamp)+" CurrentWatermark = "+new Timestamp(ctx.getCurrentWatermark()));
LOG.warn("LOG: maxTimestamp = " + new Timestamp(maxTimestamp)+" CurrentWatermark = "+new Timestamp(ctx.getCurrentWatermark()));
ctx.registerEventTimeTimer(maxTimestamp +this.interval);
return TriggerResult.FIRE;
/**
* 单并行度验证
* 结果是:> Order{orderId='dog', userId=null, money=2, evenTime=12:00:03}
* 结果是:> Order{orderId='cat', userId=null, money=1, evenTime=12:00:04}
* 结果是:> Order{orderId='dog', userId=null, money=1, evenTime=12:00:20}
* maxTimestamp = 2019-10-10 12:00:39.999 CurrentWatermark = 2019-10-10 12:00:29.999
* 结果是:> Order{orderId='dog', userId=null, money=1, evenTime=12:00:30}
* maxTimestamp = 2019-10-10 12:00:49.999 CurrentWatermark = 2019-10-10 12:00:29.999
* 结果是:> Order{orderId='dog', userId=null, money=1, evenTime=12:00:40}
* maxTimestamp = 2019-10-10 12:00:39.999 CurrentWatermark = 2019-10-10 12:00:29.999
* 结果是:> Order{orderId='dog', userId=null, money=2, evenTime=12:00:30}
*/
}
@Override
public TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception {
return time == window.maxTimestamp() ?
TriggerResult.FIRE :
TriggerResult.CONTINUE;
}
@Override
public boolean canMerge() {
return true;
}
@Override
public void onMerge(W window, OnMergeContext ctx) throws Exception {
// only register a timer if the watermark is not yet past the end of the merged window
// this is in line with the logic in onElement(). If the watermark is past the end of
// the window onElement() will fire and setting a timer here would fire the window twice.
long windowMaxTimestamp = window.maxTimestamp();
if (windowMaxTimestamp > ctx.getCurrentWatermark()) {
ctx.registerEventTimeTimer(windowMaxTimestamp);
}
}
@Override
public void clear(W window, TriggerContext ctx) throws Exception {
ValueState<Long> timeoutTimestampState = ctx.getPartitionedState(this.timeoutStateDesc);
Long timeoutTimestamp = timeoutTimestampState.value();
if (timeoutTimestamp != null) {
ctx.deleteProcessingTimeTimer(timeoutTimestamp);
timeoutTimestampState.clear();
}
ctx.deleteEventTimeTimer(window.maxTimestamp());
}
public static <T, W extends Window> EventAndProcessingTimeOutTrigger<T, W> of(Duration timeout) {
return new EventAndProcessingTimeOutTrigger<>(timeout.toMillis(),true);
}
public static <T, W extends Window> EventAndProcessingTimeOutTrigger<T, W> of(Duration timeout,boolean resetTimerOnNewRecord) {
return new EventAndProcessingTimeOutTrigger<>(timeout.toMillis(),resetTimerOnNewRecord);
}
}侧道输出解决数据丢失-掌握
public class WaterMaker {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//env.setStreamTimeCharacteristic();默认事件时间
env.setParallelism(1);
/*FastDateFormat df = FastDateFormat.getInstance("HH:mm:ss");
DataStreamSource<Order> orderDS = env.addSource(new SourceFunction<Order>() {
boolean flag = true;
@Override
public void run(SourceContext<Order> ctx) throws Exception {
while (flag) {
Random random = new Random();
String orderId = UUID.randomUUID().toString();
Integer userId = random.nextInt(3);
Integer money = random.nextInt(100);
Long evenTime = System.currentTimeMillis() - random.nextInt(6) * 1000;
System.out.println("发送1条数据: " + userId + " evenTime: " + df.format(evenTime));
ctx.collect(new Order(orderId, userId, money, evenTime));
Thread.sleep(2000);
}
}
@Override
public void cancel() {
flag = false;
}
});*/
DataStreamSource<String> source = env.socketTextStream("node1", 9999);
/*ParameterTool param = ParameterTool.fromArgs(args);
String topic = param.get("topic", "KafkaWordCount");
String group_id = param.get("group_id", "flink_wordcount");
boolean isWriteKafka = param.getBoolean("isWriteKafka", false);
boolean isWriteHdfs = param.getBoolean("isWriteHdfs", false);
boolean isWriteMysql = param.getBoolean("isWriteMysql", false);
Properties prop = new Properties();
prop.setProperty("bootstrap.servers", "node1:9092,node2:9092,node3:9092");
prop.setProperty("group.id", group_id);
prop.setProperty("auto.offset.reset", "latest");
prop.setProperty("enable.auto.commit", "true");
prop.setProperty("key.deserializer", "StringDeserializer");
prop.setProperty("value.deserializer", "StringDeserializer");
FlinkKafkaConsumer<String> kafka = new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), prop);
DataStreamSource<String> source = env.addSource(kafka);*/
OutputTag<Order> seriousLateOutputTag = new OutputTag<>("seriousLateOutputTag", TypeInformation.of(Order.class));
SingleOutputStreamOperator<Order> result = source.rebalance().map(new RichMapFunction<String, Order>() {
@Override
public Order map(String value) throws Exception {
String[] timeAndWord = value.split(",");
Timestamp timestamp = Timestamp.valueOf(timeAndWord[0]);
Order order = new Order();
order.setOrderId(timeAndWord[1]);
order.setMoney(1);
order.setEvenTime(timestamp.getTime());
return order;
}
})
.assignTimestampsAndWatermarks(WatermarkStrategy
.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(10))
.withTimestampAssigner((element, recordTimestamp) -> element.getEvenTime()))
.keyBy(t -> t.getOrderId())
//包左不包右
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
//没有trigger,某个分区没数据,不触发计算,或者都没数据,最后一个窗口不触发计算,末尾窗口数据丢失
/**
* 结果是:> Order{orderId='dog', userId=null, money=2, evenTime=12:00:03}
* 结果是:> Order{orderId='cat', userId=null, money=1, evenTime=12:00:04}
* 结果是:> Order{orderId='dog', userId=null, money=1, evenTime=12:00:20}
*
*/
//ProcessingTimeoutTrigger.of(EventTimeTrigger.create(), Duration.ofSeconds(10)结合EventTimeTrigger窗口不触发计算,就等待超时计算,且之前触发过的计算不会再计算,只更新有更新的累计值,对有更新的key覆盖更新,但是过期数据都没销毁
//.trigger(ProcessingTimeoutTrigger.of(EventTimeTrigger.create(), Duration.ofSeconds(10)))
//定期触发 spark定期触发一样
//.trigger(ProcessingTimeoutTrigger.of(EventTimeTrigger.create(), Duration.ofSeconds(5),false,true))
/**
* 单并行度验证:
* 结果是:> Order{orderId='dog', userId=null, money=2, evenTime=12:00:03}
* 结果是:> Order{orderId='cat', userId=null, money=1, evenTime=12:00:04}
* 结果是:> Order{orderId='dog', userId=null, money=1, evenTime=12:00:20}
* --以上是水印触发计算
* 结果是:> Order{orderId='dog', userId=null, money=1, evenTime=12:00:30}
* 结果是:> Order{orderId='dog', userId=null, money=1, evenTime=12:00:40}
* 结果是:> Order{orderId='dog', userId=null, money=2, evenTime=12:00:30}
* --超时触发计算
*/
//ProcessingTimeTrigger 立马触发,水印生效,过期数据销毁,跟spark一样trigger不设置或设置为0效果一样
//.trigger(ProcessingTimeTrigger.create())
//但实际要实现的是事件时间窗口根据水印触发计算销毁,数据一直不来或推进水印太慢设置一个定期触发计算并销毁
//模拟实现.trigger(ProcessingTimeoutTrigger.of(EventTimeTrigger.create(), Duration.ofSeconds(5),false,true))
.trigger(EventAndProcessingTimeOutTrigger.of(Duration.ofSeconds(5),false))
/**
* 思考1:多并行度时某个分区没有数据,官方1.1 idle机制可以解决,但是都没数据了,无数据推进低水位,最后一个窗口数据不触发计算,丢失。
* 但定义基于eventtime trigger没数据超时触发可以解决
* 思考2:多并行度时某个分区低水位推进很快,其他很慢,快的那个就会堆积,等待慢的,最后一个窗口数据一样也会不触发计算,丢失。
* 针对水印推进速度不一致的问题,官方1.5版本是给一个水印对齐的策略。但定义基于eventtime trigger定期触发可以解决。
* * 综合:定义基于eventtime trigger定期触发可以解决,1.没数据不触发;2.数据部分分区推进水位太慢不触发;3.最后一个窗口水印不推进不触发
*
*/
//在水位线的基础上再允许延迟一定的时间,在此期间有延迟数据会再触发窗口计算,超过该时间,才会写到侧道输出流。
.allowedLateness(Time.seconds(15))
.sideOutputLateData(seriousLateOutputTag)
.sum("money");
result.print("结果是:");
result.getSideOutput(seriousLateOutputTag).print("严重迟到丢失的数据:");
env.execute();
}
public static class Order{
private String orderId;
private Integer userId;
private Integer money;
private Long evenTime;
public Order(String orderId, Integer userId, Integer money, Long evenTime) {
this.orderId = orderId;
this.userId = userId;
this.money = money;
this.evenTime = evenTime;
}
public Order() {
}
public String getOrderId() {
return orderId;
}
public void setOrderId(String orderId) {
this.orderId = orderId;
}
public Integer getUserId() {
return userId;
}
public void setUserId(Integer userId) {
this.userId = userId;
}
public Integer getMoney() {
return money;
}
public void setMoney(Integer money) {
this.money = money;
}
public Long getEvenTime() {
return evenTime;
}
public void setEvenTime(Long evenTime) {
this.evenTime = evenTime;
}
@Override
public String toString() {
FastDateFormat df = FastDateFormat.getInstance("HH:mm:ss");
return "Order{" +
"orderId='" + orderId + '\'' +
", userId=" + userId +
", money=" + money +
", evenTime=" + df.format(evenTime) +
'}';
}
}
}State
无状态计算和有状态计算
- 无状态计算,不需要考虑历史值, 如map
hello --> (hello,1)
hello --> (hello,1)
- 有状态计算,需要考虑历史值,如:sum
hello , (hello,1)
hello , (hello,2)
状态分类
- State
- Managed State--开发中推荐使用 : Fink自动管理/优化,支持多种数据结构
- KeyState--只能在keyedStream上使用,支持多种数据结构
- OperatorState--一般用在Source上,支持ListState
- RawState--完全由用户自己管理,只支持byte[],只能在自定义Operator上使用
- OperatorState
分类详细图解:
ManagedState-keyState
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/state.html
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* Author
* Desc 使用KeyState中的ValueState获取流数据中的最大值/实际中可以使用maxBy即可
*/
public class StateDemo01_KeyState {
public static void main(String[] args) throws Exception {
//TODO 0.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
//如果RuntimeExecutionMode.AUTOMATIC根据源数据是流还是批来采用对应的处理方式,该Demo是批处理,keyBy的时候会排序
/*
(上海,2,2)
(上海,8,8)
(上海,4,8)
(北京,1,1)
(北京,6,6)
(北京,3,6)
*/
//如果不设置,默认流处理方式,一个个处理,没排序
/*
(北京,1,1)
(上海,2,2)
(北京,6,6)
(上海,8,8)
(北京,3,6)
(上海,4,8)
*/
//TODO 1.source
DataStream<Tuple2<String, Long>> tupleDS = env.fromElements(
Tuple2.of("北京", 1L),
Tuple2.of("上海", 2L),
Tuple2.of("北京", 6L),
Tuple2.of("上海", 8L),
Tuple2.of("北京", 3L),
Tuple2.of("上海", 4L)
);
//TODO 2.transformation
//需求:求各个城市的value最大值
//实际中使用maxBy即可
DataStream<Tuple2<String, Long>> result1 = tupleDS.keyBy(t -> t.f0).maxBy(1);
//学习时可以使用KeyState中的ValueState来实现maxBy的底层
DataStream<Tuple3<String, Long, Long>> result2 = tupleDS.keyBy(t -> t.f0).map(new RichMapFunction<Tuple2<String, Long>, Tuple3<String, Long, Long>>() {
//-1.定义一个状态用来存放最大值
private ValueState<Long> maxValueState;
//-2.状态初始化
/*
实现open方法,意味着只要创建一次连接,如果不实现,每来一条数据,就要创建一次连接,占用资源.
*/
@Override
public void open(Configuration parameters) throws Exception {
//创建状态描述器
ValueStateDescriptor stateDescriptor = new ValueStateDescriptor("maxValueState", Long.class);
//根据状态描述器获取/初始化状态
maxValueState = getRuntimeContext().getState(stateDescriptor);
}
//-3.使用状态
@Override
public Tuple3<String, Long, Long> map(Tuple2<String, Long> value) throws Exception {
Long currentValue = value.f1;
//获取状态
Long historyValue = maxValueState.value();
//判断状态
if (historyValue == null || currentValue > historyValue) {
historyValue = currentValue;
//更新状态
maxValueState.update(historyValue);
return Tuple3.of(value.f0, currentValue, historyValue);
} else {
return Tuple3.of(value.f0, currentValue, historyValue);
}
}
});
//TODO 3.sink
//result1.print();
//4> (北京,6)
//1> (上海,8)
result2.print();
//1> (上海,xxx,8)
//4> (北京,xxx,6)
//TODO 4.execute
env.execute();
}
}ManagerState-OperatorState
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/state.html
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import java.util.Iterator;
/**
* Author
* Desc 使用OperatorState中的ListState模拟KafkaSource进行offset维护
*/
public class StateDemo02_OperatorState {
public static void main(String[] args) throws Exception {
//TODO 0.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
env.setParallelism(1);//并行度设置为1方便观察
//下面的Checkpoint和重启策略配置先直接使用,下次课学
env.enableCheckpointing(1000);//每隔1s执行一次Checkpoint
env.setStateBackend(new FsStateBackend("file:///D:/ckp"));
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//固定延迟重启策略: 程序出现异常的时候,重启2次,每次延迟3秒钟重启,超过2次,程序退出
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 3000));
//TODO 1.source
DataStreamSource<String> ds = env.addSource(new MyKafkaSource()).setParallelism(1);
//TODO 2.transformation
//TODO 3.sink
ds.print();
//TODO 4.execute
env.execute();
}
//使用OperatorState中的ListState模拟KafkaSource进行offset维护
public static class MyKafkaSource extends RichParallelSourceFunction<String> implements CheckpointedFunction {
private boolean flag = true;
//-1.声明ListState
private ListState<Long> offsetState = null; //用来存放offset
private Long offset = 0L;//用来存放offset的值
//-2.初始化/创建ListState
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
ListStateDescriptor<Long> stateDescriptor = new ListStateDescriptor<>("offsetState", Long.class);
offsetState = context.getOperatorStateStore().getListState(stateDescriptor);
}
//-3.使用state
@Override
public void run(SourceContext<String> ctx) throws Exception {
Iterator<Long> iterator = offsetState.get().iterator();
if(iterator.hasNext()){
offset = iterator.next();
}
while (flag){
offset += 1;
int subTaskId = getRuntimeContext().getIndexOfThisSubtask();
ctx.collect("subTaskId:"+ subTaskId + ",当前的offset值为:"+offset);
Thread.sleep(1000);
//模拟异常
if(offset % 5 == 0){
throw new Exception("bug出现了.....");
}
}
}
//-4.state持久化
//该方法会定时执行将state状态从内存存入Checkpoint磁盘目录中
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
offsetState.clear();//清理内容数据并存入Checkpoint磁盘目录中
offsetState.add(offset);
}
@Override
public void cancel() {
flag = false;
}
}
}state的定时器 和TTL (联想redis)
ctx.timerService().registerProcessingTimeTimer(long time)注册,到点时触发 onTimer操作。
使用 flink 进行实时计算中,会遇到一些状态数不断累积,导致状态量越来越大的情形。
例如,作业中定义了超长的时间窗口,或者在动态表上应用了无限范围的 GROUP BY 语句,以及执行了没有时间窗口限制的双流 JOIN 等等操作。
对于这些情况,经常导致堆内存出现 OOM,或者堆外内存(RocksDB)用量持续增长导致超出容器的配额上限,造成作业的频繁崩溃。从 Flink 1.6 版本开始引入了State TTL 特性,该特性可以允许对作业中定义的 Keyed 状态进行超时自动清理,对于Table API 和 SQL 模块引入了空闲状态保留时间(Idle State Retention Time)进行状态管理,下面我们具体介绍一下。
在 Flink 的官方文档 中给我们展示了State TTL的基本用法,用法示例如下:
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("text state", String.class);
stateDescriptor.enableTimeToLive(ttlConfig);可以看到,要使用 State TTL 功能,首先要定义一个 StateTtlConfig 对象。
1.通过构造器模式(Builder Pattern)来创建,传入一个 Time 对象作为 TTL 时间;
2.然后设置更新类型(Update Type);
3.状态可见性(State Visibility);
4.在后续声明的状态描述符(State Descriptor)中启用 State TTL 功能了。
StateTtl
Config 的参数说明
l TTL:表示状态的过期时间,是一个 org.apache.flink.api.common.time.Time 对象。一旦设置了 TTL,那么如果上次访问的时间戳 + TTL 超过了当前时间,则表明状态过期了(这是一个简化的说法,严谨的定义请参考org.apache.flink.runtime.state.ttl.TtlUtils 类中关于 expired 的实现)
l UpdateType:表示状态时间戳的更新的时机,是一个 Enum 对象。如果设置为 Disabled,则表明不更新时间戳;如果设置为 OnCreateAndWrite,则表明当状态创建或每次写入时都会更新时间戳;如果设置为 OnReadAndWrite,则除了在状态创建和写入时更新时间戳外,读取也会更新状态的时间戳
l StateVisibility:表示对已过期但还未被清理掉的状态如何处理,也是 Enum 对象。如果设置为 ReturnExpiredIfNotCleanedUp,那么即使这个状态的时间戳表明它已经过期了,但是只要还未被真正清理掉,就会被返回给调用方;如果设置为 NeverReturnExpired,那么一旦这个状态过期了,那么永远不会被返回给调用方,只会返回空状态,避免了过期状态带来的干扰
TimeCharacteristic 以及 TtlTimeCharacteristic:表示 State TTL 功能所适用的时间模式,仍然是 Enum 对象。前者已经被标记为 Deprecated(废弃),推荐新代码采用新的 TtlTimeCharacteristic 参数。截止到 Flink 1.8,只支持 ProcessingTime 一种时间模式,对 EventTime 模式的 State TTL 支持还在开发中
CleanupStrategies:
1.默认被动清理,再读这个key时,进行清除;(内存有压力考虑换rocksdb)
2.手动清理:如下
表示过期对象的清理策略,目前来说有三种 Enum 值。当设置为 FULL_STATE_SCAN_SNAPSHOT 时,对应的是 EmptyCleanupStrategy 类,表示对过期状态不做主动清理,当执行完整快照(Snapshot / Checkpoint)时,会生成一个较小的状态文件,但本地状态并不会减小
唯有当作业重启并从上一个快照点恢复后,本地状态才会实际减小,因此可能仍然不能解决内存压力的问题。为了应对这个问题,Flink 还提供了增量清理的枚举值,分别是针对 Heap StateBackend 的 INCREMENTAL_CLEANUP(对应 IncrementalCleanupStrategy 类),以及对 RocksDB StateBackend 有效的 ROCKSDB_COMPACTION_FILTER(对应 RocksdbCompactFilterCleanupStrategy 类)
对于增量清理功能,Flink 可以被配置为每读取若干条记录就执行一次清理操作,而且可以指定每次要清理多少条失效记录;对于 RocksDB 的状态清理,则是通过 JNI 来调用 C++ 语言编写的 FlinkCompactionFilter 来实现,底层是通过 RocksDB 提供的后台 Compaction 操作来实现对失效状态过滤的
配置中有下面几个配置项可以选择:StateTtlConfig中的newBuilder这个方法是必须的,它是设置生存周期的值。
TTL 刷新策略(默认OnCreateAndWrite)
| 策略类型 | 描述 |
|---|---|
| StateTtlConfig.UpdateType.Disabled | 禁用TTL,永不过期 |
| StateTtlConfig.UpdateType.OnCreateAndWrite | 每次写操作都会更新State的最后访问时间 |
| StateTtlConfig.UpdateType.OnReadAndWrite | 每次读写操作都会跟新State的最后访问时间 |
状态可见性(默认NeverReturnExpired)
| 策略类型 | 描述 |
|---|---|
| StateTtlConfig.StateVisibility.NeverReturnExpired | 永不返回过期状态 |
| StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp | 可以返回过期但尚未被清理的状态值 |
Checkpoint
Checkpoint和State的区别
Checkpoint执行流程
0.Flink的JobManager创建CheckpointCoordinator
1.Coordinator向所有的SourceOperator发送Barrier栅栏(理解为执行Checkpoint的信号)
2.SourceOperator接收到Barrier之后,暂停当前的操作(暂停的时间很短,因为后续的写快照是异步的),并制作State快照, 然后将自己的快照保存到指定的介质中(如HDFS), 一切 ok之后向Coordinator汇报并将Barrier发送给下游的其他Operator
3.其他的如TransformationOperator接收到Barrier,重复第2步,最后将Barrier发送给Sink
4.Sink接收到Barrier之后重复第2步
5.Coordinator接收到所有的Operator的执行ok的汇报结果,认为本次快照执行成功
Chandy-Lamport algorithm分布式快照算法
Flink中的Checkpoint底层使用了Chandy-Lamport algorithm分布式快照算法可以保证数据的在分布式环境下的一致性!
https://zhuanlan.zhihu.com/p/53482103
Chandy-Lamport algorithm算法的作者也是ZK中Paxos 一致性算法的作者
https://www.cnblogs.com/shenguanpu/p/4048660.html
Flink中使用Chandy-Lamport algorithm分布式快照算法取得了成功,后续Spark的StructuredStreaming也借鉴了该算法
状态后端/存储介质
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_2.12</artifactId>
<version>1.12.0</version>
</dependency>Checkpoint代码演示
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/checkpointing.html
import org.apache.commons.lang3.SystemUtils;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.util.Collector;
import java.util.Properties;
/**
* Author
* Desc 演示Flink-Checkpoint相关配置
*/
public class CheckpointDemo01 {
public static void main(String[] args) throws Exception {
//1.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
//TODO ===========Checkpoint参数设置====
//===========类型1:必须参数=============
//设置Checkpoint的时间间隔为1000ms做一次Checkpoint/其实就是每隔1000ms发一次Barrier!
env.enableCheckpointing(1000);
//设置State状态存储介质/状态后端
//Memory:State存内存,Checkpoint存内存--开发不用!
//Fs:State存内存,Checkpoint存FS(本地/HDFS)--一般情况下使用
//RocksDB:State存RocksDB(内存+磁盘),Checkpoint存FS(本地/HDFS)--超大状态使用,但是对于状态的读写效率要低一点
/*if(args.length > 0){
env.setStateBackend(new FsStateBackend(args[0]));
}else {
env.setStateBackend(new FsStateBackend("file:///D:\\data\\ckp"));
}*/
if (SystemUtils.IS_OS_WINDOWS) {
env.setStateBackend(new FsStateBackend("file:///D:/ckp"));
} else {
env.setStateBackend(new FsStateBackend("hdfs://node1:8020/flink-checkpoint/checkpoint"));
}
//===========类型2:建议参数===========
//设置两个Checkpoint 之间最少等待时间,如设置Checkpoint之间最少是要等 500ms(为了避免每隔1000ms做一次Checkpoint的时候,前一次太慢和后一次重叠到一起去了)
//如:高速公路上,每隔1s关口放行一辆车,但是规定了两车之前的最小车距为500m
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);//默认是0
//设置如果在做Checkpoint过程中出现错误,是否让整体任务失败:true是 false不是
//env.getCheckpointConfig().setFailOnCheckpointingErrors(false);//默认是true
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(10);//默认值为0,表示不容忍任何检查点失败
//设置是否清理检查点,表示 Cancel 时是否需要保留当前的 Checkpoint,默认 Checkpoint会在作业被Cancel时被删除
//ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION:true,当作业被取消时,删除外部的checkpoint(默认值)
//ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:false,当作业被取消时,保留外部的checkpoint
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//===========类型3:直接使用默认的即可===============
//设置checkpoint的执行模式为EXACTLY_ONCE(默认)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//设置checkpoint的超时时间,如果 Checkpoint在 60s内尚未完成说明该次Checkpoint失败,则丢弃。
env.getCheckpointConfig().setCheckpointTimeout(60000);//默认10分钟
//设置同一时间有多少个checkpoint可以同时执行
//env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);//默认为1
//2.Source
DataStream<String> linesDS = env.socketTextStream("node1", 9999);
//3.Transformation
//3.1切割出每个单词并直接记为1
DataStream<Tuple2<String, Integer>> wordAndOneDS = linesDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
//value就是每一行
String[] words = value.split(" ");
for (String word : words) {
out.collect(Tuple2.of(word, 1));
}
}
});
//3.2分组
//注意:批处理的分组是groupBy,流处理的分组是keyBy
KeyedStream<Tuple2<String, Integer>, String> groupedDS = wordAndOneDS.keyBy(t -> t.f0);
//3.3聚合
DataStream<Tuple2<String, Integer>> aggResult = groupedDS.sum(1);
DataStream<String> result = (SingleOutputStreamOperator<String>) aggResult.map(new RichMapFunction<Tuple2<String, Integer>, String>() {
@Override
public String map(Tuple2<String, Integer> value) throws Exception {
return value.f0 + ":::" + value.f1;
}
});
//4.sink
result.print();
Properties props = new Properties();
props.setProperty("bootstrap.servers", "node1:9092");
FlinkKafkaProducer<String> kafkaSink = new FlinkKafkaProducer<>("flink_kafka", new SimpleStringSchema(), props);
result.addSink(kafkaSink);
//5.execute
env.execute();
// /export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic flink_kafka
}
}状态恢复-自动重启-全自动
import org.apache.commons.lang3.SystemUtils;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.util.Collector;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
/**
* Author
* Desc 演示Flink-Checkpoint+重启策略实现状态恢复
*/
public class CheckpointDemo02_Restart {
public static void main(String[] args) throws Exception {
//1.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
//TODO ===========Checkpoint参数设置====
//===========类型1:必须参数=============
//设置Checkpoint的时间间隔为1000ms做一次Checkpoint/其实就是每隔1000ms发一次Barrier!
env.enableCheckpointing(1000);
//设置State状态存储介质/状态后端
//Memory:State存内存,Checkpoint存内存--开发不用!
//Fs:State存内存,Checkpoint存FS(本地/HDFS)--一般情况下使用
//RocksDB:State存RocksDB(内存+磁盘),Checkpoint存FS(本地/HDFS)--超大状态使用,但是对于状态的读写效率要低一点
/*if(args.length > 0){
env.setStateBackend(new FsStateBackend(args[0]));
}else {
env.setStateBackend(new FsStateBackend("file:///D:\\data\\ckp"));
}*/
if (SystemUtils.IS_OS_WINDOWS) {
env.setStateBackend(new FsStateBackend("file:///D:/ckp"));
} else {
env.setStateBackend(new FsStateBackend("hdfs://node1:8020/flink-checkpoint/checkpoint"));
}
//===========类型2:建议参数===========
//设置两个Checkpoint 之间最少等待时间,如设置Checkpoint之间最少是要等 500ms(为了避免每隔1000ms做一次Checkpoint的时候,前一次太慢和后一次重叠到一起去了)
//如:高速公路上,每隔1s关口放行一辆车,但是规定了两车之前的最小车距为500m
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);//默认是0
//设置如果在做Checkpoint过程中出现错误,是否让整体任务失败:true是 false不是
//env.getCheckpointConfig().setFailOnCheckpointingErrors(false);//默认是true
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(10);//默认值为0,表示不容忍任何检查点失败
//设置是否清理检查点,表示 Cancel 时是否需要保留当前的 Checkpoint,默认 Checkpoint会在作业被Cancel时被删除
//ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION:true,当作业被取消时,删除外部的checkpoint(默认值)
//ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:false,当作业被取消时,保留外部的checkpoint
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//===========类型3:直接使用默认的即可===============
//设置checkpoint的执行模式为EXACTLY_ONCE(默认)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//设置checkpoint的超时时间,如果 Checkpoint在 60s内尚未完成说明该次Checkpoint失败,则丢弃。
env.getCheckpointConfig().setCheckpointTimeout(60000);//默认10分钟
//设置同一时间有多少个checkpoint可以同时执行
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);//默认为1
//TODO ===配置重启策略:
//1.配置了Checkpoint的情况下不做任务配置:默认是无限重启并自动恢复,可以解决小问题,但是可能会隐藏真正的bug
//2.单独配置无重启策略
//env.setRestartStrategy(RestartStrategies.noRestart());
//3.固定延迟重启--开发中常用
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3, // 最多重启3次数
Time.of(5, TimeUnit.SECONDS) // 重启时间间隔
));
//上面的设置表示:如果job失败,重启3次, 每次间隔5s
//4.失败率重启--开发中偶尔使用
/*env.setRestartStrategy(RestartStrategies.failureRateRestart(
3, // 每个测量阶段内最大失败次数
Time.of(1, TimeUnit.MINUTES), //失败率测量的时间间隔
Time.of(3, TimeUnit.SECONDS) // 两次连续重启的时间间隔
));*/
//上面的设置表示:如果1分钟内job失败不超过三次,自动重启,每次重启间隔3s (如果1分钟内程序失败达到3次,则程序退出)
//2.Source
DataStream<String> linesDS = env.socketTextStream("node1", 9999);
//3.Transformation
//3.1切割出每个单词并直接记为1
DataStream<Tuple2<String, Integer>> wordAndOneDS = linesDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
//value就是每一行
String[] words = value.split(" ");
for (String word : words) {
if (word.equals("bug")) {
System.out.println("bug.....");
throw new Exception("bug.....");
}
out.collect(Tuple2.of(word, 1));
}
}
});
//3.2分组
//注意:批处理的分组是groupBy,流处理的分组是keyBy
KeyedStream<Tuple2<String, Integer>, String> groupedDS = wordAndOneDS.keyBy(t -> t.f0);
//3.3聚合
DataStream<Tuple2<String, Integer>> aggResult = groupedDS.sum(1);
DataStream<String> result = (SingleOutputStreamOperator<String>) aggResult.map(new RichMapFunction<Tuple2<String, Integer>, String>() {
@Override
public String map(Tuple2<String, Integer> value) throws Exception {
return value.f0 + ":::" + value.f1;
}
});
//4.sink
result.print();
Properties props = new Properties();
props.setProperty("bootstrap.servers", "node1:9092");
FlinkKafkaProducer<String> kafkaSink = new FlinkKafkaProducer<>("flink_kafka", new SimpleStringSchema(), props);
result.addSink(kafkaSink);
//5.execute
env.execute();
// /export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic flink_kafka
}
}状态恢复-手动重启-半自动
1.打包-用到了kafka
2.启动Flink集群
3.上传jar包配置并提交
http://node1:8081/#/submit
4.发送单词并观察hdfs目录
5.取消任务
6.重新提交任务并指定从指定的ckp目录恢复状态接着计算
hdfs://node1:8020/flink-checkpoint/checkpoint/acb9071752276e86552a30fda41e021c/chk-100
7.继续发送数据发现可以恢复从之前的状态继续计算
Savepoint-全手动
演示
# 启动yarn session
/export/server/flink/bin/yarn-session.sh -n 2 -tm 800 -s 2 -d
# 运行job-会自动执行Checkpoint
/export/server/flink/bin/flink run --class cn.checkpoint.CheckpointDemo01 /root/ckp.jar
# 手动创建savepoint--相当于手动做了一次Checkpoint
/export/server/flink/bin/flink savepoint 0e921a10eb31bb0983b637929ec87a8a hdfs://node1:8020/flink-checkpoint/savepoint/
# 停止job
/export/server/flink/bin/flink cancel 0e921a10eb31bb0983b637929ec87a8a
# 重新启动job,手动加载savepoint数据
/export/server/flink/bin/flink run -s hdfs://node1:8020/flink-checkpoint/savepoint/savepoint-0e921a-1cac737bff7a --class cn.checkpoint.CheckpointDemo01 /root/ckp.jar
# 停止yarn session
yarn application -kill application_1607782486484_0014BroadcastState-动态更新规则配置(存内存)
flink广播变量有两种方式:
1.静态广播:
分布式缓存: Flink提供了一个分布式缓存,类似于hadoop,可以使用户在并行函数中很方便的读取本地文件,并把它放在taskmanager节点中,防止task重复拉取。 此缓存的工作机制如下:程序注册一个文件或者目录(本地或者远程文件系统,例如hdfs或者s3),通过ExecutionEnvironment注册缓存文件并为它起一个名称。 当程序执行,Flink自动将文件或者目录复制到所有taskmanager节点的本地文件系统,仅会执行一次。用户可以通过这个指定的名称查找文件或者目录,然后从taskmanager节点的本地文件系统访问它。
//在分布式缓存中将本地的文件进行注册
env.registerCachedFile("D:/wxgz-local/resources_ceshi/too.properties", "too")
//使用时,在open中从上下文中加载
val file: File = getRuntimeContext.getDistributedCache.getFile("too")
val prop = new Properties
prop.load(new FileInputStream(file))
val value = prop.getProperty("cycle")
//如果需要配置更新,可以增加定时器,定时去拉取更新,存在延迟。==>启动预加载(Guava Cache 可以过期,避免数据一直累积)+(单独线程reload)定时更新/实时lookup
//这种方式与在open中初始化的有何不同?2.动态广播更新
与分布式缓存的区别: 1.广播变量是基于内存的,是将变量分发到各个worker节点的内存上(避免多次复制,节省内存) 2.分布式缓存是基于磁盘的,将文件copy到各个节点上,当函数运行时可以在本地文件系统检索该文件(避免多次复制,提高执行效率)
需求
l 注意事项
Broadcast State 是Map 类型,即K-V 类型。
Broadcast State 只有在广播的一侧, 即在BroadcastProcessFunction 或KeyedBroadcastProcessFunction 的processBroadcastElement 方法中可以修改。在非广播的一侧, 即在BroadcastProcessFunction 或KeyedBroadcastProcessFunction 的processElement 方法中只读。
Broadcast State 中元素的顺序,在各Task 中可能不同。基于顺序的处理,需要注意。
Broadcast State 在Checkpoint 时,每个Task 都会Checkpoint 广播状态。
Broadcast State 在运行时保存在内存中,目前还不能保存在RocksDB State Backend 中。
注意:广播流数据源是必须running状态,否则checkpoint会失败。所以要么mysql每次加载时间比ck小,要么mysql数据要打到kafka。或采用cdc 监测changelog方式
有一个事件流--用户的行为日志,里面有用户id,但是没有用户的详细信息
有一个配置流/规则流--用户信息流--里面有用户的详细的信息
现在要将事件流和配置流进行关联, 得出日志中用户的详细信息,如 (用户id,详细信息, 操作)
那么我们可以将配置流/规则流--用户信息流 作为状态进行广播 (因为配置流/规则流--用户信息流较小)
思考:
1.配置流不应该是只上报有更新的配置,然后更新broadcast state的值,事件流再读取?
数据
/**
* 随机事件流--数据量较大
* 用户id,时间,类型,产品id
* <userID, eventTime, eventType, productID>
*/
public static class MySource implements SourceFunction<Tuple4<String, String, String, Integer>> {
private boolean isRunning = true;
@Override
public void run(SourceContext<Tuple4<String, String, String, Integer>> ctx) throws Exception {
Random random = new Random();
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
while (isRunning){
int id = random.nextInt(4) + 1;
String user_id = "user_" + id;
String eventTime = df.format(new Date());
String eventType = "type_" + random.nextInt(3);
int productId = random.nextInt(4);
ctx.collect(Tuple4.of(user_id,eventTime,eventType,productId));
Thread.sleep(500);
}
}
@Override
public void cancel() {
isRunning = false;
}
}
/**
* 配置流/规则流/用户信息流--数量较小
* <用户id,<姓名,年龄>>
*/
/*
CREATE TABLE `user_info` (
`userID` varchar(20) NOT NULL,
`userName` varchar(10) DEFAULT NULL,
`userAge` int(11) DEFAULT NULL,
PRIMARY KEY (`userID`) USING BTREE
) ENGINE=MyISAM DEFAULT CHARSET=utf8 ROW_FORMAT=DYNAMIC;
INSERT INTO `user_info` VALUES ('user_1', '张三', 10);
INSERT INTO `user_info` VALUES ('user_2', '李四', 20);
INSERT INTO `user_info` VALUES ('user_3', '王五', 30);
INSERT INTO `user_info` VALUES ('user_4', '赵六', 40);
*/
public static class MySQLSource extends RichSourceFunction<Map<String, Tuple2<String, Integer>>> {
private boolean flag = true;
private Connection conn = null;
private PreparedStatement ps = null;
private ResultSet rs = null;
@Override
public void open(Configuration parameters) throws Exception {
conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata", "root", "root");
String sql = "select `userID`, `userName`, `userAge` from `user_info`";
ps = conn.prepareStatement(sql);
}
@Override
public void run(SourceContext<Map<String, Tuple2<String, Integer>>> ctx) throws Exception {
while (flag){
Map<String, Tuple2<String, Integer>> map = new HashMap<>();
ResultSet rs = ps.executeQuery();
while (rs.next()){
String userID = rs.getString("userID");
String userName = rs.getString("userName");
int userAge = rs.getInt("userAge");
//Map<String, Tuple2<String, Integer>>
map.put(userID, Tuple2.of(userName,userAge));
}
ctx.collect(map);
Thread.sleep(5000);//每隔5s更新一下用户的配置信息!
}
}
@Override
public void cancel() {
flag = false;
}
@Override
public void close() throws Exception {
if (conn != null) conn.close();
if (ps != null) ps.close();
if (rs != null) rs.close();
}
}代码步骤
1.env
2.source
-1.构建实时数据事件流-自定义随机
<userID, eventTime, eventType, productID>
-2.构建配置流-从MySQL
<用户id,<姓名,年龄>>
3.transformation
-1.定义状态描述器
MapStateDescriptor<Void, Map<String, Tuple2<String, Integer>>> descriptor =
new MapStateDescriptor<>("config",Types.VOID, Types.MAP(Types.STRING, Types.TUPLE(Types.STRING, Types.INT)));
-2.广播配置流
BroadcastStream<Map<String, Tuple2<String, Integer>>> broadcastDS = configDS.broadcast(descriptor);
-3.将事件流和广播流进行连接
BroadcastConnectedStream<Tuple4<String, String, String, Integer>, Map<String, Tuple2<String, Integer>>> connectDS =eventDS.connect(broadcastDS);
-4.处理连接后的流-根据配置流补全事件流中的用户的信息
4.sink
5.execute代码实现
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.api.java.tuple.Tuple6;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
/**
* Author
* Desc
*/
public class BroadcastStateDemo {
public static void main(String[] args) throws Exception {
//TODO 1.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
env.setParallelism(1);
//TODO 2.source
//-1.构建实时数据事件流--数据量较大
//<userID, eventTime, eventType, productID>
DataStreamSource<Tuple4<String, String, String, Integer>> eventDS = env.addSource(new MySource());
//-2.配置流/规则流/用户信息流--数据量较小-从MySQL
//<用户id,<姓名,年龄>>
DataStreamSource<Map<String, Tuple2<String, Integer>>> userDS = env.addSource(new MySQLSource());
//TODO 3.transformation
//-1.定义状态描述器
//key为什么用null?
MapStateDescriptor<Void, Map<String, Tuple2<String, Integer>>> descriptor =
new MapStateDescriptor<>("info", Types.VOID, Types.MAP(Types.STRING, Types.TUPLE(Types.STRING, Types.INT)));
//-2.广播配置流
BroadcastStream<Map<String, Tuple2<String, Integer>>> broadcastDS = userDS.broadcast(descriptor);
//-3.将事件流和广播流进行连接
BroadcastConnectedStream<Tuple4<String, String, String, Integer>, Map<String, Tuple2<String, Integer>>> connectDS = eventDS.connect(broadcastDS);
//-4.处理连接后的流-根据配置流补全事件流中的用户的信息
//BroadcastProcessFunction<IN1, IN2, OUT>
SingleOutputStreamOperator<Tuple6<String, String, String, Integer, String, Integer>> result =
connectDS.process(new BroadcastProcessFunction<
//<userID, eventTime, eventType, productID> //事件流
Tuple4<String, String, String, Integer>,
//<用户id,<姓名,年龄>> //广播流
Map<String, Tuple2<String, Integer>>,
//<用户id,eventTime,eventType,productID,姓名,年龄> //结果流 需要收集的数据
Tuple6<String, String, String, Integer, String, Integer>
>() {
//处理事件流中的每一个元素
@Override
public void processElement(Tuple4<String, String, String, Integer> value, ReadOnlyContext ctx, Collector<Tuple6<String, String, String, Integer, String, Integer>> out) throws Exception {
//value就是事件流中的数据
//<userID, eventTime, eventType, productID> //事件流--已经有了
//Tuple4<String, String, String, Integer>,
//目标是将value和广播流中的数据进行关联,返回结果流
//<用户id,<姓名,年龄>> //广播流--需要获取
//Map<String, Tuple2<String, Integer>>
//<用户id,eventTime,eventType,productID,姓名,年龄> //结果流 需要收集的数据
// Tuple6<String, String, String, Integer, String, Integer>
//获取广播流
ReadOnlyBroadcastState<Void, Map<String, Tuple2<String, Integer>>> broadcastState = ctx.getBroadcastState(descriptor);
//用户id,<姓名,年龄>
Map<String, Tuple2<String, Integer>> map = broadcastState.get(null);//广播流中的数据
if (map != null) {
//根据value中的用户id去map中获取用户信息
String userId = value.f0;
Tuple2<String, Integer> tuple2 = map.get(userId);
String username = tuple2.f0;
Integer age = tuple2.f1;
//收集数据
out.collect(Tuple6.of(userId, value.f1, value.f2, value.f3, username, age));
}
}
//更新处理广播流中的数据
@Override
public void processBroadcastElement(Map<String, Tuple2<String, Integer>> value, Context ctx, Collector<Tuple6<String, String, String, Integer, String, Integer>> out) throws Exception {
//value就是从MySQL中每隔5是查询出来并广播到状态中的最新数据!
//要把最新的数据放到state中
BroadcastState<Void, Map<String, Tuple2<String, Integer>>> broadcastState = ctx.getBroadcastState(descriptor);
broadcastState.clear();//清空旧数据
broadcastState.put(null, value);//放入新数据
}
});
//TODO 4.sink
result.print();
//TODO 5.execute
env.execute();
}
/**
* 随机事件流--数据量较大
* 用户id,时间,类型,产品id
* <userID, eventTime, eventType, productID>
*/
public static class MySource implements SourceFunction<Tuple4<String, String, String, Integer>> {
private boolean isRunning = true;
@Override
public void run(SourceContext<Tuple4<String, String, String, Integer>> ctx) throws Exception {
Random random = new Random();
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
while (isRunning) {
int id = random.nextInt(4) + 1;
String user_id = "user_" + id;
String eventTime = df.format(new Date());
String eventType = "type_" + random.nextInt(3);
int productId = random.nextInt(4);
ctx.collect(Tuple4.of(user_id, eventTime, eventType, productId));
Thread.sleep(500);
}
}
@Override
public void cancel() {
isRunning = false;
}
}
/**
* 配置流/规则流/用户信息流--数据量较小
* <用户id,<姓名,年龄>>
*/
/*
CREATE TABLE `user_info` (
`userID` varchar(20) NOT NULL,
`userName` varchar(10) DEFAULT NULL,
`userAge` int(11) DEFAULT NULL,
PRIMARY KEY (`userID`) USING BTREE
) ENGINE=MyISAM DEFAULT CHARSET=utf8 ROW_FORMAT=DYNAMIC;
INSERT INTO `user_info` VALUES ('user_1', '张三', 10);
INSERT INTO `user_info` VALUES ('user_2', '李四', 20);
INSERT INTO `user_info` VALUES ('user_3', '王五', 30);
INSERT INTO `user_info` VALUES ('user_4', '赵六', 40);
*/
public static class MySQLSource extends RichSourceFunction<Map<String, Tuple2<String, Integer>>> {
private boolean flag = true;
private Connection conn = null;
private PreparedStatement ps = null;
private ResultSet rs = null;
@Override
public void open(Configuration parameters) throws Exception {
conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata", "root", "root");
String sql = "select `userID`, `userName`, `userAge` from `user_info`";
ps = conn.prepareStatement(sql);
}
@Override
public void run(SourceContext<Map<String, Tuple2<String, Integer>>> ctx) throws Exception {
while (flag) {
Map<String, Tuple2<String, Integer>> map = new HashMap<>();
ResultSet rs = ps.executeQuery();
while (rs.next()) {
String userID = rs.getString("userID");
String userName = rs.getString("userName");
int userAge = rs.getInt("userAge");
//Map<String, Tuple2<String, Integer>>
map.put(userID, Tuple2.of(userName, userAge));
}
ctx.collect(map);
Thread.sleep(5000);//每隔5s更新一下用户的配置信息!
}
}
@Override
public void cancel() {
flag = false;
}
@Override
public void close() throws Exception {
if (conn != null) conn.close();
if (ps != null) ps.close();
if (rs != null) rs.close();
}
}
}//如果失败了,且不从checkpoint恢复,共享变量就丢失,需要处理的问题:1.刚开始事件流会存在匹配不上的,就比如任务刚启动的时候;2.广播流是存内存不可能全量加载,也会出现join不上;
//需要解决的问题:
1.需要重新加载:
重新广播还是预加载?定时覆盖保存一份到本地,恢复时如果不从checkpoint或savepoint恢复时可以自己从之前的历史的加载。
2.任务刚启动的时候,数据join不上,广播流先到,也有可能是未来的属性,事件流先到,就会有join不上的问题
SQL temporary join 可以先保存两流状态,等到join上输出,可是保存在内存中,数据量大,版本太多占用内存,设置空闲状态过期总结:
1.维表数据量小,且可控,使用broadcast+预加载
(单独线程从本地reload或从分布式缓存加载,从本地reload每个task都要拉取一份,分布式缓存是每个节点拉取一份,从网络io考虑,分布式缓存。)
2.维表数据量大,或者两条事件流join,把行为串联,可以用热存储,把大概率先到的一条流存外部系统,然后另一条流interval join对应版本(异步io和Guava Cache减轻网络延迟和压力)。
3.上面的方式维表的更新存在延迟,如果对实时要求高,且两条流互相等待的时间不大,可以考虑flink sql temporary join,实现起来代码简单。但是state的管理是个问题,设置不合理,内存会溢出。
涉及到低水位的就要注意一个问题,如果某个分区没数据,低水位就不会推进。简单的方法就是加空闲源检测机制idle,比较好的方法是定义一个定期触发的trigger,推进水位。
Flink-双流Join
join的分类
join() 算子提供的语义为"Window join",即按照指定字段和(滚动/滑动/会话)窗口进行 inner join,支持处理时间和事件时间两种时间特征。
coGroup() 只有 inner join 肯定还不够,如何实现 left/right outer join 呢?答案就是利用 coGroup() 算子。
它的调用方式类似于 join() 算子,也需要开窗,但是 CoGroupFunction 比 JoinFunction 更加灵活,可以按照用户指定的逻辑匹配左流和/或右流的数据并输出。
dataStream.coGroup(otherStream)
.where(0).equalTo(1)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.apply (new CoGroupFunction () {...});join() 和 coGroup() 都是基于窗口做关联的。但是在某些情况下,两条流的数据步调未必一致。例如,订单流的数据有可能在点击流的购买动作发生之后很久才被写入,如果用窗口来圈定,很容易 join 不上。
所以 Flink 又提供了"Interval join"的语义,按照指定字段以及右流相对左流偏移的时间区间进行关联,即:right.timestamp ∈ [left.timestamp + lowerBound; left.timestamp + upperBound]
interval join 也是 inner join,虽然不需要开窗,但是需要用户指定偏移区间的上下界,并且只支持事件时间。注意在运行之前,需要分别在两个流上应用 assignTimestampsAndWatermarks() 方法获取事件时间戳和水印。
//调用between进行检查
if (timeBehaviour != TimeBehaviour.EventTime) {
throw new UnsupportedTimeCharacteristicException("Time-bounded stream joins are only supported in event time");
}为什么说window join和interval join的区别在于interval state有清理机制?
双流join的本质就是把双流利用state把数据储存起来,然后计算时嵌套循环判断join。window state 保存一个窗口的 state的数据,interval 如果不清除保留的是历史这个key的所有state,状态太大,join前把不需要的清理掉再循环join。
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/joining.html
代码演示-WindowJoin
import com.alibaba.fastjson.JSON;
import lombok.Data;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
/**
*
* Desc 演示Flink双流Join-windowJoin
*/
public class JoinDemo01_WindowJoin {
public static void main(String[] args) throws Exception {
//TODO 0.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
//TODO 1.source
//商品数据流
DataStreamSource<Goods> goodsDS = env.addSource(new GoodsSource());
//订单数据流
DataStreamSource<OrderItem> OrderItemDS = env.addSource(new OrderItemSource());
//给数据添加水印(这里简单一点直接使用系统时间作为事件时间)
/*
SingleOutputStreamOperator<Order> orderDSWithWatermark = orderDS.assignTimestampsAndWatermarks(
WatermarkStrategy.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(3))//指定maxOutOfOrderness最大无序度/最大允许的延迟时间/乱序时间
.withTimestampAssigner((order, timestamp) -> order.getEventTime())//指定事件时间列
);
*/
SingleOutputStreamOperator<Goods> goodsDSWithWatermark = goodsDS.assignTimestampsAndWatermarks(new GoodsWatermark());
SingleOutputStreamOperator<OrderItem> OrderItemDSWithWatermark = OrderItemDS.assignTimestampsAndWatermarks(new OrderItemWatermark());
//TODO 2.transformation---这里是重点
//商品类(商品id,商品名称,商品价格)
//订单明细类(订单id,商品id,商品数量)
//关联结果(商品id,商品名称,商品数量,商品价格*商品数量)
DataStream<FactOrderItem> resultDS = goodsDSWithWatermark.join(OrderItemDSWithWatermark)
.where(Goods::getGoodsId)
.equalTo(OrderItem::getGoodsId)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
//<IN1, IN2, OUT>
.apply(new JoinFunction<Goods, OrderItem, FactOrderItem>() {
@Override
public FactOrderItem join(Goods first, OrderItem second) throws Exception {
FactOrderItem result = new FactOrderItem();
result.setGoodsId(first.getGoodsId());
result.setGoodsName(first.getGoodsName());
result.setCount(new BigDecimal(second.getCount()));
result.setTotalMoney(new BigDecimal(second.getCount()).multiply(first.getGoodsPrice()));
return result;
}
});
//TODO 3.sink
resultDS.print();
//TODO 4.execute
env.execute();
}
//商品类(商品id,商品名称,商品价格)
@Data
public static class Goods {
private String goodsId;
private String goodsName;
private BigDecimal goodsPrice;
public static List<Goods> GOODS_LIST;
public static Random r;
static {
r = new Random();
GOODS_LIST = new ArrayList<>();
GOODS_LIST.add(new Goods("1", "小米12", new BigDecimal(4890)));
GOODS_LIST.add(new Goods("2", "iphone12", new BigDecimal(12000)));
GOODS_LIST.add(new Goods("3", "MacBookPro", new BigDecimal(15000)));
GOODS_LIST.add(new Goods("4", "Thinkpad X1", new BigDecimal(9800)));
GOODS_LIST.add(new Goods("5", "MeiZu One", new BigDecimal(3200)));
GOODS_LIST.add(new Goods("6", "Mate 40", new BigDecimal(6500)));
}
public static Goods randomGoods() {
int rIndex = r.nextInt(GOODS_LIST.size());
return GOODS_LIST.get(rIndex);
}
public Goods() {
}
public Goods(String goodsId, String goodsName, BigDecimal goodsPrice) {
this.goodsId = goodsId;
this.goodsName = goodsName;
this.goodsPrice = goodsPrice;
}
@Override
public String toString() {
return JSON.toJSONString(this);
}
}
//订单明细类(订单id,商品id,商品数量)
@Data
public static class OrderItem {
private String itemId;
private String goodsId;
private Integer count;
@Override
public String toString() {
return JSON.toJSONString(this);
}
}
//商品类(商品id,商品名称,商品价格)
//订单明细类(订单id,商品id,商品数量)
//关联结果(商品id,商品名称,商品数量,商品价格*商品数量)
@Data
public static class FactOrderItem {
private String goodsId;
private String goodsName;
private BigDecimal count;
private BigDecimal totalMoney;
@Override
public String toString() {
return JSON.toJSONString(this);
}
}
//实时生成商品数据流
//构建一个商品Stream源(这个好比就是维表)
public static class GoodsSource extends RichSourceFunction<Goods> {
private Boolean isCancel;
@Override
public void open(Configuration parameters) throws Exception {
isCancel = false;
}
@Override
public void run(SourceContext sourceContext) throws Exception {
while(!isCancel) {
Goods.GOODS_LIST.stream().forEach(goods -> sourceContext.collect(goods));
TimeUnit.SECONDS.sleep(1);
}
}
@Override
public void cancel() {
isCancel = true;
}
}
//实时生成订单数据流
//构建订单明细Stream源
public static class OrderItemSource extends RichSourceFunction<OrderItem> {
private Boolean isCancel;
private Random r;
@Override
public void open(Configuration parameters) throws Exception {
isCancel = false;
r = new Random();
}
@Override
public void run(SourceContext sourceContext) throws Exception {
while(!isCancel) {
Goods goods = Goods.randomGoods();
OrderItem orderItem = new OrderItem();
orderItem.setGoodsId(goods.getGoodsId());
orderItem.setCount(r.nextInt(10) + 1);
orderItem.setItemId(UUID.randomUUID().toString());
sourceContext.collect(orderItem);
orderItem.setGoodsId("111");
sourceContext.collect(orderItem);
TimeUnit.SECONDS.sleep(1);
}
}
@Override
public void cancel() {
isCancel = true;
}
}
//构建水印分配器,学习测试直接使用系统时间了
public static class GoodsWatermark implements WatermarkStrategy<Goods> {
@Override
public TimestampAssigner<Goods> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
return (element, recordTimestamp) -> System.currentTimeMillis();
}
@Override
public WatermarkGenerator<Goods> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
return new WatermarkGenerator<Goods>() {
@Override
public void onEvent(Goods event, long eventTimestamp, WatermarkOutput output) {
output.emitWatermark(new Watermark(System.currentTimeMillis()));
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
output.emitWatermark(new Watermark(System.currentTimeMillis()));
}
};
}
}
//构建水印分配器,学习测试直接使用系统时间了
public static class OrderItemWatermark implements WatermarkStrategy<OrderItem> {
@Override
public TimestampAssigner<OrderItem> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
return (element, recordTimestamp) -> System.currentTimeMillis();
}
@Override
public WatermarkGenerator<OrderItem> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
return new WatermarkGenerator<OrderItem>() {
@Override
public void onEvent(OrderItem event, long eventTimestamp, WatermarkOutput output) {
output.emitWatermark(new Watermark(System.currentTimeMillis()));
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
output.emitWatermark(new Watermark(System.currentTimeMillis()));
}
};
}
}
}代码演示-IntervalJoin
import com.alibaba.fastjson.JSON;
import lombok.Data;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
/**
*
* Desc 演示Flink双流Join-IntervalJoin
*/
public class JoinDemo02_IntervalJoin {
public static void main(String[] args) throws Exception {
//TODO 0.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
//TODO 1.source
//商品数据流
DataStreamSource<Goods> goodsDS = env.addSource(new GoodsSource());
//订单数据流
DataStreamSource<OrderItem> OrderItemDS = env.addSource(new OrderItemSource());
//给数据添加水印(这里简单一点直接使用系统时间作为事件时间)
/*
SingleOutputStreamOperator<Order> orderDSWithWatermark = orderDS.assignTimestampsAndWatermarks(
WatermarkStrategy.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(3))//指定maxOutOfOrderness最大无序度/最大允许的延迟时间/乱序时间
.withTimestampAssigner((order, timestamp) -> order.getEventTime())//指定事件时间列
);
*/
SingleOutputStreamOperator<Goods> goodsDSWithWatermark = goodsDS.assignTimestampsAndWatermarks(new GoodsWatermark());
SingleOutputStreamOperator<OrderItem> OrderItemDSWithWatermark = OrderItemDS.assignTimestampsAndWatermarks(new OrderItemWatermark());
//TODO 2.transformation---这里是重点
//商品类(商品id,商品名称,商品价格)
//订单明细类(订单id,商品id,商品数量)
//关联结果(商品id,商品名称,商品数量,商品价格*商品数量)
SingleOutputStreamOperator<FactOrderItem> resultDS = goodsDSWithWatermark.keyBy(Goods::getGoodsId)
.intervalJoin(OrderItemDSWithWatermark.keyBy(OrderItem::getGoodsId))
//join的条件:
// 条件1.id要相等
// 条件2. OrderItem的时间戳 - 2 <=Goods的时间戳 <= OrderItem的时间戳 + 1
.between(Time.seconds(-2), Time.seconds(1))
//ProcessJoinFunction<IN1, IN2, OUT>
.process(new ProcessJoinFunction<Goods, OrderItem, FactOrderItem>() {
@Override
public void processElement(Goods left, OrderItem right, Context ctx, Collector<FactOrderItem> out) throws Exception {
FactOrderItem result = new FactOrderItem();
result.setGoodsId(left.getGoodsId());
result.setGoodsName(left.getGoodsName());
result.setCount(new BigDecimal(right.getCount()));
result.setTotalMoney(new BigDecimal(right.getCount()).multiply(left.getGoodsPrice()));
out.collect(result);
}
});
//TODO 3.sink
resultDS.print();
//TODO 4.execute
env.execute();
}
//商品类(商品id,商品名称,商品价格)
@Data
public static class Goods {
private String goodsId;
private String goodsName;
private BigDecimal goodsPrice;
public static List<Goods> GOODS_LIST;
public static Random r;
static {
r = new Random();
GOODS_LIST = new ArrayList<>();
GOODS_LIST.add(new Goods("1", "小米12", new BigDecimal(4890)));
GOODS_LIST.add(new Goods("2", "iphone12", new BigDecimal(12000)));
GOODS_LIST.add(new Goods("3", "MacBookPro", new BigDecimal(15000)));
GOODS_LIST.add(new Goods("4", "Thinkpad X1", new BigDecimal(9800)));
GOODS_LIST.add(new Goods("5", "MeiZu One", new BigDecimal(3200)));
GOODS_LIST.add(new Goods("6", "Mate 40", new BigDecimal(6500)));
}
public static Goods randomGoods() {
int rIndex = r.nextInt(GOODS_LIST.size());
return GOODS_LIST.get(rIndex);
}
public Goods() {
}
public Goods(String goodsId, String goodsName, BigDecimal goodsPrice) {
this.goodsId = goodsId;
this.goodsName = goodsName;
this.goodsPrice = goodsPrice;
}
@Override
public String toString() {
return JSON.toJSONString(this);
}
}
//订单明细类(订单id,商品id,商品数量)
@Data
public static class OrderItem {
private String itemId;
private String goodsId;
private Integer count;
@Override
public String toString() {
return JSON.toJSONString(this);
}
}
//商品类(商品id,商品名称,商品价格)
//订单明细类(订单id,商品id,商品数量)
//关联结果(商品id,商品名称,商品数量,商品价格*商品数量)
@Data
public static class FactOrderItem {
private String goodsId;
private String goodsName;
private BigDecimal count;
private BigDecimal totalMoney;
@Override
public String toString() {
return JSON.toJSONString(this);
}
}
//实时生成商品数据流
//构建一个商品Stream源(这个好比就是维表)
public static class GoodsSource extends RichSourceFunction<Goods> {
private Boolean isCancel;
@Override
public void open(Configuration parameters) throws Exception {
isCancel = false;
}
@Override
public void run(SourceContext sourceContext) throws Exception {
while(!isCancel) {
Goods.GOODS_LIST.stream().forEach(goods -> sourceContext.collect(goods));
TimeUnit.SECONDS.sleep(1);
}
}
@Override
public void cancel() {
isCancel = true;
}
}
//实时生成订单数据流
//构建订单明细Stream源
public static class OrderItemSource extends RichSourceFunction<OrderItem> {
private Boolean isCancel;
private Random r;
@Override
public void open(Configuration parameters) throws Exception {
isCancel = false;
r = new Random();
}
@Override
public void run(SourceContext sourceContext) throws Exception {
while(!isCancel) {
Goods goods = Goods.randomGoods();
OrderItem orderItem = new OrderItem();
orderItem.setGoodsId(goods.getGoodsId());
orderItem.setCount(r.nextInt(10) + 1);
orderItem.setItemId(UUID.randomUUID().toString());
sourceContext.collect(orderItem);
orderItem.setGoodsId("111");
sourceContext.collect(orderItem);
TimeUnit.SECONDS.sleep(1);
}
}
@Override
public void cancel() {
isCancel = true;
}
}
//构建水印分配器,学习测试直接使用系统时间了
public static class GoodsWatermark implements WatermarkStrategy<Goods> {
@Override
public TimestampAssigner<Goods> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
return (element, recordTimestamp) -> System.currentTimeMillis();
}
@Override
public WatermarkGenerator<Goods> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
return new WatermarkGenerator<Goods>() {
@Override
public void onEvent(Goods event, long eventTimestamp, WatermarkOutput output) {
output.emitWatermark(new Watermark(System.currentTimeMillis()));
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
output.emitWatermark(new Watermark(System.currentTimeMillis()));
}
};
}
}
//构建水印分配器,学习测试直接使用系统时间了
public static class OrderItemWatermark implements WatermarkStrategy<OrderItem> {
@Override
public TimestampAssigner<OrderItem> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
return (element, recordTimestamp) -> System.currentTimeMillis();
}
@Override
public WatermarkGenerator<OrderItem> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
return new WatermarkGenerator<OrderItem>() {
@Override
public void onEvent(OrderItem event, long eventTimestamp, WatermarkOutput output) {
output.emitWatermark(new Watermark(System.currentTimeMillis()));
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
output.emitWatermark(new Watermark(System.currentTimeMillis()));
}
};
}
}
}Flink-End-to-End Exactly-Once
数据一致性语义分类
数据一致性语义详解
注意:
Exactly-Once 更准确的理解 应该是:
数据只会被正确的处理一次!
而不是说数据只被处理一次,有可能多次,但只有最后一次是正确的,成功的!
End-To-End Exactly-Once
表示从Source 到 Transformation 到 Sink 都能够保证Exactly-Once !
如何实现局部的Exactly-Once
可以使用:
1.去重
2.幂等
INSERT INTO t_student (id,`name`,age)VALUES(9,'Gordon',18)
> 1062 - Duplicate entry '9' for key 'PRIMARY'
> 时间: 0.001s3.分布式快照/Checkpoint---Flink使用的是这个
如何实现End-To-End Exactly-Once
Source: 如Kafka的offset 支持数据的replay/重放/重新传输
Transformation: 借助于Checkpoint
Sink: Checkpoint + 两阶段事务提交
- 两阶段事务提交
- SourceOperater从Kafka消费消息/数据并记录offset
- TransformationOperater对数据进行处理转换并做Checkpoint
- SinkOperator将结果写入到Kafka
注意:在sink的时候会执行两阶段提交:
1.开启事务
2.各个Operator执行barrier的Checkpoint, 成功则进行预提交
3.所有Operator执行完预提交则执行真正的提交
4.如果有任何一个预提交失败则回滚到最近的Checkpoint
代码演示
kafka主题flink-kafka1 --->
Flink Source -->
Flink-Transformation做WordCount-->
结果存储到kafka主题-flink-kafka2
//1.创建主题
/export/server/kafka/bin/kafka-topics.sh --zookeeper node1:2181 --create --replication-factor 2 --partitions 3 --topic flink_kafka1
/export/server/kafka/bin/kafka-topics.sh --zookeeper node1:2181 --create --replication-factor 2 --partitions 3 --topic flink_kafka2
//2.开启控制台生产者
/export/server/kafka/bin/kafka-console-producer.sh --broker-list node1:9092 --topic flink_kafka1
//3.开启控制台消费者
/export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic flink_kafka2
import org.apache.commons.lang3.SystemUtils;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper;
import org.apache.flink.util.Collector;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.TimeUnit;
/**
* Author
* Desc 演示Flink的EndToEnd_Exactly_Once
* 需求:
* kafka主题flink-kafka1 --->Flink Source -->Flink-Transformation做WordCount-->结果存储到kafka主题-flink-kafka2
*/
public class Flink_Kafka_EndToEnd_Exactly_Once {
public static void main(String[] args) throws Exception {
//TODO 0.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
//开启Checkpoint
//===========类型1:必须参数=============
//设置Checkpoint的时间间隔为1000ms做一次Checkpoint/其实就是每隔1000ms发一次Barrier!
env.enableCheckpointing(1000);
if (SystemUtils.IS_OS_WINDOWS) {
env.setStateBackend(new FsStateBackend("file:///D:/ckp"));
} else {
env.setStateBackend(new FsStateBackend("hdfs://node1:8020/flink-checkpoint/checkpoint"));
}
//===========类型2:建议参数===========
//设置两个Checkpoint 之间最少等待时间,如设置Checkpoint之间最少是要等 500ms(为了避免每隔1000ms做一次Checkpoint的时候,前一次太慢和后一次重叠到一起去了)
//如:高速公路上,每隔1s关口放行一辆车,但是规定了两车之前的最小车距为500m
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);//默认是0
//设置如果在做Checkpoint过程中出现错误,是否让整体任务失败:true是 false不是
//env.getCheckpointConfig().setFailOnCheckpointingErrors(false);//默认是true
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(10);//默认值为0,表示不容忍任何检查点失败
//设置是否清理检查点,表示 Cancel 时是否需要保留当前的 Checkpoint,默认 Checkpoint会在作业被Cancel时被删除
//ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION:true,当作业被取消时,删除外部的checkpoint(默认值)
//ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:false,当作业被取消时,保留外部的checkpoint
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//===========类型3:直接使用默认的即可===============
//设置checkpoint的执行模式为EXACTLY_ONCE(默认)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//设置checkpoint的超时时间,如果 Checkpoint在 60s内尚未完成说明该次Checkpoint失败,则丢弃。
env.getCheckpointConfig().setCheckpointTimeout(60000);//默认10分钟
//设置同一时间有多少个checkpoint可以同时执行
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);//默认为1
//TODO ===配置重启策略:
//1.配置了Checkpoint的情况下不做任务配置:默认是无限重启并自动恢复,可以解决小问题,但是可能会隐藏真正的bug
//2.单独配置无重启策略
//env.setRestartStrategy(RestartStrategies.noRestart());
//3.固定延迟重启--开发中常用
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3, // 最多重启3次数
Time.of(5, TimeUnit.SECONDS) // 重启时间间隔
));
//上面的设置表示:如果job失败,重启3次, 每次间隔5s
//4.失败率重启--开发中偶尔使用
/*env.setRestartStrategy(RestartStrategies.failureRateRestart(
3, // 每个测量阶段内最大失败次数
Time.of(1, TimeUnit.MINUTES), //失败率测量的时间间隔
Time.of(3, TimeUnit.SECONDS) // 两次连续重启的时间间隔
));*/
//上面的设置表示:如果1分钟内job失败不超过三次,自动重启,每次重启间隔3s (如果1分钟内程序失败达到3次,则程序退出)
//TODO 1.source-主题:flink-kafka1
//准备kafka连接参数
Properties props1 = new Properties();
props1.setProperty("bootstrap.servers", "node1:9092");//集群地址
props1.setProperty("group.id", "flink");//消费者组id
props1.setProperty("auto.offset.reset", "latest");//latest有offset记录从记录位置开始消费,没有记录从最新的/最后的消息开始消费 /earliest有offset记录从记录位置开始消费,没有记录从最早的/最开始的消息开始消费
props1.setProperty("flink.partition-discovery.interval-millis", "5000");//会开启一个后台线程每隔5s检测一下Kafka的分区情况,实现动态分区检测
//props1.setProperty("enable.auto.commit", "true");//自动提交(提交到默认主题,后续学习了Checkpoint后随着Checkpoint存储在Checkpoint和默认主题中)
//props1.setProperty("auto.commit.interval.ms", "2000");//自动提交的时间间隔
//使用连接参数创建FlinkKafkaConsumer/kafkaSource
//FlinkKafkaConsumer里面已经实现了offset的Checkpoint维护!
FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<String>("flink_kafka1", new SimpleStringSchema(), props1);
kafkaSource.setCommitOffsetsOnCheckpoints(true);//默认就是true//在做Checkpoint的时候提交offset到Checkpoint(为容错)和默认主题(为了外部工具获取)中
//使用kafkaSource
DataStream<String> kafkaDS = env.addSource(kafkaSource);
//TODO 2.transformation-做WordCount
SingleOutputStreamOperator<String> result = kafkaDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
private Random ran = new Random();
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] arr = value.split(" ");
for (String word : arr) {
int num = ran.nextInt(5);
if(num > 3){
System.out.println("随机异常产生了");
throw new Exception("随机异常产生了");
}
out.collect(Tuple2.of(word, 1));
}
}
}).keyBy(t -> t.f0)
.sum(1)
.map(new MapFunction<Tuple2<String, Integer>, String>() {
@Override
public String map(Tuple2<String, Integer> value) throws Exception {
return value.f0 + ":" + value.f1;
}
});
//TODO 3.sink-主题:flink-kafka2
Properties props2 = new Properties();
props2.setProperty("bootstrap.servers", "node1:9092");
props2.setProperty("transaction.timeout.ms", "5000");
FlinkKafkaProducer<String> kafkaSink = new FlinkKafkaProducer<>(
"flink_kafka2", // target topic
new KeyedSerializationSchemaWrapper(new SimpleStringSchema()), // serialization schema
props2, // producer config
FlinkKafkaProducer.Semantic.EXACTLY_ONCE); // fault-tolerance
result.addSink(kafkaSink);
//TODO 4.execute
env.execute();
}
}Flink-异步IO-了解
原理
API
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/asyncio.html
注意: 如果要使用异步IO, 对应Client有一定要求:
1.该Client要支持发送异步请求,如vertx
2.如果Client不支持可以使用线程池来模拟异步请求
代码演示
DROP TABLE IF EXISTS `t_category`;
CREATE TABLE `t_category` (
`id` int(11) NOT NULL,
`name` varchar(255) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
-- ----------------------------
-- Records of t_category
-- ----------------------------
INSERT INTO `t_category` VALUES ('1', '手机');
INSERT INTO `t_category` VALUES ('2', '电脑');
INSERT INTO `t_category` VALUES ('3', '服装');
INSERT INTO `t_category` VALUES ('4', '化妆品');
INSERT INTO `t_category` VALUES ('5', '食品');
import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.jdbc.JDBCClient;
import io.vertx.ext.sql.SQLClient;
import io.vertx.ext.sql.SQLConnection;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import java.sql.*;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* 使用异步io的先决条件
* 1.数据库(或key/value存储)提供支持异步请求的client。
* 2.没有异步请求客户端的话也可以将同步客户端丢到线程池中执行作为异步客户端。
*/
public class ASyncIODemo {
public static void main(String[] args) throws Exception {
//1.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//2.Source
//数据源中只有id
//DataStreamSource[1,2,3,4,5]
DataStreamSource<CategoryInfo> categoryDS = env.addSource(new RichSourceFunction<CategoryInfo>() {
private Boolean flag = true;
@Override
public void run(SourceContext<CategoryInfo> ctx) throws Exception {
Integer[] ids = {1, 2, 3, 4, 5};
for (Integer id : ids) {
ctx.collect(new CategoryInfo(id, null));
}
}
@Override
public void cancel() {
this.flag = false;
}
});
//3.Transformation
//方式一:Java-vertx中提供的异步client实现异步IO
//unorderedWait无序等待
SingleOutputStreamOperator<CategoryInfo> result1 = AsyncDataStream
.unorderedWait(categoryDS, new ASyncIOFunction1(), 1000, TimeUnit.SECONDS, 10);
//方式二:MySQL中同步client+线程池模拟异步IO
//unorderedWait无序等待
SingleOutputStreamOperator<CategoryInfo> result2 = AsyncDataStream
.unorderedWait(categoryDS, new ASyncIOFunction2(), 1000, TimeUnit.SECONDS, 10);
//4.Sink
result1.print("方式一:Java-vertx中提供的异步client实现异步IO \n");
result2.print("方式二:MySQL中同步client+线程池模拟异步IO \n");
//5.execute
env.execute();
}
}
@Data
@NoArgsConstructor
@AllArgsConstructor
class CategoryInfo {
private Integer id;
private String name;
}
//MySQL本身的客户端-需要把它变成支持异步的客户端:使用vertx或线程池
class MysqlSyncClient {
private static transient Connection connection;
private static final String JDBC_DRIVER = "com.mysql.jdbc.Driver";
private static final String URL = "jdbc:mysql://localhost:3306/bigdata";
private static final String USER = "root";
private static final String PASSWORD = "root";
static {
init();
}
private static void init() {
try {
Class.forName(JDBC_DRIVER);
} catch (ClassNotFoundException e) {
System.out.println("Driver not found!" + e.getMessage());
}
try {
connection = DriverManager.getConnection(URL, USER, PASSWORD);
} catch (SQLException e) {
System.out.println("init connection failed!" + e.getMessage());
}
}
public void close() {
try {
if (connection != null) {
connection.close();
}
} catch (SQLException e) {
System.out.println("close connection failed!" + e.getMessage());
}
}
public CategoryInfo query(CategoryInfo category) {
try {
String sql = "select id,name from t_category where id = "+ category.getId();
Statement statement = connection.createStatement();
ResultSet rs = statement.executeQuery(sql);
if (rs != null && rs.next()) {
category.setName(rs.getString("name"));
}
} catch (SQLException e) {
System.out.println("query failed!" + e.getMessage());
}
return category;
}
}
/**
* 方式一:Java-vertx中提供的异步client实现异步IO
*/
class ASyncIOFunction1 extends RichAsyncFunction<CategoryInfo, CategoryInfo> {
private transient SQLClient mySQLClient;
@Override
public void open(Configuration parameters) throws Exception {
JsonObject mySQLClientConfig = new JsonObject();
mySQLClientConfig
.put("driver_class", "com.mysql.jdbc.Driver")
.put("url", "jdbc:mysql://localhost:3306/bigdata")
.put("user", "root")
.put("password", "root")
.put("max_pool_size", 20);
VertxOptions options = new VertxOptions();
options.setEventLoopPoolSize(10);
options.setWorkerPoolSize(20);
Vertx vertx = Vertx.vertx(options);
//根据上面的配置参数获取异步请求客户端
mySQLClient = JDBCClient.createNonShared(vertx, mySQLClientConfig);
}
//使用异步客户端发送异步请求
@Override
public void asyncInvoke(CategoryInfo input, ResultFuture<CategoryInfo> resultFuture) throws Exception {
mySQLClient.getConnection(new Handler<AsyncResult<SQLConnection>>() {
@Override
public void handle(AsyncResult<SQLConnection> sqlConnectionAsyncResult) {
if (sqlConnectionAsyncResult.failed()) {
return;
}
SQLConnection connection = sqlConnectionAsyncResult.result();
connection.query("select id,name from t_category where id = " +input.getId(), new Handler<AsyncResult<io.vertx.ext.sql.ResultSet>>() {
@Override
public void handle(AsyncResult<io.vertx.ext.sql.ResultSet> resultSetAsyncResult) {
if (resultSetAsyncResult.succeeded()) {
List<JsonObject> rows = resultSetAsyncResult.result().getRows();
for (JsonObject jsonObject : rows) {
CategoryInfo categoryInfo = new CategoryInfo(jsonObject.getInteger("id"), jsonObject.getString("name"));
resultFuture.complete(Collections.singletonList(categoryInfo));
}
}
}
});
}
});
}
@Override
public void close() throws Exception {
mySQLClient.close();
}
@Override
public void timeout(CategoryInfo input, ResultFuture<CategoryInfo> resultFuture) throws Exception {
System.out.println("async call time out!");
input.setName("未知");
resultFuture.complete(Collections.singleton(input));
}
}
/**
* 方式二:同步调用+线程池模拟异步IO
*/
class ASyncIOFunction2 extends RichAsyncFunction<CategoryInfo, CategoryInfo> {
private transient MysqlSyncClient client;
private ExecutorService executorService;//线程池
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
client = new MysqlSyncClient();
executorService = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
}
//异步发送请求
@Override
public void asyncInvoke(CategoryInfo input, ResultFuture<CategoryInfo> resultFuture) throws Exception {
executorService.execute(new Runnable() {
@Override
public void run() {
resultFuture.complete(Collections.singletonList((CategoryInfo) client.query(input)));
}
});
}
@Override
public void close() throws Exception {
}
@Override
public void timeout(CategoryInfo input, ResultFuture<CategoryInfo> resultFuture) throws Exception {
System.out.println("async call time out!");
input.setName("未知");
resultFuture.complete(Collections.singleton(input));
}
}Flink-Streaming Flie Sink(新版本弃用,整合到File sink)
介绍
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/connectors/streamfile_sink.html
https://blog.csdn.net/u013220482/article/details/100901471
代码演示
import org.apache.commons.lang3.SystemUtils;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.util.Collector;
import java.util.concurrent.TimeUnit;
/**
* Author
* Desc 演示Flink StreamingFileSink将流式数据写入到HDFS 数据一致性由Checkpoint + 两阶段提交保证
*/
public class StreamingFileSinkDemo {
public static void main(String[] args) throws Exception {
//TODO 0.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
//开启Checkpoint
//===========类型1:必须参数=============
//设置Checkpoint的时间间隔为1000ms做一次Checkpoint/其实就是每隔1000ms发一次Barrier!
env.enableCheckpointing(1000);
if (SystemUtils.IS_OS_WINDOWS) {
env.setStateBackend(new FsStateBackend("file:///D:/ckp"));
} else {
env.setStateBackend(new FsStateBackend("hdfs://node1:8020/flink-checkpoint/checkpoint"));
}
//===========类型2:建议参数===========
//设置两个Checkpoint 之间最少等待时间,如设置Checkpoint之间最少是要等 500ms(为了避免每隔1000ms做一次Checkpoint的时候,前一次太慢和后一次重叠到一起去了)
//如:高速公路上,每隔1s关口放行一辆车,但是规定了两车之前的最小车距为500m
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);//默认是0
//设置如果在做Checkpoint过程中出现错误,是否让整体任务失败:true是 false不是
//env.getCheckpointConfig().setFailOnCheckpointingErrors(false);//默认是true
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(10);//默认值为0,表示不容忍任何检查点失败
//设置是否清理检查点,表示 Cancel 时是否需要保留当前的 Checkpoint,默认 Checkpoint会在作业被Cancel时被删除
//ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION:true,当作业被取消时,删除外部的checkpoint(默认值)
//ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:false,当作业被取消时,保留外部的checkpoint
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//===========类型3:直接使用默认的即可===============
//设置checkpoint的执行模式为EXACTLY_ONCE(默认)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//设置checkpoint的超时时间,如果 Checkpoint在 60s内尚未完成说明该次Checkpoint失败,则丢弃。
env.getCheckpointConfig().setCheckpointTimeout(60000);//默认10分钟
//设置同一时间有多少个checkpoint可以同时执行
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);//默认为1
//TODO 1.source
DataStream<String> lines = env.socketTextStream("node1", 9999);
//TODO 2.transformation
//注意:下面的操作将上面的2步合成了1步,直接切割单词并记为1返回
SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] arr = value.split(" ");
for (String word : arr) {
out.collect(Tuple2.of(word, 1));
}
}
});
SingleOutputStreamOperator<String> result = wordAndOne.keyBy(t -> t.f0).sum(1)
.map(new MapFunction<Tuple2<String, Integer>, String>() {
@Override
public String map(Tuple2<String, Integer> value) throws Exception {
return value.f0 + ":" + value.f1;
}
});
//TODO 3.sink
result.print();
//使用StreamingFileSink将数据sink到HDFS
OutputFileConfig config = OutputFileConfig
.builder()
.withPartPrefix("prefix")//设置文件前缀
.withPartSuffix(".txt")//设置文件后缀
.build();
StreamingFileSink<String> streamingFileSink = StreamingFileSink.
forRowFormat(new Path("hdfs://node1:8020/FlinkStreamFileSink/parquet"), new SimpleStringEncoder<String>("UTF-8"))
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(TimeUnit.MINUTES.toMillis(15))//每隔15分钟生成一个新文件
.withInactivityInterval(TimeUnit.MINUTES.toMillis(5))//每隔5分钟没有新数据到来,也把之前的生成一个新文件
.withMaxPartSize(1024 * 1024 * 1024)
.build())
.withOutputFileConfig(config)
.build();
result.addSink(streamingFileSink);
//TODO 4.execute
env.execute();
}
}Flink-高级特性-Flie Sink
import org.apache.commons.lang3.SystemUtils;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.util.Collector;
import java.util.concurrent.TimeUnit;
/**
* Author
* Desc 演示Flink FileSink将批/流式数据写入到HDFS 数据一致性由Checkpoint + 两阶段提交保证
*/
public class FileSinkDemo {
public static void main(String[] args) throws Exception {
//TODO 0.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
//开启Checkpoint
//===========类型1:必须参数=============
//设置Checkpoint的时间间隔为1000ms做一次Checkpoint/其实就是每隔1000ms发一次Barrier!
env.enableCheckpointing(1000);
if (SystemUtils.IS_OS_WINDOWS) {
env.setStateBackend(new FsStateBackend("file:///D:/ckp"));
} else {
env.setStateBackend(new FsStateBackend("hdfs://node1:8020/flink-checkpoint/checkpoint"));
}
//===========类型2:建议参数===========
//设置两个Checkpoint 之间最少等待时间,如设置Checkpoint之间最少是要等 500ms(为了避免每隔1000ms做一次Checkpoint的时候,前一次太慢和后一次重叠到一起去了)
//如:高速公路上,每隔1s关口放行一辆车,但是规定了两车之前的最小车距为500m
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);//默认是0
//设置如果在做Checkpoint过程中出现错误,是否让整体任务失败:true是 false不是
//env.getCheckpointConfig().setFailOnCheckpointingErrors(false);//默认是true
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(10);//默认值为0,表示不容忍任何检查点失败
//设置是否清理检查点,表示 Cancel 时是否需要保留当前的 Checkpoint,默认 Checkpoint会在作业被Cancel时被删除
//ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION:true,当作业被取消时,删除外部的checkpoint(默认值)
//ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:false,当作业被取消时,保留外部的checkpoint
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//===========类型3:直接使用默认的即可===============
//设置checkpoint的执行模式为EXACTLY_ONCE(默认)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//设置checkpoint的超时时间,如果 Checkpoint在 60s内尚未完成说明该次Checkpoint失败,则丢弃。
env.getCheckpointConfig().setCheckpointTimeout(60000);//默认10分钟
//设置同一时间有多少个checkpoint可以同时执行
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);//默认为1
//TODO 1.source
DataStream<String> lines = env.socketTextStream("node1", 9999);
//TODO 2.transformation
//注意:下面的操作将上面的2步合成了1步,直接切割单词并记为1返回
SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] arr = value.split(" ");
for (String word : arr) {
out.collect(Tuple2.of(word, 1));
}
}
});
SingleOutputStreamOperator<String> result = wordAndOne.keyBy(t -> t.f0).sum(1)
.map(new MapFunction<Tuple2<String, Integer>, String>() {
@Override
public String map(Tuple2<String, Integer> value) throws Exception {
return value.f0 + ":" + value.f1;
}
});
//TODO 3.sink
result.print();
//使用FileSink将数据sink到HDFS
OutputFileConfig config = OutputFileConfig
.builder()
.withPartPrefix("prefix")
.withPartSuffix(".txt")
.build();
FileSink<String> sink = FileSink
.forRowFormat(new Path("hdfs://node1:8020/FlinkFileSink/parquet"), new SimpleStringEncoder<String>("UTF-8"))
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
.withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
.withMaxPartSize(1024 * 1024 * 1024)
.build())
.withOutputFileConfig(config)
.withBucketAssigner(new DateTimeBucketAssigner("yyyy-MM-dd--HH"))
.build();
result.sinkTo(sink);
//TODO 4.execute
env.execute();
}
}Flink监控
https://blog.lovedata.net/8156c1e1.html
什么是Metrics
Metrics分类
代码
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
/**
* Author
* Desc 演示Flink-Metrics监控
* 在Map算子中提供一个Counter,统计map处理的数据条数,运行之后再WebUI上进行监控
*/
public class MetricsDemo {
public static void main(String[] args) throws Exception {
//TODO 0.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
//TODO 1.source
DataStream<String> lines = env.socketTextStream("node1", 9999);
//TODO 2.transformation
SingleOutputStreamOperator<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
String[] arr = value.split(" ");
for (String word : arr) {
out.collect(word);
}
}
});
SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = words
.map(new RichMapFunction<String, Tuple2<String, Integer>>() {
Counter myCounter;//用来记录map处理了多少个单词
//对Counter进行初始化
@Override
public void open(Configuration parameters) throws Exception {
myCounter = getRuntimeContext().getMetricGroup().addGroup("myGroup").counter("myCounter");
}
//处理单词,将单词记为(单词,1)
@Override
public Tuple2<String, Integer> map(String value) throws Exception {
myCounter.inc();//计数器+1
return Tuple2.of(value, 1);
}
});
SingleOutputStreamOperator<Tuple2<String, Integer>> result = wordAndOne.keyBy(t -> t.f0).sum(1);
//TODO 3.sink
result.print();
//TODO 4.execute
env.execute();
}
}
// /export/server/flink/bin/yarn-session.sh -n 2 -tm 800 -s 1 -d
// /export/server/flink/bin/flink run --class cn.metrics.MetricsDemo /root/metrics.jar
// 查看WebUI提交观察UI
1.打包
2.提交到Yarn上运行
3.查看监控指标
4.也可以通过浏览器f12的找到url发送请求获取监控信息
5.也可以通过代码发送请求获取监控信息
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.URL;
import java.net.URLConnection;
public class MetricsTest {
public static void main(String[] args) {
//String result = sendGet("http://node1:8088/proxy/application_1609508087977_0010/jobs/558a5a3016661f1d732228330ebfaad5/vertices/cbc357ccb763df2852fee8c4fc7d55f2/metrics?get=0.Map.myGroup.myCounter");
String result = sendGet("http://node1:8088/proxy/application_1609508087977_0010/jobs/558a5a3016661f1d732228330ebfaad5");
System.out.println(result);
}
public static String sendGet(String url) {
String result = "";
BufferedReader in = null;
try {
String urlNameString = url;
URL realUrl = new URL(urlNameString);
URLConnection connection = realUrl.openConnection();
// 设置通用的请求属性
connection.setRequestProperty("accept", "*/*");
connection.setRequestProperty("connection", "Keep-Alive");
connection.setRequestProperty("user-agent", "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1;SV1)");
// 建立实际的连接
connection.connect();
in = new BufferedReader(new InputStreamReader(connection.getInputStream()));
String line;
while ((line = in.readLine()) != null) {
result += line;
}
} catch (Exception e) {
System.out.println("发送GET请求出现异常!" + e);
e.printStackTrace();
}
// 使用finally块来关闭输入流
finally {
try {
if (in != null) {
in.close();
}
} catch (Exception e2) {
e2.printStackTrace();
}
}
return result;
}
}6.也可以整合三方工具对flink进行监控
https://blog.lovedata.net/8156c1e1.html
Flink内存管理
#--—提交参数———
/export/server/flink/bin/flink run -m yarn-cluster -yjm 1024 -ytm 1024 -p 6 -ys 2 streaming.jar -c dictionary_utils.OneDictForExecutorFlink性能优化
问题定位口诀
一压二查三指标,延迟吞吐是核心。
时刻关注资源量,排查首先看GC。
口诀解析
常见性能问题
1.序列化和反序列化
2.数据倾斜
3.频繁gc
4.外部系统
5.大窗口
经典场景调优
数据去重
数据倾斜
内存调优
实时场景常见问题排查
1.数据延迟:(本身有动态背压机制credit-base,但是业务不接受延迟太久,还是会手动处理,重启。)
①确认本身的处理耗时情况,如果本身处理耗时不高,延迟高,那就是上游问题;
①本身耗时高:
是否访问量比平时大了,如果是则增加并发;
是否有访问外部系统,热点数据导致的部分线程处理速度变慢;--线程池和cache
2.数据倾斜:
①rebalance
②窗口计算有倾斜,可以加随机前缀分成两段聚合;
②使用水印不触发,也会导致数据堆积
3.内存溢出
改用G1垃圾回收器,和jvm调优。 窗口数据高峰时溢出,从数据结构、存储方式、压缩上面再考虑。
rocksdb的优化:block size和block cache size可调大,compaction的size是合并大小可调大,flush和compaction后台线程数可调大,关注性能,可关闭压缩。
1.复用对象
stream
.apply(new WindowFunction<WikipediaEditEvent, Tuple2<String, Long>, String, TimeWindow>() {
@Override
public void apply(String userName, TimeWindow timeWindow, Iterable<WikipediaEditEvent> iterable, Collector<Tuple2<String, Long>> collector) throws Exception {
long changesCount = ...
// A new Tuple instance is created on every execution
collector.collect(new Tuple2<>(userName, changesCount));
}
}上面的代码可以优化为下面的代码:
可以避免Tuple2的重复创建
stream
.apply(new WindowFunction<WikipediaEditEvent, Tuple2<String, Long>, String, TimeWindow>() {
// Create an instance that we will reuse on every call
private Tuple2<String, Long> result = new Tuple<>();
@Override
public void apply(String userName, TimeWindow timeWindow, Iterable<WikipediaEditEvent> iterable, Collector<Tuple2<String, Long>> collector) throws Exception {
long changesCount = ...
// Set fields on an existing object instead of creating a new one
result.f0 = userName;
// Auto-boxing!! A new Long value may be created
result.f1 = changesCount;
// Reuse the same Tuple2 object
collector.collect(result);
}
}Flink Table&SQL
为什么需要Table&SQL
发展历史
两种Table planners
1.旧的planner( 创建环境时useOldPlanner)
2.Blink planner (创建环境时useBlinkPlanner(),新版本默认)
- 依赖
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/
程序结构
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/common.html
创建环境
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/common.html
创建表
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/common.html
- 查询
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/common.html
整合DataStream
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/common.html
核心概念
动态表/无界表
连续查询/需要借助State
Flink Table & SQL 基本操作
案例1
将DataStream数据转Table和View然后使用sql进行统计查询
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import java.util.Arrays;
import static org.apache.flink.table.api.Expressions.$;
/**
* Author
* Desc 演示Flink Table&SQL 案例- 将DataStream数据转Table和View然后使用sql进行统计查询
*/
public class Demo01 {
public static void main(String[] args) throws Exception {
//TODO 0.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env, settings);
//TODO 1.source
DataStream<Order> orderA = env.fromCollection(Arrays.asList(
new Order(1L, "beer", 3),
new Order(1L, "diaper", 4),
new Order(3L, "rubber", 2)));
DataStream<Order> orderB = env.fromCollection(Arrays.asList(
new Order(2L, "pen", 3),
new Order(2L, "rubber", 3),
new Order(4L, "beer", 1)));
//TODO 2.transformation
// 将DataStream数据转Table和View,然后查询
Table tableA = tenv.fromDataStream(orderA, $("user"), $("product"), $("amount"));
tableA.printSchema();
System.out.println(tableA);
tenv.createTemporaryView("tableB", orderB, $("user"), $("product"), $("amount"));
//查询:tableA中amount>2的和tableB中amount>1的数据最后合并
/*
select * from tableA where amount > 2
union
select * from tableB where amount > 1
*/
String sql = "select * from "+tableA+" where amount > 2 \n" +
"union \n" +
" select * from tableB where amount > 1";
Table resultTable = tenv.sqlQuery(sql);
resultTable.printSchema();
System.out.println(resultTable);//UnnamedTable$1
//将Table转为DataStream
//DataStream<Order> resultDS = tenv.toAppendStream(resultTable, Order.class);//union all使用toAppendStream
DataStream<Tuple2<Boolean, Order>> resultDS = tenv.toRetractStream(resultTable, Order.class);//union使用toRetractStream
//toAppendStream → 将计算后的数据append到结果DataStream中去
//toRetractStream → 将计算后的新的数据在DataStream原数据的基础上更新true或是删除false
//类似StructuredStreaming中的append/update/complete
//TODO 3.sink
resultDS.print();
//TODO 4.execute
env.execute();
}
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class Order {
public Long user;
public String product;
public int amount;
}
}案例2
使用Table/DSL风格和SQL风格完成WordCount
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import static org.apache.flink.table.api.Expressions.$;
/**
* Author
* Desc 演示Flink Table&SQL 案例- 使用SQL和Table两种方式做WordCount
*/
public class Demo02 {
public static void main(String[] args) throws Exception {
//TODO 0.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env, settings);
//TODO 1.source
DataStream<WC> wordsDS = env.fromElements(
new WC("Hello", 1),
new WC("World", 1),
new WC("Hello", 1)
);
//TODO 2.transformation
//将DataStream转为View或Table
tenv.createTemporaryView("t_words", wordsDS,$("word"), $("frequency"));
/*
select word,sum(frequency) as frequency
from t_words
group by word
*/
String sql = "select word,sum(frequency) as frequency\n " +
"from t_words\n " +
"group by word";
//执行sql
Table resultTable = tenv.sqlQuery(sql);
//转为DataStream
DataStream<Tuple2<Boolean, WC>> resultDS = tenv.toRetractStream(resultTable, WC.class);
//toAppendStream → 将计算后的数据append到结果DataStream中去
//toRetractStream → 将计算后的新的数据在DataStream原数据的基础上更新true或是删除false
//类似StructuredStreaming中的append/update/complete
//TODO 3.sink
resultDS.print();
//new WC("Hello", 1),
//new WC("World", 1),
//new WC("Hello", 1)
//输出结果
//(true,Demo02.WC(word=Hello, frequency=1))
//(true,Demo02.WC(word=World, frequency=1))
//(false,Demo02.WC(word=Hello, frequency=1))
//(true,Demo02.WC(word=Hello, frequency=2))
//TODO 4.execute
env.execute();
}
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class WC {
public String word;
public long frequency;
}
}import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import static org.apache.flink.table.api.Expressions.$;
/**
* Author
* Desc 演示Flink Table&SQL 案例- 使用SQL和Table两种方式做WordCount
*/
public class Demo02_2 {
public static void main(String[] args) throws Exception {
//TODO 0.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env, settings);
//TODO 1.source
DataStream<WC> wordsDS = env.fromElements(
new WC("Hello", 1),
new WC("World", 1),
new WC("Hello", 1)
);
//TODO 2.transformation
//将DataStream转为View或Table
Table table = tenv.fromDataStream(wordsDS);
//使用table风格查询/DSL
Table resultTable = table
.groupBy($("word"))
.select($("word"), $("frequency").sum().as("frequency"))
.filter($("frequency").isEqual(2));
//转换为DataStream
DataStream<Tuple2<Boolean, WC>> resultDS = tenv.toRetractStream(resultTable, WC.class);
//TODO 3.sink
resultDS.print();
//TODO 4.execute
env.execute();
}
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class WC {
public String word;
public long frequency;
}
}案例3
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import java.time.Duration;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import static org.apache.flink.table.api.Expressions.$;
/**
* Author
* Desc 演示Flink Table&SQL 案例- 使用事件时间+Watermaker+window完成订单统计
*/
public class Demo03 {
public static void main(String[] args) throws Exception {
//TODO 0.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env, settings);
//TODO 1.source
DataStreamSource<Order> orderDS = env.addSource(new RichSourceFunction<Order>() {
private Boolean isRunning = true;
@Override
public void run(SourceContext<Order> ctx) throws Exception {
Random random = new Random();
while (isRunning) {
Order order = new Order(UUID.randomUUID().toString(), random.nextInt(3), random.nextInt(101), System.currentTimeMillis());
TimeUnit.SECONDS.sleep(1);
ctx.collect(order);
}
}
@Override
public void cancel() {
isRunning = false;
}
});
//TODO 2.transformation
//需求:事件时间+Watermarker+FlinkSQL和Table的window完成订单统计
DataStream<Order> orderDSWithWatermark = orderDS.assignTimestampsAndWatermarks(WatermarkStrategy.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((order, recordTimestamp) -> order.getCreateTime())
);
//将DataStream-->View/Table,注意:指定列的时候需要指定哪一列是时间
tenv.createTemporaryView("t_order",orderDSWithWatermark,$("orderId"), $("userId"), $("money"), $("createTime").rowtime());
/*
select userId, count(orderId) as orderCount, max(money) as maxMoney,min(money) as minMoney
from t_order
group by userId,
tumble(createTime, INTERVAL '5' SECOND)
*/
String sql = "select userId, count(orderId) as orderCount, max(money) as maxMoney,min(money) as minMoney\n " +
"from t_order\n " +
"group by userId,\n " +
"tumble(createTime, INTERVAL '5' SECOND)";
//执行sql
Table resultTable = tenv.sqlQuery(sql);
DataStream<Tuple2<Boolean, Row>> resultDS = tenv.toRetractStream(resultTable, Row.class);
//TODO 3.sink
resultDS.print();
//TODO 4.execute
env.execute();
}
@Data
@AllArgsConstructor
@NoArgsConstructor
public static class Order {
private String orderId;
private Integer userId;
private Integer money;
private Long createTime;//事件时间
}
}
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.Tumble;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import java.time.Duration;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.lit;
/**
* Author
* Desc 演示Flink Table&SQL 案例- 使用事件时间+Watermaker+window完成订单统计-Table风格
*/
public class Demo03_2 {
public static void main(String[] args) throws Exception {
//TODO 0.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tblEnv = StreamTableEnvironment.create(env);
//TODO 1.source
DataStreamSource<Order> orderDS = env.addSource(new RichSourceFunction<Order>() {
private Boolean isRunning = true;
@Override
public void run(SourceContext<Order> ctx) throws Exception {
Random random = new Random();
while (isRunning) {
Order order = new Order(UUID.randomUUID().toString(), random.nextInt(3), random.nextInt(101), System.currentTimeMillis());
TimeUnit.SECONDS.sleep(1);
ctx.collect(order);
}
}
@Override
public void cancel() {
isRunning = false;
}
});
//TODO 2.transformation
//需求:事件时间+Watermarker+FlinkSQL和Table的window完成订单统计
DataStream<Order> orderDSWithWatermark = orderDS.assignTimestampsAndWatermarks(WatermarkStrategy.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((order, recordTimestamp) -> order.getCreateTime())
);
//将DataStream-->View/Table,注意:指定列的时候需要指定哪一列是时间
tenv.createTemporaryView("t_order",orderDSWithWatermark,$("orderId"), $("userId"), $("money"), $("createTime").rowtime());
//Table table = tenv.fromDataStream(orderDSWithWatermark, $("orderId"), $("userId"), $("money"), $("createTime").rowtime());
//table.groupBy().select();
/*
select userId, count(orderId) as orderCount, max(money) as maxMoney,min(money) as minMoney
from t_order
group by userId,
tumble(createTime, INTERVAL '5' SECOND)
*/
Table resultTable = tenv.from("t_order")
.window(Tumble.over(lit(5).second())
.on($("createTime"))
.as("
"))
.groupBy($("tumbleWindow"), $("userId"))
.select(
$("userId"),
$("orderId").count().as("orderCount"),
$("money").max().as("maxMoney"),
$("money").min().as("minMoney")
);
DataStream<Tuple2<Boolean, Row>> resultDS = tenv.toRetractStream(resultTable, Row.class);
//TODO 3.sink
resultDS.print();
//TODO 4.execute
env.execute();
}
@Data
@AllArgsConstructor
@NoArgsConstructor
public static class Order {
private String orderId;
private Integer userId;
private Integer money;
private Long createTime;//事件时间
}
}Flink SQL空闲状态保留时间(idle state retention time)
Flink SQL新手有可能犯的错误,笔者认为其中之一就是忘记设置空闲状态保留时间导致状态爆炸。
为什么要设置 如果我们在数据流上进行分组查询,分组处理产生的结果(不仅仅是聚合结果)会作为中间状态存储下来。随着分组key的不断增加,状态自然也会不断膨胀。但是这些状态数据基本都有时效性,不必永久保留。例如,使用Top-N语法进行去重,重复数据的出现一般都位于特定区间内(例如一小时或一天内),过了这段时间之后,对应的状态就不再需要了。Flink SQL提供的idle state retention time特性可以保证当状态中某个key对应的数据未更新的时间达到阈值时,该条状态被自动清理。设置方法是:
tbEnv.getConfig().setIdleStateRetention(Duration.ofDays(1));注意setIdleStateRetentionTime()方法需要传入两个参数:状态的最小保留时间minRetentionTime和最大保留时间maxRetentionTime(根据实际业务决定),且两者至少相差5分钟。如果minRetentionTime和maxRetentionTime的间隔设置太小,就会比较频繁地产生Timer与更新ValueState,维护Timer的成本会变大
Flink-高级特性-新特性-FlinkSQL整合Hiv
1.介绍
版本
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/hive/
添加依赖和jar包和配置
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hive_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-metastore</artifactId>
<version>2.1.0</version>
<exclusions>
<exclusion>
<artifactId>hadoop-hdfs</artifactId>
<groupId>org.apache.hadoop</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>2.1.0</version>
</dependency>上传资料hive中的jar包到flink/lib中
FlinkSQL整合Hive-CLI命令行整合
1.修改hive-site.xml
<property>
<name>hive.metastore.uris</name>
<value>thrift://node3:9083</value>
</property><?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>root</value>
</property>
<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>123456</value>
</property>
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:mysql://node3:3306/hive?createDatabaseIfNotExist=true&useSSL=false</value>
</property>
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>com.mysql.jdbc.Driver</value>
</property>
<property>
<name>hive.metastore.schema.verification</name>
<value>false</value>
</property>
<property>
<name>datanucleus.schema.autoCreateAll</name>
<value>true</value>
</property>
<property>
<name>hive.server2.thrift.bind.host</name>
<value>node3</value>
</property>
<property>
<name>hive.metastore.uris</name>
<value>thrift://node3:9083</value>
</property>
</configuration>2.启动元数据服务
nohup /export/server/hive/bin/hive --service metastore &
3.修改flink/conf/sql-client-defaults.yaml
catalogs:
- name: myhive
type: hive
hive-conf-dir: /export/server/hive/conf
default-database: default4.分发
5.启动flink集群
/export/server/flink/bin/start-cluster.sh
6.启动flink-sql客户端-hive在哪就在哪启
/export/server/flink/bin/sql-client.sh embedded
7.执行sql:
show catalogs;
use catalog myhive;
show tables;
select * from person;
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/hive/
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.catalog.hive.HiveCatalog;
/**
* Author
* Desc
*/
public class HiveDemo {
public static void main(String[] args){
//TODO 0.env
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
//TODO 指定hive的配置
String name = "myhive";
String defaultDatabase = "default";
String hiveConfDir = "./conf";
//TODO 根据配置创建hiveCatalog
HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir);
//注册catalog
tableEnv.registerCatalog("myhive", hive);
//使用注册的catalog
tableEnv.useCatalog("myhive");
//向Hive表中写入数据
String insertSQL = "insert into person select * from person";
TableResult result = tableEnv.executeSql(insertSQL);
System.out.println(result.getJobClient().get().getJobStatus());
}
}整合kafka和hive
案例4(掌握从datastream转sql,sql转datastream)
import org.apache.commons.lang3.time.FastDateFormat;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.types.Row;
import java.io.Serializable;
import java.sql.Timestamp;
public class KafkaToHive {
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
configuration.setString(RestOptions.BIND_PORT,"8081-8089"); //指定 Flink Web UI 端口为9091
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
StreamTableEnvironment tbEnv = StreamTableEnvironment.create(env,settings);
//写hive可以不用那么checkpoint可以设置大点,可是没到时间点也会ck,是根据数量?
env.enableCheckpointing(10*60*1000, CheckpointingMode.EXACTLY_ONCE);
env.setParallelism(3);
//socket->producer
SingleOutputStreamOperator<Order> socket = env.socketTextStream("node1", 9999).rebalance().map(new RichMapFunction<String, Order>() {
@Override
public Order map(String value) throws Exception {
String[] timeAndWord = value.split(",");
Timestamp timestamp = Timestamp.valueOf(timeAndWord[0]);
Order order = new Order();
order.setOrderId(timeAndWord[1]);
order.setMoney(1L);
order.setEvenTime(timestamp.getTime());
return order;
}
});
tbEnv.createTemporaryView("socket",socket);
tbEnv.executeSql("DROP TABLE IF EXISTS ODS");
TableResult odsResult = tbEnv.executeSql("CREATE TABLE ODS (\n" +
" `orderId` STRING,\n" +
" `userId` INT,\n" +
" `money` BIGINT,\n" +
" `evenTime` BIGINT\n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'KafkaWordCount',\n" +
" 'properties.bootstrap.servers' = 'node1:9092',\n" +
" 'format' = 'json',\n" +
" 'sink.partitioner' = 'round-robin'\n" +
") ");
String sql="insert into ODS select orderId,userId,money,evenTime from socket";
TableResult result = tbEnv.executeSql(sql);
//kafka->hive
tbEnv.executeSql("DROP TABLE IF EXISTS DW");
//默认is_generic'='false' 创建的
/**
* 可以让Flink使用Hive Catalog存储Flink SQL 元数据。
* 可以在Hive命令行中使用DESCRIBE FORMATTED命令查看表的元数据,
* 如果是is_generic=true代表是Flink专用表,这种表只能由Flink读写使用,不要用Hive去读写。
* 也可以直接使用Flink读写Hive表数据。
*/
TableResult dwResult = tbEnv.executeSql("CREATE TABLE DW (\n" +
" `orderId` STRING,\n" +
" `userId` INT,\n" +
" `money` BIGINT,\n" +
" `evenTime` BIGINT\n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'KafkaWordCount',\n" +
" 'properties.bootstrap.servers' = 'node1:9092',\n" +
" 'properties.group.id' = 'sink_to_hive',\n" +
" 'scan.startup.mode' = 'group-offsets',\n" +
" 'format' = 'json'\n" +
")");
//写hive
//TODO 指定hive的配置
String name = "myhive";
String defaultDatabase = "default";
String hiveConfDir = "E:\\BigData\\workspace\\Flink\\Flink_study\\src\\main\\resources";
//TODO 根据配置创建hiveCatalog,连接hive使用的是beeline thiftserver服务方式
HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir);
//注册catalog
tbEnv.registerCatalog("myhive", hive);
//使用注册的catalog
tbEnv.useCatalog("myhive");
//可以设定使用hive sql
tbEnv.executeSql("DROP TABLE IF EXISTS OrderTb");
TableResult OrderTb = tbEnv.executeSql("create table if not EXISTS OrderTb(" +
" `orderId` STRING,\n" +
" `userId` INT,\n" +
" `money` BIGINT,\n" +
" `evenTime` BIGINT\n" +
")WITH (\n" +
" 'is_generic'='false' )");
//Insert overwrite is not supported for streaming write.
tbEnv.executeSql("insert into OrderTb select orderId,userId,money,evenTime from default_catalog.default_database.DW");
Table table = tbEnv.sqlQuery("select * from default_catalog.default_database.DW ");
tbEnv.toRetractStream(table, TypeInformation.of(Row.class)).print();
env.execute();
}
public static class Order implements Serializable {
private String orderId;
private Integer userId;
private Long money;
private Long evenTime;
public Order(String orderId, Integer userId, Long money, Long evenTime) {
this.orderId = orderId;
this.userId = userId;
this.money = money;
this.evenTime = evenTime;
}
public Order() {
}
public String getOrderId() {
return orderId;
}
public void setOrderId(String orderId) {
this.orderId = orderId;
}
public Integer getUserId() {
return userId;
}
public void setUserId(Integer userId) {
this.userId = userId;
}
public Long getMoney() {
return money;
}
public void setMoney(Long money) {
this.money = money;
}
public Long getEvenTime() {
return evenTime;
}
public void setEvenTime(Long evenTime) {
this.evenTime = evenTime;
}
@Override
public String toString() {
FastDateFormat df = FastDateFormat.getInstance("HH:mm:ss");
return "Order{" +
"orderId='" + orderId + '\'' +
", userId=" + userId +
", money=" + money +
", evenTime=" + df.format(evenTime) +
'}';
}
}
}Flink-练习-双十一实时交易大屏-掌握
需 求
数据
/**
* 自定义数据源实时产生订单数据Tuple2<分类, 金额>
*/
public static class MySource implements SourceFunction<Tuple2<String, Double>> {
private boolean flag = true;
private String[] categorys = {"女装", "男装", "图书", "家电", "洗护", "美妆", "运动", "游戏", "户外", "家具", "乐器", "办公"};
private Random random = new Random();
@Override
public void run(SourceContext<Tuple2<String, Double>> ctx) throws Exception {
while (flag) {
//随机生成分类和金额
int index = random.nextInt(categorys.length);//[0~length) ==> [0~length-1]
String category = categorys[index];//获取的随机分类
double price = random.nextDouble() * 100;//注意nextDouble生成的是[0~1)之间的随机小数,*100之后表示[0~100)的随机小数
ctx.collect(Tuple2.of(category, price));
Thread.sleep(20);
}
}
@Override
public void cancel() {
flag = false;
}
\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\实现步骤
1.env
2.source
3.transformation--预聚合
3.1定义大小为一天的窗口,第二个参数表示中国使用的UTC+08:00时区比UTC时间早
keyBy(t->t.f0)
window(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8))
3.2定义一个1s的触发器
.trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(1)))
3.3聚合结果.aggregate(new PriceAggregate(), new WindowResult());
3.4看一下聚合的结果
CategoryPojo(category=男装, totalPrice=17225.26, dateTime=2020-10-20 08:04:12)
4.sink-使用上面预聚合的结果,实现业务需求:
tempAggResult.keyBy(CategoryPojo::getDateTime)
//每秒钟更新一次统计结果
.window(TumblingProcessingTimeWindows.of(Time.seconds(1)))
//在ProcessWindowFunction中实现该复杂业务逻辑
.process(new WindowResultProcess());
4.1.实时计算出当天零点截止到当前时间的销售总额
4.2.计算出各个分类的销售top3
4.3.每秒钟更新一次统计结果
5.execute代码实现
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.FastDateFormat;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.List;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.Random;
import java.util.stream.Collectors;
/**
* Author
* Desc
* 1.实时计算出当天零点截止到当前时间的销售总额 11月11日 00:00:00 ~ 23:59:59
* 2.计算出各个分类的销售top3
* 3.每秒钟更新一次统计结果
*/
public class DoubleElevenBigScreem {
public static void main(String[] args) throws Exception {
//TODO 1.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
env.setParallelism(1);//方便观察
//TODO 2.source
DataStream<Tuple2<String, Double>> orderDS = env.addSource(new MySource());
//TODO 3.transformation--初步聚合:每隔1s聚合一下截止到当前时间的各个分类的销售总金额
DataStream<CategoryPojo> tempAggResult = orderDS
//分组
.keyBy(t -> t.f0)
//如果直接使用之前学习的窗口按照下面的写法表示:
//表示每隔1天计算一次
//.window(TumblingProcessingTimeWindows.of(Time.days(1)));
//表示每隔1s计算最近一天的数据,但是11月11日 00:01:00运行计算的是: 11月10日 00:01:00~11月11日 00:01:00 ---不对!
//.window(SlidingProcessingTimeWindows.of(Time.days(1),Time.seconds(1)));
//*例如中国使用UTC+08:00,您需要一天大小的时间窗口,
//*窗口从当地时间的00:00:00开始,您可以使用{@code of(时间.天(1),时间.hours(-8))}.
//下面的代码表示从当天的00:00:00开始计算当天的数据,缺一个触发时机/触发间隔
//3.1定义大小为一天的窗口,第二个参数表示中国使用的UTC+08:00时区比UTC时间早
.window(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8)))
//3.2自定义触发时机/触发间隔
.trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(1)))
//.sum()//简单聚合
//3.3自定义聚合和结果收集
//aggregate(AggregateFunction<T, ACC, V> aggFunction,WindowFunction<V, R, K, W> windowFunction)
.aggregate(new PriceAggregate(), new WindowResult());//aggregate支持复杂的自定义聚合
//3.4看一下聚合的结果
tempAggResult.print("初步聚合的各个分类的销售总额");
//初步聚合的各个分类的销售总额> DoubleElevenBigScreem.CategoryPojo(category=游戏, totalPrice=563.8662504982619, dateTime=2021-01-19 10:31:40)
//初步聚合的各个分类的销售总额> DoubleElevenBigScreem.CategoryPojo(category=办公, totalPrice=876.5216500403918, dateTime=2021-01-19 10:31:40)
//TODO 4.sink-使用上面初步聚合的结果(每隔1s聚合一下截止到当前时间的各个分类的销售总金额),实现业务需求:
tempAggResult.keyBy(CategoryPojo::getDateTime)
.window(TumblingProcessingTimeWindows.of(Time.seconds(1)))//每隔1s进行最终的聚合并输出结果
//.sum//简单聚合
.process(new FinalResultWindowProcess());//在ProcessWindowFunction中实现该复杂业务逻辑
//TODO 5.execute
env.execute();
}
/**
* 自定义数据源实时产生订单数据Tuple2<分类, 金额>
*/
public static class MySource implements SourceFunction<Tuple2<String, Double>> {
private boolean flag = true;
private String[] categorys = {"女装", "男装", "图书", "家电", "洗护", "美妆", "运动", "游戏", "户外", "家具", "乐器", "办公"};
private Random random = new Random();
@Override
public void run(SourceContext<Tuple2<String, Double>> ctx) throws Exception {
while (flag) {
//随机生成分类和金额
int index = random.nextInt(categorys.length);//[0~length) ==> [0~length-1]
String category = categorys[index];//获取的随机分类
double price = random.nextDouble() * 100;//注意nextDouble生成的是[0~1)之间的随机小数,*100之后表示[0~100)的随机小数
ctx.collect(Tuple2.of(category, price));
Thread.sleep(20);
}
}
@Override
public void cancel() {
flag = false;
}
}
/**
* 自定义聚合函数,指定聚合规则
* AggregateFunction<IN, ACC, OUT>
*/
private static class PriceAggregate implements AggregateFunction<Tuple2<String, Double>, Double, Double> {
//初始化累加器
@Override
public Double createAccumulator() {
return 0D;//D表示double,L表示Long
}
//把数据累加到累加器上
@Override
public Double add(Tuple2<String, Double> value, Double accumulator) {
return value.f1 + accumulator;
}
//获取累加结果
@Override
public Double getResult(Double accumulator) {
return accumulator;
}
//合并各个subtask的结果
@Override
public Double merge(Double a, Double b) {
return a + b;
}
}
/**
* 自定义窗口函数,指定窗口数据收集规则
* WindowFunction<IN, OUT, KEY, W extends Window>
*/
private static class WindowResult implements WindowFunction<Double, CategoryPojo, String, TimeWindow> {
private FastDateFormat df = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss");
@Override
//void apply(KEY key, W window, Iterable<IN> input, Collector<OUT> out)
public void apply(String category, TimeWindow window, Iterable<Double> input, Collector<CategoryPojo> out) throws Exception {
long currentTimeMillis = System.currentTimeMillis();
String dateTime = df.format(currentTimeMillis);
Double totalPrice = input.iterator().next();
out.collect(new CategoryPojo(category,totalPrice,dateTime));//转化成同一秒,但是万一到后面数据量大,都能在一秒内完成计算吗?转化成天安全点
}
}
/**
* 用于存储聚合的结果
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public static class CategoryPojo {
private String category;//分类名称
private double totalPrice;//该分类总销售额
private String dateTime;// 截止到当前时间的时间,本来应该是EventTime,但是我们这里简化了直接用当前系统时间即可
}
/**
* 自定义窗口完成销售总额统计和分类销售额top3统计并输出
* abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window>
*/
private static class FinalResultWindowProcess extends ProcessWindowFunction<CategoryPojo, Object, String, TimeWindow> {
//注意:
//下面的key/dateTime表示当前这1s的时间
//elements:表示截止到当前这1s的各个分类的销售数据
@Override
public void process(String dateTime, Context context, Iterable<CategoryPojo> elements, Collector<Object> out) throws Exception {
//1.实时计算出当天零点截止到当前时间的销售总额 11月11日 00:00:00 ~ 23:59:59
double total = 0D;//用来记录销售总额
//2.计算出各个分类的销售top3:如: "女装": 10000 "男装": 9000 "图书":8000
//注意:这里只需要求top3,也就是只需要排前3名就行了,其他的不用管!当然你也可以每次对进来的所有数据进行排序,但是浪费!
//所以这里直接使用小顶堆完成top3排序:
//70
//80
//90
//如果进来一个比堆顶元素还有小的,直接不要
//如果进来一个比堆顶元素大,如85,直接把堆顶元素删掉,把85加进去并继续按照小顶堆规则排序,小的在上面,大的在下面
//80
//85
//90
//创建一个小顶堆
Queue<CategoryPojo> queue = new PriorityQueue<>(3,//初识容量
//正常的排序,就是小的在前,大的在后,也就是c1>c2的时候返回1,也就是升序,也就是小顶堆
(c1, c2) -> c1.getTotalPrice() >= c2.getTotalPrice() ? 1 : -1);
for (CategoryPojo element : elements) {
double price = element.getTotalPrice();
total += price;
if(queue.size()< 3){
queue.add(element);//或offer入队
}else{
if(price >= queue.peek().getTotalPrice()){//peek表示取出堆顶元素但不删除
//queue.remove(queue.peek());
queue.poll();//移除堆顶元素
queue.add(element);//或offer入队
}
}
}
//代码走到这里那么queue存放的就是分类的销售额top3,但是是升序.需要改为逆序然后输出
List<String> top3List = queue.stream()
.sorted((c1, c2) -> c1.getTotalPrice() >= c2.getTotalPrice() ? -1 : 1)
.map(c -> "分类:" + c.getCategory() + " 金额:" + c.getTotalPrice())
.collect(Collectors.toList());
//3.每秒钟更新一次统计结果-也就是直接输出
double roundResult = new BigDecimal(total).setScale(2, RoundingMode.HALF_UP).doubleValue();//四舍五入保留2位小数
System.out.println("时间: "+dateTime +" 总金额 :" + roundResult);
System.out.println("top3: \n" + StringUtils.join(top3List,"\n"));
}
}
}Flink-练习-订单自动好评-掌握
需求
数据
思考:开窗口计算?
按照一定规则收集数据计算,规则是针对所有数据,跟单条数据无关
每个订单有每个订单的倒计时开始时间都不一样,不能用窗口计算。
计算的触发和结束由每条数据/key自己决定,不能用窗口,得自己建立一个状态保存。
思考:
1.进来一条数据就判断是否好评,还是都先保存状态倒计时,然后到时间再统一判断是否超时。
结合实际情况,如果评价超时多,那就后判断,但是已经好评的就会保存state,占用空间,这是牺牲空间换时间。如果都是积极评价的那选择先判断,牺牲时间换空间思路。
2.状态用valuestate 还是mapstate?
用valuestate就行,不需要获取订单完成时间,倒计时已经记录了时间
3.现实好评都是隔很多天比如15天,这样存储的状态就大了,没必要实时搞,可以按小时/天去更新状态。
4.状态过期处理
/**
* 自定义source实时产生订单数据Tuple3<用户id,订单id, 订单生成时间>
*/
public static class MySource implements SourceFunction<Tuple3<String, String, Long>> {
private boolean flag = true;
@Override
public void run(SourceContext<Tuple3<String, String, Long>> ctx) throws Exception {
Random random = new Random();
while (flag) {
String userId = random.nextInt(5) + "";
String orderId = UUID.randomUUID().toString();
long currentTimeMillis = System.currentTimeMillis();
ctx.collect(Tuple3.of(userId, orderId, currentTimeMillis));
Thread.sleep(500);
}
}
@Override
public void cancel() {
flag = false;
}
}实现步骤
1.env
2.source
3.transformation
设置经过interval毫秒用户未对订单做出评价,自动给与好评.为了演示方便,设置5s的时间
long interval = 5000L;
分组后使用自定义KeyedProcessFunction完成定时判断超时订单并自动好评
dataStream.keyBy(0).process(new TimerProcessFuntion(interval));
3.1定义MapState类型的状态,key是订单号,value是订单完成时间
3.2创建MapState
MapStateDescriptor<String, Long> mapStateDesc =
new MapStateDescriptor<>("mapStateDesc", String.class, Long.class);
mapState = getRuntimeContext().getMapState(mapStateDesc);
3.3注册定时器
mapState.put(value.f0, value.f1);
ctx.timerService().registerProcessingTimeTimer(value.f1 + interval);
3.4定时器被触发时执行并输出结果
4.sink
5.execute代码实现
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;
import java.util.Iterator;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
/**
* Author
* Desc
*/
public class OrderAutomaticFavorableComments {
public static void main(String[] args) throws Exception {
//TODO 1.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
env.setParallelism(1);
//TODO 2.source
//Tuple3<用户id,订单id,订单生成时间>
DataStream<Tuple3<String, String, Long>> orderDS = env.addSource(new MySource());
//TODO 3.transformation
//设置经过interval毫秒用户未对订单做出评价,自动给与好评.为了演示方便,设置5s的时间
long interval = 5000L;//5s
//分组后使用自定义KeyedProcessFunction完成定时判断超时订单并自动好评
orderDS.keyBy(t -> t.f0)
.process(new TimerProcessFunction(interval));
//TODO 4.sink
//TODO 5.execute
env.execute();
}
/**
* 自定义source实时产生订单数据Tuple3<用户id,订单id, 订单生成时间>
*/
public static class MySource implements SourceFunction<Tuple3<String, String, Long>> {
private boolean flag = true;
@Override
public void run(SourceContext<Tuple3<String, String, Long>> ctx) throws Exception {
Random random = new Random();
while (flag) {
String userId = random.nextInt(5) + "";
String orderId = UUID.randomUUID().toString();
long currentTimeMillis = System.currentTimeMillis();
ctx.collect(Tuple3.of(userId, orderId, currentTimeMillis));
Thread.sleep(500);
}
}
@Override
public void cancel() {
flag = false;
}
}
/**
* 自定义ProcessFunction完成订单自动好评
* 进来一条数据应该在interval时间后进行判断该订单是否超时是否需要自动好评
* abstract class KeyedProcessFunction<K, I, O>
*/
private static class TimerProcessFunction extends KeyedProcessFunction<String, Tuple3<String, String, Long>, Object> {
private long interval;//订单超时时间 传进来的是5000ms/5s
public TimerProcessFunction(long interval) {
this.interval = interval;
}
//-0.准备一个State来存储订单id和订单生成时间
private MapState<String, Long> mapState = null;
//-1.初始化
@Override
public void open(Configuration parameters) throws Exception {
MapStateDescriptor<String, Long> mapStateDescriptor = new MapStateDescriptor<>("mapState", String.class, Long.class);
mapState = getRuntimeContext().getMapState(mapStateDescriptor);
}
//-2.处理每一条数据并存入状态并注册定时器
@Override
public void processElement(Tuple3<String, String, Long> value, Context ctx, Collector<Object> out) throws Exception {
//Tuple3<用户id,订单id, 订单生成时间> value里面是当前进来的数据里面有订单生成时间
//把订单数据保存到状态中
mapState.put(value.f1, value.f2);//xxx,2020-11-11 00:00:00 ||xx,2020-11-11 00:00:01
//该订单在value.f2 + interval时过期/到期,这时如果没有评价的话需要系统给与默认好评
//注册一个定时器在value.f2 + interval时检查是否需要默认好评
ctx.timerService().registerProcessingTimeTimer(value.f2 + interval);//2020-11-11 00:00:05 || 2020-11-11 00:00:06
}
//-3.执行定时任务
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Object> out) throws Exception {
//检查历史订单数据(在状态中存储着)
//遍历取出状态中的订单数据
Iterator<Map.Entry<String, Long>> iterator = mapState.iterator();
while (iterator.hasNext()) {
Map.Entry<String, Long> map = iterator.next();
String orderId = map.getKey();
Long orderTime = map.getValue();
//先判断是否好评--实际中应该去调用订单评价系统看是否好评了,我们这里写个方法模拟一下
if (!isFavorable(orderId)) {//该订单没有给好评
//判断是否超时--不用考虑进来的数据是否过期,该种方式少访问一次外部好评系统
if (System.currentTimeMillis() - orderTime >= interval) {
System.out.println("orderId:" + orderId + "该订单已经超时未评价,系统自动给与好评!....");
//移除状态中的数据,避免后续重复判断
iterator.remove();
mapState.remove(orderId);
}
} else {
System.out.println("orderId:" + orderId + "该订单已经评价....");
//移除状态中的数据,避免后续重复判断
iterator.remove();
mapState.remove(orderId);
}
}
}
//自定义一个方法模拟订单系统返回该订单是否已经好评
public boolean isFavorable(String orderId) {
return orderId.hashCode() % 2 == 0;
}
}
}