risingwave_stream/from_proto/
mview.rs1use std::sync::Arc;
16
17use risingwave_common::catalog::ConflictBehavior;
18use risingwave_common::util::sort_util::ColumnOrder;
19use risingwave_common::util::value_encoding::BasicSerde;
20use risingwave_common::util::value_encoding::column_aware_row_encoding::ColumnAwareSerde;
21use risingwave_pb::stream_plan::{ArrangeNode, MaterializeNode};
22
23use super::*;
24use crate::executor::MaterializeExecutor;
25
26pub struct MaterializeExecutorBuilder;
27
28impl ExecutorBuilder for MaterializeExecutorBuilder {
29 type Node = MaterializeNode;
30
31 async fn new_boxed_executor(
32 params: ExecutorParams,
33 node: &Self::Node,
34 store: impl StateStore,
35 ) -> StreamResult<Executor> {
36 let [input]: [_; 1] = params.input.try_into().unwrap();
37
38 let order_key = node
39 .column_orders
40 .iter()
41 .map(ColumnOrder::from_protobuf)
42 .collect();
43
44 let table = node.get_table()?;
45 let versioned = table.version.is_some();
46
47 let conflict_behavior =
48 ConflictBehavior::from_protobuf(&table.handle_pk_conflict_behavior());
49 let version_column_index = table.version_column_index;
50
51 macro_rules! new_executor {
52 ($SD:ident) => {
53 MaterializeExecutor::<_, $SD>::new(
54 input,
55 params.info.schema.clone(),
56 store,
57 order_key,
58 params.actor_context,
59 params.vnode_bitmap.map(Arc::new),
60 table,
61 params.watermark_epoch,
62 conflict_behavior,
63 version_column_index,
64 params.executor_stats.clone(),
65 )
66 .await
67 .boxed()
68 };
69 }
70
71 let exec = if versioned {
72 new_executor!(ColumnAwareSerde)
73 } else {
74 new_executor!(BasicSerde)
75 };
76
77 Ok((params.info, exec).into())
78 }
79}
80
81pub struct ArrangeExecutorBuilder;
82
83impl ExecutorBuilder for ArrangeExecutorBuilder {
84 type Node = ArrangeNode;
85
86 async fn new_boxed_executor(
87 params: ExecutorParams,
88 node: &Self::Node,
89 store: impl StateStore,
90 ) -> StreamResult<Executor> {
91 let [input]: [_; 1] = params.input.try_into().unwrap();
92
93 let keys = node
94 .get_table_info()?
95 .arrange_key_orders
96 .iter()
97 .map(ColumnOrder::from_protobuf)
98 .collect();
99
100 let table = node.get_table()?;
101
102 let vnodes = params.vnode_bitmap.map(Arc::new);
105 let conflict_behavior =
106 ConflictBehavior::from_protobuf(&table.handle_pk_conflict_behavior());
107 let version_column_index = table.version_column_index;
108 let exec = MaterializeExecutor::<_, BasicSerde>::new(
109 input,
110 params.info.schema.clone(),
111 store,
112 keys,
113 params.actor_context,
114 vnodes,
115 table,
116 params.watermark_epoch,
117 conflict_behavior,
118 version_column_index,
119 params.executor_stats.clone(),
120 )
121 .await;
122
123 Ok((params.info, exec).into())
124 }
125}