Flink导入mysql数据到doris
经过各种实践,发现比较适合中小公司的方式。分为全量和增量。
1、参考链接:
1、Flink Doris Connector:https://doris.apache.org/zh-CN/docs/dev/ecosystem/flink-doris-connector
2、JSON格式数据导入:https://doris.apache.org/zh-CN/docs/dev/data-operate/import/import-way/load-json-format
2、全量导入
通过外部表同步数据
3、增量导入
1、dataocean监听kafka然后insert,这种方式,官方不建议,大量的insert会把doris-be打崩溃。(目前正在用的方式)。
2、binlog load的方式,以前旧版本尝试有问题,使用新版本尝试看看。经过新版本的尝试,发现这种方式不适合,详见:binlog方式调研。
3、Flink CDC,但是这种方式得学习和搭建Flink,使得成本变高了。准备尝试这种方式了,后期使用这种方式。
4、Flink的方式
1、订阅kafka的方式
canal发个kafka的消息,可以通过如下方式同步给doris。
但是问题是:无法控制同步哪些数据到哪个表
CREATE ROUTINE LOAD ocean.test1 ON cloud_uc_company
COLUMNS(id, name, delete_at)
PROPERTIES
(
"desired_concurrent_number"="3",
"max_batch_interval" = "20",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200",
"strict_mode" = "false",
"format" = "json",
"jsonpaths" = "[\"$.id\",\"$.name\",\"$.delete_at\"]",
"strip_outer_array" = "true",
"json_root" = "$.data"
)
FROM KAFKA
(
"kafka_broker_list" = "10.20.1.26:9092",
"kafka_topic" = "topic-periodfour",
'property.group.id' = 'doris_ods_log',
'property.client.id' = 'doris_ods_log',
'property.kafka_default_offsets' = 'OFFSET_BEGINNING'
);
2、Flink代码可以拿到kafka消息
2.1、原理:
这种方式是拿到canal发送给里的kafka消json消息,然后取出其中的data部分,然后再写到doris里去。这种方式其实是通过doris的json序列化器,将data部分序列化成json字符串,然后再写入doris。
2.2、碰到的问题:
1、问题描述:
DorisRuntimeException: stream load error: [ANALYSIS_ERROR]errCode = 2, detailMessage = current running txns on db 11008 is 100, larger than limit 100, see more in null
解决方式:
这是因为同一个库并发导入超过了100,可通过调整 fe.conf的参数
max_running_txn_num_per_db
来解决。
执行sql:admin set frontend config ('max_running_txn_num_per_db' = '1000');
2、问题描述:
DorisRuntimeException: stream load error: [INTERNAL_ERROR]too many filtered rows, see more in http://10.20.0.92:8040/api/_load_error_log?file=__shard_182/error_log_insert_stmt_8f4f21b2fa64f604-8fbb0ec0fba8129d_8f4f21b2fa64f604_8fbb0ec0fba8129d。
打开链接,看到了错误信息:Reason: JSON data is array-object,strip_outer_array
must be TRUE。
解决方式:
需要增加代码:设置pro.setProperty("strip_outer_array", "true");
2.3、代码:
package com.bm001.flinkdemo;
import com.google.gson.Gson;
import org.apache.commons.collections.CollectionUtils;
import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.sink.DorisSink;
import org.apache.doris.flink.sink.writer.DorisRecordSerializer;
import org.apache.doris.flink.sink.writer.JsonDebeziumSchemaSerializer;
import org.apache.doris.flink.sink.writer.RowDataSerializer;
import org.apache.doris.flink.sink.writer.SimpleStringSerializer;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.types.DataType;
import java.io.IOException;
import java.time.LocalDate;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
/**
* 这种方法是将kafka的json消息,获取其中的data部分,
* 然后再通过doris的json序列化器,将data部分序列化成json字符串,然后再写入doris
* <p>
* 每个同步的表都需要增加一个任务
*/
public class Kafka2DorisDataStreamDemo {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put("bootstrap.servers", "10.20.1.26:9092");
props.put("group.id", "group-flinkdemo");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset", "latest");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(1000);
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//source config
FlinkKafkaConsumer<String> flinkKafkaConsumer = new FlinkKafkaConsumer<>("topic-periodfour", new SimpleStringSchema(), props);
//sink config
DorisSink.Builder<String> builder = DorisSink.builder();
DorisOptions.Builder dorisBuilder = DorisOptions.builder();
DorisOptions dorisOptions = dorisBuilder.setFenodes("10.20.0.91:8030")
.setTableIdentifier("ocean.cloud_uc_company")
.setUsername("root")
.setPassword("root12345")
.build();
Properties pro = new Properties();
//json data format
pro.setProperty("format", "json");
pro.setProperty("read_json_by_line", "true");
pro.setProperty("strip_outer_array", "true"); //如果serialize返回的是json数组格式,此处必须要设置为true
DorisExecutionOptions executionOptions = DorisExecutionOptions.builder().setLabelPrefix("label-doris" + UUID.randomUUID()) //streamload label prefix,
.setStreamLoadProp(pro).build();
builder.setDorisReadOptions(DorisReadOptions.builder().build()).setDorisExecutionOptions(executionOptions)
// .setSerializer(new SimpleStringSerializer()) //serialize according to string
.setSerializer(new DorisRecordSerializer<String>() {
@Override
public byte[] serialize(String s) throws IOException {
CanalMessage canalMessage = new Gson().fromJson(s, CanalMessage.class);
String type = canalMessage.getType();
if ("INSERT".equals(type) || "UPDATE".equals(type)) {
if ("uc_company".equals(canalMessage.getTable())) {
// Map<String, Object> dataMap = canalMessage.getData().get(0);
// return new Gson().toJson(dataMap).getBytes();
return new Gson().toJson(canalMessage.getData()).getBytes();
}
}
return null;
}
}).setDorisOptions(dorisOptions);
//build stream
DataStreamSource<String> dataStreamSource = env.addSource(flinkKafkaConsumer);
dataStreamSource.sinkTo(builder.build());
env.execute("flink kafka to doris by datastream");
}
}