Skip to content

Commit

Permalink
feat: Filter out invalid splits to improve flink database compaction …
Browse files Browse the repository at this point in the history
…efficiency
  • Loading branch information
zhourui999 committed Apr 21, 2024
1 parent e27ceb4 commit 4ae3dce
Showing 1 changed file with 13 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.utils.JavaTypeInfo;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.EndOfScanException;
import org.apache.paimon.table.source.InnerStreamTableScanImpl;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.StreamTableScan;

Expand Down Expand Up @@ -81,8 +83,19 @@ public void run(SourceContext<Tuple2<Split, String>> ctx) throws Exception {
for (Map.Entry<Identifier, StreamTableScan> entry : scansMap.entrySet()) {
Identifier identifier = entry.getKey();
StreamTableScan scan = entry.getValue();
int maxLevel = ((InnerStreamTableScanImpl) scan).options().numLevels() - 1;
splits.addAll(
scan.plan().splits().stream()
.filter(
split -> {
DataSplit dataSplit = (DataSplit) split;
if (dataSplit.dataFiles().isEmpty()) {
return false;
}
return dataSplit.dataFiles().stream()
.map(DataFileMeta::level)
.anyMatch(level -> level != maxLevel);
})
.map(split -> new Tuple2<>(split, identifier.getFullName()))
.collect(Collectors.toList()));
}
Expand Down

0 comments on commit 4ae3dce

Please sign in to comment.