risingwave_stream/executor/source/executor_core.rs
1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use risingwave_common::catalog::{ColumnId, TableId};
18use risingwave_connector::source::reader::desc::SourceDescBuilder;
19use risingwave_connector::source::{SplitId, SplitImpl, SplitMetaData};
20use risingwave_storage::StateStore;
21
22use super::SourceStateTableHandler;
23
24/// [`StreamSourceCore`] stores the necessary information for the source executor to execute on the
25/// external connector.
26pub struct StreamSourceCore<S: StateStore> {
27 pub(crate) source_id: TableId,
28 pub(crate) source_name: String,
29
30 pub(crate) column_ids: Vec<ColumnId>,
31
32 /// `source_desc_builder` will be taken (`mem::take`) on execution. A `SourceDesc` (currently
33 /// named `SourceDescV2`) will be constructed and used for execution.
34 pub(crate) source_desc_builder: Option<SourceDescBuilder>,
35
36 /// Split info for stream source. A source executor might read data from several splits of
37 /// external connector.
38 pub(crate) latest_split_info: HashMap<SplitId, SplitImpl>,
39
40 /// Stores information of the splits.
41 pub(crate) split_state_store: SourceStateTableHandler<S>,
42
43 /// Contains the latests offsets for the splits that are updated *in the current epoch*.
44 /// It is cleared after each barrier.
45 ///
46 /// Source messages will only write the cache.
47 /// It is read on split change and rebuild stream reader on error.
48 pub(crate) updated_splits_in_epoch: HashMap<SplitId, SplitImpl>,
49}
50
51impl<S> StreamSourceCore<S>
52where
53 S: StateStore,
54{
55 pub fn new(
56 source_id: TableId,
57 source_name: String,
58 column_ids: Vec<ColumnId>,
59 source_desc_builder: SourceDescBuilder,
60 split_state_store: SourceStateTableHandler<S>,
61 ) -> Self {
62 Self {
63 source_id,
64 source_name,
65 column_ids,
66 source_desc_builder: Some(source_desc_builder),
67 latest_split_info: HashMap::new(),
68 split_state_store,
69 updated_splits_in_epoch: HashMap::new(),
70 }
71 }
72
73 pub fn init_split_state(&mut self, splits: Vec<SplitImpl>) {
74 self.latest_split_info = splits
75 .into_iter()
76 .map(|split| (split.id(), split))
77 .collect();
78 }
79}