risingwave_stream/from_proto/
batch_query.rs1use risingwave_common::catalog::ColumnId;
16use risingwave_pb::plan_common::StorageTableDesc;
17use risingwave_pb::stream_plan::BatchPlanNode;
18use risingwave_storage::table::batch_table::BatchTable;
19
20use super::*;
21use crate::executor::{BatchQueryExecutor, DummyExecutor};
22
23pub struct BatchQueryExecutorBuilder;
24
25impl_stream_node_body!(BatchPlan(BatchPlanNode) => BatchQueryExecutorBuilder);
26
27impl ExecutorBuilder for BatchQueryExecutorBuilder {
28 type Node = BatchPlanNode;
29
30 async fn new_boxed_executor(
31 params: ExecutorParams,
32 node: &Self::Node,
33 state_store: impl StateStore,
34 ) -> StreamResult<Executor> {
35 if node.table_desc.is_none() {
36 let mut info = params.info;
38 info.identity = "DummyBatchQueryExecutor".to_owned();
39 return Ok((info, DummyExecutor).into());
40 }
41
42 let table_desc: &StorageTableDesc = node.get_table_desc()?;
43
44 let column_ids = node
45 .column_ids
46 .iter()
47 .copied()
48 .map(ColumnId::from)
49 .collect();
50
51 let table = BatchTable::new_partial(
52 state_store,
53 column_ids,
54 params.vnode_bitmap.map(Into::into),
55 table_desc,
56 );
57 assert_eq!(table.schema().data_types(), params.info.schema.data_types());
58
59 let exec = BatchQueryExecutor::new(
60 table,
61 params.config.developer.chunk_size,
62 params.info.schema.clone(),
63 );
64 Ok((params.info, exec).into())
65 }
66}