New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Fix-6388] Fix kafka source can not consumer all history data when use batch mode #6685
base: dev
Are you sure you want to change the base?
Conversation
@@ -171,6 +171,9 @@ public void pollNext(Collector<SeaTunnelRow> output) throws Exception { | |||
&& record.offset() | |||
>= sourceSplit | |||
.getEndOffset()) { | |||
// signal to the source that we have |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @SbloodyS for create this PR. Could you explain to us the purpose of move context.signalNoMoreElement();
to here? And why this change can fix kafka source can not consumer all history data when use batch mode?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, In Batch mode The pollNext
will only be executed once because KafkaConsumer.poll
can only be consume limited data restricted by max.poll.records
and its timeout. Moving it internally can enable poolNext
to execute multiple times to consume data. @Hisoka-X
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add test case to reproduce this bug? Like we can set max.poll.records
to 1?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add test case to reproduce this bug? Like we can set
max.poll.records
to 1?
Can you tell me where to add it? I can't find any relavant unit tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Purpose of this pull request
fix #6388
Does this PR introduce any user-facing change?
How was this patch tested?
Check list
New License Guide
release-note
.