Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Spark] Skip reading log entries beyond endOffset, if specified while getting file changes for CDC in streaming queries #3110

Conversation

anishshri-db
Copy link
Contributor

@anishshri-db anishshri-db commented May 17, 2024

Which Delta project/connector is this regarding?

  • Spark
  • Standalone
  • Flink
  • Kernel
  • Other (fill in here)

Description

Skip reading log entries beyond endOffset, if specified while getting file changes for CDC in streaming queries

How was this patch tested?

Existing unit tests

Also verified using logs to ensure that additional Delta logs are not read


24/05/16 01:21:01 INFO StateStore: StateStore stopped
Run completed in 54 seconds, 237 milliseconds.
Total number of tests run: 1
Suites: completed 1, aborted 0
Tests: succeeded 1, failed 0, canceled 0, ignored 0, pending 0
All tests passed.

Before:

10457:24/05/16 01:38:37 INFO DeltaSource: [queryId = 199ce] [batchId = 0] Getting CDC dataFrame for delta_log_path=file:/tmp/spark-c3309e79-80cf-4819-8b03-2fc607cc2679/_delta_log with startVersion=0, startIndex=-100, isInitialSnapshot=false, endOffset={"sourceVersion":1,"reservoirId":"75194ee3-4ff6-431e-8e7c-32006684d5ad","reservoirVersion":1,"index":-1,"isStartingVersion":false} took timeMs=52 ms
11114:24/05/16 01:38:39 INFO DeltaSource: [queryId = 199ce] [batchId = 1] Getting CDC dataFrame for delta_log_path=file:/tmp/spark-c3309e79-80cf-4819-8b03-2fc607cc2679/_delta_log with startVersion=1, startIndex=-100, isInitialSnapshot=false, endOffset={"sourceVersion":1,"reservoirId":"75194ee3-4ff6-431e-8e7c-32006684d5ad","reservoirVersion":2,"index":-1,"isStartingVersion":false} took timeMs=25 ms
11518:24/05/16 01:38:39 INFO DeltaSource: [queryId = 199ce] [batchId = 2] Getting CDC dataFrame for delta_log_path=file:/tmp/spark-c3309e79-80cf-4819-8b03-2fc607cc2679/_delta_log with startVersion=2, startIndex=-100, isInitialSnapshot=false, endOffset={"sourceVersion":1,"reservoirId":"75194ee3-4ff6-431e-8e7c-32006684d5ad","reservoirVersion":3,"index":-1,"isStartingVersion":false} took timeMs=24 ms

After:

10498:24/05/16 01:32:10 INFO DeltaSource: [queryId = ede3f] [batchId = 0] Getting CDC dataFrame for delta_log_path=file:/tmp/spark-270c3d6e-40df-4e6f-b1da-c293af5d6741/_delta_log with startVersion=0, startIndex=-100, isInitialSnapshot=false, endOffset={"sourceVersion":1,"reservoirId":"516bafe0-e0ea-4380-afcb-44e416302a07","reservoirVersion":1,"index":-1,"isStartingVersion":false} took timeMs=39 ms
11155:24/05/16 01:32:11 INFO DeltaSource: [queryId = ede3f] [batchId = 1] Getting CDC dataFrame for delta_log_path=file:/tmp/spark-270c3d6e-40df-4e6f-b1da-c293af5d6741/_delta_log with startVersion=1, startIndex=-100, isInitialSnapshot=false, endOffset={"sourceVersion":1,"reservoirId":"516bafe0-e0ea-4380-afcb-44e416302a07","reservoirVersion":2,"index":-1,"isStartingVersion":false} took timeMs=14 ms
11579:24/05/16 01:32:12 INFO DeltaSource: [queryId = ede3f] [batchId = 2] Getting CDC dataFrame for delta_log_path=file:/tmp/spark-270c3d6e-40df-4e6f-b1da-c293af5d6741/_delta_log with startVersion=2, startIndex=-100, isInitialSnapshot=false, endOffset={"sourceVersion":1,"reservoirId":"516bafe0-e0ea-4380-afcb-44e416302a07","reservoirVersion":3,"index":-1,"isStartingVersion":false} took timeMs=13 ms

Difference is even more if we are processing/reading through large number of backlog versions.

In Cx setup, before the change - batches are taking > 300s. After the change, batches complete is < 15s.

Does this PR introduce any user-facing changes?

No

@anishshri-db
Copy link
Contributor Author

