实际使用
Apache SeaTunnel版本:2.3.4
话不多说,先贴最终的运行文件,由于我使用的json
的rest-api
提交方式,所以结果如下图所示:
使用rest
和conf
的区别就在于job执行的环境不同,conf使用的是ClientJobExecutionEnvironment
(经测试也支持json格式),而rest方式则使用的是RestJobExecutionEnvironment
接口返回的数据格式
{
"code": "0000",
"msg": "成功",
"data": {
"records": [
{
"id": "1798895733824393218",
"taskContent": "许可证02",
"taskType": "许可证"
}
]
}
}
// 实际数据分页的很多,以上是示例
接入配置
{
"env": {
"job.mode": "BATCH",
"job.name": "SeaTunnel_Job"
},
"source": [
{
"result_table_name": "Table13367210156032",
"plugin_name": "Http",
"url": "http://*.*.*.*:*/day_plan_repair/page",
"method": "GET", // Http请求方式 只支持GET和POST两种方式
"format": "json", // 默认值是text 只支持json和text两种方式
"json_field": { // 可以看看作是从上述接口返回的数据中取数据的路径和key的映射关系,value则是取值的JsonPath
"id": "$.data.records[*].id",
"taskContent": "$.data.records[*].taskContent",
"taskType": "$.data.records[*].taskType"
},
// "pageing": {
// "page_field": "current", // 当前页的key,就是分页接口的请求参数中的当前页的key,
// "batch_size": 10 // 每页取多少数据
// },
"schema": {
"fields": {
"id": "BIGINT", // 主键列问题,详见下面的问题
"taskContent": "STRING",
"taskType": "STRING"
}
}
}
],
"transform": [
{
"field_mapper": { // key是source中的schema.field中的值,value是sink中使用的值,例如下面的save_mode_create_template里的${rowtype_fields}使用的就是value,可以更改value作为sink的新命名列
"id": "id",
"taskContent": "task_content",
"taskType": "task_type"
},
"result_table_name": "Table13367210156033",
"source_table_name": "Table13367210156032",
"plugin_name": "FieldMapper"
}
],
"sink": [
{
"source_table_name": "Table13367210156033",
"plugin_name": "Doris",
"fenodes ": "*.*.*.*:*",
"database": "test",
"password": "****",
"username": "****",
"table": "ods_day_plan",
"sink.label-prefix": "test-ods_day_plan", // Stream Load 导入使用的标签前缀。在 2pc 场景下,需要全局唯一性来保证 SeaTunnel 的 EOS 语义
"sink.enable-2pc": false, // 是否开启两阶段提交(2pc),默认为true,保证Exactly-Once语义。Doris的二阶段提交详见https://doris.apache.org/zh-CN/docs/data-operate/import/stream-load-manual
"data_save_mode": "APPEND_DATA", // 数据保存模式 DROP_DATA、APPEND_DATA、CUSTOM_PROCESSING、ERROR_WHEN_DATA_EXISTS官方提供了四种,我使用保留数据库结构,追加数据,可以详见源码中的DataSaveMode枚举
"schema_save_mode": "CREATE_SCHEMA_WHEN_NOT_EXIST", // Scheme保存模式 RECREATE_SCHEMA、CREATE_SCHEMA_WHEN_NOT_EXIST、ERROR_WHEN_SCHEMA_NOT_EXIST 我使用的是当Schema不存在时创建;具体释义详见SchemaSaveMode枚举
"save_mode_create_template": "CREATE TABLE IF NOT EXISTS `${database}`.`${table_name}` (\n ${rowtype_fields}\n ) ENGINE=OLAP\n UNIQUE KEY (id)\n DISTRIBUTED BY HASH (id)\n PROPERTIES (\n \"replication_allocation\" = \"tag.location.default: 1\",\n \"in_memory\" = \"false\",\n \"storage_format\" = \"V2\",\n \"disable_auto_compaction\" = \"false\"\n )",
"sink.enable-delete": true, //是否启用删除,此配置只有Doris的表模型是Unique模型,同时需要Doris表开启批量删除功能(默认开启 0.15+ 版本)
"doris.config": {
"format": "json",
"read_json_by_line": "true"
}
}
]
}
实际使用中遇到的问题
Handle save mode failed
具体的报错日志中包含
Caused by: java.sql.SQLException: errCode = 2, detailMessage = Syntax error in line 21:
UNIQUE KEY ()
^
Encountered: )
Expected: IDENTIFIER
解决方案:
详见链接[issue](https://github.com/apache/seatunnel/issues/6646)
使用了上述配置文件中的save_mode_create_template字段解决,目标中值的
可以自行根据业务配置。
NoSuchMethodError
java.lang.NoSuchMethodError: retrofit2.Retrofit$Builder.client(Lshaded/okhttp3/OkHttpClient;)Lretrofit2/Retrofit$Builder;
at org.influxdb.impl.InfluxDBImpl.<init>(InfluxDBImpl.java:179) ~[connector-influxdb-2.3.4.jar:2.3.4]
at org.influxdb.impl.InfluxDBImpl.<init>(InfluxDBImpl.java:120) ~[connector-influxdb-2.3.4.jar:2.3.4]
at org.apache.seatunnel.connectors.seatunnel.influxdb.client.InfluxDBClient.getInfluxDB(InfluxDBClient.java:72) ~[connector-influxdb-2.3.4.jar:2.3.4]
在使用influxdb的连接时,遇到了jar包冲突的问题,最终发现在创建http链接
的时候,retrofit2的依赖与datahub连接器中的存在版本冲突,我这里没有
使用到datahub,所以删除datahub的连接器即可解决问题!
Apache Doris BIGINT类型精度丢失问题
详见帖子
配置主键
Doris配置save_mode_create_template
包含主键时,主键类型必须是数字或日期类型。
上面的source配置的schema中的id,接口返回的实际类型是字符串类型,但是是雪花算法的全数字类型,所以使用BIGINT
类型自动转换
原因是Sink配置中的save_mode_create_template
的UNIQUE KEY
使用的id作为主键,Doris要求主键列类型必须是数字或者日期类型!!
个人经验
当sink、source、transform只有一个时,可以省略
result_table_name、source_table_name
配置项下载源码,修改源码,在源码中增加log日志,并打包替换SeaTunnel运行时的jar,以方便根据日志得到自己想知道的结果或者方便理解代码
根据1的运用,熟知代码后可以进行二次开发,例如需要token认证的接口该怎么处理,值得深思。
另外source配置中的json_field中的value的JsonPath值,不支持 列表中复杂类型取值的问题Array或Map<String, Object>。也可以考虑二开解决
// 举例: { "code": "0000", "msg": "成功", "data": { "records": [ { "id": "1798895733824393218", "taskContent": "许可证02", "taskType": "许可证", "region_list": [ // 此格式中的region_list无法解析和同步 $.data.records[*].region_list[*].id 会报数据和总数不匹配的错误 { "id":"1", "name": "11" }, { "id":"1", "name": "11" } ] } ] } }
附上 测试代码 (使用的是JDK17)
private static final Option[] DEFAULT_OPTIONS = { Option.SUPPRESS_EXCEPTIONS, Option.ALWAYS_RETURN_LIST, Option.DEFAULT_PATH_LEAF_TO_NULL }; private JsonPath[] jsonPaths; private final Configuration jsonConfiguration = Configuration.defaultConfiguration().addOptions(DEFAULT_OPTIONS); @Test public void test5() { String data = """ { "code": "0000", "msg": "成功", "data": { "records": [ { "id": "1798895733824393218", "taskContent": "12312312313" } ] } } """; Map<String, String> map = new HashMap<>(); map.put("id", "$.data.records[*].id"); map.put("taskContent", "$.data.records[*].taskContent"); JsonField jsonField = JsonField.builder().fields(map).build(); initJsonPath(jsonField); data = JsonUtils.toJsonNode(parseToMap(decodeJSON(data), jsonField)).toString(); log.error(data); } // 以下代码都是HttpSourceReader中的代码 private void initJsonPath(JsonField jsonField) { jsonPaths = new JsonPath[jsonField.getFields().size()]; for (int index = 0; index < jsonField.getFields().keySet().size(); index++) { jsonPaths[index] = JsonPath.compile( jsonField.getFields().values().toArray(new String[] {})[index]); } } private List<Map<String, String>> parseToMap(List<List<String>> datas, JsonField jsonField) { List<Map<String, String>> decodeDatas = new ArrayList<>(datas.size()); String[] keys = jsonField.getFields().keySet().toArray(new String[] {}); for (List<String> data : datas) { Map<String, String> decodeData = new HashMap<>(jsonField.getFields().size()); final int[] index = {0}; data.forEach( field -> { decodeData.put(keys[index[0]], field); index[0]++; }); decodeDatas.add(decodeData); } return decodeDatas; } private List<List<String>> decodeJSON(String data) { ReadContext jsonReadContext = JsonPath.using(jsonConfiguration).parse(data); List<List<String>> results = new ArrayList<>(jsonPaths.length); for (JsonPath path : jsonPaths) { List<String> result = jsonReadContext.read(path); results.add(result); } for (int i = 1; i < results.size(); i++) { List<?> result0 = results.get(0); List<?> result = results.get(i); if (result0.size() != result.size()) { throw new HttpConnectorException( HttpConnectorErrorCode.FIELD_DATA_IS_INCONSISTENT, String.format( "[%s](%d) and [%s](%d) the number of parsing records is inconsistent.", jsonPaths[0].getPath(), result0.size(), jsonPaths[i].getPath(), result.size())); } } return dataFlip(results); } private List<List<String>> dataFlip(List<List<String>> results) { List<List<String>> datas = new ArrayList<>(); for (int i = 0; i < results.size(); i++) { List<String> result = results.get(i); if (i == 0) { for (Object o : result) { String val = o == null ? null : o.toString(); List<String> row = new ArrayList<>(jsonPaths.length); row.add(val); datas.add(row); } } else { for (int j = 0; j < result.size(); j++) { Object o = result.get(j); String val = o == null ? null : o.toString(); List<String> row = datas.get(j); row.add(val); } } } return datas; }
以上是我的一些经验分享,希望对大家有帮助!