risingwave_connector/source/
batch.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use crate::source::filesystem::opendal_source::BatchPosixFsSplit;
16use crate::source::{SplitImpl, SplitMetaData};
17
18/// # Refreshable Batch Source/Table
19///
20/// A refreshable batch source can be refreshed - reload all data from the source, e.g., re-run a `SELECT *` query from the source.
21/// The reloaded data will be handled by `RefreshableMaterialize` to calculate a diff to send to downstream.
22///
23/// - *Batch* means the source loads all data at once, instead of continuously streaming data.
24/// - *Refreshable* part is handled by the materialize executor. When creating a table with a refreshable batch source, the table can be
25///   refreshed by running `REFRESH TABLE t` SQL command.
26///
27/// See <https://github.com/risingwavelabs/risingwave/issues/22690> for the whole picture of the user journey.
28///
29/// ## Failover
30///
31/// Batch source is considered stateless. i.e., it's consumption progress is not recorded, and cannot be resumed.
32/// The split metadata just represent "how to load the data".
33///
34/// - On startup, `SourceExecutor` will load data.
35/// - On `RefreshStart` barrier (from `REFRESH TABLE t` SQL command), it will re-load data.
36/// - On recovery, it will *do nothing*, regardless of whether it's in the middle of loading data or not before crash.
37pub trait BatchSourceSplit: SplitMetaData {
38    fn finished(&self) -> bool;
39    /// Mark the source as finished. Called after the source is exhausted.
40    /// Then `SourceExecutor` will report to meta to send a `LoadFinish` barrier,
41    /// and the `RefreshableMaterialize` will begin to calculate the diff.
42    fn finish(&mut self);
43    /// Refresh the source to make it ready for re-run.
44    /// Called when receiving `RefreshStart` barrier.
45    fn refresh(&mut self);
46}
47
48pub enum BatchSourceSplitImpl {
49    BatchPosixFs(BatchPosixFsSplit),
50}
51
52/// See [`BatchSourceSplit`] for more details.
53impl BatchSourceSplitImpl {
54    pub fn finished(&self) -> bool {
55        match self {
56            BatchSourceSplitImpl::BatchPosixFs(split) => split.finished(),
57        }
58    }
59
60    pub fn finish(&mut self) {
61        tracing::info!("finishing batch source split");
62        match self {
63            BatchSourceSplitImpl::BatchPosixFs(split) => split.finish(),
64        }
65    }
66
67    pub fn refresh(&mut self) {
68        tracing::info!("refreshing batch source split");
69        match self {
70            BatchSourceSplitImpl::BatchPosixFs(split) => split.refresh(),
71        }
72    }
73}
74
75impl From<BatchSourceSplitImpl> for SplitImpl {
76    fn from(batch_split: BatchSourceSplitImpl) -> Self {
77        match batch_split {
78            BatchSourceSplitImpl::BatchPosixFs(split) => SplitImpl::BatchPosixFs(split),
79        }
80    }
81}