Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] [doris-x] sink-遇到源端update时,会出现重复统计 #1837

Open
2 of 3 tasks
waryars opened this issue Oct 23, 2023 · 7 comments
Open
2 of 3 tasks

[Bug] [doris-x] sink-遇到源端update时,会出现重复统计 #1837

waryars opened this issue Oct 23, 2023 · 7 comments
Labels
bug Something isn't working

Comments

@waryars
Copy link

waryars commented Oct 23, 2023

Search before asking

  • I had searched in the issues and found no similar issues.

What happened

mysql-binlog->kafka->doris时,源端mysql遇到update语句时,目标端会出现重复数据。

源端操作:
insert into test_binlog(id,cname,sale_amt,ttime)
values(154,'hua',10.20,now());
insert into test_binlog(id,cname,sale_amt,ttime)
values(155,'hua',10.20,now());
insert into test_binlog(id,cname,sale_amt,ttime)
values(156,'hua',10.20,now());

目标端效果:
mysql> select * from test_sink;
+------------+-------+-----------+-----------+----------+----------+
| tdate | cname | area_name | addr_name | sale_num | sale_amt |
+------------+-------+-----------+-----------+----------+----------+
| 2023-10-24 | hua | 广东省 | 湛江市 | 3 | 30.60 |
+------------+-------+-----------+-----------+----------+----------+
1 row in set (0.01 sec)

源端操作:
delete from test_binlog where id=154;
update test_binlog set sale_amt=20.20 where id=155;

目标端效果:
mysql> select * from test_sink;
+------------+-------+-----------+-----------+----------+----------+
| tdate | cname | area_name | addr_name | sale_num | sale_amt |
+------------+-------+-----------+-----------+----------+----------+
| 2023-10-24 | hua | 广东省 | 湛江市 | 2 | 30.40 |
| 2023-10-24 | hua | 广东省 | 湛江市 | 1 | 10.20 |
+------------+-------+-----------+-----------+----------+----------+
2 rows in set (0.01 sec)

遇到update,则多出一条记录。

What you expected to happen

不应出现重复统计数据

How to reproduce

一、mysqlbinlog->kafka配置:

[root@t-hadoop01 binlog]# cat binlog_kafka.sql
CREATE TABLE source
(
id int,
cname varchar,
sale_amt decimal(18,2),
ttime timestamp
) WITH (
'connector' = 'binlog-x'
,'username' = 'root'
,'password' = 'xxxxx'
,'cat' = 'insert,delete,update'
,'url' = 'jdbc:mysql://172.16.44.83:3306/ambari?useSSL=false'
,'host' = '172.16.44.83'
,'port' = '3306'
,'table' = 'ambari.test_binlog'
,'timestamp-format.standard' = 'SQL'
);

CREATE TABLE sink
(
id int,
cname varchar,
sale_amt decimal(18,2),
ttime timestamp
) WITH (
'connector' = 'kafka-x'
,'topic' = 'flinkcdc-mysql'
,'properties.bootstrap.servers' = '172.16.56.254:34715'
,'value.format' = 'debezium-json'
);

insert into sink
select *
from source u;

二、kafka->doris配置:
[root@t-hadoop01 doris]# cat dim_doris.sql
CREATE TABLE source (
id int,
cname varchar,
sale_amt decimal(18,2),
ttime timestamp,
PROCTIME AS PROCTIME()
) WITH (
'connector' = 'kafka-x'
,'topic' = 'flinkcdc-mysql'
,'properties.bootstrap.servers' = '172.16.56.254:34715'
,'scan.startup.mode' = 'latest-offset'
,'value.format' = 'debezium-json'
);

CREATE TABLE test_side (
id int,
cname varchar,
area_name varchar,
addr_name varchar,
PRIMARY KEY (cname) NOT ENFORCED
) WITH (
'connector' = 'doris-x',
'url' = 'jdbc:mysql://172.16.44.86:9030',
'schema' = 'dorisdb',
'table-name' = 'test_side',
'username' = 'root',
'password' = '',
'lookup.cache-type' = 'lru',
'lookup.cache-period' = '3000',
'lookup.cache.max-rows' = '20000',
'lookup.cache.ttl' = '3000'
);

CREATE TABLE sink
(
tdate varchar,
cname varchar,
area_name varchar,
addr_name varchar,
sale_num bigint,
sale_amt decimal(18,2),
PRIMARY KEY (tdate,cname) NOT ENFORCED
) WITH (
'password' = '',
'connector' = 'doris-x',
'sink.buffer-flush.interval' = '1000',
'sink.all-replace' = 'true',
'sink.buffer-flush.max-rows' = '100',
'schema' = 'dorisdb',
'table-name' = 'test_sink',
'sink.parallelism' = '1',
'url' = 'jdbc:mysql://172.16.44.86:9030',
'username' = 'root'
);

INSERT INTO sink
SELECT
date_format(a.ttime,'yyyy-MM-dd') as tdate,
a.cname,
max(b.area_name) as area_name,
max(b.addr_name) as addr_name,
count(*) as sale_num,
sum(a.sale_amt) as sale_amt
FROM source a
left join test_side FOR SYSTEM_TIME AS OF a.PROCTIME AS b
on (a.cname=b.cname)
group by date_format(a.ttime,'yyyy-MM-dd'),a.cname;

