Skip to content

Commit

Permalink
[core] incremental-between-scan-mode should respect changelog-producer (
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi committed May 6, 2024
1 parent 3112c97 commit 6137fd0
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 22 deletions.
3 changes: 3 additions & 0 deletions docs/content/flink/sql-query.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ SELECT * FROM t /*+ OPTIONS('incremental-between' = '12,20') */;
SELECT * FROM t /*+ OPTIONS('incremental-between-timestamp' = '1692169000000,1692169900000') */;
```

By default, will scan changelog files for the table which produces changelog files. Otherwise, scan newly changed files.
You can also force specifying `'incremental-between-scan-mode'`.

In Batch SQL, the `DELETE` records are not allowed to be returned, so records of `-D` will be dropped.
If you want see `DELETE` records, you can use audit_log table:

Expand Down
3 changes: 3 additions & 0 deletions docs/content/spark/sql-query.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ For example:
- '5,10' means changes between snapshot 5 and snapshot 10.
- 'TAG1,TAG3' means changes between TAG1 and TAG3.

By default, will scan changelog files for the table which produces changelog files. Otherwise, scan newly changed files.
You can also force specifying `'incremental-between-scan-mode'`.

Requires Spark 3.2+.

Paimon supports that use Spark SQL to do the incremental query that implemented by Spark Table Valued Function.
Expand Down
4 changes: 2 additions & 2 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -274,9 +274,9 @@
</tr>
<tr>
<td><h5>incremental-between-scan-mode</h5></td>
<td style="word-wrap: break-word;">delta</td>
<td style="word-wrap: break-word;">auto</td>
<td><p>Enum</p></td>
<td>Scan kind when Read incremental changes between start snapshot (exclusive) and end snapshot, 'delta' for scan newly changed files between snapshots, 'changelog' scan changelog files between snapshots.<br /><br />Possible values:<ul><li>"delta": Scan newly changed files between snapshots.</li><li>"changelog": Scan changelog files between snapshots.</li></ul></td>
<td>Scan kind when Read incremental changes between start snapshot (exclusive) and end snapshot. <br /><br />Possible values:<ul><li>"auto": Scan changelog files for the table which produces changelog files. Otherwise, scan newly changed files.</li><li>"delta": Scan newly changed files between snapshots.</li><li>"changelog": Scan changelog files between snapshots.</li></ul></td>
</tr>
<tr>
<td><h5>incremental-between-timestamp</h5></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -883,10 +883,9 @@ public class CoreOptions implements Serializable {
public static final ConfigOption<IncrementalBetweenScanMode> INCREMENTAL_BETWEEN_SCAN_MODE =
key("incremental-between-scan-mode")
.enumType(IncrementalBetweenScanMode.class)
.defaultValue(IncrementalBetweenScanMode.DELTA)
.defaultValue(IncrementalBetweenScanMode.AUTO)
.withDescription(
"Scan kind when Read incremental changes between start snapshot (exclusive) and end snapshot, "
+ "'delta' for scan newly changed files between snapshots, 'changelog' scan changelog files between snapshots.");
"Scan kind when Read incremental changes between start snapshot (exclusive) and end snapshot. ");

public static final ConfigOption<String> INCREMENTAL_BETWEEN_TIMESTAMP =
key("incremental-between-timestamp")
Expand Down Expand Up @@ -2081,6 +2080,9 @@ public String getValue() {

/** Specifies this scan type for incremental scan . */
public enum IncrementalBetweenScanMode implements DescribedEnum {
AUTO(
"auto",
"Scan changelog files for the table which produces changelog files. Otherwise, scan newly changed files."),
DELTA("delta", "Scan newly changed files between snapshots."),
CHANGELOG("changelog", "Scan changelog files between snapshots.");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,12 @@ protected StartingScanner createStartingScanner(boolean isStreaming) {
options.incrementalBetweenScanMode();
ScanMode scanMode;
switch (scanType) {
case AUTO:
scanMode =
options.changelogProducer() == ChangelogProducer.NONE
? ScanMode.DELTA
: ScanMode.CHANGELOG;
break;
case DELTA:
scanMode = ScanMode.DELTA;
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,18 @@
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.StreamTableCommit;
import org.apache.paimon.table.sink.StreamTableWrite;
import org.apache.paimon.table.source.ScanMode;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.utils.SnapshotManager;

import org.junit.jupiter.api.Test;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.apache.paimon.CoreOptions.INCREMENTAL_BETWEEN;
import static org.apache.paimon.CoreOptions.INCREMENTAL_BETWEEN_SCAN_MODE;
import static org.assertj.core.api.Assertions.assertThat;

/** Tests for {@link IncrementalStartingScanner}. */
Expand All @@ -55,27 +60,22 @@ public void testScan() throws Exception {
write.compact(binaryRow(1), 0, false);
commit.commit(1, write.prepareCommit(true, 1));

assertThat(snapshotManager.latestSnapshotId()).isEqualTo(4);
write.close();
commit.close();

IncrementalStartingScanner deltaScanner =
new IncrementalStartingScanner(snapshotManager, 1L, 4L, ScanMode.DELTA);
StartingScanner.ScannedResult deltaResult =
(StartingScanner.ScannedResult) deltaScanner.scan(snapshotReader);
assertThat(deltaResult.currentSnapshotId()).isEqualTo(4);
assertThat(getResult(table.newRead(), toSplits(deltaResult.splits())))
.hasSameElementsAs(Arrays.asList("+I 2|20|200", "+I 1|10|100", "+I 3|40|500"));
assertThat(snapshotManager.latestSnapshotId()).isEqualTo(4);

IncrementalStartingScanner changeLogScanner =
new IncrementalStartingScanner(snapshotManager, 1L, 4L, ScanMode.CHANGELOG);
StartingScanner.ScannedResult changeLogResult =
(StartingScanner.ScannedResult) changeLogScanner.scan(snapshotReader);
assertThat(changeLogResult.currentSnapshotId()).isEqualTo(4);
assertThat(getResult(table.newRead(), toSplits(changeLogResult.splits())))
Map<String, String> dynamicOptions = new HashMap<>();
dynamicOptions.put(INCREMENTAL_BETWEEN.key(), "1,4");
List<Split> splits = table.copy(dynamicOptions).newScan().plan().splits();
assertThat(getResult(table.newRead(), splits))
.hasSameElementsAs(
Arrays.asList("+I 2|20|200", "+I 1|10|100", "+I 3|40|400", "+U 3|40|500"));

write.close();
commit.close();
dynamicOptions.put(INCREMENTAL_BETWEEN_SCAN_MODE.key(), "delta");
splits = table.copy(dynamicOptions).newScan().plan().splits();
assertThat(getResult(table.newRead(), splits))
.hasSameElementsAs(Arrays.asList("+I 2|20|200", "+I 1|10|100", "+I 3|40|500"));
}

@Override
Expand Down

0 comments on commit 6137fd0

Please sign in to comment.