Don't think test failure is related. Happening on other PRs too - https://github.com/delta-io/delta/actions/runs/9122276234/job/25082822738?pr=3112

@allisonport-db allisonport-db merged commit 0ee9fd0 into delta-io:master May 22, 2024
9 of 10 checks passed
longvu-db pushed a commit to longvu-db/delta that referenced this pull request May 28, 2024
… getting file changes for CDC in streaming queries (delta-io#3110)

#### Which Delta project/connector is this regarding?
- [x] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description
Skip reading log entries beyond endOffset, if specified while getting
file changes for CDC in streaming queries

## How was this patch tested?
Existing unit tests

Also verified using logs to ensure that additional Delta logs are not
read

```

24/05/16 01:21:01 INFO StateStore: StateStore stopped
Run completed in 54 seconds, 237 milliseconds.
Total number of tests run: 1
Suites: completed 1, aborted 0
Tests: succeeded 1, failed 0, canceled 0, ignored 0, pending 0
All tests passed.
```

Before:
```
10457:24/05/16 01:38:37 INFO DeltaSource: [queryId = 199ce] [batchId = 0] Getting CDC dataFrame for delta_log_path=file:/tmp/spark-c3309e79-80cf-4819-8b03-2fc607cc2679/_delta_log with startVersion=0, startIndex=-100, isInitialSnapshot=false, endOffset={"sourceVersion":1,"reservoirId":"75194ee3-4ff6-431e-8e7c-32006684d5ad","reservoirVersion":1,"index":-1,"isStartingVersion":false} took timeMs=52 ms
11114:24/05/16 01:38:39 INFO DeltaSource: [queryId = 199ce] [batchId = 1] Getting CDC dataFrame for delta_log_path=file:/tmp/spark-c3309e79-80cf-4819-8b03-2fc607cc2679/_delta_log with startVersion=1, startIndex=-100, isInitialSnapshot=false, endOffset={"sourceVersion":1,"reservoirId":"75194ee3-4ff6-431e-8e7c-32006684d5ad","reservoirVersion":2,"index":-1,"isStartingVersion":false} took timeMs=25 ms
11518:24/05/16 01:38:39 INFO DeltaSource: [queryId = 199ce] [batchId = 2] Getting CDC dataFrame for delta_log_path=file:/tmp/spark-c3309e79-80cf-4819-8b03-2fc607cc2679/_delta_log with startVersion=2, startIndex=-100, isInitialSnapshot=false, endOffset={"sourceVersion":1,"reservoirId":"75194ee3-4ff6-431e-8e7c-32006684d5ad","reservoirVersion":3,"index":-1,"isStartingVersion":false} took timeMs=24 ms
```

After:
```
10498:24/05/16 01:32:10 INFO DeltaSource: [queryId = ede3f] [batchId = 0] Getting CDC dataFrame for delta_log_path=file:/tmp/spark-270c3d6e-40df-4e6f-b1da-c293af5d6741/_delta_log with startVersion=0, startIndex=-100, isInitialSnapshot=false, endOffset={"sourceVersion":1,"reservoirId":"516bafe0-e0ea-4380-afcb-44e416302a07","reservoirVersion":1,"index":-1,"isStartingVersion":false} took timeMs=39 ms
11155:24/05/16 01:32:11 INFO DeltaSource: [queryId = ede3f] [batchId = 1] Getting CDC dataFrame for delta_log_path=file:/tmp/spark-270c3d6e-40df-4e6f-b1da-c293af5d6741/_delta_log with startVersion=1, startIndex=-100, isInitialSnapshot=false, endOffset={"sourceVersion":1,"reservoirId":"516bafe0-e0ea-4380-afcb-44e416302a07","reservoirVersion":2,"index":-1,"isStartingVersion":false} took timeMs=14 ms
11579:24/05/16 01:32:12 INFO DeltaSource: [queryId = ede3f] [batchId = 2] Getting CDC dataFrame for delta_log_path=file:/tmp/spark-270c3d6e-40df-4e6f-b1da-c293af5d6741/_delta_log with startVersion=2, startIndex=-100, isInitialSnapshot=false, endOffset={"sourceVersion":1,"reservoirId":"516bafe0-e0ea-4380-afcb-44e416302a07","reservoirVersion":3,"index":-1,"isStartingVersion":false} took timeMs=13 ms
```

Difference is even more if we are processing/reading through large
number of backlog versions.

In Cx setup, before the change - batches are taking > 300s. After the
change, batches complete is < 15s.

## Does this PR introduce _any_ user-facing changes?
No
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants