risingwave_batch_executors/executor/
source.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use futures::StreamExt;
18use futures_async_stream::try_stream;
19use itertools::Itertools;
20use risingwave_common::array::{DataChunk, Op, StreamChunk};
21use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema, TableId};
22use risingwave_common::types::DataType;
23use risingwave_connector::WithOptionsSecResolved;
24use risingwave_connector::parser::SpecificParserConfig;
25use risingwave_connector::source::monitor::SourceMetrics;
26use risingwave_connector::source::reader::reader::SourceReader;
27use risingwave_connector::source::{
28    ConnectorProperties, SourceColumnDesc, SourceContext, SourceCtrlOpts, SplitImpl, SplitMetaData,
29};
30use risingwave_pb::batch_plan::plan_node::NodeBody;
31
32use super::Executor;
33use crate::error::{BatchError, Result};
34use crate::executor::{BoxedExecutor, BoxedExecutorBuilder, ExecutorBuilder};
35
36pub struct SourceExecutor {
37    source: SourceReader,
38
39    // used to create reader
40    column_ids: Vec<ColumnId>,
41    metrics: Arc<SourceMetrics>,
42    source_id: TableId,
43    split_list: Vec<SplitImpl>,
44
45    schema: Schema,
46    identity: String,
47
48    chunk_size: usize,
49}
50
51impl BoxedExecutorBuilder for SourceExecutor {
52    async fn new_boxed_executor(
53        source: &ExecutorBuilder<'_>,
54        inputs: Vec<BoxedExecutor>,
55    ) -> Result<BoxedExecutor> {
56        ensure!(inputs.is_empty(), "Source should not have input executor!");
57        let source_node = try_match_expand!(
58            source.plan_node().get_node_body().unwrap(),
59            NodeBody::Source
60        )?;
61
62        // prepare connector source
63        let options_with_secret = WithOptionsSecResolved::new(
64            source_node.with_properties.clone(),
65            source_node.secret_refs.clone(),
66        );
67        let config = ConnectorProperties::extract(options_with_secret.clone(), false)
68            .map_err(BatchError::connector)?;
69
70        let info = source_node.get_info().unwrap();
71        let parser_config = SpecificParserConfig::new(info, &options_with_secret)?;
72
73        let columns: Vec<_> = source_node
74            .columns
75            .iter()
76            .map(|c| SourceColumnDesc::from(&ColumnDesc::from(c.column_desc.as_ref().unwrap())))
77            .collect();
78
79        let column_ids: Vec<_> = source_node
80            .columns
81            .iter()
82            .map(|column| ColumnId::from(column.get_column_desc().unwrap().column_id))
83            .collect();
84
85        let split_list = source_node
86            .split
87            .iter()
88            .map(|split| SplitImpl::restore_from_bytes(split).unwrap())
89            .collect_vec();
90
91        let fields = source_node
92            .columns
93            .iter()
94            .map(|prost| {
95                let column_desc = prost.column_desc.as_ref().unwrap();
96                let data_type = DataType::from(column_desc.column_type.as_ref().unwrap());
97                let name = column_desc.name.clone();
98                Field::with_name(data_type, name)
99            })
100            .collect();
101        let schema = Schema::new(fields);
102
103        assert!(!matches!(config, ConnectorProperties::Iceberg(_)));
104        let source_reader = SourceReader {
105            config,
106            columns,
107            parser_config,
108            connector_message_buffer_size: source
109                .context()
110                .get_config()
111                .developer
112                .connector_message_buffer_size,
113        };
114
115        Ok(Box::new(SourceExecutor {
116            source: source_reader,
117            column_ids,
118            metrics: source.context().source_metrics(),
119            source_id: TableId::new(source_node.source_id),
120            split_list,
121            schema,
122            identity: source.plan_node().get_identity().clone(),
123            chunk_size: source.context().get_config().developer.chunk_size,
124        }))
125    }
126}
127
128impl Executor for SourceExecutor {
129    fn schema(&self) -> &risingwave_common::catalog::Schema {
130        &self.schema
131    }
132
133    fn identity(&self) -> &str {
134        &self.identity
135    }
136
137    fn execute(self: Box<Self>) -> super::BoxedDataChunkStream {
138        self.do_execute().boxed()
139    }
140}
141
142impl SourceExecutor {
143    #[try_stream(ok = DataChunk, error = BatchError)]
144    async fn do_execute(self: Box<Self>) {
145        let source_ctx = Arc::new(SourceContext::new(
146            u32::MAX,
147            self.source_id,
148            u32::MAX,
149            "NA".to_owned(), // source name was not passed in batch plan
150            self.metrics,
151            SourceCtrlOpts {
152                chunk_size: self.chunk_size,
153                split_txn: false,
154            },
155            ConnectorProperties::default(),
156            None,
157        ));
158        let (stream, _) = self
159            .source
160            .build_stream(Some(self.split_list), self.column_ids, source_ctx, false)
161            .await?;
162
163        #[for_await]
164        for chunk in stream {
165            let chunk = chunk.map_err(BatchError::connector)?;
166            let data_chunk = convert_stream_chunk_to_batch_chunk(chunk)?;
167            if data_chunk.capacity() > 0 {
168                yield data_chunk;
169            }
170        }
171    }
172}
173
174fn convert_stream_chunk_to_batch_chunk(chunk: StreamChunk) -> Result<DataChunk> {
175    // chunk read from source must be compact
176    assert!(chunk.data_chunk().is_compacted());
177
178    if chunk.ops().iter().any(|op| *op != Op::Insert) {
179        bail!("Only support insert op in batch source executor");
180    }
181
182    Ok(chunk.data_chunk().clone())
183}