risingwave_batch_executors/executor/
source.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use futures::StreamExt;
18use futures_async_stream::try_stream;
19use itertools::Itertools;
20use risingwave_common::array::{DataChunk, Op, StreamChunk};
21use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema};
22use risingwave_common::id::SourceId;
23use risingwave_common::types::DataType;
24use risingwave_connector::WithOptionsSecResolved;
25use risingwave_connector::parser::SpecificParserConfig;
26use risingwave_connector::source::monitor::SourceMetrics;
27use risingwave_connector::source::reader::reader::SourceReader;
28use risingwave_connector::source::{
29    ConnectorProperties, SourceColumnDesc, SourceContext, SourceCtrlOpts, SplitImpl, SplitMetaData,
30};
31use risingwave_pb::batch_plan::plan_node::NodeBody;
32
33use super::Executor;
34use crate::error::{BatchError, Result};
35use crate::executor::{BoxedExecutor, BoxedExecutorBuilder, ExecutorBuilder};
36
37pub struct SourceExecutor {
38    source: SourceReader,
39
40    // used to create reader
41    column_ids: Vec<ColumnId>,
42    metrics: Arc<SourceMetrics>,
43    source_id: SourceId,
44    split_list: Vec<SplitImpl>,
45
46    schema: Schema,
47    identity: String,
48
49    chunk_size: usize,
50}
51
52impl BoxedExecutorBuilder for SourceExecutor {
53    async fn new_boxed_executor(
54        source: &ExecutorBuilder<'_>,
55        inputs: Vec<BoxedExecutor>,
56    ) -> Result<BoxedExecutor> {
57        ensure!(inputs.is_empty(), "Source should not have input executor!");
58        let source_node = try_match_expand!(
59            source.plan_node().get_node_body().unwrap(),
60            NodeBody::Source
61        )?;
62
63        // prepare connector source
64        let options_with_secret = WithOptionsSecResolved::new(
65            source_node.with_properties.clone(),
66            source_node.secret_refs.clone(),
67        );
68        let config = ConnectorProperties::extract(options_with_secret.clone(), false)
69            .map_err(BatchError::connector)?;
70
71        let info = source_node.get_info().unwrap();
72        let parser_config = SpecificParserConfig::new(info, &options_with_secret)?;
73
74        let columns: Vec<_> = source_node
75            .columns
76            .iter()
77            .map(|c| SourceColumnDesc::from(&ColumnDesc::from(c.column_desc.as_ref().unwrap())))
78            .collect();
79
80        let column_ids: Vec<_> = source_node
81            .columns
82            .iter()
83            .map(|column| ColumnId::from(column.get_column_desc().unwrap().column_id))
84            .collect();
85
86        let split_list = source_node
87            .split
88            .iter()
89            .map(|split| SplitImpl::restore_from_bytes(split).unwrap())
90            .collect_vec();
91
92        let fields = source_node
93            .columns
94            .iter()
95            .map(|prost| {
96                let column_desc = prost.column_desc.as_ref().unwrap();
97                let data_type = DataType::from(column_desc.column_type.as_ref().unwrap());
98                let name = column_desc.name.clone();
99                Field::with_name(data_type, name)
100            })
101            .collect();
102        let schema = Schema::new(fields);
103
104        assert!(!matches!(config, ConnectorProperties::Iceberg(_)));
105        let source_reader = SourceReader {
106            config,
107            columns,
108            parser_config,
109            connector_message_buffer_size: source
110                .context()
111                .get_config()
112                .developer
113                .connector_message_buffer_size,
114        };
115
116        Ok(Box::new(SourceExecutor {
117            source: source_reader,
118            column_ids,
119            metrics: source.context().source_metrics(),
120            source_id: source_node.source_id,
121            split_list,
122            schema,
123            identity: source.plan_node().get_identity().clone(),
124            chunk_size: source.context().get_config().developer.chunk_size,
125        }))
126    }
127}
128
129impl Executor for SourceExecutor {
130    fn schema(&self) -> &risingwave_common::catalog::Schema {
131        &self.schema
132    }
133
134    fn identity(&self) -> &str {
135        &self.identity
136    }
137
138    fn execute(self: Box<Self>) -> super::BoxedDataChunkStream {
139        self.do_execute().boxed()
140    }
141}
142
143impl SourceExecutor {
144    #[try_stream(ok = DataChunk, error = BatchError)]
145    async fn do_execute(self: Box<Self>) {
146        let source_ctx = Arc::new(SourceContext::new(
147            u32::MAX.into(),
148            self.source_id,
149            u32::MAX.into(),
150            "NA".to_owned(), // source name was not passed in batch plan
151            self.metrics,
152            SourceCtrlOpts {
153                chunk_size: self.chunk_size,
154                split_txn: false,
155            },
156            ConnectorProperties::default(),
157            None,
158        ));
159        let (stream, _) = self
160            .source
161            .build_stream(Some(self.split_list), self.column_ids, source_ctx, false)
162            .await?;
163
164        #[for_await]
165        for chunk in stream {
166            let chunk = chunk.map_err(BatchError::connector)?;
167            let data_chunk = convert_stream_chunk_to_batch_chunk(chunk)?;
168            if data_chunk.capacity() > 0 {
169                yield data_chunk;
170            }
171        }
172    }
173}
174
175fn convert_stream_chunk_to_batch_chunk(chunk: StreamChunk) -> Result<DataChunk> {
176    // chunk read from source must be compact
177    assert!(chunk.data_chunk().is_vis_compacted());
178
179    if chunk.ops().iter().any(|op| *op != Op::Insert) {
180        bail!("Only support insert op in batch source executor");
181    }
182
183    Ok(chunk.data_chunk().clone())
184}