risingwave_stream/from_proto/
mview.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use risingwave_common::catalog::ConflictBehavior;
18use risingwave_common::util::sort_util::ColumnOrder;
19use risingwave_common::util::value_encoding::BasicSerde;
20use risingwave_common::util::value_encoding::column_aware_row_encoding::ColumnAwareSerde;
21use risingwave_pb::stream_plan::{ArrangeNode, MaterializeNode};
22
23use super::*;
24use crate::executor::{MaterializeExecutor, RefreshableMaterializeArgs};
25
26pub struct MaterializeExecutorBuilder;
27
28impl ExecutorBuilder for MaterializeExecutorBuilder {
29    type Node = MaterializeNode;
30
31    async fn new_boxed_executor(
32        params: ExecutorParams,
33        node: &Self::Node,
34        store: impl StateStore,
35    ) -> StreamResult<Executor> {
36        let [input]: [_; 1] = params.input.try_into().unwrap();
37
38        let order_key = node
39            .column_orders
40            .iter()
41            .map(ColumnOrder::from_protobuf)
42            .collect();
43
44        let table = node.get_table()?;
45        let versioned = table.version.is_some();
46        let refreshable = table.refreshable;
47
48        let mut conflict_behavior =
49            ConflictBehavior::from_protobuf(&table.handle_pk_conflict_behavior());
50        if params
51            .config
52            .developer
53            .materialize_force_overwrite_on_no_check
54            && conflict_behavior == ConflictBehavior::NoCheck
55        {
56            conflict_behavior = ConflictBehavior::Overwrite;
57        }
58        let version_column_indices: Vec<u32> = table.version_column_indices.clone();
59
60        let exec = if refreshable {
61            // Create refresh args for refreshable tables
62            let refresh_args = RefreshableMaterializeArgs::<_, ColumnAwareSerde>::new(
63                store.clone(),
64                table,
65                node.staging_table.as_ref().unwrap(),
66                node.refresh_progress_table.as_ref().unwrap(),
67                params.vnode_bitmap.clone().map(Arc::new),
68            )
69            .await;
70
71            // Use unified MaterializeExecutor with refresh args
72            MaterializeExecutor::<_, ColumnAwareSerde>::new(
73                input,
74                params.info.schema.clone(),
75                store,
76                order_key,
77                params.actor_context,
78                params.vnode_bitmap.map(Arc::new),
79                table,
80                params.watermark_epoch,
81                conflict_behavior,
82                version_column_indices.clone(),
83                params.executor_stats.clone(),
84                Some(refresh_args),
85                node.cleaned_by_ttl_watermark,
86                params.local_barrier_manager.clone(),
87            )
88            .await
89            .boxed()
90        } else {
91            // Use standard MaterializeExecutor for regular tables (no refresh args)
92            if versioned {
93                MaterializeExecutor::<_, ColumnAwareSerde>::new(
94                    input,
95                    params.info.schema.clone(),
96                    store,
97                    order_key,
98                    params.actor_context,
99                    params.vnode_bitmap.map(Arc::new),
100                    table,
101                    params.watermark_epoch,
102                    conflict_behavior,
103                    version_column_indices.clone(),
104                    params.executor_stats.clone(),
105                    None, // No refresh args for regular tables
106                    node.cleaned_by_ttl_watermark,
107                    params.local_barrier_manager.clone(),
108                )
109                .await
110                .boxed()
111            } else {
112                MaterializeExecutor::<_, BasicSerde>::new(
113                    input,
114                    params.info.schema.clone(),
115                    store,
116                    order_key,
117                    params.actor_context,
118                    params.vnode_bitmap.map(Arc::new),
119                    table,
120                    params.watermark_epoch,
121                    conflict_behavior,
122                    version_column_indices.clone(),
123                    params.executor_stats.clone(),
124                    None, // No refresh args for regular tables
125                    node.cleaned_by_ttl_watermark,
126                    params.local_barrier_manager.clone(),
127                )
128                .await
129                .boxed()
130            }
131        };
132
133        Ok((params.info, exec).into())
134    }
135}
136
137pub struct ArrangeExecutorBuilder;
138
139impl ExecutorBuilder for ArrangeExecutorBuilder {
140    type Node = ArrangeNode;
141
142    async fn new_boxed_executor(
143        params: ExecutorParams,
144        node: &Self::Node,
145        store: impl StateStore,
146    ) -> StreamResult<Executor> {
147        let [input]: [_; 1] = params.input.try_into().unwrap();
148
149        let keys = node
150            .get_table_info()?
151            .arrange_key_orders
152            .iter()
153            .map(ColumnOrder::from_protobuf)
154            .collect();
155
156        let table = node.get_table()?;
157
158        // FIXME: Lookup is now implemented without cell-based table API and relies on all vnodes
159        // being `SINGLETON_VNODE`, so we need to make the Arrange a singleton.
160        let vnodes = params.vnode_bitmap.map(Arc::new);
161        let conflict_behavior =
162            ConflictBehavior::from_protobuf(&table.handle_pk_conflict_behavior());
163        let version_column_indices: Vec<u32> = table.version_column_indices.clone();
164        let exec = MaterializeExecutor::<_, BasicSerde>::new(
165            input,
166            params.info.schema.clone(),
167            store,
168            keys,
169            params.actor_context,
170            vnodes,
171            table,
172            params.watermark_epoch,
173            conflict_behavior,
174            version_column_indices,
175            params.executor_stats.clone(),
176            None,  // ArrangeExecutor doesn't support refresh functionality
177            false, // ArrangeExecutor doesn't support TTL watermark
178            params.local_barrier_manager.clone(),
179        )
180        .await;
181
182        Ok((params.info, exec).into())
183    }
184}