risingwave_batch_executors/executor/
source.rs1use std::sync::Arc;
16
17use futures::StreamExt;
18use futures_async_stream::try_stream;
19use itertools::Itertools;
20use risingwave_common::array::{DataChunk, Op, StreamChunk};
21use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema};
22use risingwave_common::id::SourceId;
23use risingwave_common::types::DataType;
24use risingwave_connector::WithOptionsSecResolved;
25use risingwave_connector::parser::SpecificParserConfig;
26use risingwave_connector::source::monitor::SourceMetrics;
27use risingwave_connector::source::reader::reader::SourceReader;
28use risingwave_connector::source::{
29 ConnectorProperties, SourceColumnDesc, SourceContext, SourceCtrlOpts, SplitImpl, SplitMetaData,
30};
31use risingwave_pb::batch_plan::plan_node::NodeBody;
32
33use super::Executor;
34use crate::error::{BatchError, Result};
35use crate::executor::{BoxedExecutor, BoxedExecutorBuilder, ExecutorBuilder};
36
37pub struct SourceExecutor {
38 source: SourceReader,
39
40 column_ids: Vec<ColumnId>,
42 metrics: Arc<SourceMetrics>,
43 source_id: SourceId,
44 split_list: Vec<SplitImpl>,
45
46 schema: Schema,
47 identity: String,
48
49 chunk_size: usize,
50}
51
52impl BoxedExecutorBuilder for SourceExecutor {
53 async fn new_boxed_executor(
54 source: &ExecutorBuilder<'_>,
55 inputs: Vec<BoxedExecutor>,
56 ) -> Result<BoxedExecutor> {
57 ensure!(inputs.is_empty(), "Source should not have input executor!");
58 let source_node = try_match_expand!(
59 source.plan_node().get_node_body().unwrap(),
60 NodeBody::Source
61 )?;
62
63 let options_with_secret = WithOptionsSecResolved::new(
65 source_node.with_properties.clone(),
66 source_node.secret_refs.clone(),
67 );
68 let config = ConnectorProperties::extract(options_with_secret.clone(), false)
69 .map_err(BatchError::connector)?;
70
71 let info = source_node.get_info().unwrap();
72 let parser_config = SpecificParserConfig::new(info, &options_with_secret)?;
73
74 let columns: Vec<_> = source_node
75 .columns
76 .iter()
77 .map(|c| SourceColumnDesc::from(&ColumnDesc::from(c.column_desc.as_ref().unwrap())))
78 .collect();
79
80 let column_ids: Vec<_> = source_node
81 .columns
82 .iter()
83 .map(|column| ColumnId::from(column.get_column_desc().unwrap().column_id))
84 .collect();
85
86 let split_list = source_node
87 .split
88 .iter()
89 .map(|split| SplitImpl::restore_from_bytes(split).unwrap())
90 .collect_vec();
91
92 let fields = source_node
93 .columns
94 .iter()
95 .map(|prost| {
96 let column_desc = prost.column_desc.as_ref().unwrap();
97 let data_type = DataType::from(column_desc.column_type.as_ref().unwrap());
98 let name = column_desc.name.clone();
99 Field::with_name(data_type, name)
100 })
101 .collect();
102 let schema = Schema::new(fields);
103
104 assert!(!matches!(config, ConnectorProperties::Iceberg(_)));
105 let source_reader = SourceReader {
106 config,
107 columns,
108 parser_config,
109 connector_message_buffer_size: source
110 .context()
111 .get_config()
112 .developer
113 .connector_message_buffer_size,
114 };
115
116 Ok(Box::new(SourceExecutor {
117 source: source_reader,
118 column_ids,
119 metrics: source.context().source_metrics(),
120 source_id: source_node.source_id,
121 split_list,
122 schema,
123 identity: source.plan_node().get_identity().clone(),
124 chunk_size: source.context().get_config().developer.chunk_size,
125 }))
126 }
127}
128
129impl Executor for SourceExecutor {
130 fn schema(&self) -> &risingwave_common::catalog::Schema {
131 &self.schema
132 }
133
134 fn identity(&self) -> &str {
135 &self.identity
136 }
137
138 fn execute(self: Box<Self>) -> super::BoxedDataChunkStream {
139 self.do_execute().boxed()
140 }
141}
142
143impl SourceExecutor {
144 #[try_stream(ok = DataChunk, error = BatchError)]
145 async fn do_execute(self: Box<Self>) {
146 let source_ctx = Arc::new(SourceContext::new(
147 u32::MAX.into(),
148 self.source_id,
149 u32::MAX.into(),
150 "NA".to_owned(), self.metrics,
152 SourceCtrlOpts {
153 chunk_size: self.chunk_size,
154 split_txn: false,
155 },
156 ConnectorProperties::default(),
157 None,
158 ));
159 let (stream, _) = self
160 .source
161 .build_stream(Some(self.split_list), self.column_ids, source_ctx, false)
162 .await?;
163
164 #[for_await]
165 for chunk in stream {
166 let chunk = chunk.map_err(BatchError::connector)?;
167 let data_chunk = convert_stream_chunk_to_batch_chunk(chunk)?;
168 if data_chunk.capacity() > 0 {
169 yield data_chunk;
170 }
171 }
172 }
173}
174
175fn convert_stream_chunk_to_batch_chunk(chunk: StreamChunk) -> Result<DataChunk> {
176 assert!(chunk.data_chunk().is_vis_compacted());
178
179 if chunk.ops().iter().any(|op| *op != Op::Insert) {
180 bail!("Only support insert op in batch source executor");
181 }
182
183 Ok(chunk.data_chunk().clone())
184}