risingwave_batch_executors/executor/
source.rs1use std::sync::Arc;
16
17use futures::StreamExt;
18use futures_async_stream::try_stream;
19use itertools::Itertools;
20use risingwave_common::array::{DataChunk, Op, StreamChunk};
21use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema, TableId};
22use risingwave_common::types::DataType;
23use risingwave_connector::WithOptionsSecResolved;
24use risingwave_connector::parser::SpecificParserConfig;
25use risingwave_connector::source::monitor::SourceMetrics;
26use risingwave_connector::source::reader::reader::SourceReader;
27use risingwave_connector::source::{
28 ConnectorProperties, SourceColumnDesc, SourceContext, SourceCtrlOpts, SplitImpl, SplitMetaData,
29};
30use risingwave_pb::batch_plan::plan_node::NodeBody;
31
32use super::Executor;
33use crate::error::{BatchError, Result};
34use crate::executor::{BoxedExecutor, BoxedExecutorBuilder, ExecutorBuilder};
35
36pub struct SourceExecutor {
37 source: SourceReader,
38
39 column_ids: Vec<ColumnId>,
41 metrics: Arc<SourceMetrics>,
42 source_id: TableId,
43 split_list: Vec<SplitImpl>,
44
45 schema: Schema,
46 identity: String,
47
48 chunk_size: usize,
49}
50
51impl BoxedExecutorBuilder for SourceExecutor {
52 async fn new_boxed_executor(
53 source: &ExecutorBuilder<'_>,
54 inputs: Vec<BoxedExecutor>,
55 ) -> Result<BoxedExecutor> {
56 ensure!(inputs.is_empty(), "Source should not have input executor!");
57 let source_node = try_match_expand!(
58 source.plan_node().get_node_body().unwrap(),
59 NodeBody::Source
60 )?;
61
62 let options_with_secret = WithOptionsSecResolved::new(
64 source_node.with_properties.clone(),
65 source_node.secret_refs.clone(),
66 );
67 let config = ConnectorProperties::extract(options_with_secret.clone(), false)
68 .map_err(BatchError::connector)?;
69
70 let info = source_node.get_info().unwrap();
71 let parser_config = SpecificParserConfig::new(info, &options_with_secret)?;
72
73 let columns: Vec<_> = source_node
74 .columns
75 .iter()
76 .map(|c| SourceColumnDesc::from(&ColumnDesc::from(c.column_desc.as_ref().unwrap())))
77 .collect();
78
79 let column_ids: Vec<_> = source_node
80 .columns
81 .iter()
82 .map(|column| ColumnId::from(column.get_column_desc().unwrap().column_id))
83 .collect();
84
85 let split_list = source_node
86 .split
87 .iter()
88 .map(|split| SplitImpl::restore_from_bytes(split).unwrap())
89 .collect_vec();
90
91 let fields = source_node
92 .columns
93 .iter()
94 .map(|prost| {
95 let column_desc = prost.column_desc.as_ref().unwrap();
96 let data_type = DataType::from(column_desc.column_type.as_ref().unwrap());
97 let name = column_desc.name.clone();
98 Field::with_name(data_type, name)
99 })
100 .collect();
101 let schema = Schema::new(fields);
102
103 assert!(!matches!(config, ConnectorProperties::Iceberg(_)));
104 let source_reader = SourceReader {
105 config,
106 columns,
107 parser_config,
108 connector_message_buffer_size: source
109 .context()
110 .get_config()
111 .developer
112 .connector_message_buffer_size,
113 };
114
115 Ok(Box::new(SourceExecutor {
116 source: source_reader,
117 column_ids,
118 metrics: source.context().source_metrics(),
119 source_id: TableId::new(source_node.source_id),
120 split_list,
121 schema,
122 identity: source.plan_node().get_identity().clone(),
123 chunk_size: source.context().get_config().developer.chunk_size,
124 }))
125 }
126}
127
128impl Executor for SourceExecutor {
129 fn schema(&self) -> &risingwave_common::catalog::Schema {
130 &self.schema
131 }
132
133 fn identity(&self) -> &str {
134 &self.identity
135 }
136
137 fn execute(self: Box<Self>) -> super::BoxedDataChunkStream {
138 self.do_execute().boxed()
139 }
140}
141
142impl SourceExecutor {
143 #[try_stream(ok = DataChunk, error = BatchError)]
144 async fn do_execute(self: Box<Self>) {
145 let source_ctx = Arc::new(SourceContext::new(
146 u32::MAX,
147 self.source_id,
148 u32::MAX,
149 "NA".to_owned(), self.metrics,
151 SourceCtrlOpts {
152 chunk_size: self.chunk_size,
153 split_txn: false,
154 },
155 ConnectorProperties::default(),
156 None,
157 ));
158 let (stream, _) = self
159 .source
160 .build_stream(Some(self.split_list), self.column_ids, source_ctx, false)
161 .await?;
162
163 #[for_await]
164 for chunk in stream {
165 let chunk = chunk.map_err(BatchError::connector)?;
166 let data_chunk = convert_stream_chunk_to_batch_chunk(chunk)?;
167 if data_chunk.capacity() > 0 {
168 yield data_chunk;
169 }
170 }
171 }
172}
173
174fn convert_stream_chunk_to_batch_chunk(chunk: StreamChunk) -> Result<DataChunk> {
175 assert!(chunk.data_chunk().is_compacted());
177
178 if chunk.ops().iter().any(|op| *op != Op::Insert) {
179 bail!("Only support insert op in batch source executor");
180 }
181
182 Ok(chunk.data_chunk().clone())
183}