Skip to main content

risingwave_stream/from_proto/
mview.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use risingwave_common::catalog::ConflictBehavior;
18use risingwave_common::util::sort_util::ColumnOrder;
19use risingwave_common::util::value_encoding::BasicSerde;
20use risingwave_common::util::value_encoding::column_aware_row_encoding::ColumnAwareSerde;
21use risingwave_pb::stream_plan::{ArrangeNode, MaterializeNode};
22
23use super::*;
24use crate::executor::{MaterializeExecutor, RefreshableMaterializeArgs};
25
26pub struct MaterializeExecutorBuilder;
27
28impl_stream_node_body!(Materialize(MaterializeNode) => MaterializeExecutorBuilder);
29
30impl ExecutorBuilder for MaterializeExecutorBuilder {
31    type Node = MaterializeNode;
32
33    async fn new_boxed_executor(
34        params: ExecutorParams,
35        node: &Self::Node,
36        store: impl StateStore,
37    ) -> StreamResult<Executor> {
38        let [input]: [_; 1] = params.input.try_into().unwrap();
39
40        let order_key = node
41            .column_orders
42            .iter()
43            .map(ColumnOrder::from_protobuf)
44            .collect();
45
46        let table = node.get_table()?;
47        let versioned = table.version.is_some();
48        let refreshable = table.refreshable;
49
50        let mut conflict_behavior =
51            ConflictBehavior::from_protobuf(&table.handle_pk_conflict_behavior());
52        if params
53            .config
54            .developer
55            .materialize_force_overwrite_on_no_check
56            && conflict_behavior == ConflictBehavior::NoCheck
57        {
58            conflict_behavior = ConflictBehavior::Overwrite;
59        }
60        let version_column_indices: Vec<u32> = table.version_column_indices.clone();
61
62        let exec = if refreshable {
63            // Create refresh args for refreshable tables
64            let refresh_args = RefreshableMaterializeArgs::<_, ColumnAwareSerde>::new(
65                store.clone(),
66                table,
67                node.staging_table.as_ref().unwrap(),
68                node.refresh_progress_table.as_ref().unwrap(),
69                params.vnode_bitmap.clone().map(Arc::new),
70            )
71            .await;
72
73            // Use unified MaterializeExecutor with refresh args
74            MaterializeExecutor::<_, ColumnAwareSerde>::new(
75                input,
76                params.info.schema.clone(),
77                store,
78                order_key,
79                params.actor_context,
80                params.vnode_bitmap.map(Arc::new),
81                table,
82                params.watermark_epoch,
83                conflict_behavior,
84                version_column_indices.clone(),
85                params.executor_stats.clone(),
86                Some(refresh_args),
87                node.cleaned_by_ttl_watermark,
88                params.local_barrier_manager.clone(),
89            )
90            .await
91            .boxed()
92        } else {
93            // Use standard MaterializeExecutor for regular tables (no refresh args)
94            if versioned {
95                MaterializeExecutor::<_, ColumnAwareSerde>::new(
96                    input,
97                    params.info.schema.clone(),
98                    store,
99                    order_key,
100                    params.actor_context,
101                    params.vnode_bitmap.map(Arc::new),
102                    table,
103                    params.watermark_epoch,
104                    conflict_behavior,
105                    version_column_indices.clone(),
106                    params.executor_stats.clone(),
107                    None, // No refresh args for regular tables
108                    node.cleaned_by_ttl_watermark,
109                    params.local_barrier_manager.clone(),
110                )
111                .await
112                .boxed()
113            } else {
114                MaterializeExecutor::<_, BasicSerde>::new(
115                    input,
116                    params.info.schema.clone(),
117                    store,
118                    order_key,
119                    params.actor_context,
120                    params.vnode_bitmap.map(Arc::new),
121                    table,
122                    params.watermark_epoch,
123                    conflict_behavior,
124                    version_column_indices.clone(),
125                    params.executor_stats.clone(),
126                    None, // No refresh args for regular tables
127                    node.cleaned_by_ttl_watermark,
128                    params.local_barrier_manager.clone(),
129                )
130                .await
131                .boxed()
132            }
133        };
134
135        Ok((params.info, exec).into())
136    }
137}
138
139pub struct ArrangeExecutorBuilder;
140
141impl_stream_node_body!(Arrange(ArrangeNode) => ArrangeExecutorBuilder);
142
143impl ExecutorBuilder for ArrangeExecutorBuilder {
144    type Node = ArrangeNode;
145
146    async fn new_boxed_executor(
147        params: ExecutorParams,
148        node: &Self::Node,
149        store: impl StateStore,
150    ) -> StreamResult<Executor> {
151        let [input]: [_; 1] = params.input.try_into().unwrap();
152
153        let keys = node
154            .get_table_info()?
155            .arrange_key_orders
156            .iter()
157            .map(ColumnOrder::from_protobuf)
158            .collect();
159
160        let table = node.get_table()?;
161
162        // FIXME: Lookup is now implemented without cell-based table API and relies on all vnodes
163        // being `SINGLETON_VNODE`, so we need to make the Arrange a singleton.
164        let vnodes = params.vnode_bitmap.map(Arc::new);
165        let conflict_behavior =
166            ConflictBehavior::from_protobuf(&table.handle_pk_conflict_behavior());
167        let version_column_indices: Vec<u32> = table.version_column_indices.clone();
168        let exec = MaterializeExecutor::<_, BasicSerde>::new(
169            input,
170            params.info.schema.clone(),
171            store,
172            keys,
173            params.actor_context,
174            vnodes,
175            table,
176            params.watermark_epoch,
177            conflict_behavior,
178            version_column_indices,
179            params.executor_stats.clone(),
180            None,  // ArrangeExecutor doesn't support refresh functionality
181            false, // ArrangeExecutor doesn't support TTL watermark
182            params.local_barrier_manager.clone(),
183        )
184        .await;
185
186        Ok((params.info, exec).into())
187    }
188}