Skip to content

Commit

Permalink
refactor datafilemeta
Browse files Browse the repository at this point in the history
  • Loading branch information
Aitozi committed Apr 10, 2024
1 parent 3ae9538 commit f9fc9e8
Show file tree
Hide file tree
Showing 15 changed files with 105 additions and 40 deletions.
34 changes: 17 additions & 17 deletions paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.fs.Path;
import org.apache.paimon.manifest.FileSource;
import org.apache.paimon.stats.BinaryTableStats;
import org.apache.paimon.stats.FieldStatsArraySerializer;
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.BigIntType;
import org.apache.paimon.types.BooleanType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.IntType;
Expand Down Expand Up @@ -77,13 +77,13 @@ public class DataFileMeta {

private final List<String> extraFiles;
private final Timestamp creationTime;
private final Boolean isAppend;

// rowCount = addRowCount + deleteRowCount
// Why don't we keep addRowCount and deleteRowCount?
// Because in previous versions of DataFileMeta, we only keep rowCount.
// We have to keep the compatibility.
private final @Nullable Long deleteRowCount;
private final @Nullable FileSource fileSource;

public static DataFileMeta forAppend(
String fileName,
Expand All @@ -93,7 +93,7 @@ public static DataFileMeta forAppend(
long minSequenceNumber,
long maxSequenceNumber,
long schemaId,
boolean isAppend) {
FileSource fileSource) {
return new DataFileMeta(
fileName,
fileSize,
Expand All @@ -107,7 +107,7 @@ public static DataFileMeta forAppend(
schemaId,
DUMMY_LEVEL,
0L,
isAppend);
fileSource);
}

public DataFileMeta(
Expand All @@ -123,7 +123,7 @@ public DataFileMeta(
long schemaId,
int level,
@Nullable Long deleteRowCount,
Boolean isAppend) {
FileSource fileSource) {
this(
fileName,
fileSize,
Expand All @@ -139,7 +139,7 @@ public DataFileMeta(
Collections.emptyList(),
Timestamp.fromLocalDateTime(LocalDateTime.now()).toMillisTimestamp(),
deleteRowCount,
isAppend);
fileSource);
}

public DataFileMeta(
Expand All @@ -157,7 +157,7 @@ public DataFileMeta(
List<String> extraFiles,
Timestamp creationTime,
@Nullable Long deleteRowCount,
Boolean isAppend) {
FileSource fileSource) {
this.fileName = fileName;
this.fileSize = fileSize;

Expand All @@ -176,7 +176,7 @@ public DataFileMeta(
this.creationTime = creationTime;

this.deleteRowCount = deleteRowCount;
this.isAppend = isAppend;
this.fileSource = fileSource;
}

public String fileName() {
Expand All @@ -199,8 +199,8 @@ public Optional<Long> deleteRowCount() {
return Optional.ofNullable(deleteRowCount);
}

public Optional<Boolean> isAppend() {
return Optional.ofNullable(isAppend);
public Optional<FileSource> fileSource() {
return Optional.ofNullable(fileSource);
}

public BinaryRow minKey() {
Expand Down Expand Up @@ -289,7 +289,7 @@ public DataFileMeta upgrade(int newLevel) {
extraFiles,
creationTime,
deleteRowCount,
isAppend);
fileSource);
}

public List<Path> collectFiles(DataFilePathFactory pathFactory) {
Expand All @@ -315,7 +315,7 @@ public DataFileMeta copy(List<String> newExtraFiles) {
newExtraFiles,
creationTime,
deleteRowCount,
isAppend);
fileSource);
}

@Override
Expand All @@ -341,7 +341,7 @@ public boolean equals(Object o) {
&& Objects.equals(extraFiles, that.extraFiles)
&& Objects.equals(creationTime, that.creationTime)
&& Objects.equals(deleteRowCount, that.deleteRowCount)
&& Objects.equals(isAppend, that.isAppend);
&& Objects.equals(fileSource, that.fileSource);
}

@Override
Expand All @@ -361,7 +361,7 @@ public int hashCode() {
extraFiles,
creationTime,
deleteRowCount,
isAppend);
fileSource);
}

@Override
Expand All @@ -370,7 +370,7 @@ public String toString() {
"{fileName: %s, fileSize: %d, rowCount: %d, "
+ "minKey: %s, maxKey: %s, keyStats: %s, valueStats: %s, "
+ "minSequenceNumber: %d, maxSequenceNumber: %d, "
+ "schemaId: %d, level: %d, extraFiles: %s, creationTime: %s, deleteRowCount: %d, isAppend: %s}",
+ "schemaId: %d, level: %d, extraFiles: %s, creationTime: %s, deleteRowCount: %d, fileSource: %s}",
fileName,
fileSize,
rowCount,
Expand All @@ -385,7 +385,7 @@ public String toString() {
extraFiles,
creationTime,
deleteRowCount,
isAppend);
fileSource);
}

public static RowType schema() {
Expand All @@ -404,7 +404,7 @@ public static RowType schema() {
fields.add(new DataField(11, "_EXTRA_FILES", new ArrayType(false, newStringType(false))));
fields.add(new DataField(12, "_CREATION_TIME", DataTypes.TIMESTAMP_MILLIS()));
fields.add(new DataField(13, "_DELETE_ROW_COUNT", new BigIntType(true)));
fields.add(new DataField(14, "_IS_APPEND", new BooleanType(true)));
fields.add(new DataField(14, "_FILE_SOURCE", newBytesType(true)));
return new RowType(fields);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.manifest.FileSource;
import org.apache.paimon.stats.BinaryTableStats;
import org.apache.paimon.utils.ObjectSerializer;

Expand Down Expand Up @@ -55,7 +56,7 @@ public InternalRow toRow(DataFileMeta meta) {
toStringArrayData(meta.extraFiles()),
meta.creationTime(),
meta.deleteRowCount().orElse(null),
meta.isAppend().orElse(null));
meta.fileSource().orElse(null));
}

@Override
Expand All @@ -75,6 +76,6 @@ public DataFileMeta fromRow(InternalRow row) {
fromStringArrayData(row.getArray(11)),
row.getTimestamp(12, 3),
row.isNullAt(13) ? null : row.getLong(13),
row.isNullAt(14) ? null : row.getBoolean(14));
row.isNullAt(14) ? null : FileSource.fromByteValue(row.getByte(14)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.paimon.format.TableStatsExtractor;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.manifest.FileSource;
import org.apache.paimon.stats.BinaryTableStats;
import org.apache.paimon.stats.FieldStatsArraySerializer;
import org.apache.paimon.types.RowType;
Expand Down Expand Up @@ -172,6 +173,6 @@ public DataFileMeta result() throws IOException {
schemaId,
level,
deleteRecordCount,
!isCompact);
isCompact ? FileSource.COMPACT : FileSource.APPEND);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.paimon.format.TableStatsExtractor;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.manifest.FileSource;
import org.apache.paimon.statistics.FieldStatsCollector;
import org.apache.paimon.stats.BinaryTableStats;
import org.apache.paimon.stats.FieldStatsArraySerializer;
Expand Down Expand Up @@ -88,6 +89,6 @@ public DataFileMeta result() throws IOException {
seqNumCounter.getValue() - super.recordCount(),
seqNumCounter.getValue() - 1,
schemaId,
!isCompact);
isCompact ? FileSource.COMPACT : FileSource.APPEND);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.manifest;

/** Source of a file */
public enum FileSource {
APPEND((byte) 0),

COMPACT((byte) 0);

private final byte value;

FileSource(byte value) {
this.value = value;
}

public byte toByteValue() {
return value;
}

public static FileSource fromByteValue(byte value) {
switch (value) {
case 0:
return APPEND;
case 1:
return COMPACT;
default:
throw new UnsupportedOperationException(
"Unsupported byte value '" + value + "' for value kind.");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.paimon.io.CompactIncrement;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataIncrement;
import org.apache.paimon.manifest.FileSource;
import org.apache.paimon.statistics.FieldStatsCollector;
import org.apache.paimon.stats.BinaryTableStats;
import org.apache.paimon.stats.FieldStatsArraySerializer;
Expand Down Expand Up @@ -166,7 +167,7 @@ private static DataFileMeta constructFileMeta(
0,
0,
((FileStoreTable) table).schema().id(),
false); // todo review
FileSource.APPEND);
}

public static BinaryRow writePartitionValue(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.paimon.Snapshot;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.consumer.ConsumerManager;
import org.apache.paimon.manifest.FileSource;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.operation.SnapshotDeletion;
import org.apache.paimon.utils.SnapshotManager;
Expand Down Expand Up @@ -188,17 +189,17 @@ public int expireUntil(long earliestId, long endExclusiveId) {
continue;
}

boolean deleteDeltaFile =
!changelogDecoupled || snapshot.changelogManifestList() != null;

if (!deleteDeltaFile) {
// if the manifest do not have the file source information, we just
// skip clean this, let it done by changelog cleaner.
if (changelogDecoupled && snapshot.changelogManifestList() == null) {
// if we do not have the file source information eg: the old version table file,
// we just skip clean this here, let it done by changelog cleaner.
Predicate<ManifestEntry> enriched =
manifestEntry -> {
return skipper.test(manifestEntry)
|| (manifestEntry.file().isAppend().orElse(true));
};
manifestEntry ->
skipper.test(manifestEntry)
|| (manifestEntry
.file()
.fileSource()
.orElse(FileSource.APPEND)
== FileSource.APPEND);
snapshotDeletion.cleanUnusedDataFiles(snapshot, enriched);
} else {
snapshotDeletion.cleanUnusedDataFiles(snapshot, skipper);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.IndexIncrement;
import org.apache.paimon.manifest.FileKind;
import org.apache.paimon.manifest.FileSource;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestFileMeta;
Expand Down Expand Up @@ -642,7 +643,9 @@ private static Set<Path> getSnapshotFileInUse(
entries = scan.withManifestList(snapshot.deltaManifests(manifestList)).plan().files();
for (ManifestEntry entry : entries) {
// append delete file are delayed to delete
if (entry.kind() == FileKind.DELETE && entry.file().isAppend().orElse(true)) {
if (entry.kind() == FileKind.DELETE
&& entry.file().fileSource().orElse(FileSource.APPEND)
== FileSource.APPEND) {
result.add(
new Path(
pathFactory.bucketPath(entry.partition(), entry.bucket()),
Expand Down Expand Up @@ -703,7 +706,7 @@ private static Set<Path> getChangelogFileInUse(
if (changelog.changelogManifestList() == null && changelog.deltaManifestList() != null) {
for (ManifestEntry entry :
scan.withManifestList(changelog.deltaManifests(manifestList)).plan().files()) {
if (entry.file().isAppend().orElse(true)) {
if (entry.file().fileSource().orElse(FileSource.APPEND) == FileSource.APPEND) {
result.add(
new Path(
pathFactory.bucketPath(entry.partition(), entry.bucket()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.paimon.compact.CompactUnit;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.manifest.FileSource;
import org.apache.paimon.mergetree.LevelSortedRun;
import org.apache.paimon.mergetree.SortedRun;

Expand Down Expand Up @@ -357,6 +358,7 @@ private LevelSortedRun level(int level, long size) {
}

static DataFileMeta file(long size) {
return new DataFileMeta("", size, 1, null, null, null, null, 0, 0, 0, 0, 0L, null);
return new DataFileMeta(
"", size, 1, null, null, null, null, 0, 0, 0, 0, 0L, FileSource.APPEND);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.manifest.FileKind;
import org.apache.paimon.manifest.FileSource;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.mergetree.compact.DeduplicateMergeFunction;
import org.apache.paimon.schema.Schema;
Expand Down Expand Up @@ -201,7 +202,7 @@ public void testExpireExtraFiles() throws IOException {
extraFiles,
Timestamp.now(),
0L,
null);
FileSource.APPEND);
ManifestEntry add = new ManifestEntry(FileKind.ADD, partition, 0, 1, dataFile);
ManifestEntry delete = new ManifestEntry(FileKind.DELETE, partition, 0, 1, dataFile);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.paimon.data.InternalRow;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.manifest.FileSource;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.utils.Pair;

Expand Down Expand Up @@ -56,7 +57,7 @@ public static DataFileMeta newFileFromSequence(
0,
0,
0L,
null);
FileSource.APPEND);
}

@Test
Expand Down

0 comments on commit f9fc9e8

Please sign in to comment.