Skip to main content

risingwave_stream/from_proto/
batch_query.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::catalog::ColumnId;
16use risingwave_pb::plan_common::StorageTableDesc;
17use risingwave_pb::stream_plan::BatchPlanNode;
18use risingwave_storage::table::batch_table::BatchTable;
19
20use super::*;
21use crate::executor::{BatchQueryExecutor, DummyExecutor};
22
23pub struct BatchQueryExecutorBuilder;
24
25impl_stream_node_body!(BatchPlan(BatchPlanNode) => BatchQueryExecutorBuilder);
26
27impl ExecutorBuilder for BatchQueryExecutorBuilder {
28    type Node = BatchPlanNode;
29
30    async fn new_boxed_executor(
31        params: ExecutorParams,
32        node: &Self::Node,
33        state_store: impl StateStore,
34    ) -> StreamResult<Executor> {
35        if node.table_desc.is_none() {
36            // used in sharing cdc source backfill as a dummy batch plan node
37            let mut info = params.info;
38            info.identity = "DummyBatchQueryExecutor".to_owned();
39            return Ok((info, DummyExecutor).into());
40        }
41
42        let table_desc: &StorageTableDesc = node.get_table_desc()?;
43
44        let column_ids = node
45            .column_ids
46            .iter()
47            .copied()
48            .map(ColumnId::from)
49            .collect();
50
51        let table = BatchTable::new_partial(
52            state_store,
53            column_ids,
54            params.vnode_bitmap.map(Into::into),
55            table_desc,
56        );
57        assert_eq!(table.schema().data_types(), params.info.schema.data_types());
58
59        let exec = BatchQueryExecutor::new(
60            table,
61            params.config.developer.chunk_size,
62            params.info.schema.clone(),
63        );
64        Ok((params.info, exec).into())
65    }
66}