risingwave_stream/from_proto/
mview.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use risingwave_common::catalog::ConflictBehavior;
18use risingwave_common::util::sort_util::ColumnOrder;
19use risingwave_common::util::value_encoding::BasicSerde;
20use risingwave_common::util::value_encoding::column_aware_row_encoding::ColumnAwareSerde;
21use risingwave_pb::stream_plan::{ArrangeNode, MaterializeNode};
22
23use super::*;
24use crate::executor::{MaterializeExecutor, RefreshableMaterializeArgs};
25
26pub struct MaterializeExecutorBuilder;
27
28impl ExecutorBuilder for MaterializeExecutorBuilder {
29    type Node = MaterializeNode;
30
31    async fn new_boxed_executor(
32        params: ExecutorParams,
33        node: &Self::Node,
34        store: impl StateStore,
35    ) -> StreamResult<Executor> {
36        let [input]: [_; 1] = params.input.try_into().unwrap();
37
38        let order_key = node
39            .column_orders
40            .iter()
41            .map(ColumnOrder::from_protobuf)
42            .collect();
43
44        let table = node.get_table()?;
45        let versioned = table.version.is_some();
46        let refreshable = table.refreshable;
47
48        let conflict_behavior =
49            ConflictBehavior::from_protobuf(&table.handle_pk_conflict_behavior());
50        let version_column_indices: Vec<u32> = table.version_column_indices.clone();
51
52        let exec = if refreshable {
53            // Create refresh args for refreshable tables
54            let refresh_args = RefreshableMaterializeArgs::<_, ColumnAwareSerde>::new(
55                store.clone(),
56                table,
57                node.staging_table.as_ref().unwrap(),
58                node.refresh_progress_table.as_ref().unwrap(),
59                params.vnode_bitmap.clone().map(Arc::new),
60            )
61            .await;
62
63            // Use unified MaterializeExecutor with refresh args
64            MaterializeExecutor::<_, ColumnAwareSerde>::new(
65                input,
66                params.info.schema.clone(),
67                store,
68                order_key,
69                params.actor_context,
70                params.vnode_bitmap.map(Arc::new),
71                table,
72                params.watermark_epoch,
73                conflict_behavior,
74                version_column_indices.clone(),
75                params.executor_stats.clone(),
76                Some(refresh_args),
77                params.local_barrier_manager.clone(),
78            )
79            .await
80            .boxed()
81        } else {
82            // Use standard MaterializeExecutor for regular tables (no refresh args)
83            if versioned {
84                MaterializeExecutor::<_, ColumnAwareSerde>::new(
85                    input,
86                    params.info.schema.clone(),
87                    store,
88                    order_key,
89                    params.actor_context,
90                    params.vnode_bitmap.map(Arc::new),
91                    table,
92                    params.watermark_epoch,
93                    conflict_behavior,
94                    version_column_indices.clone(),
95                    params.executor_stats.clone(),
96                    None, // No refresh args for regular tables
97                    params.local_barrier_manager.clone(),
98                )
99                .await
100                .boxed()
101            } else {
102                MaterializeExecutor::<_, BasicSerde>::new(
103                    input,
104                    params.info.schema.clone(),
105                    store,
106                    order_key,
107                    params.actor_context,
108                    params.vnode_bitmap.map(Arc::new),
109                    table,
110                    params.watermark_epoch,
111                    conflict_behavior,
112                    version_column_indices.clone(),
113                    params.executor_stats.clone(),
114                    None, // No refresh args for regular tables
115                    params.local_barrier_manager.clone(),
116                )
117                .await
118                .boxed()
119            }
120        };
121
122        Ok((params.info, exec).into())
123    }
124}
125
126pub struct ArrangeExecutorBuilder;
127
128impl ExecutorBuilder for ArrangeExecutorBuilder {
129    type Node = ArrangeNode;
130
131    async fn new_boxed_executor(
132        params: ExecutorParams,
133        node: &Self::Node,
134        store: impl StateStore,
135    ) -> StreamResult<Executor> {
136        let [input]: [_; 1] = params.input.try_into().unwrap();
137
138        let keys = node
139            .get_table_info()?
140            .arrange_key_orders
141            .iter()
142            .map(ColumnOrder::from_protobuf)
143            .collect();
144
145        let table = node.get_table()?;
146
147        // FIXME: Lookup is now implemented without cell-based table API and relies on all vnodes
148        // being `SINGLETON_VNODE`, so we need to make the Arrange a singleton.
149        let vnodes = params.vnode_bitmap.map(Arc::new);
150        let conflict_behavior =
151            ConflictBehavior::from_protobuf(&table.handle_pk_conflict_behavior());
152        let version_column_indices: Vec<u32> = table.version_column_indices.clone();
153        let exec = MaterializeExecutor::<_, BasicSerde>::new(
154            input,
155            params.info.schema.clone(),
156            store,
157            keys,
158            params.actor_context,
159            vnodes,
160            table,
161            params.watermark_epoch,
162            conflict_behavior,
163            version_column_indices,
164            params.executor_stats.clone(),
165            None, // ArrangeExecutor doesn't support refresh functionality
166            params.local_barrier_manager.clone(),
167        )
168        .await;
169
170        Ok((params.info, exec).into())
171    }
172}