risingwave_stream/from_proto/
mview.rs1use std::sync::Arc;
16
17use risingwave_common::catalog::ConflictBehavior;
18use risingwave_common::util::sort_util::ColumnOrder;
19use risingwave_common::util::value_encoding::BasicSerde;
20use risingwave_common::util::value_encoding::column_aware_row_encoding::ColumnAwareSerde;
21use risingwave_pb::stream_plan::{ArrangeNode, MaterializeNode};
22
23use super::*;
24use crate::executor::MaterializeExecutor;
25use crate::executor::materialize::RefreshableMaterializeArgs;
26
27pub struct MaterializeExecutorBuilder;
28
29impl ExecutorBuilder for MaterializeExecutorBuilder {
30 type Node = MaterializeNode;
31
32 async fn new_boxed_executor(
33 params: ExecutorParams,
34 node: &Self::Node,
35 store: impl StateStore,
36 ) -> StreamResult<Executor> {
37 let [input]: [_; 1] = params.input.try_into().unwrap();
38
39 let order_key = node
40 .column_orders
41 .iter()
42 .map(ColumnOrder::from_protobuf)
43 .collect();
44
45 let table = node.get_table()?;
46 let versioned = table.version.is_some();
47 let refreshable = table.refreshable;
48
49 let conflict_behavior =
50 ConflictBehavior::from_protobuf(&table.handle_pk_conflict_behavior());
51 let version_column_indices: Vec<u32> = table.version_column_indices.clone();
52
53 let exec = if refreshable {
54 let refresh_args = RefreshableMaterializeArgs::<_, ColumnAwareSerde>::new(
56 store.clone(),
57 table,
58 node.staging_table.as_ref().unwrap(),
59 node.refresh_progress_table.as_ref().unwrap(),
60 params.vnode_bitmap.clone().map(Arc::new),
61 )
62 .await;
63
64 MaterializeExecutor::<_, ColumnAwareSerde>::new(
66 input,
67 params.info.schema.clone(),
68 store,
69 order_key,
70 params.actor_context,
71 params.vnode_bitmap.map(Arc::new),
72 table,
73 params.watermark_epoch,
74 conflict_behavior,
75 version_column_indices.clone(),
76 params.executor_stats.clone(),
77 Some(refresh_args),
78 params.local_barrier_manager.clone(),
79 )
80 .await
81 .boxed()
82 } else {
83 if versioned {
85 MaterializeExecutor::<_, ColumnAwareSerde>::new(
86 input,
87 params.info.schema.clone(),
88 store,
89 order_key,
90 params.actor_context,
91 params.vnode_bitmap.map(Arc::new),
92 table,
93 params.watermark_epoch,
94 conflict_behavior,
95 version_column_indices.clone(),
96 params.executor_stats.clone(),
97 None, params.local_barrier_manager.clone(),
99 )
100 .await
101 .boxed()
102 } else {
103 MaterializeExecutor::<_, BasicSerde>::new(
104 input,
105 params.info.schema.clone(),
106 store,
107 order_key,
108 params.actor_context,
109 params.vnode_bitmap.map(Arc::new),
110 table,
111 params.watermark_epoch,
112 conflict_behavior,
113 version_column_indices.clone(),
114 params.executor_stats.clone(),
115 None, params.local_barrier_manager.clone(),
117 )
118 .await
119 .boxed()
120 }
121 };
122
123 Ok((params.info, exec).into())
124 }
125}
126
127pub struct ArrangeExecutorBuilder;
128
129impl ExecutorBuilder for ArrangeExecutorBuilder {
130 type Node = ArrangeNode;
131
132 async fn new_boxed_executor(
133 params: ExecutorParams,
134 node: &Self::Node,
135 store: impl StateStore,
136 ) -> StreamResult<Executor> {
137 let [input]: [_; 1] = params.input.try_into().unwrap();
138
139 let keys = node
140 .get_table_info()?
141 .arrange_key_orders
142 .iter()
143 .map(ColumnOrder::from_protobuf)
144 .collect();
145
146 let table = node.get_table()?;
147
148 let vnodes = params.vnode_bitmap.map(Arc::new);
151 let conflict_behavior =
152 ConflictBehavior::from_protobuf(&table.handle_pk_conflict_behavior());
153 let version_column_indices: Vec<u32> = table.version_column_indices.clone();
154 let exec = MaterializeExecutor::<_, BasicSerde>::new(
155 input,
156 params.info.schema.clone(),
157 store,
158 keys,
159 params.actor_context,
160 vnodes,
161 table,
162 params.watermark_epoch,
163 conflict_behavior,
164 version_column_indices,
165 params.executor_stats.clone(),
166 None, params.local_barrier_manager.clone(),
168 )
169 .await;
170
171 Ok((params.info, exec).into())
172 }
173}