需求背景
由于我司的项目中需要接入不同的数据源的数据到数仓中,在选择了众多的产品中最后选择了Apache SeaTunnel,对比参考
目前我这边使用的接口,暂时没有接口认证,如果需要接口认证的方式接入数据,再做讨论及测试
实际使用
Apache SeaTunnel版本: 2.3.4
话不多说,先贴最终的运行文件,由于我使用的json
的rest-api
提交方式,所以结果如下图所示:
使用rest
和conf
的区别就在于job执行的环境不同,conf使用的是ClientJobExecutionEnvironment
(经测试也支持json格式),而rest方式则使用的是RestJobExecutionEnvironment
接口返回的数据格式
{
"code": "0000",
"msg": "成功",
"data": {
"records": [
{
"id": "1798895733824393218",
"taskContent": "许可证02",
"taskType": "许可证"
}
]
}
}
// 实际数据分页的很多,以上是示例
接入配置
{
"env": {
"job.mode": "BATCH",
"job.name": "SeaTunnel_Job"
},
"source": [
{
"result_table_name": "Table13367210156032",
"plugin_name": "Http",
"url": "http://*.*.*.*:*/day_plan_repair/page",
"method": "GET", // Http请求方式 只支持GET和POST两种方式
"format": "json", // 默认值是text 只支持json和text两种方式
"json_field": { // 可以看看作是从上述接口返回的数据中取数据的路径和key的映射关系,value则是取值的JsonPath
"id": "$.data.records[*].id",
"taskContent": "$.data.records[*].taskContent",
"taskType": "$.data.records[*].taskType"
},
// "pageing": {
// "page_field": "current", // 当前页的key,就是分页接口的请求参数中的当前页的key,
// "batch_size": 10 // 每页取多少数据
// },
"schema": {
"fields": {
"id": "BIGINT", // 主键列问题,详见下面的问题
"taskContent": "STRING",
"taskType": "STRING"
}
}
}
],
"transform": [
{
"field_mapper": { // key是source中的schema.field中的值,value是sink中使用的值,例如下面的save_mode_create_template里的${rowtype_fields}使用的就是value,可以更改value作为sink的新命名列
"id": "id",
"taskContent": "task_content",
"taskType": "task_type"
},
"result_table_name": "Table13367210156033",
"source_table_name": "Table13367210156032",
"plugin_name": "FieldMapper"
}
],
"sink": [
{
"source_table_name": "Table13367210156033",
"plugin_name": "Doris",
"fenodes ": "*.*.*.*:*",
"database": "test",
"password": "****",
"username": "****",
"table": "ods_day_plan",
"sink.label-prefix": "test-ods_day_plan", // Stream Load 导入使用的标签前缀。在 2pc 场景下,需要全局唯一性来保证 SeaTunnel 的 EOS 语义
"sink.enable-2pc": false, // 是否开启两阶段提交(2pc),默认为true,保证Exactly-Once语义。Doris的二阶段提交详见https://doris.apache.org/zh-CN/docs/data-operate/import/stream-load-manual
"data_save_mode": "APPEND_DATA", // 数据保存模式 DROP_DATA、APPEND_DATA、CUSTOM_PROCESSING、ERROR_WHEN_DATA_EXISTS官方提供了四种,我使用保留数据库结构,追加数据,可以详见源码中的DataSaveMode枚举
"schema_save_mode": "CREATE_SCHEMA_WHEN_NOT_EXIST", // Scheme保存模式 RECREATE_SCHEMA、CREATE_SCHEMA_WHEN_NOT_EXIST、ERROR_WHEN_SCHEMA_NOT_EXIST 我使用的是当Schema不存在时创建;具体释义详见SchemaSaveMode枚举
"save_mode_create_template": "CREATE TABLE IF NOT EXISTS `${database}`.`${table_name}` (\n ${rowtype_fields}\n ) ENGINE=OLAP\n UNIQUE KEY (id)\n DISTRIBUTED BY HASH (id)\n PROPERTIES (\n \"replication_allocation\" = \"tag.location.default: 1\",\n \"in_memory\" = \"false\",\n \"storage_format\" = \"V2\",\n \"disable_auto_compaction\" = \"false\"\n )",
"sink.enable-delete": true, //是否启用删除,此配置只有Doris的表模型是Unique模型,同时需要Doris表开启批量删除功能(默认开启 0.15+ 版本)
"doris.config": {
"format": "json",
"read_json_by_line": "true"
}
}
]
}
实际使用中遇到的问题
Handle save mode failed
具体的报错日志中包含
Caused by: java.sql.SQLException: errCode = 2, detailMessage = Syntax error in line 21:
UNIQUE KEY ()
^
Encountered: )
Expected: IDENTIFIER
解决方案:详见链接[issue](https://github.com/apache/seatunnel/issues/6646)
使用了上述配置文件中的`save_mode_create_template`字段解决,目标中值的可以自行根据业务配置。
NoSuchMethodError
java.lang.NoSuchMethodError: retrofit2.Retrofit$Builder.client(Lshaded/okhttp3/OkHttpClient;)Lretrofit2/Retrofit$Builder;
at org.influxdb.impl.InfluxDBImpl.<init>(InfluxDBImpl.java:179) ~[connector-influxdb-2.3.4.jar:2.3.4]
at org.influxdb.impl.InfluxDBImpl.<init>(InfluxDBImpl.java:120) ~[connector-influxdb-2.3.4.jar:2.3.4]
at org.apache.seatunnel.connectors.seatunnel.influxdb.client.InfluxDBClient.getInfluxDB(InfluxDBClient.java:72) ~[connector-influxdb-2.3.4.jar:2.3.4]
在使用influxdb的连接时,遇到了**jar包冲突**的问题,最终发现在创建http链接的时候,`retrofit2`的依赖与`datahub`连接器中的存在版本冲突,我这里没有使用到`datahub`,所以**删除datahub的连接器**即可解决问题
Apache Doris BIGINT类型精度丢失问题
详见帖子
配置主键
Doris配置save_mode_create_template
包含主键时,主键类型必须是数字或日期类型。
上面的source配置的schema中的id,接口返回的实际类型是字符串类型,但是是雪花算法的全数字类型,所以使用BIGINT
类型自动转换
原因是Sink配置中的save_mode_create_template
的UNIQUE KEY
使用的id作为主键,Doris要求主键列类型必须是数字或者日期类型!!
个人经验
- 当sink、source、transform只有一个时,可以省略
result_table_name、source_table_name
配置项 - 下载源码,修改源码,在源码中增加log日志,并打包替换SeaTunnel运行时的jar,以方便根据日志得到自己想知道的结果或者方便理解代码
- 根据1的运用,熟知代码后可以进行二次开发,例如需要token认证的接口该怎么处理,值得深思。
- 另外source配置中的json_field中的value的JsonPath值,不支持 列表中复杂类型取值的问题Array