Seatunnel 2.3.1 部署使用
大数据相关文件可以参考我的csdn或者github
1 部署
1.1 下载解压
https://dlcdn.apache.org/incubator/seatunnel/2.3.1/apache-seatunnel-incubating-2.3.1-bin.tar.gz
下载完毕之后上传到服务器上面并解压
1 2
| # 解压到了/opt/module目录下 tar -zxvf apache-seatunnel-incubating-2.3.1-bin.tar.gz -C /opt/module
|
1.2 下载对应的connector
在apache的仓库下载相应的connector,下载时每个jar包在不同的路径下面,放到/seatunnel-2.3.1/connectors/seatunnel目录下
https://repo.maven.apache.org/maven2/org/apache/seatunnel/
1 2 3 4 5 6 7 8 9 10 11 12 13
| connector-assert-2.3.1.jar connector-cdc-mysql-2.3.1.jar connector-console-2.3.1.jar # 自带的 connector-doris-2.3.1.jar connector-elasticsearch-2.3.1.jar connector-fake-2.3.1.jar # 自带的 connector-file-hadoop-2.3.1.jar connector-file-local-2.3.1.jar connector-hive-2.3.1.jar connector-iceberg-2.3.1.jar connector-jdbc-2.3.1.jar connector-kafka-2.3.1.jar connector-redis-2.3.1.jar
|
配置安装seatunnel的插件
1
| vim seatunnel-2.3.1/config/plugin_config
|
调用安装脚本的时候会在maven的中央仓库下载对应的jar包,尽量少放,下载太慢了,我放了这些
1 2 3 4 5 6 7
| --connectors-v2-- connector-assert connector-cdc-mysql connector-jdbc connector-fake connector-console --end--
|
1.3 安装seatunnel
1
| sh bin/install-plugin.sh 2.3.1
|
整个过程非常慢…应该是从maven中央仓库下载东西
⭐1.4 补充一些jar包
- 使用hive的话需要将这3个jar放入到seatunnel-2.3.1/lib目录下:
1 2 3 4 5 6 7 8 9 10 11 12 13
| hive-exec-2.3.9.jar # 下载链接 # https://repo.maven.apache.org/maven2/org/apache/hive/hive-exec/2.3.9/hive-exec-2.3.9.jar # 注意这里是hive-exec-2.3.9.jar,不要从你的hive的lib目录下拷贝最新的jar包,就用这个
seatunnel-hadoop3-3.1.4-uber-2.3.1.jar # 下载链接 # https://repo.maven.apache.org/maven2/org/apache/seatunnel/seatunnel-hadoop3-3.1.4-uber/2.3.1/seatunnel-hadoop3-3.1.4-uber-2.3.1.jar
seatunnel-hadoop3-3.1.4-uber-2.3.1-optional.jar # 下载链接 # hhttps://repo.maven.apache.org/maven2/org/apache/seatunnel/seatunnel-hadoop3-3.1.4-uber/2.3.1/seatunnel-hadoop3-3.1.4-uber-2.3.1-optional.jar
|
中间由于其他缘故我拷贝了一个hive
框架/lib
目录下的libfb303-0.9.3.jar
放到seatunnel
的lib
目录下了。
- 使用
mysql
的话需要将mysql的驱动拷贝过来,应该是需要8
系列的mysql驱动,我这里使用的是mysql-connector-java-8.0.21.jar
2 测试样例
2.1 官方demo fake to console
seatunnel-2.3.1/config/v2.batch.config.template
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| env { execution.parallelism = 2 job.mode = "BATCH" checkpoint.interval = 10000 }
source { FakeSource { parallelism = 2 result_table_name = "fake" row.num = 16 schema = { fields { name = "string" age = "int" } } } }
sink { Console { } }
|
运行命令
1 2
| cd /opt/module/seatunnel-2.3.1 ./bin/seatunnel.sh --config ./config/v2.batch.config.template -e lcoal
|
运行成功的话会可以在console看到打印的测试数据
2.2 mysql to console
我新建了一个用来放运行配置的目录/opt/module/seatunnel-2.3.1/job
mysql_2console.conf
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| env { execution.parallelism = 2 job.mode = "BATCH" checkpoint.interval = 10000 } source{ Jdbc { url = "jdbc:mysql://hadoop102/dim_db?useUnicode=true&characterEncoding=utf8&useSSL=false" driver = "com.mysql.cj.jdbc.Driver" connection_check_timeout_sec = 100 user = "root" password = "111111" query = "select * from dim_basicdata_date_a_d where date < '2010-12-31'" } }
sink { Console { } }
|
查询的是一张日期维表的数据
建表语句:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| CREATE DATABASE dim_db DEFAULT CHARACTER SET utf8 DEFAULT COLLATE utf8_general_ci;
drop table if exists dim_db.dim_basicdata_date_a_d; create table if not exists dim_db.dim_basicdata_date_a_d ( `date` varchar(40) comment '日期', `year` varchar(40) comment '年', `quarter` varchar(40) comment '季度(1/2/3/4)', `season` varchar(40) comment '季节(春季/夏季/秋季/冬季)', `month` varchar(40) comment '月', `day` varchar(40) comment '日', `week` varchar(40) comment '年内第几周', `weekday` varchar(40) comment '周几(1-周一/2-周二/3-周三/4-周四/5-周五/6-周六/7-周日)', `is_workday` varchar(40) comment '是否是工作日(1-是,0-否)', `date_type` varchar(40) comment '节假日类型(工作日/法定上班[调休]/周末/节假日)', `update_date` varchar(40) comment '更新日期' );
|
可以自己插入几条数据试试
运行命令
1 2
| cd /opt/module/seatunnel-2.3.1 ./bin/seatunnel.sh --config ./job/mysql_2console.conf -e local
|
2.3 hive to console
创建一张hive表
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| CREATE database db_hive;
drop table if exists db_hive.dim_basicdata_date_a_d; create table if not exists db_hive.dim_basicdata_date_a_d ( `date` string comment '日期', `year` string comment '年', `quarter` string comment '季度(1/2/3/4)', `season` string comment '季节(春季/夏季/秋季/冬季)', `month` string comment '月', `day` string comment '日', `week` string comment '年内第几周', `weekday` string comment '周几(1-周一/2-周二/3-周三/4-周四/5-周五/6-周六/7-周日)', `is_workday` string comment '是否是工作日(1-是,0-否)', `date_type` string comment '节假日类型(工作日/法定上班[调休]/周末/节假日)', `update_date` string comment '更新日期' );
|
自行插入几条数据
创建配置文件hive_2console.conf
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| env { execution.parallelism = 2 job.mode = "BATCH" checkpoint.interval = 10000 } source{ Hive { table_name = "db_hive.dim_basicdata_date_a_d" metastore_uri = "thrift://hadoop102:9083" } }
sink { Console { } }
|
这里我使用的hive连接方式是jdbc访问元数据,所以metastore_uri = "jdbc:hive2://hadoop102:10000"
也可以正常使用。
hive-site.xml
修改配置文件,有可能你已经配置好了
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
|
<property> <name>hive.metastore.uris</name> <value>thrift://hadoop102:9083</value> </property>
<property> <name>hive.server2.thrift.bind.host</name> <value>hadoop102</value> </property>
<property> <name>hive.server2.thrift.port</name> <value>10000</value> </property>
|
运行命令
1 2
| cd /opt/module/seatunnel-2.3.1 ./bin/seatunnel.sh --config ./job/hive_2console.conf -e local
|
2.4 mysql to hive
创建配置文件
dim_basicdate_mysql_2hive.conf
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| env { execution.parallelism = 2 job.mode = "BATCH" checkpoint.interval = 10000 } source{ Jdbc { url = "jdbc:mysql://hadoop102/dim_db?useUnicode=true&characterEncoding=utf8&useSSL=false" driver = "com.mysql.cj.jdbc.Driver" connection_check_timeout_sec = 100 user = "root" password = "111111" query = "select * from dim_basicdata_date_a_d" } }
sink { Hive { table_name = "db_hive.dim_basicdata_date_a_d" metastore_uri = "thrift://hadoop102:9083" } }
|
运行命令
1 2
| cd /opt/module/seatunnel-2.3.1 ./bin/seatunnel.sh --config ./job/dim_basicdate_mysql_2hive.conf-e local
|