Skip to content

Commit

Permalink
fixed
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaojian committed Mar 16, 2024
1 parent 5a56a6b commit 2a0fd59
Show file tree
Hide file tree
Showing 9 changed files with 829 additions and 30 deletions.
58 changes: 46 additions & 12 deletions paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,12 @@
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.ScanMode;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.BranchManager;
import org.apache.paimon.utils.CommitIncrement;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.RecordWriter;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.StringUtils;
import org.apache.paimon.utils.TagManager;

import org.slf4j.Logger;
Expand Down Expand Up @@ -96,17 +98,20 @@ public class TestFileStore extends KeyValueFileStore {

private long commitIdentifier;

private String branch;

private TestFileStore(
String root,
CoreOptions options,
RowType partitionType,
RowType keyType,
RowType valueType,
KeyValueFieldsExtractor keyValueFieldsExtractor,
MergeFunctionFactory<KeyValue> mfFactory) {
MergeFunctionFactory<KeyValue> mfFactory,
String branch) {
super(
FileIOFinder.find(new Path(root)),
new SchemaManager(FileIOFinder.find(new Path(root)), options.path()),
new SchemaManager(FileIOFinder.find(new Path(root)), options.path(), branch),
0L,
false,
options,
Expand All @@ -125,6 +130,7 @@ private TestFileStore(
this.commitUser = UUID.randomUUID().toString();

this.commitIdentifier = 0L;
this.branch = branch;
}

public AbstractFileStoreWrite<KeyValue> newWrite() {
Expand Down Expand Up @@ -285,7 +291,7 @@ public List<Snapshot> commitDataImpl(
.write(kv);
}

FileStoreCommit commit = newCommit(commitUser);
FileStoreCommit commit = newCommit(commitUser, branch);
ManifestCommittable committable =
new ManifestCommittable(
identifier == null ? commitIdentifier++ : identifier, watermark);
Expand All @@ -306,12 +312,12 @@ public List<Snapshot> commitDataImpl(
}

SnapshotManager snapshotManager = snapshotManager();
Long snapshotIdBeforeCommit = snapshotManager.latestSnapshotId();
Long snapshotIdBeforeCommit = snapshotManager.latestSnapshotId(branch);
if (snapshotIdBeforeCommit == null) {
snapshotIdBeforeCommit = Snapshot.FIRST_SNAPSHOT_ID - 1;
}
commitFunction.accept(commit, committable);
Long snapshotIdAfterCommit = snapshotManager.latestSnapshotId();
Long snapshotIdAfterCommit = snapshotManager.latestSnapshotId(branch);
if (snapshotIdAfterCommit == null) {
snapshotIdAfterCommit = Snapshot.FIRST_SNAPSHOT_ID - 1;
}
Expand Down Expand Up @@ -451,18 +457,18 @@ public void assertCleaned() throws IOException {
// - latest should < true_latest
// - earliest should < true_earliest
SnapshotManager snapshotManager = snapshotManager();
Path snapshotDir = snapshotManager.snapshotDirectory();
Path snapshotDir = snapshotManager.snapshotDirectory(branch);
Path earliest = new Path(snapshotDir, SnapshotManager.EARLIEST);
Path latest = new Path(snapshotDir, SnapshotManager.LATEST);
if (actualFiles.remove(earliest)) {
long earliestId = snapshotManager.readHint(SnapshotManager.EARLIEST);
long earliestId = snapshotManager.readHint(SnapshotManager.EARLIEST, branch);
fileIO.delete(earliest, false);
assertThat(earliestId <= snapshotManager.earliestSnapshotId()).isTrue();
assertThat(earliestId <= snapshotManager.earliestSnapshotId(branch)).isTrue();
}
if (actualFiles.remove(latest)) {
long latestId = snapshotManager.readHint(SnapshotManager.LATEST);
long latestId = snapshotManager.readHint(SnapshotManager.LATEST, branch);
fileIO.delete(latest, false);
assertThat(latestId <= snapshotManager.latestSnapshotId()).isTrue();
assertThat(latestId <= snapshotManager.latestSnapshotId(branch)).isTrue();
}
actualFiles.remove(latest);

Expand All @@ -481,7 +487,9 @@ private Set<Path> getFilesInUse() {
Set<Path> result = new HashSet<>();

SchemaManager schemaManager = new SchemaManager(fileIO, options.path());
schemaManager.listAllIds().forEach(id -> result.add(schemaManager.toSchemaPath(id)));
schemaManager
.listAllIds(branch)
.forEach(id -> result.add(schemaManager.toSchemaPath(branch, id)));

SnapshotManager snapshotManager = snapshotManager();
Long latestSnapshotId = snapshotManager.latestSnapshotId();
Expand Down Expand Up @@ -565,6 +573,7 @@ public static class Builder {
private final MergeFunctionFactory<KeyValue> mfFactory;

private CoreOptions.ChangelogProducer changelogProducer;
private final String branch;

public Builder(
String format,
Expand All @@ -575,6 +584,28 @@ public Builder(
RowType valueType,
KeyValueFieldsExtractor keyValueFieldsExtractor,
MergeFunctionFactory<KeyValue> mfFactory) {
this(
format,
root,
numBuckets,
partitionType,
keyType,
valueType,
keyValueFieldsExtractor,
mfFactory,
BranchManager.DEFAULT_MAIN_BRANCH);
}

public Builder(
String format,
String root,
int numBuckets,
RowType partitionType,
RowType keyType,
RowType valueType,
KeyValueFieldsExtractor keyValueFieldsExtractor,
MergeFunctionFactory<KeyValue> mfFactory,
String branch) {
this.format = format;
this.root = root;
this.numBuckets = numBuckets;
Expand All @@ -585,6 +616,7 @@ public Builder(
this.mfFactory = mfFactory;

this.changelogProducer = CoreOptions.ChangelogProducer.NONE;
this.branch = StringUtils.isEmpty(branch) ? BranchManager.DEFAULT_MAIN_BRANCH : branch;
}

public Builder changelogProducer(CoreOptions.ChangelogProducer changelogProducer) {
Expand Down Expand Up @@ -612,6 +644,7 @@ public TestFileStore build() {

// disable dynamic-partition-overwrite in FileStoreCommit layer test
conf.set(CoreOptions.DYNAMIC_PARTITION_OVERWRITE, false);
conf.set(CoreOptions.BRANCH, branch);

return new TestFileStore(
root,
Expand All @@ -620,7 +653,8 @@ public TestFileStore build() {
keyType,
valueType,
keyValueFieldsExtractor,
mfFactory);
mfFactory,
branch);
}
}
}

0 comments on commit 2a0fd59

Please sign in to comment.