Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HIVE-28258: Use Iceberg semantics for Merge task #5251

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

SourabhBadhya
Copy link
Contributor

What changes were proposed in this pull request?

Use Iceberg semantics for Merge task

Why are the changes needed?

To use Iceberg readers and writers for merge task.

Does this PR introduce any user-facing change?

No

Is the change a dependency upgrade?

No

How was this patch tested?

Existing tests - iceberg_merge_files.q is present.


@SuppressWarnings("checkstyle:VisibilityModifier")
public abstract class AbstractMapredIcebergRecordReader<T> implements RecordReader<Void, T> {

protected final org.apache.hadoop.mapreduce.RecordReader<Void, ?> innerReader;

public AbstractMapredIcebergRecordReader(org.apache.iceberg.mr.mapreduce.IcebergInputFormat<?> mapreduceInputFormat,
IcebergSplit split, JobConf job, Reporter reporter) throws IOException {
InputSplit split, JobConf job, Reporter reporter) throws IOException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is that change? do we support non IcebergSplit in IcebergRecordReader?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is required since I have created a new split by the name of IcebergMergeSplit which extends FileSplit.

@@ -85,7 +90,8 @@ private static final class MapredIcebergRecordReader<T> extends AbstractMapredIc
private final long splitLength; // for getPos()

MapredIcebergRecordReader(org.apache.iceberg.mr.mapreduce.IcebergInputFormat<T> mapreduceInputFormat,
IcebergSplit split, JobConf job, Reporter reporter) throws IOException {
org.apache.hadoop.mapreduce.InputSplit split, JobConf job, Reporter reporter)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IcebergSplit

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is required since I have created a new split by the name of IcebergMergeSplit which extends FileSplit.

if (inputFormat instanceof CombineHiveInputFormat.MergeSplit) {
if (mrwork == null) {
mrwork = (MapWork) Utilities.getMergeWork(jobConf);
if (mrwork == null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can it be null?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can be null, hence we do Utilities.getMapWork()

@@ -768,4 +763,8 @@ public String toString() {
public interface AvoidSplitCombination {
boolean shouldSkipCombine(Path path, Configuration conf) throws IOException;
}

public interface MergeSplit {
Copy link
Member

@deniskuzZ deniskuzZ Jun 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is that generic or only Iceberg specific? btw, interface always public

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is that used at all?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is generic interface for creating merge splits and its used.

import org.apache.iceberg.ContentFile;
import org.apache.iceberg.util.SerializationUtil;

public class IcebergMergeSplit extends FileSplit implements org.apache.hadoop.mapred.InputSplit {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should it extend IcebergSplit?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are inherent problems of extending it with IcebergSplit. The required split for merge task must be of type FileSplit however IcebergSplit extends InputSplit only. Also the IcebergSplitContainer is also extended by IcebergSplit which is not used here.

@@ -436,49 +526,34 @@ private CloseableIterable<T> openGeneric(FileScanTask task, Schema readSchema) {

@SuppressWarnings("unchecked")
private CloseableIterable<T> open(FileScanTask currentTask, Schema readSchema) {
switch (inMemoryDataModel) {
switch (getInMemoryDataModel()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not just make fields protected to avoid all those getters

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To avoid checkstyle errors. Checkstyle rules prevent usage of protected variables.

Copy link

sonarcloud bot commented Jun 7, 2024

Quality Gate Passed Quality Gate passed

Issues
32 New issues
0 Accepted issues

Measures
0 Security Hotspots
No data about Coverage
No data about Duplication

See analysis details on SonarCloud

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants