[kernel-spark][Part 1] CDC streaming offset management (initial snapshot)#6075
Conversation
626af46 to
2f3d3db
Compare
e89755b to
ae0540c
Compare
murali-db
left a comment
There was a problem hiding this comment.
[AI] The refactoring of IndexedFile with static factory methods and the extraction of applyBoundaryFiltering are clean improvements. The DSv1-vs-DSv2 parameterized tests provide good confidence in parity.
A few items to address:
-
PR description is empty. The "Description", "How was this patch tested?", and "Does this PR introduce any user-facing changes?" sections are all blank. Per project conventions — what does this change do? What breaks without it? Please fill these in.
-
Snapshot cache doesn't account for CDC mode — see inline comment on
getSnapshotFiles. This is the main code concern. -
Minor nits inline on naming and doc accuracy.
79f2695 to
7a4bdb7
Compare
Range-diff: master (7a4bdb7 -> f9ee471)
... (truncated, output exceeded 60000 bytes) Reproduce locally: |
|
Hi @murali-db, I have addressed the AI comments. PTAL. |
Range-diff: master (f9ee471 -> 2001a72)
... (truncated, output exceeded 60000 bytes) Reproduce locally: |
| validateCDFEnabledOnTable(); | ||
| CloseableIterator<IndexedFile> result; | ||
| if (isInitialSnapshot) { | ||
| Snapshot snapshot = snapshotManager.loadSnapshotAt(fromVersion); |
There was a problem hiding this comment.
Can we reuse the initialSnapshot vs refreshing one here?
There was a problem hiding this comment.
This is a good catch, thank you! We don't need a fresh snapshot.
huan233usc
left a comment
There was a problem hiding this comment.
One more question, and plz add CDCDataFileTest.java as well
Range-diff: master (a13ae25 -> 9f42cb1)
... (truncated, output exceeded 60000 bytes) Reproduce locally: |
Range-diff: master (9f42cb1 -> 320b3ac)
... (truncated, output exceeded 60000 bytes) Reproduce locally: |
Range-diff: master (320b3ac -> 3398dde)
... (truncated, output exceeded 60000 bytes) Reproduce locally: |
Range-diff: master (3398dde -> 41bc0a2)
... (truncated, output exceeded 60000 bytes) Reproduce locally: |
Range-diff: master (41bc0a2 -> 2123524)
... (truncated, output exceeded 60000 bytes) Reproduce locally: |
Range-diff: master (2123524 -> 18d8afb)
... (truncated, output exceeded 60000 bytes) Reproduce locally: |
Range-diff: master (18d8afb -> 07859c6)
... (truncated, output exceeded 60000 bytes) Reproduce locally: |
Range-diff: master (07859c6 -> 7a4827d)
... (truncated, output exceeded 60000 bytes) Reproduce locally: |
Range-diff: master (7a4827d -> 6b26f5b)
... (truncated, output exceeded 60000 bytes) Reproduce locally: |
Range-diff: master (6b26f5b -> da38d13)
... (truncated, output exceeded 60000 bytes) Reproduce locally: |
Range-diff: master (da38d13 -> ab609f8)
... (truncated, output exceeded 60000 bytes) Reproduce locally: |
…hot) (delta-io#6075) ## 🥞 Stacked PR Use this [link](https://github.com/delta-io/delta/pull/6075/files) to review incremental changes. - [**stack/cdf1**](delta-io#6075) [[Files changed](https://github.com/delta-io/delta/pull/6075/files)] - [stack/cdf2](delta-io#6076) [[Files changed](https://github.com/delta-io/delta/pull/6076/files/ab609f8e2185d3ba863c00304195c24b60f7d04b..301cf056c94305a2ca5d96dc3dcdfd88d5dbc37b)] - [stack/cdf2.5](delta-io#6391) [[Files changed](https://github.com/delta-io/delta/pull/6391/files/301cf056c94305a2ca5d96dc3dcdfd88d5dbc37b..d2ce6b47593a08ed491742e024056b8f565dd33f)] - [stack/cdf3](delta-io#6336) [[Files changed](https://github.com/delta-io/delta/pull/6336/files/70544ed3a42014a51dd0d8d97e7c4f2333e6a221..fb5a4f6e478f68410739ce68e9a3fd92b2091be1)] - [stack/cdf4](delta-io#6359) [[Files changed](https://github.com/delta-io/delta/pull/6359/files/fb5a4f6e478f68410739ce68e9a3fd92b2091be1..e52ab30e7fe3923ba302e489368231b836ff4314)] - [stack/cdf5](delta-io#6362) [[Files changed](https://github.com/delta-io/delta/pull/6362/files/e52ab30e7fe3923ba302e489368231b836ff4314..45bc4f043f996ad56d6937f6f0b5d1876fe1130a)] - [stack/cdf6](delta-io#6363) [[Files changed](https://github.com/delta-io/delta/pull/6363/files/45bc4f043f996ad56d6937f6f0b5d1876fe1130a..7d8e01ec6211eba6712b4c54e08eb68f45e542b6)] - [stack/cdf-outofrange](delta-io#6388) [[Files changed](https://github.com/delta-io/delta/pull/6388/files/216a484e5f6e7b20fa26d428703ca05dcbbb6b5a..fee69f12b67cc91d1aeed01a759f64940198405e)] - [stack/cdf7](delta-io#6370) [[Files changed](https://github.com/delta-io/delta/pull/6370/files/fee69f12b67cc91d1aeed01a759f64940198405e..7f5fc84dee6dc3386a7b18db5a0d1d7533024842)] --------- <!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> #### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [x] Spark - [ ] Standalone - [ ] Flink - [x] Kernel - [ ] Other (fill in here) ## Description Adds initial snapshot write-time-CDC support to the DSv2 streaming read path (SparkMicroBatchStream), bringing it closer to DSv1 feature parity. <!-- - Describe what this PR changes. - Describe why we need the change. If this PR resolves an issue be sure to include "Resolves #XXX" to correctly link and close the issue upon merge. --> ## How was this patch tested? <!-- If tests were added, say they were added here. Please make sure to test the changes thoroughly including negative and positive cases if possible. If the changes were tested in any way other than unit tests, please clarify how you tested step by step (ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future). If the changes were not tested, please explain why. --> ## Does this PR introduce _any_ user-facing changes? <!-- If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible. If possible, please also clarify if this is a user-facing change compared to the released Delta Lake versions or within the unreleased branches such as master. If no, write 'No'. -->

🥞 Stacked PR
Use this link to review incremental changes.
Which Delta project/connector is this regarding?
Description
Adds initial snapshot write-time-CDC support to the DSv2 streaming read path (SparkMicroBatchStream), bringing it closer to DSv1 feature parity.
How was this patch tested?
Does this PR introduce any user-facing changes?