risingwave_stream/executor/source/
executor_core.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use risingwave_common::catalog::{ColumnId, TableId};
18use risingwave_connector::source::reader::desc::SourceDescBuilder;
19use risingwave_connector::source::{SplitId, SplitImpl, SplitMetaData};
20use risingwave_storage::StateStore;
21
22use super::SourceStateTableHandler;
23
24/// [`StreamSourceCore`] stores the necessary information for the source executor to execute on the
25/// external connector.
26pub struct StreamSourceCore<S: StateStore> {
27    pub(crate) source_id: TableId,
28    pub(crate) source_name: String,
29
30    pub(crate) column_ids: Vec<ColumnId>,
31
32    /// `source_desc_builder` will be taken (`mem::take`) on execution. A `SourceDesc` (currently
33    /// named `SourceDescV2`) will be constructed and used for execution.
34    pub(crate) source_desc_builder: Option<SourceDescBuilder>,
35
36    /// Split info for stream source. A source executor might read data from several splits of
37    /// external connector.
38    pub(crate) latest_split_info: HashMap<SplitId, SplitImpl>,
39
40    /// Stores information of the splits.
41    pub(crate) split_state_store: SourceStateTableHandler<S>,
42
43    /// Contains the latests offsets for the splits that are updated *in the current epoch*.
44    /// It is cleared after each barrier.
45    ///
46    /// Source messages will only write the cache.
47    /// It is read on split change and rebuild stream reader on error.
48    pub(crate) updated_splits_in_epoch: HashMap<SplitId, SplitImpl>,
49}
50
51impl<S> StreamSourceCore<S>
52where
53    S: StateStore,
54{
55    pub fn new(
56        source_id: TableId,
57        source_name: String,
58        column_ids: Vec<ColumnId>,
59        source_desc_builder: SourceDescBuilder,
60        split_state_store: SourceStateTableHandler<S>,
61    ) -> Self {
62        Self {
63            source_id,
64            source_name,
65            column_ids,
66            source_desc_builder: Some(source_desc_builder),
67            latest_split_info: HashMap::new(),
68            split_state_store,
69            updated_splits_in_epoch: HashMap::new(),
70        }
71    }
72
73    pub fn init_split_state(&mut self, splits: Vec<SplitImpl>) {
74        self.latest_split_info = splits
75            .into_iter()
76            .map(|split| (split.id(), split))
77            .collect();
78    }
79}