Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fix-6388] Fix kafka source can not consumer all history data when use batch mode #6685

Open
wants to merge 2 commits into
base: dev
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,9 @@ public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
&& record.offset()
>= sourceSplit
.getEndOffset()) {
// signal to the source that we have
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @SbloodyS for create this PR. Could you explain to us the purpose of move context.signalNoMoreElement(); to here? And why this change can fix kafka source can not consumer all history data when use batch mode?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, In Batch mode The pollNext will only be executed once because KafkaConsumer.poll can only be consume limited data restricted by max.poll.records and its timeout. Moving it internally can enable poolNext to execute multiple times to consume data. @Hisoka-X

Copy link
Member

@Hisoka-X Hisoka-X Apr 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add test case to reproduce this bug? Like we can set max.poll.records to 1?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add test case to reproduce this bug? Like we can set max.poll.records to 1?

Can you tell me where to add it? I can't find any relavant unit tests.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// reached the end of the data.
context.signalNoMoreElement();
break;
}
}
Expand Down Expand Up @@ -198,11 +201,6 @@ public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
}
completableFuture.join();
});

if (Boundedness.BOUNDED.equals(context.getBoundedness())) {
// signal to the source that we have reached the end of the data.
context.signalNoMoreElement();
}
}

@Override
Expand Down