risingwave_stream/from_proto/
mview.rs1use std::sync::Arc;
16
17use risingwave_common::catalog::ConflictBehavior;
18use risingwave_common::util::sort_util::ColumnOrder;
19use risingwave_common::util::value_encoding::BasicSerde;
20use risingwave_common::util::value_encoding::column_aware_row_encoding::ColumnAwareSerde;
21use risingwave_pb::stream_plan::{ArrangeNode, MaterializeNode};
22
23use super::*;
24use crate::executor::{MaterializeExecutor, RefreshableMaterializeArgs};
25
26pub struct MaterializeExecutorBuilder;
27
28impl ExecutorBuilder for MaterializeExecutorBuilder {
29 type Node = MaterializeNode;
30
31 async fn new_boxed_executor(
32 params: ExecutorParams,
33 node: &Self::Node,
34 store: impl StateStore,
35 ) -> StreamResult<Executor> {
36 let [input]: [_; 1] = params.input.try_into().unwrap();
37
38 let order_key = node
39 .column_orders
40 .iter()
41 .map(ColumnOrder::from_protobuf)
42 .collect();
43
44 let table = node.get_table()?;
45 let versioned = table.version.is_some();
46 let refreshable = table.refreshable;
47
48 let mut conflict_behavior =
49 ConflictBehavior::from_protobuf(&table.handle_pk_conflict_behavior());
50 if params
51 .config
52 .developer
53 .materialize_force_overwrite_on_no_check
54 && conflict_behavior == ConflictBehavior::NoCheck
55 {
56 conflict_behavior = ConflictBehavior::Overwrite;
57 }
58 let version_column_indices: Vec<u32> = table.version_column_indices.clone();
59
60 let exec = if refreshable {
61 let refresh_args = RefreshableMaterializeArgs::<_, ColumnAwareSerde>::new(
63 store.clone(),
64 table,
65 node.staging_table.as_ref().unwrap(),
66 node.refresh_progress_table.as_ref().unwrap(),
67 params.vnode_bitmap.clone().map(Arc::new),
68 )
69 .await;
70
71 MaterializeExecutor::<_, ColumnAwareSerde>::new(
73 input,
74 params.info.schema.clone(),
75 store,
76 order_key,
77 params.actor_context,
78 params.vnode_bitmap.map(Arc::new),
79 table,
80 params.watermark_epoch,
81 conflict_behavior,
82 version_column_indices.clone(),
83 params.executor_stats.clone(),
84 Some(refresh_args),
85 node.cleaned_by_ttl_watermark,
86 params.local_barrier_manager.clone(),
87 )
88 .await
89 .boxed()
90 } else {
91 if versioned {
93 MaterializeExecutor::<_, ColumnAwareSerde>::new(
94 input,
95 params.info.schema.clone(),
96 store,
97 order_key,
98 params.actor_context,
99 params.vnode_bitmap.map(Arc::new),
100 table,
101 params.watermark_epoch,
102 conflict_behavior,
103 version_column_indices.clone(),
104 params.executor_stats.clone(),
105 None, node.cleaned_by_ttl_watermark,
107 params.local_barrier_manager.clone(),
108 )
109 .await
110 .boxed()
111 } else {
112 MaterializeExecutor::<_, BasicSerde>::new(
113 input,
114 params.info.schema.clone(),
115 store,
116 order_key,
117 params.actor_context,
118 params.vnode_bitmap.map(Arc::new),
119 table,
120 params.watermark_epoch,
121 conflict_behavior,
122 version_column_indices.clone(),
123 params.executor_stats.clone(),
124 None, node.cleaned_by_ttl_watermark,
126 params.local_barrier_manager.clone(),
127 )
128 .await
129 .boxed()
130 }
131 };
132
133 Ok((params.info, exec).into())
134 }
135}
136
137pub struct ArrangeExecutorBuilder;
138
139impl ExecutorBuilder for ArrangeExecutorBuilder {
140 type Node = ArrangeNode;
141
142 async fn new_boxed_executor(
143 params: ExecutorParams,
144 node: &Self::Node,
145 store: impl StateStore,
146 ) -> StreamResult<Executor> {
147 let [input]: [_; 1] = params.input.try_into().unwrap();
148
149 let keys = node
150 .get_table_info()?
151 .arrange_key_orders
152 .iter()
153 .map(ColumnOrder::from_protobuf)
154 .collect();
155
156 let table = node.get_table()?;
157
158 let vnodes = params.vnode_bitmap.map(Arc::new);
161 let conflict_behavior =
162 ConflictBehavior::from_protobuf(&table.handle_pk_conflict_behavior());
163 let version_column_indices: Vec<u32> = table.version_column_indices.clone();
164 let exec = MaterializeExecutor::<_, BasicSerde>::new(
165 input,
166 params.info.schema.clone(),
167 store,
168 keys,
169 params.actor_context,
170 vnodes,
171 table,
172 params.watermark_epoch,
173 conflict_behavior,
174 version_column_indices,
175 params.executor_stats.clone(),
176 None, false, params.local_barrier_manager.clone(),
179 )
180 .await;
181
182 Ok((params.info, exec).into())
183 }
184}