risingwave_stream/executor/source/
executor_core.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use risingwave_common::catalog::{ColumnId, TableId};
18use risingwave_connector::WithPropertiesExt;
19use risingwave_connector::source::reader::desc::SourceDescBuilder;
20use risingwave_connector::source::{BatchSourceSplitImpl, SplitId, SplitImpl, SplitMetaData};
21use risingwave_storage::StateStore;
22
23use super::SourceStateTableHandler;
24
25/// [`StreamSourceCore`] stores the necessary information for the source executor to execute on the
26/// external connector.
27pub struct StreamSourceCore<S: StateStore> {
28    pub(crate) source_id: TableId,
29    pub(crate) source_name: String,
30
31    pub(crate) column_ids: Vec<ColumnId>,
32
33    /// `source_desc_builder` will be taken (`mem::take`) on execution. A `SourceDesc` (currently
34    /// named `SourceDescV2`) will be constructed and used for execution.
35    pub(crate) source_desc_builder: Option<SourceDescBuilder>,
36
37    /// Split info for stream source. A source executor might read data from several splits of
38    /// external connector.
39    pub(crate) latest_split_info: HashMap<SplitId, SplitImpl>,
40
41    /// Stores information of the splits.
42    pub(crate) split_state_store: SourceStateTableHandler<S>,
43
44    /// Contains the latest offsets for the splits that are updated *in the current epoch*.
45    /// It is cleared after each barrier.
46    ///
47    /// Source messages will only write the cache.
48    /// It is read on split change and rebuild stream reader on error.
49    pub(crate) updated_splits_in_epoch: HashMap<SplitId, SplitImpl>,
50
51    /// Is refreshable batch source
52    pub(crate) is_batch_source: bool,
53}
54
55impl<S> StreamSourceCore<S>
56where
57    S: StateStore,
58{
59    pub fn new(
60        source_id: TableId,
61        source_name: String,
62        column_ids: Vec<ColumnId>,
63        source_desc_builder: SourceDescBuilder,
64        split_state_store: SourceStateTableHandler<S>,
65    ) -> Self {
66        let is_batch_source = source_desc_builder.with_properties().is_batch_connector();
67        Self {
68            source_id,
69            source_name,
70            column_ids,
71            source_desc_builder: Some(source_desc_builder),
72            latest_split_info: HashMap::new(),
73            split_state_store,
74            updated_splits_in_epoch: HashMap::new(),
75            is_batch_source,
76        }
77    }
78
79    pub fn init_split_state(&mut self, splits: Vec<SplitImpl>) {
80        self.latest_split_info = splits
81            .into_iter()
82            .map(|split| (split.id(), split))
83            .collect();
84    }
85
86    /// # Panics
87    /// If the source is not a batch source.
88    pub fn get_batch_split(&self) -> BatchSourceSplitImpl {
89        debug_assert_eq!(
90            self.latest_split_info.len(),
91            1,
92            "batch source should have only one split"
93        );
94        self.latest_split_info
95            .values()
96            .next()
97            .unwrap()
98            .clone()
99            .into_batch_split()
100            .unwrap()
101    }
102}