risingwave_stream/from_proto/
batch_query.rs1use risingwave_common::catalog::ColumnId;
16use risingwave_pb::plan_common::StorageTableDesc;
17use risingwave_pb::stream_plan::BatchPlanNode;
18use risingwave_storage::table::batch_table::BatchTable;
19
20use super::*;
21use crate::executor::{BatchQueryExecutor, DummyExecutor};
22
23pub struct BatchQueryExecutorBuilder;
24
25impl ExecutorBuilder for BatchQueryExecutorBuilder {
26 type Node = BatchPlanNode;
27
28 async fn new_boxed_executor(
29 params: ExecutorParams,
30 node: &Self::Node,
31 state_store: impl StateStore,
32 ) -> StreamResult<Executor> {
33 if node.table_desc.is_none() {
34 let mut info = params.info;
36 info.identity = "DummyBatchQueryExecutor".to_owned();
37 return Ok((info, DummyExecutor).into());
38 }
39
40 let table_desc: &StorageTableDesc = node.get_table_desc()?;
41
42 let column_ids = node
43 .column_ids
44 .iter()
45 .copied()
46 .map(ColumnId::from)
47 .collect();
48
49 let table = BatchTable::new_partial(
50 state_store,
51 column_ids,
52 params.vnode_bitmap.map(Into::into),
53 table_desc,
54 );
55 assert_eq!(table.schema().data_types(), params.info.schema.data_types());
56
57 let exec = BatchQueryExecutor::new(
58 table,
59 params.env.config().developer.chunk_size,
60 params.info.schema.clone(),
61 );
62 Ok((params.info, exec).into())
63 }
64}