risingwave_stream/executor/source/executor_core.rs
1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use risingwave_common::catalog::ColumnId;
18use risingwave_common::id::SourceId;
19use risingwave_connector::source::reader::desc::SourceDescBuilder;
20use risingwave_connector::source::{BatchSourceSplitImpl, SplitId, SplitImpl, SplitMetaData};
21use risingwave_storage::StateStore;
22
23use super::SourceStateTableHandler;
24
25/// [`StreamSourceCore`] stores the necessary information for the source executor to execute on the
26/// external connector.
27pub struct StreamSourceCore<S: StateStore> {
28 pub(crate) source_id: SourceId,
29 pub(crate) source_name: String,
30
31 pub(crate) column_ids: Vec<ColumnId>,
32
33 /// `source_desc_builder` will be taken (`mem::take`) on execution. A `SourceDesc` (currently
34 /// named `SourceDescV2`) will be constructed and used for execution.
35 pub(crate) source_desc_builder: Option<SourceDescBuilder>,
36
37 /// Split info for stream source. A source executor might read data from several splits of
38 /// external connector.
39 pub(crate) latest_split_info: HashMap<SplitId, SplitImpl>,
40
41 /// Stores information of the splits.
42 pub(crate) split_state_store: SourceStateTableHandler<S>,
43
44 /// Contains the latest offsets for the splits that are updated *in the current epoch*.
45 /// It is cleared after each barrier.
46 ///
47 /// Source messages will only write the cache.
48 /// It is read on split change and rebuild stream reader on error.
49 pub(crate) updated_splits_in_epoch: HashMap<SplitId, SplitImpl>,
50}
51
52impl<S> StreamSourceCore<S>
53where
54 S: StateStore,
55{
56 pub fn new(
57 source_id: SourceId,
58 source_name: String,
59 column_ids: Vec<ColumnId>,
60 source_desc_builder: SourceDescBuilder,
61 split_state_store: SourceStateTableHandler<S>,
62 ) -> Self {
63 Self {
64 source_id,
65 source_name,
66 column_ids,
67 source_desc_builder: Some(source_desc_builder),
68 latest_split_info: HashMap::new(),
69 split_state_store,
70 updated_splits_in_epoch: HashMap::new(),
71 }
72 }
73
74 pub fn init_split_state(&mut self, splits: Vec<SplitImpl>) {
75 self.latest_split_info = splits
76 .into_iter()
77 .map(|split| (split.id(), split))
78 .collect();
79 }
80
81 /// # Panics
82 /// If the source is not a batch source.
83 pub fn get_batch_split(&self) -> BatchSourceSplitImpl {
84 debug_assert_eq!(
85 self.latest_split_info.len(),
86 1,
87 "batch source should have only one split"
88 );
89 self.latest_split_info
90 .values()
91 .next()
92 .unwrap()
93 .clone()
94 .into_batch_split()
95 .unwrap()
96 }
97}