risingwave_stream/from_proto/
stream_scan.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use anyhow::anyhow;
18use risingwave_common::catalog::ColumnId;
19use risingwave_common::util::value_encoding::BasicSerde;
20use risingwave_common::util::value_encoding::column_aware_row_encoding::ColumnAwareSerde;
21use risingwave_pb::plan_common::StorageTableDesc;
22use risingwave_pb::stream_plan::{StreamScanNode, StreamScanType};
23use risingwave_storage::table::batch_table::BatchTable;
24
25use super::*;
26use crate::common::table::state_table::{ReplicatedStateTable, StateTableBuilder};
27use crate::executor::{
28    ArrangementBackfillExecutor, BackfillExecutor, ChainExecutor, RearrangedChainExecutor,
29    TroublemakerExecutor, UpstreamTableExecutor,
30};
31
32pub struct StreamScanExecutorBuilder;
33
34impl ExecutorBuilder for StreamScanExecutorBuilder {
35    type Node = StreamScanNode;
36
37    async fn new_boxed_executor(
38        params: ExecutorParams,
39        node: &Self::Node,
40        state_store: impl StateStore,
41    ) -> StreamResult<Executor> {
42        // For reporting the progress.
43        let progress = params
44            .local_barrier_manager
45            .register_create_mview_progress(&params.actor_context);
46
47        let output_indices = node
48            .output_indices
49            .iter()
50            .map(|&i| i as usize)
51            .collect_vec();
52
53        let exec = match node.stream_scan_type() {
54            StreamScanType::Chain | StreamScanType::UpstreamOnly => {
55                let [upstream, snapshot]: [_; 2] = params.input.try_into().unwrap();
56                let upstream_only = matches!(node.stream_scan_type(), StreamScanType::UpstreamOnly);
57                ChainExecutor::new(snapshot, upstream, progress, upstream_only).boxed()
58            }
59            StreamScanType::Rearrange => {
60                let [upstream, snapshot]: [_; 2] = params.input.try_into().unwrap();
61                RearrangedChainExecutor::new(snapshot, upstream, progress).boxed()
62            }
63
64            StreamScanType::Backfill => {
65                let [upstream, _]: [_; 2] = params.input.try_into().unwrap();
66                let table_desc: &StorageTableDesc = node.get_table_desc()?;
67
68                let column_ids = node
69                    .upstream_column_ids
70                    .iter()
71                    .map(ColumnId::from)
72                    .collect_vec();
73
74                let vnodes = params.vnode_bitmap.map(Arc::new);
75
76                let state_table = if let Ok(table) = node.get_state_table() {
77                    Some(
78                        StateTableBuilder::new(table, state_store.clone(), vnodes.clone())
79                            .enable_preload_all_rows_by_config(&params.config)
80                            .build()
81                            .await,
82                    )
83                } else {
84                    None
85                };
86
87                let upstream_table =
88                    BatchTable::new_partial(state_store.clone(), column_ids, vnodes, table_desc);
89
90                BackfillExecutor::new(
91                    upstream_table,
92                    upstream,
93                    state_table,
94                    output_indices,
95                    progress,
96                    params.executor_stats.clone(),
97                    params.config.developer.chunk_size,
98                    node.rate_limit.into(),
99                    params.fragment_id,
100                )
101                .boxed()
102            }
103            StreamScanType::ArrangementBackfill => {
104                let [upstream, _]: [_; 2] = params.input.try_into().unwrap();
105                let column_ids = node
106                    .upstream_column_ids
107                    .iter()
108                    .map(ColumnId::from)
109                    .collect_vec();
110
111                let vnodes = params.vnode_bitmap.map(Arc::new);
112
113                let state_table = node.get_state_table().unwrap();
114                let state_table =
115                    StateTableBuilder::new(state_table, state_store.clone(), vnodes.clone())
116                        .enable_preload_all_rows_by_config(&params.config)
117                        .build()
118                        .await;
119
120                let upstream_table = node.get_arrangement_table().unwrap();
121                let versioned = upstream_table.get_version().is_ok();
122
123                macro_rules! new_executor {
124                    ($SD:ident) => {{
125                        // TODO: can it be ConsistentOldValue?
126                        let upstream_table = ReplicatedStateTable::new_replicated(
127                            upstream_table,
128                            state_store.clone(),
129                            vnodes,
130                            column_ids,
131                        )
132                        .await;
133                        ArrangementBackfillExecutor::<_, $SD>::new(
134                            upstream_table,
135                            upstream,
136                            state_table,
137                            output_indices,
138                            progress,
139                            params.executor_stats.clone(),
140                            params.config.developer.chunk_size,
141                            node.rate_limit.into(),
142                            params.fragment_id,
143                        )
144                        .boxed()
145                    }};
146                }
147                if versioned {
148                    new_executor!(ColumnAwareSerde)
149                } else {
150                    new_executor!(BasicSerde)
151                }
152            }
153            StreamScanType::CrossDbSnapshotBackfill => {
154                let table_desc: &StorageTableDesc = node.get_table_desc()?;
155                assert!(params.input.is_empty());
156
157                let output_indices = node
158                    .output_indices
159                    .iter()
160                    .map(|&i| i as usize)
161                    .collect_vec();
162
163                let column_ids = node
164                    .upstream_column_ids
165                    .iter()
166                    .map(ColumnId::from)
167                    .collect_vec();
168
169                let vnodes = params.vnode_bitmap.map(Arc::new);
170                let barrier_rx = params
171                    .local_barrier_manager
172                    .subscribe_barrier(params.actor_context.id);
173
174                let upstream_table = BatchTable::new_partial(
175                    state_store.clone(),
176                    column_ids,
177                    vnodes.clone(),
178                    table_desc,
179                );
180
181                let state_table = node.get_state_table()?;
182                let state_table = StateTableBuilder::new(state_table, state_store.clone(), vnodes)
183                    .enable_preload_all_rows_by_config(&params.config)
184                    .build()
185                    .await;
186
187                let chunk_size = params.config.developer.chunk_size;
188                let snapshot_epoch = node
189                    .snapshot_backfill_epoch
190                    .ok_or_else(|| anyhow!("snapshot epoch not set for {:?}", node))?;
191
192                UpstreamTableExecutor::new(
193                    upstream_table.table_id(),
194                    upstream_table,
195                    state_table,
196                    snapshot_epoch,
197                    output_indices,
198                    chunk_size,
199                    node.rate_limit.into(),
200                    params.actor_context,
201                    barrier_rx,
202                    progress,
203                )
204                .boxed()
205            }
206            StreamScanType::SnapshotBackfill => {
207                unreachable!(
208                    "SnapshotBackfillExecutor is handled specially when in `StreamActorManager::create_nodes_inner`"
209                )
210            }
211            StreamScanType::Unspecified => unreachable!(),
212        };
213
214        if crate::consistency::insane() {
215            let mut info = params.info.clone();
216            info.identity = format!("{} (troubled)", info.identity);
217            Ok((
218                params.info,
219                TroublemakerExecutor::new((info, exec).into(), params.config.developer.chunk_size),
220            )
221                .into())
222        } else {
223            Ok((params.info, exec).into())
224        }
225    }
226}