提交Flink作业及所见问题总结
一、提交作业
1、执行命令
./bin/flink run [options] <job-jar> <arguments>
可以使用flink run --help 用来查看更多命令
2、示例
2.1、不带参数:
./bin/flink run -c com.yclxiao.cdc.Cdc2DorisSQLDemoWithCheckpoint ./flinkdemo-1.0-SNAPSHOT.jar
2.2、带参数:
每一个-
代表一个参数键
,后面跟的是值
./bin/flink run -c com.yclxiao.cdc.Cdc2DorisSQLDemoWithCheckpoint ./flinkdemo-1.0-SNAPSHOT.jar -name ycl -age 11
解析的时候直接使用flink自带类去解析
ParameterTool params = ParameterTool.fromArgs(args);
2.3、从checkpoint提交
增加了参数:
-s /Users/yclxiao/Project/bigdata/flinkdemo/checkpoints/143ac6febfce4274d24bdff6ec83d1c8/chk-170
完整命令:
./bin/flink run -c com.yclxiao.cdc.Cdc2DorisSQLDemoWithCheckpoint -s /Users/yclxiao/Project/bigdata/flinkdemo/checkpoints/143ac6febfce4274d24bdff6ec83d1c8/chk-170 ./flinkdemo-1.0-SNAPSHOT.jar -name ycl -age 11
二、提交作业碰到的问题
先把碰到的问题做个总结,再做详细解说
1、总结
先把碰到的问题总结一下:
- 资源不够的问题。解决方式:调整集群配置文件。
- 打包时,META-INF下面的SPI没打进去的问题。解决方式:在pom.xml中增加maven插件。
- pom的依赖配置问题,在FlinkSQL场景下会跟集群里的lib包有重复的冲突。解决方式:有些依赖打包时候无需打进去,在flink集群的lib目录下存在的jar包,则在打包作业jar时,无需打进去。
- 公有云上的特殊情况
2、详细解说
2.1、资源不够的问题:
错误描述:
2023-06-19 15:42:24,452 WARN org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Could not fulfill resource requirements of job 032dc3ac91b6128aaedae625b36e0575. Free slots: 0
2023-06-19 15:42:24,452 WARN org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge [] - Could not acquire the minimum required resources, failing slot requests. Acquired: [ResourceRequirement{resourceProfile=ResourceProfile{cpuCores=1, taskHeapMemory=2.283gb (2451151214 bytes), taskOffHeapMemory=0 bytes, managedMemory=2.026gb (2175669399 bytes), networkMemory=518.720mb (543917349 bytes)}, numberOfRequiredSlots=1}]. Current slot pool status: Registered TMs: 1, registered slots: 1 free slots: 0
2023-06-19 15:42:24,454 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: acct_profit[1] -> (Calc[2], Calc[9]) (1/1) (2ff961f809129e63bb6b9b164dd56ca4) switched from SCHEDULED to FAILED on [unassigned resource].
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not acquire the minimum required resources.
2023-06-19 15:42:23,440 WARN org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Could not fulfill resource requirements of job 032dc3ac91b6128aaedae625b36e0575. Free slots: 0
2023-06-19 15:42:23,440 WARN org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge [] - Could not acquire the minimum required resources, failing slot requests. Acquired: [ResourceRequirement{resourceProfile=ResourceProfile{cpuCores=1, taskHeapMemory=2.283gb (2451151214 bytes), taskOffHeapMemory=0 bytes, managedMemory=2.026gb (2175669399 bytes), networkMemory=518.720mb (543917349 bytes)}, numberOfRequiredSlots=1}]. Current slot pool status: Registered TMs: 1, registered slots: 1 free slots: 0
2023-06-19 15:42:23,441 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: acct_profit[1] -> (Calc[2], Calc[9]) (1/1) (f35db0e3c29aef8507f6d6f7d19e4e90) switched from SCHEDULED to FAILED on [unassigned resource].
可能内存设置小了、可能并发分配小了、可能是slot设置小了,参考配置flink-conf.yaml
:
jobmanager.memory.process.size: 2600m
taskmanager.memory.process.size: 2728m
taskmanager.memory.flink.size: 2280m
taskmanager.numberOfTaskSlots: 10
parallelism.default: 4
2.2、找不到mysql-cdc的问题
是因为打包的时候没有把所有包的meta-inf合并打包到一起,需要在pom.xml中增加配置:
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.4</version>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.bm001.datacompute.cdc.api.CloudAcctProfit2DwsHdjProfitRecordAPI</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
可以参考如下文章:
https://wii.pub/2021/08/23/tools/maven/problems/merge-meta-info/
https://blog.csdn.net/RL_LEEE/article/details/128134800
2.3、jar包重复的问题
有时候本地开发和运行时需要某个jar包,但是丢到集群去执行时不需要这个jar包。因为集群的lib中已经存在此jar包。此时会报类似的错误:
Caused by: org.apache.flink.table.api.ValidationException: Multiple factories for identifier 'default' that implement 'org.apache.flink.table.delegation.ExecutorFactory' found in the classpath.
解决方法:打包时,需要将pom的scope改成provided
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
3、提交到公有云上出现的问题
在本地运行OK,提交到测试服务器也是运行OK,但是丢到公有云的ECS机器上可能出现一些问题。
3.1、无效的参数 0.0.0.0:8081
无效的参数 0.0.0.0问题,应该是netty访问0.0.0.0被限制了,应该是云上自己限制的,测试环境没这个问题,后来改成配置本机ip地址就好了。
rest.address: xx.xx.xx.xx
rest.bind-address: xx.xx.xx.xx
3.2、需要修改tmp临时文件的地址,否则会占用系统盘的控件
io.tmp.dirs: /data/software/flink-15.4/tmp
3.3、云上数据库用户权限不够
到云上控制台修改用户权限
Caused by: java.sql.SQLSyntaxErrorException: Access denied; you need (at least one of) the RELOAD privilege(s) for this operation