risingwave_stream/from_proto/
batch_query.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::catalog::ColumnId;
16use risingwave_pb::plan_common::StorageTableDesc;
17use risingwave_pb::stream_plan::BatchPlanNode;
18use risingwave_storage::table::batch_table::BatchTable;
19
20use super::*;
21use crate::executor::{BatchQueryExecutor, DummyExecutor};
22
23pub struct BatchQueryExecutorBuilder;
24
25impl ExecutorBuilder for BatchQueryExecutorBuilder {
26    type Node = BatchPlanNode;
27
28    async fn new_boxed_executor(
29        params: ExecutorParams,
30        node: &Self::Node,
31        state_store: impl StateStore,
32    ) -> StreamResult<Executor> {
33        if node.table_desc.is_none() {
34            // used in sharing cdc source backfill as a dummy batch plan node
35            let mut info = params.info;
36            info.identity = "DummyBatchQueryExecutor".to_owned();
37            return Ok((info, DummyExecutor).into());
38        }
39
40        let table_desc: &StorageTableDesc = node.get_table_desc()?;
41
42        let column_ids = node
43            .column_ids
44            .iter()
45            .copied()
46            .map(ColumnId::from)
47            .collect();
48
49        let table = BatchTable::new_partial(
50            state_store,
51            column_ids,
52            params.vnode_bitmap.map(Into::into),
53            table_desc,
54        );
55        assert_eq!(table.schema().data_types(), params.info.schema.data_types());
56
57        let exec = BatchQueryExecutor::new(
58            table,
59            params.env.config().developer.chunk_size,
60            params.info.schema.clone(),
61        );
62        Ok((params.info, exec).into())
63    }
64}