risingwave_batch_executors/executor/
source.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use futures::StreamExt;
18use futures_async_stream::try_stream;
19use itertools::Itertools;
20use risingwave_common::array::{DataChunk, Op, StreamChunk};
21use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema};
22use risingwave_common::id::SourceId;
23use risingwave_common::types::DataType;
24use risingwave_connector::WithOptionsSecResolved;
25use risingwave_connector::parser::SpecificParserConfig;
26use risingwave_connector::source::monitor::SourceMetrics;
27use risingwave_connector::source::reader::reader::SourceReader;
28use risingwave_connector::source::{
29    ConnectorProperties, SourceColumnDesc, SourceContext, SourceCtrlOpts, SourceReaderEvent,
30    SplitImpl, SplitMetaData,
31};
32use risingwave_pb::batch_plan::plan_node::NodeBody;
33
34use super::Executor;
35use crate::error::{BatchError, Result};
36use crate::executor::{BoxedExecutor, BoxedExecutorBuilder, ExecutorBuilder};
37
38pub struct SourceExecutor {
39    source: SourceReader,
40
41    // used to create reader
42    column_ids: Vec<ColumnId>,
43    metrics: Arc<SourceMetrics>,
44    source_id: SourceId,
45    split_list: Vec<SplitImpl>,
46
47    schema: Schema,
48    identity: String,
49
50    chunk_size: usize,
51}
52
53impl BoxedExecutorBuilder for SourceExecutor {
54    async fn new_boxed_executor(
55        source: &ExecutorBuilder<'_>,
56        inputs: Vec<BoxedExecutor>,
57    ) -> Result<BoxedExecutor> {
58        ensure!(inputs.is_empty(), "Source should not have input executor!");
59        let source_node = try_match_expand!(
60            source.plan_node().get_node_body().unwrap(),
61            NodeBody::Source
62        )?;
63
64        // prepare connector source
65        let options_with_secret = WithOptionsSecResolved::new(
66            source_node.with_properties.clone(),
67            source_node.secret_refs.clone(),
68        );
69        let config = ConnectorProperties::extract(options_with_secret.clone(), false)
70            .map_err(BatchError::connector)?;
71
72        let info = source_node.get_info().unwrap();
73        let parser_config = SpecificParserConfig::new(info, &options_with_secret)?;
74
75        let columns: Vec<_> = source_node
76            .columns
77            .iter()
78            .map(|c| SourceColumnDesc::from(&ColumnDesc::from(c.column_desc.as_ref().unwrap())))
79            .collect();
80
81        let column_ids: Vec<_> = source_node
82            .columns
83            .iter()
84            .map(|column| ColumnId::from(column.get_column_desc().unwrap().column_id))
85            .collect();
86
87        let split_list = source_node
88            .split
89            .iter()
90            .map(|split| SplitImpl::restore_from_bytes(split).unwrap())
91            .collect_vec();
92
93        let fields = source_node
94            .columns
95            .iter()
96            .map(|prost| {
97                let column_desc = prost.column_desc.as_ref().unwrap();
98                let data_type = DataType::from(column_desc.column_type.as_ref().unwrap());
99                let name = column_desc.name.clone();
100                Field::with_name(data_type, name)
101            })
102            .collect();
103        let schema = Schema::new(fields);
104
105        assert!(!matches!(config, ConnectorProperties::Iceberg(_)));
106        let source_reader = SourceReader {
107            config,
108            columns,
109            parser_config,
110            connector_message_buffer_size: source
111                .context()
112                .get_config()
113                .developer
114                .connector_message_buffer_size,
115        };
116
117        Ok(Box::new(SourceExecutor {
118            source: source_reader,
119            column_ids,
120            metrics: source.context().source_metrics(),
121            source_id: source_node.source_id,
122            split_list,
123            schema,
124            identity: source.plan_node().get_identity().clone(),
125            chunk_size: source.context().get_config().developer.chunk_size,
126        }))
127    }
128}
129
130impl Executor for SourceExecutor {
131    fn schema(&self) -> &risingwave_common::catalog::Schema {
132        &self.schema
133    }
134
135    fn identity(&self) -> &str {
136        &self.identity
137    }
138
139    fn execute(self: Box<Self>) -> super::BoxedDataChunkStream {
140        self.do_execute().boxed()
141    }
142}
143
144impl SourceExecutor {
145    #[try_stream(ok = DataChunk, error = BatchError)]
146    async fn do_execute(self: Box<Self>) {
147        let source_ctx = Arc::new(SourceContext::new(
148            u32::MAX.into(),
149            self.source_id,
150            u32::MAX.into(),
151            "NA".to_owned(), // source name was not passed in batch plan
152            self.metrics,
153            SourceCtrlOpts {
154                chunk_size: self.chunk_size,
155                split_txn: false,
156            },
157            ConnectorProperties::default(),
158            None,
159        ));
160        let (stream, _) = self
161            .source
162            .build_stream(Some(self.split_list), self.column_ids, source_ctx, false)
163            .await?;
164
165        #[for_await]
166        for event in stream {
167            let chunk = match event.map_err(BatchError::connector)? {
168                SourceReaderEvent::DataChunk(chunk) => chunk,
169                SourceReaderEvent::SplitProgress(_) => continue,
170            };
171            let data_chunk = convert_stream_chunk_to_batch_chunk(chunk)?;
172            if data_chunk.capacity() > 0 {
173                yield data_chunk;
174            }
175        }
176    }
177}
178
179fn convert_stream_chunk_to_batch_chunk(chunk: StreamChunk) -> Result<DataChunk> {
180    // chunk read from source must be compact
181    assert!(chunk.data_chunk().is_vis_compacted());
182
183    if chunk.ops().iter().any(|op| *op != Op::Insert) {
184        bail!("Only support insert op in batch source executor");
185    }
186
187    Ok(chunk.data_chunk().clone())
188}