-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Bug] [doris-x] sink-遇到源端update时,会出现重复统计 #1837
Comments
@waryars 在使用jdbc 模式下 |
试过用Unique表模型,启动都会报错的 |
为什么不是以sink端的指定的pimary key去处理~ |
@waryars |
Caused by: java.sql.SQLException: errCode = 2, detailMessage = errCode = 2, detailMessage = Column[area_name] is not key column or storage model is not duplicate or column type is float or double. |
把表模型一改,很好重现的 |
我把 area_name,addr_name 字段类型的字段都加到key范围里了,其它字段的类型改为:sale_num float,sale_amt double 但还是报错: Column[sale_num] is not key column or storage model is not duplicate or column type is float or double. 当源表插入第二条记录时报错: |
Search before asking
What happened
mysql-binlog->kafka->doris时,源端mysql遇到update语句时,目标端会出现重复数据。
源端操作:
insert into test_binlog(id,cname,sale_amt,ttime)
values(154,'hua',10.20,now());
insert into test_binlog(id,cname,sale_amt,ttime)
values(155,'hua',10.20,now());
insert into test_binlog(id,cname,sale_amt,ttime)
values(156,'hua',10.20,now());
目标端效果:
mysql> select * from test_sink;
+------------+-------+-----------+-----------+----------+----------+
| tdate | cname | area_name | addr_name | sale_num | sale_amt |
+------------+-------+-----------+-----------+----------+----------+
| 2023-10-24 | hua | 广东省 | 湛江市 | 3 | 30.60 |
+------------+-------+-----------+-----------+----------+----------+
1 row in set (0.01 sec)
源端操作:
delete from test_binlog where id=154;
update test_binlog set sale_amt=20.20 where id=155;
目标端效果:
mysql> select * from test_sink;
+------------+-------+-----------+-----------+----------+----------+
| tdate | cname | area_name | addr_name | sale_num | sale_amt |
+------------+-------+-----------+-----------+----------+----------+
| 2023-10-24 | hua | 广东省 | 湛江市 | 2 | 30.40 |
| 2023-10-24 | hua | 广东省 | 湛江市 | 1 | 10.20 |
+------------+-------+-----------+-----------+----------+----------+
2 rows in set (0.01 sec)
遇到update,则多出一条记录。
What you expected to happen
不应出现重复统计数据
How to reproduce
一、mysqlbinlog->kafka配置:
[root@t-hadoop01 binlog]# cat binlog_kafka.sql
CREATE TABLE source
(
id int,
cname varchar,
sale_amt decimal(18,2),
ttime timestamp
) WITH (
'connector' = 'binlog-x'
,'username' = 'root'
,'password' = 'xxxxx'
,'cat' = 'insert,delete,update'
,'url' = 'jdbc:mysql://172.16.44.83:3306/ambari?useSSL=false'
,'host' = '172.16.44.83'
,'port' = '3306'
,'table' = 'ambari.test_binlog'
,'timestamp-format.standard' = 'SQL'
);
CREATE TABLE sink
(
id int,
cname varchar,
sale_amt decimal(18,2),
ttime timestamp
) WITH (
'connector' = 'kafka-x'
,'topic' = 'flinkcdc-mysql'
,'properties.bootstrap.servers' = '172.16.56.254:34715'
,'value.format' = 'debezium-json'
);
insert into sink
select *
from source u;
二、kafka->doris配置:
[root@t-hadoop01 doris]# cat dim_doris.sql
CREATE TABLE source (
id int,
cname varchar,
sale_amt decimal(18,2),
ttime timestamp,
PROCTIME AS PROCTIME()
) WITH (
'connector' = 'kafka-x'
,'topic' = 'flinkcdc-mysql'
,'properties.bootstrap.servers' = '172.16.56.254:34715'
,'scan.startup.mode' = 'latest-offset'
,'value.format' = 'debezium-json'
);
CREATE TABLE test_side (
id int,
cname varchar,
area_name varchar,
addr_name varchar,
PRIMARY KEY (cname) NOT ENFORCED
) WITH (
'connector' = 'doris-x',
'url' = 'jdbc:mysql://172.16.44.86:9030',
'schema' = 'dorisdb',
'table-name' = 'test_side',
'username' = 'root',
'password' = '',
'lookup.cache-type' = 'lru',
'lookup.cache-period' = '3000',
'lookup.cache.max-rows' = '20000',
'lookup.cache.ttl' = '3000'
);
CREATE TABLE sink
(
tdate varchar,
cname varchar,
area_name varchar,
addr_name varchar,
sale_num bigint,
sale_amt decimal(18,2),
PRIMARY KEY (tdate,cname) NOT ENFORCED
) WITH (
'password' = '',
'connector' = 'doris-x',
'sink.buffer-flush.interval' = '1000',
'sink.all-replace' = 'true',
'sink.buffer-flush.max-rows' = '100',
'schema' = 'dorisdb',
'table-name' = 'test_sink',
'sink.parallelism' = '1',
'url' = 'jdbc:mysql://172.16.44.86:9030',
'username' = 'root'
);
INSERT INTO sink
SELECT
date_format(a.ttime,'yyyy-MM-dd') as tdate,
a.cname,
max(b.area_name) as area_name,
max(b.addr_name) as addr_name,
count(*) as sale_num,
sum(a.sale_amt) as sale_amt
FROM source a
left join test_side FOR SYSTEM_TIME AS OF a.PROCTIME AS b
on (a.cname=b.cname)
group by date_format(a.ttime,'yyyy-MM-dd'),a.cname;
三、表结构信息:
3.1、源端mysql表:
CREATE TABLE
test_binlog
(id
int(11) DEFAULT NULL,cname
varchar(50) DEFAULT NULL,sale_amt
decimal(18,2) DEFAULT NULL,ttime
datetime DEFAULT NULL) ENGINE=InnoDB DEFAULT CHARSET=utf8;
3.2、目标端doris表:
维度表结构及其数据:
CREATE TABLE
test_side
(id
int(11) NULL COMMENT 'id',cname
varchar(50) NULL COMMENT 'xxx',area_name
varchar(50) NULL COMMENT 'xxx',addr_name
varchar(50) NULL COMMENT 'xxx') ENGINE=OLAP
DUPLICATE KEY(
id
,cname
)COMMENT 'test1表'
DISTRIBUTED BY HASH(
id
,cname
) BUCKETS 5PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"in_memory" = "false",
"storage_format" = "V2",
"disable_auto_compaction" = "false"
);
insert into test_side(id,cname,area_name,addr_name) values(1,'war','广东省','珠海市');
insert into test_side(id,cname,area_name,addr_name) values(2,'pha','广东省','广州市');
insert into test_side(id,cname,area_name,addr_name) values(3,'hua','广东省','湛江市');
sink表结构:
create table test_sink(tdate varchar(10) comment 'id',cname varchar(50) comment '名称',area_name varchar(50) comment 'xx',addr_name varchar(50) comment 'xx',sale_num bigint,sale_amt decimal(18,2)) duplicate key(tdate,cname) comment 'test表' distributed by hash(tdate,cname) buckets 5 PROPERTIES ("replication_num" = "1");
四、执行:
1、sh bin/chunjun-local.sh -job chunjun-examples/sql/binlog/binlog_kafka.sql
2、sh bin/chunjun-local.sh -job chunjun-examples/sql/doris/dim_doris.sql
五、问题重现:
mysql-binlog->kafka->doris时,源端mysql遇到update语句时,目标端会出现重复数据。
源端操作:
insert into test_binlog(id,cname,sale_amt,ttime)
values(154,'hua',10.20,now());
insert into test_binlog(id,cname,sale_amt,ttime)
values(155,'hua',10.20,now());
insert into test_binlog(id,cname,sale_amt,ttime)
values(156,'hua',10.20,now());
目标端效果:
mysql> select * from test_sink;
+------------+-------+-----------+-----------+----------+----------+
| tdate | cname | area_name | addr_name | sale_num | sale_amt |
+------------+-------+-----------+-----------+----------+----------+
| 2023-10-24 | hua | 广东省 | 湛江市 | 3 | 30.60 |
+------------+-------+-----------+-----------+----------+----------+
1 row in set (0.01 sec)
源端操作:
delete from test_binlog where id=154;
update test_binlog set sale_amt=20.20 where id=155;
目标端效果:
mysql> select * from test_sink;
+------------+-------+-----------+-----------+----------+----------+
| tdate | cname | area_name | addr_name | sale_num | sale_amt |
+------------+-------+-----------+-----------+----------+----------+
| 2023-10-24 | hua | 广东省 | 湛江市 | 2 | 30.40 |
| 2023-10-24 | hua | 广东省 | 湛江市 | 1 | 10.20 |
+------------+-------+-----------+-----------+----------+----------+
2 rows in set (0.01 sec)
遇到update,则多出一条记录。
Anything else
No response
Version
master
Are you willing to submit PR?
Code of Conduct
The text was updated successfully, but these errors were encountered: