Skip to main content

risingwave_stream/from_proto/
stream_scan.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use anyhow::anyhow;
18use risingwave_common::catalog::ColumnId;
19use risingwave_common::util::value_encoding::BasicSerde;
20use risingwave_common::util::value_encoding::column_aware_row_encoding::ColumnAwareSerde;
21use risingwave_pb::plan_common::StorageTableDesc;
22use risingwave_pb::stream_plan::{StreamScanNode, StreamScanType};
23use risingwave_storage::table::batch_table::BatchTable;
24
25use super::*;
26use crate::common::table::state_table::{ReplicatedStateTable, StateTableBuilder};
27use crate::executor::{
28    ArrangementBackfillExecutor, BackfillExecutor, ChainExecutor, RearrangedChainExecutor,
29    TroublemakerExecutor, UpstreamTableExecutor,
30};
31
32pub struct StreamScanExecutorBuilder;
33
34impl_stream_node_body!(StreamScan(StreamScanNode) => StreamScanExecutorBuilder);
35
36impl ExecutorBuilder for StreamScanExecutorBuilder {
37    type Node = StreamScanNode;
38
39    async fn new_boxed_executor(
40        params: ExecutorParams,
41        node: &Self::Node,
42        state_store: impl StateStore,
43    ) -> StreamResult<Executor> {
44        // For reporting the progress.
45        let progress = params
46            .local_barrier_manager
47            .register_create_mview_progress(&params.actor_context);
48
49        let output_indices = node
50            .output_indices
51            .iter()
52            .map(|&i| i as usize)
53            .collect_vec();
54
55        let exec = match node.stream_scan_type() {
56            StreamScanType::Chain | StreamScanType::UpstreamOnly => {
57                let [upstream, snapshot]: [_; 2] = params.input.try_into().unwrap();
58                let upstream_only = matches!(node.stream_scan_type(), StreamScanType::UpstreamOnly);
59                ChainExecutor::new(snapshot, upstream, progress, upstream_only).boxed()
60            }
61            StreamScanType::Rearrange => {
62                let [upstream, snapshot]: [_; 2] = params.input.try_into().unwrap();
63                RearrangedChainExecutor::new(snapshot, upstream, progress).boxed()
64            }
65
66            StreamScanType::Backfill => {
67                let [upstream, _]: [_; 2] = params.input.try_into().unwrap();
68                let table_desc: &StorageTableDesc = node.get_table_desc()?;
69
70                let column_ids = node
71                    .upstream_column_ids
72                    .iter()
73                    .map(ColumnId::from)
74                    .collect_vec();
75
76                let vnodes = params.vnode_bitmap.map(Arc::new);
77
78                let state_table = if let Ok(table) = node.get_state_table() {
79                    Some(
80                        StateTableBuilder::new(table, state_store.clone(), vnodes.clone())
81                            .enable_preload_all_rows_by_config(&params.config)
82                            .build()
83                            .await,
84                    )
85                } else {
86                    None
87                };
88
89                let upstream_table =
90                    BatchTable::new_partial(state_store.clone(), column_ids, vnodes, table_desc);
91
92                BackfillExecutor::new(
93                    upstream_table,
94                    upstream,
95                    state_table,
96                    output_indices,
97                    progress,
98                    params.executor_stats.clone(),
99                    params.config.developer.chunk_size,
100                    node.rate_limit.into(),
101                    params.fragment_id,
102                )
103                .boxed()
104            }
105            StreamScanType::ArrangementBackfill => {
106                let [upstream, _]: [_; 2] = params.input.try_into().unwrap();
107                let column_ids = node
108                    .upstream_column_ids
109                    .iter()
110                    .map(ColumnId::from)
111                    .collect_vec();
112
113                let vnodes = params.vnode_bitmap.map(Arc::new);
114
115                let state_table = node.get_state_table().unwrap();
116                let state_table =
117                    StateTableBuilder::new(state_table, state_store.clone(), vnodes.clone())
118                        .enable_preload_all_rows_by_config(&params.config)
119                        .build()
120                        .await;
121
122                let upstream_table = node.get_arrangement_table().unwrap();
123                let versioned = upstream_table.get_version().is_ok();
124
125                macro_rules! new_executor {
126                    ($SD:ident) => {{
127                        // TODO: can it be ConsistentOldValue?
128                        let upstream_table = ReplicatedStateTable::new_replicated(
129                            upstream_table,
130                            state_store.clone(),
131                            vnodes,
132                            column_ids,
133                        )
134                        .await;
135                        ArrangementBackfillExecutor::<_, $SD>::new(
136                            upstream_table,
137                            upstream,
138                            state_table,
139                            output_indices,
140                            progress,
141                            params.executor_stats.clone(),
142                            params.config.developer.chunk_size,
143                            node.rate_limit.into(),
144                            params.fragment_id,
145                        )
146                        .boxed()
147                    }};
148                }
149                if versioned {
150                    new_executor!(ColumnAwareSerde)
151                } else {
152                    new_executor!(BasicSerde)
153                }
154            }
155            StreamScanType::CrossDbSnapshotBackfill => {
156                let table_desc: &StorageTableDesc = node.get_table_desc()?;
157                assert!(params.input.is_empty());
158
159                let output_indices = node
160                    .output_indices
161                    .iter()
162                    .map(|&i| i as usize)
163                    .collect_vec();
164
165                let column_ids = node
166                    .upstream_column_ids
167                    .iter()
168                    .map(ColumnId::from)
169                    .collect_vec();
170
171                let vnodes = params.vnode_bitmap.map(Arc::new);
172                let barrier_rx = params
173                    .local_barrier_manager
174                    .subscribe_barrier(params.actor_context.id);
175
176                let upstream_table = BatchTable::new_partial(
177                    state_store.clone(),
178                    column_ids,
179                    vnodes.clone(),
180                    table_desc,
181                );
182
183                let state_table = node.get_state_table()?;
184                let state_table = StateTableBuilder::new(state_table, state_store.clone(), vnodes)
185                    .enable_preload_all_rows_by_config(&params.config)
186                    .build()
187                    .await;
188
189                let chunk_size = params.config.developer.chunk_size;
190                let snapshot_epoch = node
191                    .snapshot_backfill_epoch
192                    .ok_or_else(|| anyhow!("snapshot epoch not set for {:?}", node))?;
193
194                UpstreamTableExecutor::new(
195                    upstream_table.table_id(),
196                    upstream_table,
197                    state_table,
198                    snapshot_epoch,
199                    output_indices,
200                    chunk_size,
201                    node.rate_limit.into(),
202                    params.actor_context,
203                    barrier_rx,
204                    progress,
205                )
206                .boxed()
207            }
208            StreamScanType::SnapshotBackfill => {
209                unreachable!(
210                    "SnapshotBackfillExecutor is handled specially when in `StreamActorManager::create_nodes_inner`"
211                )
212            }
213            StreamScanType::Unspecified => unreachable!(),
214        };
215
216        if crate::consistency::insane() {
217            let mut info = params.info.clone();
218            info.identity = format!("{} (troubled)", info.identity);
219            Ok((
220                params.info,
221                TroublemakerExecutor::new((info, exec).into(), params.config.developer.chunk_size),
222            )
223                .into())
224        } else {
225            Ok((params.info, exec).into())
226        }
227    }
228}