risingwave_stream/executor/source/
executor_core.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use risingwave_common::catalog::ColumnId;
18use risingwave_common::id::SourceId;
19use risingwave_connector::source::reader::desc::SourceDescBuilder;
20use risingwave_connector::source::{BatchSourceSplitImpl, SplitId, SplitImpl, SplitMetaData};
21use risingwave_storage::StateStore;
22
23use super::SourceStateTableHandler;
24
25/// [`StreamSourceCore`] stores the necessary information for the source executor to execute on the
26/// external connector.
27pub struct StreamSourceCore<S: StateStore> {
28    pub(crate) source_id: SourceId,
29    pub(crate) source_name: String,
30
31    pub(crate) column_ids: Vec<ColumnId>,
32
33    /// `source_desc_builder` will be taken (`mem::take`) on execution. A `SourceDesc` (currently
34    /// named `SourceDescV2`) will be constructed and used for execution.
35    pub(crate) source_desc_builder: Option<SourceDescBuilder>,
36
37    /// Split info for stream source. A source executor might read data from several splits of
38    /// external connector.
39    pub(crate) latest_split_info: HashMap<SplitId, SplitImpl>,
40
41    /// Stores information of the splits.
42    pub(crate) split_state_store: SourceStateTableHandler<S>,
43
44    /// Contains the latest offsets for the splits that are updated *in the current epoch*.
45    /// It is cleared after each barrier.
46    ///
47    /// Source messages will only write the cache.
48    /// It is read on split change and rebuild stream reader on error.
49    pub(crate) updated_splits_in_epoch: HashMap<SplitId, SplitImpl>,
50}
51
52impl<S> StreamSourceCore<S>
53where
54    S: StateStore,
55{
56    pub fn new(
57        source_id: SourceId,
58        source_name: String,
59        column_ids: Vec<ColumnId>,
60        source_desc_builder: SourceDescBuilder,
61        split_state_store: SourceStateTableHandler<S>,
62    ) -> Self {
63        Self {
64            source_id,
65            source_name,
66            column_ids,
67            source_desc_builder: Some(source_desc_builder),
68            latest_split_info: HashMap::new(),
69            split_state_store,
70            updated_splits_in_epoch: HashMap::new(),
71        }
72    }
73
74    pub fn init_split_state(&mut self, splits: Vec<SplitImpl>) {
75        self.latest_split_info = splits
76            .into_iter()
77            .map(|split| (split.id(), split))
78            .collect();
79    }
80
81    /// # Panics
82    /// If the source is not a batch source.
83    pub fn get_batch_split(&self) -> BatchSourceSplitImpl {
84        debug_assert_eq!(
85            self.latest_split_info.len(),
86            1,
87            "batch source should have only one split"
88        );
89        self.latest_split_info
90            .values()
91            .next()
92            .unwrap()
93            .clone()
94            .into_batch_split()
95            .unwrap()
96    }
97}