risingwave_stream/from_proto/
mview.rs1use std::sync::Arc;
16
17use risingwave_common::catalog::ConflictBehavior;
18use risingwave_common::util::sort_util::ColumnOrder;
19use risingwave_common::util::value_encoding::BasicSerde;
20use risingwave_common::util::value_encoding::column_aware_row_encoding::ColumnAwareSerde;
21use risingwave_pb::stream_plan::{ArrangeNode, MaterializeNode};
22
23use super::*;
24use crate::executor::{MaterializeExecutor, RefreshableMaterializeArgs};
25
26pub struct MaterializeExecutorBuilder;
27
28impl ExecutorBuilder for MaterializeExecutorBuilder {
29 type Node = MaterializeNode;
30
31 async fn new_boxed_executor(
32 params: ExecutorParams,
33 node: &Self::Node,
34 store: impl StateStore,
35 ) -> StreamResult<Executor> {
36 let [input]: [_; 1] = params.input.try_into().unwrap();
37
38 let order_key = node
39 .column_orders
40 .iter()
41 .map(ColumnOrder::from_protobuf)
42 .collect();
43
44 let table = node.get_table()?;
45 let versioned = table.version.is_some();
46 let refreshable = table.refreshable;
47
48 let conflict_behavior =
49 ConflictBehavior::from_protobuf(&table.handle_pk_conflict_behavior());
50 let version_column_indices: Vec<u32> = table.version_column_indices.clone();
51
52 let exec = if refreshable {
53 let refresh_args = RefreshableMaterializeArgs::<_, ColumnAwareSerde>::new(
55 store.clone(),
56 table,
57 node.staging_table.as_ref().unwrap(),
58 node.refresh_progress_table.as_ref().unwrap(),
59 params.vnode_bitmap.clone().map(Arc::new),
60 )
61 .await;
62
63 MaterializeExecutor::<_, ColumnAwareSerde>::new(
65 input,
66 params.info.schema.clone(),
67 store,
68 order_key,
69 params.actor_context,
70 params.vnode_bitmap.map(Arc::new),
71 table,
72 params.watermark_epoch,
73 conflict_behavior,
74 version_column_indices.clone(),
75 params.executor_stats.clone(),
76 Some(refresh_args),
77 params.local_barrier_manager.clone(),
78 )
79 .await
80 .boxed()
81 } else {
82 if versioned {
84 MaterializeExecutor::<_, ColumnAwareSerde>::new(
85 input,
86 params.info.schema.clone(),
87 store,
88 order_key,
89 params.actor_context,
90 params.vnode_bitmap.map(Arc::new),
91 table,
92 params.watermark_epoch,
93 conflict_behavior,
94 version_column_indices.clone(),
95 params.executor_stats.clone(),
96 None, params.local_barrier_manager.clone(),
98 )
99 .await
100 .boxed()
101 } else {
102 MaterializeExecutor::<_, BasicSerde>::new(
103 input,
104 params.info.schema.clone(),
105 store,
106 order_key,
107 params.actor_context,
108 params.vnode_bitmap.map(Arc::new),
109 table,
110 params.watermark_epoch,
111 conflict_behavior,
112 version_column_indices.clone(),
113 params.executor_stats.clone(),
114 None, params.local_barrier_manager.clone(),
116 )
117 .await
118 .boxed()
119 }
120 };
121
122 Ok((params.info, exec).into())
123 }
124}
125
126pub struct ArrangeExecutorBuilder;
127
128impl ExecutorBuilder for ArrangeExecutorBuilder {
129 type Node = ArrangeNode;
130
131 async fn new_boxed_executor(
132 params: ExecutorParams,
133 node: &Self::Node,
134 store: impl StateStore,
135 ) -> StreamResult<Executor> {
136 let [input]: [_; 1] = params.input.try_into().unwrap();
137
138 let keys = node
139 .get_table_info()?
140 .arrange_key_orders
141 .iter()
142 .map(ColumnOrder::from_protobuf)
143 .collect();
144
145 let table = node.get_table()?;
146
147 let vnodes = params.vnode_bitmap.map(Arc::new);
150 let conflict_behavior =
151 ConflictBehavior::from_protobuf(&table.handle_pk_conflict_behavior());
152 let version_column_indices: Vec<u32> = table.version_column_indices.clone();
153 let exec = MaterializeExecutor::<_, BasicSerde>::new(
154 input,
155 params.info.schema.clone(),
156 store,
157 keys,
158 params.actor_context,
159 vnodes,
160 table,
161 params.watermark_epoch,
162 conflict_behavior,
163 version_column_indices,
164 params.executor_stats.clone(),
165 None, params.local_barrier_manager.clone(),
167 )
168 .await;
169
170 Ok((params.info, exec).into())
171 }
172}