risingwave_batch_executors/executor/
source.rs1use std::sync::Arc;
16
17use futures::StreamExt;
18use futures_async_stream::try_stream;
19use itertools::Itertools;
20use risingwave_common::array::{DataChunk, Op, StreamChunk};
21use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema};
22use risingwave_common::id::SourceId;
23use risingwave_common::types::DataType;
24use risingwave_connector::WithOptionsSecResolved;
25use risingwave_connector::parser::SpecificParserConfig;
26use risingwave_connector::source::monitor::SourceMetrics;
27use risingwave_connector::source::reader::reader::SourceReader;
28use risingwave_connector::source::{
29 ConnectorProperties, SourceColumnDesc, SourceContext, SourceCtrlOpts, SourceReaderEvent,
30 SplitImpl, SplitMetaData,
31};
32use risingwave_pb::batch_plan::plan_node::NodeBody;
33
34use super::Executor;
35use crate::error::{BatchError, Result};
36use crate::executor::{BoxedExecutor, BoxedExecutorBuilder, ExecutorBuilder};
37
38pub struct SourceExecutor {
39 source: SourceReader,
40
41 column_ids: Vec<ColumnId>,
43 metrics: Arc<SourceMetrics>,
44 source_id: SourceId,
45 split_list: Vec<SplitImpl>,
46
47 schema: Schema,
48 identity: String,
49
50 chunk_size: usize,
51}
52
53impl BoxedExecutorBuilder for SourceExecutor {
54 async fn new_boxed_executor(
55 source: &ExecutorBuilder<'_>,
56 inputs: Vec<BoxedExecutor>,
57 ) -> Result<BoxedExecutor> {
58 ensure!(inputs.is_empty(), "Source should not have input executor!");
59 let source_node = try_match_expand!(
60 source.plan_node().get_node_body().unwrap(),
61 NodeBody::Source
62 )?;
63
64 let options_with_secret = WithOptionsSecResolved::new(
66 source_node.with_properties.clone(),
67 source_node.secret_refs.clone(),
68 );
69 let config = ConnectorProperties::extract(options_with_secret.clone(), false)
70 .map_err(BatchError::connector)?;
71
72 let info = source_node.get_info().unwrap();
73 let parser_config = SpecificParserConfig::new(info, &options_with_secret)?;
74
75 let columns: Vec<_> = source_node
76 .columns
77 .iter()
78 .map(|c| SourceColumnDesc::from(&ColumnDesc::from(c.column_desc.as_ref().unwrap())))
79 .collect();
80
81 let column_ids: Vec<_> = source_node
82 .columns
83 .iter()
84 .map(|column| ColumnId::from(column.get_column_desc().unwrap().column_id))
85 .collect();
86
87 let split_list = source_node
88 .split
89 .iter()
90 .map(|split| SplitImpl::restore_from_bytes(split).unwrap())
91 .collect_vec();
92
93 let fields = source_node
94 .columns
95 .iter()
96 .map(|prost| {
97 let column_desc = prost.column_desc.as_ref().unwrap();
98 let data_type = DataType::from(column_desc.column_type.as_ref().unwrap());
99 let name = column_desc.name.clone();
100 Field::with_name(data_type, name)
101 })
102 .collect();
103 let schema = Schema::new(fields);
104
105 assert!(!matches!(config, ConnectorProperties::Iceberg(_)));
106 let source_reader = SourceReader {
107 config,
108 columns,
109 parser_config,
110 connector_message_buffer_size: source
111 .context()
112 .get_config()
113 .developer
114 .connector_message_buffer_size,
115 };
116
117 Ok(Box::new(SourceExecutor {
118 source: source_reader,
119 column_ids,
120 metrics: source.context().source_metrics(),
121 source_id: source_node.source_id,
122 split_list,
123 schema,
124 identity: source.plan_node().get_identity().clone(),
125 chunk_size: source.context().get_config().developer.chunk_size,
126 }))
127 }
128}
129
130impl Executor for SourceExecutor {
131 fn schema(&self) -> &risingwave_common::catalog::Schema {
132 &self.schema
133 }
134
135 fn identity(&self) -> &str {
136 &self.identity
137 }
138
139 fn execute(self: Box<Self>) -> super::BoxedDataChunkStream {
140 self.do_execute().boxed()
141 }
142}
143
144impl SourceExecutor {
145 #[try_stream(ok = DataChunk, error = BatchError)]
146 async fn do_execute(self: Box<Self>) {
147 let source_ctx = Arc::new(SourceContext::new(
148 u32::MAX.into(),
149 self.source_id,
150 u32::MAX.into(),
151 "NA".to_owned(), self.metrics,
153 SourceCtrlOpts {
154 chunk_size: self.chunk_size,
155 split_txn: false,
156 },
157 ConnectorProperties::default(),
158 None,
159 ));
160 let (stream, _) = self
161 .source
162 .build_stream(Some(self.split_list), self.column_ids, source_ctx, false)
163 .await?;
164
165 #[for_await]
166 for event in stream {
167 let chunk = match event.map_err(BatchError::connector)? {
168 SourceReaderEvent::DataChunk(chunk) => chunk,
169 SourceReaderEvent::SplitProgress(_) => continue,
170 };
171 let data_chunk = convert_stream_chunk_to_batch_chunk(chunk)?;
172 if data_chunk.capacity() > 0 {
173 yield data_chunk;
174 }
175 }
176 }
177}
178
179fn convert_stream_chunk_to_batch_chunk(chunk: StreamChunk) -> Result<DataChunk> {
180 assert!(chunk.data_chunk().is_vis_compacted());
182
183 if chunk.ops().iter().any(|op| *op != Op::Insert) {
184 bail!("Only support insert op in batch source executor");
185 }
186
187 Ok(chunk.data_chunk().clone())
188}