-
Notifications
You must be signed in to change notification settings - Fork 4.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
HIVE-28258: Use Iceberg semantics for Merge task #5251
base: master
Are you sure you want to change the base?
Conversation
...erg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapred/MapredIcebergInputFormat.java
Outdated
Show resolved
Hide resolved
...erg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapred/MapredIcebergInputFormat.java
Outdated
Show resolved
Hide resolved
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
Outdated
Show resolved
Hide resolved
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
Outdated
Show resolved
Hide resolved
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
Outdated
Show resolved
Hide resolved
...erg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapred/MapredIcebergInputFormat.java
Outdated
Show resolved
Hide resolved
|
||
@SuppressWarnings("checkstyle:VisibilityModifier") | ||
public abstract class AbstractMapredIcebergRecordReader<T> implements RecordReader<Void, T> { | ||
|
||
protected final org.apache.hadoop.mapreduce.RecordReader<Void, ?> innerReader; | ||
|
||
public AbstractMapredIcebergRecordReader(org.apache.iceberg.mr.mapreduce.IcebergInputFormat<?> mapreduceInputFormat, | ||
IcebergSplit split, JobConf job, Reporter reporter) throws IOException { | ||
InputSplit split, JobConf job, Reporter reporter) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is that change? do we support non IcebergSplit in IcebergRecordReader?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is required since I have created a new split by the name of IcebergMergeSplit which extends FileSplit.
@@ -85,7 +90,8 @@ private static final class MapredIcebergRecordReader<T> extends AbstractMapredIc | |||
private final long splitLength; // for getPos() | |||
|
|||
MapredIcebergRecordReader(org.apache.iceberg.mr.mapreduce.IcebergInputFormat<T> mapreduceInputFormat, | |||
IcebergSplit split, JobConf job, Reporter reporter) throws IOException { | |||
org.apache.hadoop.mapreduce.InputSplit split, JobConf job, Reporter reporter) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IcebergSplit
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is required since I have created a new split by the name of IcebergMergeSplit which extends FileSplit.
ql/src/java/org/apache/hadoop/hive/ql/plan/MergeTaskProperties.java
Outdated
Show resolved
Hide resolved
if (inputFormat instanceof CombineHiveInputFormat.MergeSplit) { | ||
if (mrwork == null) { | ||
mrwork = (MapWork) Utilities.getMergeWork(jobConf); | ||
if (mrwork == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can it be null?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It can be null, hence we do Utilities.getMapWork()
@@ -768,4 +763,8 @@ public String toString() { | |||
public interface AvoidSplitCombination { | |||
boolean shouldSkipCombine(Path path, Configuration conf) throws IOException; | |||
} | |||
|
|||
public interface MergeSplit { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is that generic or only Iceberg specific? btw, interface always public
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is that used at all?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is generic interface for creating merge splits and its used.
import org.apache.iceberg.ContentFile; | ||
import org.apache.iceberg.util.SerializationUtil; | ||
|
||
public class IcebergMergeSplit extends FileSplit implements org.apache.hadoop.mapred.InputSplit { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should it extend IcebergSplit?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are inherent problems of extending it with IcebergSplit. The required split for merge task must be of type FileSplit however IcebergSplit extends InputSplit only. Also the IcebergSplitContainer is also extended by IcebergSplit which is not used here.
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
Outdated
Show resolved
Hide resolved
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
Outdated
Show resolved
Hide resolved
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
Outdated
Show resolved
Hide resolved
@@ -436,49 +526,34 @@ private CloseableIterable<T> openGeneric(FileScanTask task, Schema readSchema) { | |||
|
|||
@SuppressWarnings("unchecked") | |||
private CloseableIterable<T> open(FileScanTask currentTask, Schema readSchema) { | |||
switch (inMemoryDataModel) { | |||
switch (getInMemoryDataModel()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not just make fields protected to avoid all those getters
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To avoid checkstyle errors. Checkstyle rules prevent usage of protected variables.
Quality Gate passedIssues Measures |
What changes were proposed in this pull request?
Use Iceberg semantics for Merge task
Why are the changes needed?
To use Iceberg readers and writers for merge task.
Does this PR introduce any user-facing change?
No
Is the change a dependency upgrade?
No
How was this patch tested?
Existing tests - iceberg_merge_files.q is present.