kuhuo
kuhuo
发布于 2024-07-19 / 59 阅读
0
0

使用 SeaTunnel 进行 HTTP 同步到 Doris 实战经验分享

实际使用

Apache SeaTunnel版本:2.3.4

话不多说,先贴最终的运行文件,由于我使用的jsonrest-api提交方式,所以结果如下图所示:

使用restconf的区别就在于job执行的环境不同,conf使用的是ClientJobExecutionEnvironment(经测试也支持json格式),而rest方式则使用的是RestJobExecutionEnvironment

接口返回的数据格式

{
  "code": "0000",
  "msg": "成功",
  "data": {
    "records": [
      {
        "id": "1798895733824393218",
        "taskContent": "许可证02",
        "taskType": "许可证"
      }
    ]
  }
}
// 实际数据分页的很多,以上是示例

接入配置

{
  "env": {
    "job.mode": "BATCH",
    "job.name": "SeaTunnel_Job"
  },
  "source": [
    {
      "result_table_name": "Table13367210156032",
      "plugin_name": "Http",
      "url": "http://*.*.*.*:*/day_plan_repair/page",
      "method": "GET",  // Http请求方式 只支持GET和POST两种方式
      "format": "json", // 默认值是text 只支持json和text两种方式
      "json_field": {   // 可以看看作是从上述接口返回的数据中取数据的路径和key的映射关系,value则是取值的JsonPath
        "id": "$.data.records[*].id",
        "taskContent": "$.data.records[*].taskContent",
        "taskType": "$.data.records[*].taskType"
      },
      // "pageing": {
      //   "page_field": "current", // 当前页的key,就是分页接口的请求参数中的当前页的key,
      //   "batch_size": 10         // 每页取多少数据
      // },
      "schema": {
        "fields": {
          "id": "BIGINT", // 主键列问题,详见下面的问题
          "taskContent": "STRING",
          "taskType": "STRING"
        }
      }
    }
  ],
  "transform": [
    {
      "field_mapper": { // key是source中的schema.field中的值,value是sink中使用的值,例如下面的save_mode_create_template里的${rowtype_fields}使用的就是value,可以更改value作为sink的新命名列
        "id": "id", 
        "taskContent": "task_content",
        "taskType": "task_type"
      },
      "result_table_name": "Table13367210156033",
      "source_table_name": "Table13367210156032",
      "plugin_name": "FieldMapper"
    }
  ],
  "sink": [
    {
      "source_table_name": "Table13367210156033",
      "plugin_name": "Doris",
      "fenodes ": "*.*.*.*:*",
      "database": "test",
      "password": "****",
      "username": "****",
      "table": "ods_day_plan",
      "sink.label-prefix": "test-ods_day_plan", // Stream Load 导入使用的标签前缀。在 2pc 场景下,需要全局唯一性来保证 SeaTunnel 的 EOS 语义
      "sink.enable-2pc": false, // 是否开启两阶段提交(2pc),默认为true,保证Exactly-Once语义。Doris的二阶段提交详见https://doris.apache.org/zh-CN/docs/data-operate/import/stream-load-manual
      "data_save_mode": "APPEND_DATA", // 数据保存模式 DROP_DATA、APPEND_DATA、CUSTOM_PROCESSING、ERROR_WHEN_DATA_EXISTS官方提供了四种,我使用保留数据库结构,追加数据,可以详见源码中的DataSaveMode枚举
      "schema_save_mode": "CREATE_SCHEMA_WHEN_NOT_EXIST", // Scheme保存模式 RECREATE_SCHEMA、CREATE_SCHEMA_WHEN_NOT_EXIST、ERROR_WHEN_SCHEMA_NOT_EXIST 我使用的是当Schema不存在时创建;具体释义详见SchemaSaveMode枚举
      "save_mode_create_template": "CREATE TABLE IF NOT EXISTS `${database}`.`${table_name}` (\n ${rowtype_fields}\n ) ENGINE=OLAP\n UNIQUE KEY (id)\n DISTRIBUTED BY HASH (id)\n PROPERTIES (\n \"replication_allocation\" = \"tag.location.default: 1\",\n \"in_memory\" = \"false\",\n  \"storage_format\" = \"V2\",\n \"disable_auto_compaction\" = \"false\"\n )",
      "sink.enable-delete": true, //是否启用删除,此配置只有Doris的表模型是Unique模型,同时需要Doris表开启批量删除功能(默认开启 0.15+ 版本)
      "doris.config": {
        "format": "json",
        "read_json_by_line": "true"
      }
    }
  ]
}


实际使用中遇到的问题

Handle save mode failed

具体的报错日志中包含
Caused by: java.sql.SQLException: errCode = 2, detailMessage = Syntax error in line 21:
 UNIQUE KEY ()
             ^
Encountered: )
Expected: IDENTIFIER
解决方案:
详见链接[issue](https://github.com/apache/seatunnel/issues/6646)

使用了上述配置文件中的save_mode_create_template字段解决,目标中值的
可以自行根据业务配置。

NoSuchMethodError

java.lang.NoSuchMethodError: retrofit2.Retrofit$Builder.client(Lshaded/okhttp3/OkHttpClient;)Lretrofit2/Retrofit$Builder;
 at org.influxdb.impl.InfluxDBImpl.<init>(InfluxDBImpl.java:179) ~[connector-influxdb-2.3.4.jar:2.3.4]
 at org.influxdb.impl.InfluxDBImpl.<init>(InfluxDBImpl.java:120) ~[connector-influxdb-2.3.4.jar:2.3.4]
 at org.apache.seatunnel.connectors.seatunnel.influxdb.client.InfluxDBClient.getInfluxDB(InfluxDBClient.java:72) ~[connector-influxdb-2.3.4.jar:2.3.4]
在使用influxdb的连接时,遇到了jar包冲突的问题,最终发现在创建http链接
的时候,retrofit2的依赖与datahub连接器中的存在版本冲突,我这里没有
使用到datahub,所以删除datahub的连接器即可解决问题!

Apache Doris BIGINT类型精度丢失问题

详见帖子

配置主键

Doris配置save_mode_create_template包含主键时,主键类型必须是数字或日期类型。

上面的source配置的schema中的id,接口返回的实际类型是字符串类型,但是是雪花算法的全数字类型,所以使用BIGINT类型自动转换

原因是Sink配置中的save_mode_create_templateUNIQUE KEY使用的id作为主键,Doris要求主键列类型必须是数字或者日期类型!!

个人经验

  1. 当sink、source、transform只有一个时,可以省略result_table_name、source_table_name配置项

  2. 下载源码,修改源码,在源码中增加log日志,并打包替换SeaTunnel运行时的jar,以方便根据日志得到自己想知道的结果或者方便理解代码

  3. 根据1的运用,熟知代码后可以进行二次开发,例如需要token认证的接口该怎么处理,值得深思。

  4. 另外source配置中的json_field中的value的JsonPath值,不支持 列表中复杂类型取值的问题Array或Map<String, Object>。也可以考虑二开解决

    // 举例:
    {
      "code": "0000",
      "msg": "成功",
      "data": {
        "records": [
          {
            "id": "1798895733824393218",
            "taskContent": "许可证02",
            "taskType": "许可证",
            "region_list": [ // 此格式中的region_list无法解析和同步 $.data.records[*].region_list[*].id 会报数据和总数不匹配的错误
              {
                "id":"1",
                "name": "11"
              },
              {
                "id":"1",
                "name": "11"
              }
            ]
          }
        ]
      }
    }
    

    附上 测试代码 (使用的是JDK17)

        private static final Option[] DEFAULT_OPTIONS = {
                Option.SUPPRESS_EXCEPTIONS, Option.ALWAYS_RETURN_LIST, Option.DEFAULT_PATH_LEAF_TO_NULL
        };
        private JsonPath[] jsonPaths;
        private final Configuration jsonConfiguration = Configuration.defaultConfiguration().addOptions(DEFAULT_OPTIONS);
    
        @Test
        public void test5() {
            String data = """
                    {
                        "code": "0000",
                        "msg": "成功",
                        "data": {
                            "records": [
                                {
                                    "id": "1798895733824393218",
                                    "taskContent": "12312312313"
                                }
                            ]
                        }
                    }
                    """;
            Map<String, String> map = new HashMap<>();
            map.put("id", "$.data.records[*].id");
            map.put("taskContent", "$.data.records[*].taskContent");
            JsonField jsonField = JsonField.builder().fields(map).build();
            initJsonPath(jsonField);
            data = JsonUtils.toJsonNode(parseToMap(decodeJSON(data), jsonField)).toString();
            log.error(data);
        }
        // 以下代码都是HttpSourceReader中的代码
        private void initJsonPath(JsonField jsonField) {
            jsonPaths = new JsonPath[jsonField.getFields().size()];
            for (int index = 0; index < jsonField.getFields().keySet().size(); index++) {
                jsonPaths[index] =
                        JsonPath.compile(
                                jsonField.getFields().values().toArray(new String[] {})[index]);
            }
        }
    
        private List<Map<String, String>> parseToMap(List<List<String>> datas, JsonField jsonField) {
            List<Map<String, String>> decodeDatas = new ArrayList<>(datas.size());
            String[] keys = jsonField.getFields().keySet().toArray(new String[] {});
    
            for (List<String> data : datas) {
                Map<String, String> decodeData = new HashMap<>(jsonField.getFields().size());
                final int[] index = {0};
                data.forEach(
                        field -> {
                            decodeData.put(keys[index[0]], field);
                            index[0]++;
                        });
                decodeDatas.add(decodeData);
            }
    
            return decodeDatas;
        }
    
        private List<List<String>> decodeJSON(String data) {
            ReadContext jsonReadContext = JsonPath.using(jsonConfiguration).parse(data);
            List<List<String>> results = new ArrayList<>(jsonPaths.length);
            for (JsonPath path : jsonPaths) {
                List<String> result = jsonReadContext.read(path);
                results.add(result);
            }
            for (int i = 1; i < results.size(); i++) {
                List<?> result0 = results.get(0);
                List<?> result = results.get(i);
                if (result0.size() != result.size()) {
                    throw new HttpConnectorException(
                            HttpConnectorErrorCode.FIELD_DATA_IS_INCONSISTENT,
                            String.format(
                                    "[%s](%d) and [%s](%d) the number of parsing records is inconsistent.",
                                    jsonPaths[0].getPath(),
                                    result0.size(),
                                    jsonPaths[i].getPath(),
                                    result.size()));
                }
            }
    
            return dataFlip(results);
        }
    
        private List<List<String>> dataFlip(List<List<String>> results) {
    
            List<List<String>> datas = new ArrayList<>();
            for (int i = 0; i < results.size(); i++) {
                List<String> result = results.get(i);
                if (i == 0) {
                    for (Object o : result) {
                        String val = o == null ? null : o.toString();
                        List<String> row = new ArrayList<>(jsonPaths.length);
                        row.add(val);
                        datas.add(row);
                    }
                } else {
                    for (int j = 0; j < result.size(); j++) {
                        Object o = result.get(j);
                        String val = o == null ? null : o.toString();
                        List<String> row = datas.get(j);
                        row.add(val);
                    }
                }
            }
            return datas;
        }
    

    以上是我的一些经验分享,希望对大家有帮助!


评论