risingwave_connector/source/batch.rs
1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use crate::source::filesystem::opendal_source::BatchPosixFsSplit;
16use crate::source::{SplitImpl, SplitMetaData};
17
18/// # Refreshable Batch Source/Table
19///
20/// A refreshable batch source can be refreshed - reload all data from the source, e.g., re-run a `SELECT *` query from the source.
21/// The reloaded data will be handled by `RefreshableMaterialize` to calculate a diff to send to downstream.
22///
23/// - *Batch* means the source loads all data at once, instead of continuously streaming data.
24/// - *Refreshable* part is handled by the materialize executor. When creating a table with a refreshable batch source, the table can be
25/// refreshed by running `REFRESH TABLE t` SQL command.
26///
27/// See <https://github.com/risingwavelabs/risingwave/issues/22690> for the whole picture of the user journey.
28///
29/// ## Failover
30///
31/// Batch source is considered stateless. i.e., it's consumption progress is not recorded, and cannot be resumed.
32/// The split metadata just represent "how to load the data".
33///
34/// - On startup, `SourceExecutor` will load data.
35/// - On `RefreshStart` barrier (from `REFRESH TABLE t` SQL command), it will re-load data.
36/// - On recovery, it will *do nothing*, regardless of whether it's in the middle of loading data or not before crash.
37pub trait BatchSourceSplit: SplitMetaData {
38 fn finished(&self) -> bool;
39 /// Mark the source as finished. Called after the source is exhausted.
40 /// Then `SourceExecutor` will report to meta to send a `LoadFinish` barrier,
41 /// and the `RefreshableMaterialize` will begin to calculate the diff.
42 fn finish(&mut self);
43 /// Refresh the source to make it ready for re-run.
44 /// Called when receiving `RefreshStart` barrier.
45 fn refresh(&mut self);
46}
47
48pub enum BatchSourceSplitImpl {
49 BatchPosixFs(BatchPosixFsSplit),
50}
51
52/// See [`BatchSourceSplit`] for more details.
53impl BatchSourceSplitImpl {
54 pub fn finished(&self) -> bool {
55 match self {
56 BatchSourceSplitImpl::BatchPosixFs(split) => split.finished(),
57 }
58 }
59
60 pub fn finish(&mut self) {
61 tracing::info!("finishing batch source split");
62 match self {
63 BatchSourceSplitImpl::BatchPosixFs(split) => split.finish(),
64 }
65 }
66
67 pub fn refresh(&mut self) {
68 tracing::info!("refreshing batch source split");
69 match self {
70 BatchSourceSplitImpl::BatchPosixFs(split) => split.refresh(),
71 }
72 }
73}
74
75impl From<BatchSourceSplitImpl> for SplitImpl {
76 fn from(batch_split: BatchSourceSplitImpl) -> Self {
77 match batch_split {
78 BatchSourceSplitImpl::BatchPosixFs(split) => SplitImpl::BatchPosixFs(split),
79 }
80 }
81}