risingwave_stream/from_proto/
mview.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use risingwave_common::catalog::ConflictBehavior;
18use risingwave_common::util::sort_util::ColumnOrder;
19use risingwave_common::util::value_encoding::BasicSerde;
20use risingwave_common::util::value_encoding::column_aware_row_encoding::ColumnAwareSerde;
21use risingwave_pb::stream_plan::{ArrangeNode, MaterializeNode};
22
23use super::*;
24use crate::executor::MaterializeExecutor;
25use crate::executor::materialize::RefreshableMaterializeArgs;
26
27pub struct MaterializeExecutorBuilder;
28
29impl ExecutorBuilder for MaterializeExecutorBuilder {
30    type Node = MaterializeNode;
31
32    async fn new_boxed_executor(
33        params: ExecutorParams,
34        node: &Self::Node,
35        store: impl StateStore,
36    ) -> StreamResult<Executor> {
37        let [input]: [_; 1] = params.input.try_into().unwrap();
38
39        let order_key = node
40            .column_orders
41            .iter()
42            .map(ColumnOrder::from_protobuf)
43            .collect();
44
45        let table = node.get_table()?;
46        let versioned = table.version.is_some();
47        let refreshable = table.refreshable;
48
49        let conflict_behavior =
50            ConflictBehavior::from_protobuf(&table.handle_pk_conflict_behavior());
51        let version_column_indices: Vec<u32> = table.version_column_indices.clone();
52
53        let exec = if refreshable {
54            // Create refresh args for refreshable tables
55            let refresh_args = RefreshableMaterializeArgs::<_, ColumnAwareSerde>::new(
56                store.clone(),
57                table,
58                node.staging_table.as_ref().unwrap(),
59                node.refresh_progress_table.as_ref().unwrap(),
60                params.vnode_bitmap.clone().map(Arc::new),
61            )
62            .await;
63
64            // Use unified MaterializeExecutor with refresh args
65            MaterializeExecutor::<_, ColumnAwareSerde>::new(
66                input,
67                params.info.schema.clone(),
68                store,
69                order_key,
70                params.actor_context,
71                params.vnode_bitmap.map(Arc::new),
72                table,
73                params.watermark_epoch,
74                conflict_behavior,
75                version_column_indices.clone(),
76                params.executor_stats.clone(),
77                Some(refresh_args),
78                params.local_barrier_manager.clone(),
79            )
80            .await
81            .boxed()
82        } else {
83            // Use standard MaterializeExecutor for regular tables (no refresh args)
84            if versioned {
85                MaterializeExecutor::<_, ColumnAwareSerde>::new(
86                    input,
87                    params.info.schema.clone(),
88                    store,
89                    order_key,
90                    params.actor_context,
91                    params.vnode_bitmap.map(Arc::new),
92                    table,
93                    params.watermark_epoch,
94                    conflict_behavior,
95                    version_column_indices.clone(),
96                    params.executor_stats.clone(),
97                    None, // No refresh args for regular tables
98                    params.local_barrier_manager.clone(),
99                )
100                .await
101                .boxed()
102            } else {
103                MaterializeExecutor::<_, BasicSerde>::new(
104                    input,
105                    params.info.schema.clone(),
106                    store,
107                    order_key,
108                    params.actor_context,
109                    params.vnode_bitmap.map(Arc::new),
110                    table,
111                    params.watermark_epoch,
112                    conflict_behavior,
113                    version_column_indices.clone(),
114                    params.executor_stats.clone(),
115                    None, // No refresh args for regular tables
116                    params.local_barrier_manager.clone(),
117                )
118                .await
119                .boxed()
120            }
121        };
122
123        Ok((params.info, exec).into())
124    }
125}
126
127pub struct ArrangeExecutorBuilder;
128
129impl ExecutorBuilder for ArrangeExecutorBuilder {
130    type Node = ArrangeNode;
131
132    async fn new_boxed_executor(
133        params: ExecutorParams,
134        node: &Self::Node,
135        store: impl StateStore,
136    ) -> StreamResult<Executor> {
137        let [input]: [_; 1] = params.input.try_into().unwrap();
138
139        let keys = node
140            .get_table_info()?
141            .arrange_key_orders
142            .iter()
143            .map(ColumnOrder::from_protobuf)
144            .collect();
145
146        let table = node.get_table()?;
147
148        // FIXME: Lookup is now implemented without cell-based table API and relies on all vnodes
149        // being `SINGLETON_VNODE`, so we need to make the Arrange a singleton.
150        let vnodes = params.vnode_bitmap.map(Arc::new);
151        let conflict_behavior =
152            ConflictBehavior::from_protobuf(&table.handle_pk_conflict_behavior());
153        let version_column_indices: Vec<u32> = table.version_column_indices.clone();
154        let exec = MaterializeExecutor::<_, BasicSerde>::new(
155            input,
156            params.info.schema.clone(),
157            store,
158            keys,
159            params.actor_context,
160            vnodes,
161            table,
162            params.watermark_epoch,
163            conflict_behavior,
164            version_column_indices,
165            params.executor_stats.clone(),
166            None, // ArrangeExecutor doesn't support refresh functionality
167            params.local_barrier_manager.clone(),
168        )
169        .await;
170
171        Ok((params.info, exec).into())
172    }
173}