risingwave_stream/from_proto/
stream_scan.rs1use std::sync::Arc;
16
17use anyhow::anyhow;
18use risingwave_common::catalog::ColumnId;
19use risingwave_common::util::value_encoding::BasicSerde;
20use risingwave_common::util::value_encoding::column_aware_row_encoding::ColumnAwareSerde;
21use risingwave_pb::plan_common::StorageTableDesc;
22use risingwave_pb::stream_plan::{StreamScanNode, StreamScanType};
23use risingwave_storage::table::batch_table::BatchTable;
24
25use super::*;
26use crate::common::table::state_table::{ReplicatedStateTable, StateTableBuilder};
27use crate::executor::{
28 ArrangementBackfillExecutor, BackfillExecutor, ChainExecutor, RearrangedChainExecutor,
29 TroublemakerExecutor, UpstreamTableExecutor,
30};
31
32pub struct StreamScanExecutorBuilder;
33
34impl ExecutorBuilder for StreamScanExecutorBuilder {
35 type Node = StreamScanNode;
36
37 async fn new_boxed_executor(
38 params: ExecutorParams,
39 node: &Self::Node,
40 state_store: impl StateStore,
41 ) -> StreamResult<Executor> {
42 let progress = params
44 .local_barrier_manager
45 .register_create_mview_progress(¶ms.actor_context);
46
47 let output_indices = node
48 .output_indices
49 .iter()
50 .map(|&i| i as usize)
51 .collect_vec();
52
53 let exec = match node.stream_scan_type() {
54 StreamScanType::Chain | StreamScanType::UpstreamOnly => {
55 let [upstream, snapshot]: [_; 2] = params.input.try_into().unwrap();
56 let upstream_only = matches!(node.stream_scan_type(), StreamScanType::UpstreamOnly);
57 ChainExecutor::new(snapshot, upstream, progress, upstream_only).boxed()
58 }
59 StreamScanType::Rearrange => {
60 let [upstream, snapshot]: [_; 2] = params.input.try_into().unwrap();
61 RearrangedChainExecutor::new(snapshot, upstream, progress).boxed()
62 }
63
64 StreamScanType::Backfill => {
65 let [upstream, _]: [_; 2] = params.input.try_into().unwrap();
66 let table_desc: &StorageTableDesc = node.get_table_desc()?;
67
68 let column_ids = node
69 .upstream_column_ids
70 .iter()
71 .map(ColumnId::from)
72 .collect_vec();
73
74 let vnodes = params.vnode_bitmap.map(Arc::new);
75
76 let state_table = if let Ok(table) = node.get_state_table() {
77 Some(
78 StateTableBuilder::new(table, state_store.clone(), vnodes.clone())
79 .enable_preload_all_rows_by_config(¶ms.config)
80 .build()
81 .await,
82 )
83 } else {
84 None
85 };
86
87 let upstream_table =
88 BatchTable::new_partial(state_store.clone(), column_ids, vnodes, table_desc);
89
90 BackfillExecutor::new(
91 upstream_table,
92 upstream,
93 state_table,
94 output_indices,
95 progress,
96 params.executor_stats.clone(),
97 params.config.developer.chunk_size,
98 node.rate_limit.into(),
99 params.fragment_id,
100 )
101 .boxed()
102 }
103 StreamScanType::ArrangementBackfill => {
104 let [upstream, _]: [_; 2] = params.input.try_into().unwrap();
105 let column_ids = node
106 .upstream_column_ids
107 .iter()
108 .map(ColumnId::from)
109 .collect_vec();
110
111 let vnodes = params.vnode_bitmap.map(Arc::new);
112
113 let state_table = node.get_state_table().unwrap();
114 let state_table =
115 StateTableBuilder::new(state_table, state_store.clone(), vnodes.clone())
116 .enable_preload_all_rows_by_config(¶ms.config)
117 .build()
118 .await;
119
120 let upstream_table = node.get_arrangement_table().unwrap();
121 let versioned = upstream_table.get_version().is_ok();
122
123 macro_rules! new_executor {
124 ($SD:ident) => {{
125 let upstream_table = ReplicatedStateTable::new_replicated(
127 upstream_table,
128 state_store.clone(),
129 vnodes,
130 column_ids,
131 )
132 .await;
133 ArrangementBackfillExecutor::<_, $SD>::new(
134 upstream_table,
135 upstream,
136 state_table,
137 output_indices,
138 progress,
139 params.executor_stats.clone(),
140 params.config.developer.chunk_size,
141 node.rate_limit.into(),
142 params.fragment_id,
143 )
144 .boxed()
145 }};
146 }
147 if versioned {
148 new_executor!(ColumnAwareSerde)
149 } else {
150 new_executor!(BasicSerde)
151 }
152 }
153 StreamScanType::CrossDbSnapshotBackfill => {
154 let table_desc: &StorageTableDesc = node.get_table_desc()?;
155 assert!(params.input.is_empty());
156
157 let output_indices = node
158 .output_indices
159 .iter()
160 .map(|&i| i as usize)
161 .collect_vec();
162
163 let column_ids = node
164 .upstream_column_ids
165 .iter()
166 .map(ColumnId::from)
167 .collect_vec();
168
169 let vnodes = params.vnode_bitmap.map(Arc::new);
170 let barrier_rx = params
171 .local_barrier_manager
172 .subscribe_barrier(params.actor_context.id);
173
174 let upstream_table = BatchTable::new_partial(
175 state_store.clone(),
176 column_ids,
177 vnodes.clone(),
178 table_desc,
179 );
180
181 let state_table = node.get_state_table()?;
182 let state_table = StateTableBuilder::new(state_table, state_store.clone(), vnodes)
183 .enable_preload_all_rows_by_config(¶ms.config)
184 .build()
185 .await;
186
187 let chunk_size = params.config.developer.chunk_size;
188 let snapshot_epoch = node
189 .snapshot_backfill_epoch
190 .ok_or_else(|| anyhow!("snapshot epoch not set for {:?}", node))?;
191
192 UpstreamTableExecutor::new(
193 upstream_table.table_id(),
194 upstream_table,
195 state_table,
196 snapshot_epoch,
197 output_indices,
198 chunk_size,
199 node.rate_limit.into(),
200 params.actor_context,
201 barrier_rx,
202 progress,
203 )
204 .boxed()
205 }
206 StreamScanType::SnapshotBackfill => {
207 unreachable!(
208 "SnapshotBackfillExecutor is handled specially when in `StreamActorManager::create_nodes_inner`"
209 )
210 }
211 StreamScanType::Unspecified => unreachable!(),
212 };
213
214 if crate::consistency::insane() {
215 let mut info = params.info.clone();
216 info.identity = format!("{} (troubled)", info.identity);
217 Ok((
218 params.info,
219 TroublemakerExecutor::new((info, exec).into(), params.config.developer.chunk_size),
220 )
221 .into())
222 } else {
223 Ok((params.info, exec).into())
224 }
225 }
226}