三、表结构信息:
3.1、源端mysql表:
CREATE TABLE test_binlog (
id int(11) DEFAULT NULL,
cname varchar(50) DEFAULT NULL,
sale_amt decimal(18,2) DEFAULT NULL,
ttime datetime DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

3.2、目标端doris表:
维度表结构及其数据:
CREATE TABLE test_side (
id int(11) NULL COMMENT 'id',
cname varchar(50) NULL COMMENT 'xxx',
area_name varchar(50) NULL COMMENT 'xxx',
addr_name varchar(50) NULL COMMENT 'xxx'
) ENGINE=OLAP
DUPLICATE KEY(id, cname)
COMMENT 'test1表'
DISTRIBUTED BY HASH(id, cname) BUCKETS 5
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"in_memory" = "false",
"storage_format" = "V2",
"disable_auto_compaction" = "false"
);

insert into test_side(id,cname,area_name,addr_name) values(1,'war','广东省','珠海市');
insert into test_side(id,cname,area_name,addr_name) values(2,'pha','广东省','广州市');
insert into test_side(id,cname,area_name,addr_name) values(3,'hua','广东省','湛江市');

sink表结构:
create table test_sink(tdate varchar(10) comment 'id',cname varchar(50) comment '名称',area_name varchar(50) comment 'xx',addr_name varchar(50) comment 'xx',sale_num bigint,sale_amt decimal(18,2)) duplicate key(tdate,cname) comment 'test表' distributed by hash(tdate,cname) buckets 5 PROPERTIES ("replication_num" = "1");

四、执行:
1、sh bin/chunjun-local.sh -job chunjun-examples/sql/binlog/binlog_kafka.sql
2、sh bin/chunjun-local.sh -job chunjun-examples/sql/doris/dim_doris.sql

五、问题重现:
mysql-binlog->kafka->doris时,源端mysql遇到update语句时,目标端会出现重复数据。

源端操作:
insert into test_binlog(id,cname,sale_amt,ttime)
values(154,'hua',10.20,now());
insert into test_binlog(id,cname,sale_amt,ttime)
values(155,'hua',10.20,now());
insert into test_binlog(id,cname,sale_amt,ttime)
values(156,'hua',10.20,now());

目标端效果:
mysql> select * from test_sink;
+------------+-------+-----------+-----------+----------+----------+
| tdate | cname | area_name | addr_name | sale_num | sale_amt |
+------------+-------+-----------+-----------+----------+----------+
| 2023-10-24 | hua | 广东省 | 湛江市 | 3 | 30.60 |
+------------+-------+-----------+-----------+----------+----------+
1 row in set (0.01 sec)

源端操作:
delete from test_binlog where id=154;
update test_binlog set sale_amt=20.20 where id=155;

目标端效果:
mysql> select * from test_sink;
+------------+-------+-----------+-----------+----------+----------+
| tdate | cname | area_name | addr_name | sale_num | sale_amt |
+------------+-------+-----------+-----------+----------+----------+
| 2023-10-24 | hua | 广东省 | 湛江市 | 2 | 30.40 |
| 2023-10-24 | hua | 广东省 | 湛江市 | 1 | 10.20 |
+------------+-------+-----------+-----------+----------+----------+
2 rows in set (0.01 sec)

遇到update,则多出一条记录。

Anything else

No response

Version

master

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@waryars waryars added the bug Something isn't working label Oct 23, 2023
@zoudaokoulife
Copy link
Contributor

zoudaokoulife commented Feb 19, 2024

@waryars 在使用jdbc 模式下
doris 的表使用
DUPLICATE KEY(id, cname) ==》“ 而在建表语句中指定的 DUPLICATE KEY,只是用来指明底层数据按照那些列进行排序。(更贴切的名称应该为 “Sorted Column” 参考:https://doris.apache.org/zh-CN/docs/data-table/data-model/
如果你要达到更新的效果,将test_sink 表的属性修改为
UNIQUE KEY(tdate, cname)

@waryars
Copy link
Author

waryars commented Feb 21, 2024

试过用Unique表模型,启动都会报错的

@waryars
Copy link
Author

waryars commented Feb 21, 2024

为什么不是以sink端的指定的pimary key去处理~

@zoudaokoulife
Copy link
Contributor

@waryars
1:使用unique 模型报错信息是啥呢?帮忙发下。我本地验证过是正常的
2:“为什么不是以sink端的指定的pimary key去处理~“ 这个写入逻辑受限于connecor 的数据模型逻辑。类似上面doris 的数据模型里面,你指定的主键的逻辑只是排序, 那写入的时候也是按doris 的调用逻辑。

@zoudaokoulife zoudaokoulife reopened this Feb 21, 2024
@waryars
Copy link
Author

waryars commented Feb 21, 2024

Caused by: java.sql.SQLException: errCode = 2, detailMessage = errCode = 2, detailMessage = Column[area_name] is not key column or storage model is not duplicate or column type is float or double.
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:965)
at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3976)
at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3912)

@waryars
Copy link
Author

waryars commented Feb 21, 2024

把表模型一改,很好重现的

@waryars
Copy link
Author

waryars commented Feb 21, 2024

我把 area_name,addr_name 字段类型的字段都加到key范围里了,其它字段的类型改为:sale_num float,sale_amt double

但还是报错:

Column[sale_num] is not key column or storage model is not duplicate or column type is float or double.

当源表插入第二条记录时报错:
insert into test_binlog(id,cname,sale_amt,ttime)
values(154,'hua',10.20,now());
insert into test_binlog(id,cname,sale_amt,ttime)
values(155,'hua',10.20,now());
insert into test_binlog(id,cname,sale_amt,ttime)
values(156,'hua',10.20,now());

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants