risingwave_stream/executor/source/
executor_core.rs1use std::collections::HashMap;
16
17use risingwave_common::catalog::{ColumnId, TableId};
18use risingwave_connector::WithPropertiesExt;
19use risingwave_connector::source::reader::desc::SourceDescBuilder;
20use risingwave_connector::source::{BatchSourceSplitImpl, SplitId, SplitImpl, SplitMetaData};
21use risingwave_storage::StateStore;
22
23use super::SourceStateTableHandler;
24
25pub struct StreamSourceCore<S: StateStore> {
28 pub(crate) source_id: TableId,
29 pub(crate) source_name: String,
30
31 pub(crate) column_ids: Vec<ColumnId>,
32
33 pub(crate) source_desc_builder: Option<SourceDescBuilder>,
36
37 pub(crate) latest_split_info: HashMap<SplitId, SplitImpl>,
40
41 pub(crate) split_state_store: SourceStateTableHandler<S>,
43
44 pub(crate) updated_splits_in_epoch: HashMap<SplitId, SplitImpl>,
50
51 pub(crate) is_batch_source: bool,
53}
54
55impl<S> StreamSourceCore<S>
56where
57 S: StateStore,
58{
59 pub fn new(
60 source_id: TableId,
61 source_name: String,
62 column_ids: Vec<ColumnId>,
63 source_desc_builder: SourceDescBuilder,
64 split_state_store: SourceStateTableHandler<S>,
65 ) -> Self {
66 let is_batch_source = source_desc_builder.with_properties().is_batch_connector();
67 Self {
68 source_id,
69 source_name,
70 column_ids,
71 source_desc_builder: Some(source_desc_builder),
72 latest_split_info: HashMap::new(),
73 split_state_store,
74 updated_splits_in_epoch: HashMap::new(),
75 is_batch_source,
76 }
77 }
78
79 pub fn init_split_state(&mut self, splits: Vec<SplitImpl>) {
80 self.latest_split_info = splits
81 .into_iter()
82 .map(|split| (split.id(), split))
83 .collect();
84 }
85
86 pub fn get_batch_split(&self) -> BatchSourceSplitImpl {
89 debug_assert_eq!(
90 self.latest_split_info.len(),
91 1,
92 "batch source should have only one split"
93 );
94 self.latest_split_info
95 .values()
96 .next()
97 .unwrap()
98 .clone()
99 .into_batch_split()
100 .unwrap()
101 }
102}