Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement OperatorInputStream and OperatorOutputStream #4626

Merged
merged 2 commits into from
May 19, 2024

Conversation

tisonkun
Copy link
Member

@tisonkun tisonkun commented May 17, 2024

This closes #4616.

@tisonkun tisonkun requested a review from Xuanwo as a code owner May 17, 2024 11:18
@github-actions github-actions bot requested a review from morristai May 17, 2024 11:18
@tisonkun tisonkun marked this pull request as draft May 17, 2024 11:36
@tisonkun tisonkun marked this pull request as ready for review May 17, 2024 11:45
@@ -39,11 +41,23 @@ pub struct StdBytesIterator {
impl StdBytesIterator {
/// NOTE: don't allow users to create StdIterator directly.
#[inline]
pub(crate) fn new(r: oio::BlockingReader, range: std::ops::Range<u64>) -> Self {
pub(crate) fn new(r: oio::BlockingReader, range: impl RangeBounds<u64>) -> Self {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to wait for #4594.

I'm still working on this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is it blocked? I don't see the dependency.

Copy link
Member

@Xuanwo Xuanwo May 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some services may behavior wrong while handling limit = u64::MAX, it's a bug need to fix. Can we revert changes to core first in this PR?

Copy link
Member Author

@tisonkun tisonkun May 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I get the point. But change range to support .. is significant to this PR. So I'd instead wait for your change and later rebase on those changes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it and thanks a lot.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or if you can rebase on my patch? I don't think the bug you mention would conflict with this patch but it's instead another issue to fix.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makese sense, let me adapt those logic here.

Signed-off-by: tison <wander4096@gmail.com>
@tisonkun tisonkun changed the title feat: implement OperatorInputStream feat: implement OperatorInputStream and OperatorOutputStream May 18, 2024
Comment on lines +161 to +165
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<scope>test</scope>
</dependency>
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add to test scope for some handy debugging, like:

System.out.println(FileUtils.readFileToString(tempDir.resolve(path).toFile(), StandardCharsets.UTF_8));

This doesn't affect the formal release.

Signed-off-by: tison <wander4096@gmail.com>
}
}

private static final int MAX_BYTES = 16384;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Later we can make it configuable.

Copy link
Member

@Xuanwo Xuanwo May 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please configure by op.writer_with(path).chunk(size), and remove internal bytes in OperatorOutputStream.

We can implement this in coming PRs.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As mentioned in #4626 (comment), they are differnt concept of buffering. I agree that we can improve the configurability here, though.

if (offset > MAX_BYTES) {
throw new IOException("INTERNAL ERROR: " + offset + " > " + MAX_BYTES);
} else if (offset < MAX_BYTES) {
final byte[] bytes = Arrays.copyOf(this.bytes, offset);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is normally called on close and thus only copy once. But users may like to flush to the underneath writer manually.

I'm not sure if we can avoid this copy with some JNI regioned array methods.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

@Xuanwo Xuanwo May 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is normally called on close and thus only copy once. But users may like to flush to the underneath writer manually.

OpenDAL doesn't provide flush semantics and only guarantees data persistence after closing. Therefore, flush should be a no-op, and this behavior needs to be clearly documented in our public API.

public void write(int b) throws IOException {
bytes[offset++] = (byte) b;
if (offset >= MAX_BYTES) {
flush();
Copy link
Member

@Xuanwo Xuanwo May 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need this, opendal core handling buffer on it own.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need it.

This sends the Java buffer to the Rust. Otherwise the Rust operator never receive the buffer. We should buffer it a bit in the Java side because if we send every byte, it would introduce expensive JNI call total cost.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is, they are different concept of buffering.

if (offset > MAX_BYTES) {
throw new IOException("INTERNAL ERROR: " + offset + " > " + MAX_BYTES);
} else if (offset < MAX_BYTES) {
final byte[] bytes = Arrays.copyOf(this.bytes, offset);
Copy link
Member

@Xuanwo Xuanwo May 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is normally called on close and thus only copy once. But users may like to flush to the underneath writer manually.

OpenDAL doesn't provide flush semantics and only guarantees data persistence after closing. Therefore, flush should be a no-op, and this behavior needs to be clearly documented in our public API.

}
}

private static final int MAX_BYTES = 16384;
Copy link
Member

@Xuanwo Xuanwo May 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please configure by op.writer_with(path).chunk(size), and remove internal bytes in OperatorOutputStream.

We can implement this in coming PRs.

@tisonkun tisonkun merged commit ac69437 into apache:main May 19, 2024
213 checks passed
@tisonkun tisonkun deleted the java-input-stream branch May 19, 2024 14:52
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

bindings/java: read and write big files through streams
2 participants