risingwave_stream/from_proto/
mview.rs1use std::sync::Arc;
16
17use risingwave_common::catalog::ConflictBehavior;
18use risingwave_common::util::sort_util::ColumnOrder;
19use risingwave_common::util::value_encoding::BasicSerde;
20use risingwave_common::util::value_encoding::column_aware_row_encoding::ColumnAwareSerde;
21use risingwave_pb::stream_plan::{ArrangeNode, MaterializeNode};
22
23use super::*;
24use crate::executor::{MaterializeExecutor, RefreshableMaterializeArgs};
25
26pub struct MaterializeExecutorBuilder;
27
28impl_stream_node_body!(Materialize(MaterializeNode) => MaterializeExecutorBuilder);
29
30impl ExecutorBuilder for MaterializeExecutorBuilder {
31 type Node = MaterializeNode;
32
33 async fn new_boxed_executor(
34 params: ExecutorParams,
35 node: &Self::Node,
36 store: impl StateStore,
37 ) -> StreamResult<Executor> {
38 let [input]: [_; 1] = params.input.try_into().unwrap();
39
40 let order_key = node
41 .column_orders
42 .iter()
43 .map(ColumnOrder::from_protobuf)
44 .collect();
45
46 let table = node.get_table()?;
47 let versioned = table.version.is_some();
48 let refreshable = table.refreshable;
49
50 let mut conflict_behavior =
51 ConflictBehavior::from_protobuf(&table.handle_pk_conflict_behavior());
52 if params
53 .config
54 .developer
55 .materialize_force_overwrite_on_no_check
56 && conflict_behavior == ConflictBehavior::NoCheck
57 {
58 conflict_behavior = ConflictBehavior::Overwrite;
59 }
60 let version_column_indices: Vec<u32> = table.version_column_indices.clone();
61
62 let exec = if refreshable {
63 let refresh_args = RefreshableMaterializeArgs::<_, ColumnAwareSerde>::new(
65 store.clone(),
66 table,
67 node.staging_table.as_ref().unwrap(),
68 node.refresh_progress_table.as_ref().unwrap(),
69 params.vnode_bitmap.clone().map(Arc::new),
70 )
71 .await;
72
73 MaterializeExecutor::<_, ColumnAwareSerde>::new(
75 input,
76 params.info.schema.clone(),
77 store,
78 order_key,
79 params.actor_context,
80 params.vnode_bitmap.map(Arc::new),
81 table,
82 params.watermark_epoch,
83 conflict_behavior,
84 version_column_indices.clone(),
85 params.executor_stats.clone(),
86 Some(refresh_args),
87 node.cleaned_by_ttl_watermark,
88 params.local_barrier_manager.clone(),
89 )
90 .await
91 .boxed()
92 } else {
93 if versioned {
95 MaterializeExecutor::<_, ColumnAwareSerde>::new(
96 input,
97 params.info.schema.clone(),
98 store,
99 order_key,
100 params.actor_context,
101 params.vnode_bitmap.map(Arc::new),
102 table,
103 params.watermark_epoch,
104 conflict_behavior,
105 version_column_indices.clone(),
106 params.executor_stats.clone(),
107 None, node.cleaned_by_ttl_watermark,
109 params.local_barrier_manager.clone(),
110 )
111 .await
112 .boxed()
113 } else {
114 MaterializeExecutor::<_, BasicSerde>::new(
115 input,
116 params.info.schema.clone(),
117 store,
118 order_key,
119 params.actor_context,
120 params.vnode_bitmap.map(Arc::new),
121 table,
122 params.watermark_epoch,
123 conflict_behavior,
124 version_column_indices.clone(),
125 params.executor_stats.clone(),
126 None, node.cleaned_by_ttl_watermark,
128 params.local_barrier_manager.clone(),
129 )
130 .await
131 .boxed()
132 }
133 };
134
135 Ok((params.info, exec).into())
136 }
137}
138
139pub struct ArrangeExecutorBuilder;
140
141impl_stream_node_body!(Arrange(ArrangeNode) => ArrangeExecutorBuilder);
142
143impl ExecutorBuilder for ArrangeExecutorBuilder {
144 type Node = ArrangeNode;
145
146 async fn new_boxed_executor(
147 params: ExecutorParams,
148 node: &Self::Node,
149 store: impl StateStore,
150 ) -> StreamResult<Executor> {
151 let [input]: [_; 1] = params.input.try_into().unwrap();
152
153 let keys = node
154 .get_table_info()?
155 .arrange_key_orders
156 .iter()
157 .map(ColumnOrder::from_protobuf)
158 .collect();
159
160 let table = node.get_table()?;
161
162 let vnodes = params.vnode_bitmap.map(Arc::new);
165 let conflict_behavior =
166 ConflictBehavior::from_protobuf(&table.handle_pk_conflict_behavior());
167 let version_column_indices: Vec<u32> = table.version_column_indices.clone();
168 let exec = MaterializeExecutor::<_, BasicSerde>::new(
169 input,
170 params.info.schema.clone(),
171 store,
172 keys,
173 params.actor_context,
174 vnodes,
175 table,
176 params.watermark_epoch,
177 conflict_behavior,
178 version_column_indices,
179 params.executor_stats.clone(),
180 None, false, params.local_barrier_manager.clone(),
183 )
184 .await;
185
186 Ok((params.info, exec).into())
187 }
188}