risingwave_stream/from_proto/
stream_scan.rs1use std::sync::Arc;
16
17use anyhow::anyhow;
18use risingwave_common::catalog::ColumnId;
19use risingwave_common::util::value_encoding::BasicSerde;
20use risingwave_common::util::value_encoding::column_aware_row_encoding::ColumnAwareSerde;
21use risingwave_pb::plan_common::StorageTableDesc;
22use risingwave_pb::stream_plan::{StreamScanNode, StreamScanType};
23use risingwave_storage::table::batch_table::BatchTable;
24
25use super::*;
26use crate::common::table::state_table::{ReplicatedStateTable, StateTableBuilder};
27use crate::executor::{
28 ArrangementBackfillExecutor, BackfillExecutor, ChainExecutor, RearrangedChainExecutor,
29 TroublemakerExecutor, UpstreamTableExecutor,
30};
31
32pub struct StreamScanExecutorBuilder;
33
34impl ExecutorBuilder for StreamScanExecutorBuilder {
35 type Node = StreamScanNode;
36
37 async fn new_boxed_executor(
38 params: ExecutorParams,
39 node: &Self::Node,
40 state_store: impl StateStore,
41 ) -> StreamResult<Executor> {
42 let progress = params
44 .local_barrier_manager
45 .register_create_mview_progress(params.actor_context.id);
46
47 let output_indices = node
48 .output_indices
49 .iter()
50 .map(|&i| i as usize)
51 .collect_vec();
52
53 let exec = match node.stream_scan_type() {
54 StreamScanType::Chain | StreamScanType::UpstreamOnly => {
55 let [upstream, snapshot]: [_; 2] = params.input.try_into().unwrap();
56 let upstream_only = matches!(node.stream_scan_type(), StreamScanType::UpstreamOnly);
57 ChainExecutor::new(snapshot, upstream, progress, upstream_only).boxed()
58 }
59 StreamScanType::Rearrange => {
60 let [upstream, snapshot]: [_; 2] = params.input.try_into().unwrap();
61 RearrangedChainExecutor::new(snapshot, upstream, progress).boxed()
62 }
63
64 StreamScanType::Backfill => {
65 let [upstream, _]: [_; 2] = params.input.try_into().unwrap();
66 let table_desc: &StorageTableDesc = node.get_table_desc()?;
67
68 let column_ids = node
69 .upstream_column_ids
70 .iter()
71 .map(ColumnId::from)
72 .collect_vec();
73
74 let vnodes = params.vnode_bitmap.map(Arc::new);
75
76 let state_table = if let Ok(table) = node.get_state_table() {
77 Some(
78 StateTableBuilder::new(table, state_store.clone(), vnodes.clone())
79 .enable_preload_all_rows_by_config(
80 ¶ms.actor_context.streaming_config,
81 )
82 .build()
83 .await,
84 )
85 } else {
86 None
87 };
88
89 let upstream_table =
90 BatchTable::new_partial(state_store.clone(), column_ids, vnodes, table_desc);
91
92 BackfillExecutor::new(
93 upstream_table,
94 upstream,
95 state_table,
96 output_indices,
97 progress,
98 params.executor_stats.clone(),
99 params.env.config().developer.chunk_size,
100 node.rate_limit.into(),
101 params.fragment_id,
102 )
103 .boxed()
104 }
105 StreamScanType::ArrangementBackfill => {
106 let [upstream, _]: [_; 2] = params.input.try_into().unwrap();
107 let column_ids = node
108 .upstream_column_ids
109 .iter()
110 .map(ColumnId::from)
111 .collect_vec();
112
113 let vnodes = params.vnode_bitmap.map(Arc::new);
114
115 let state_table = node.get_state_table().unwrap();
116 let state_table =
117 StateTableBuilder::new(state_table, state_store.clone(), vnodes.clone())
118 .enable_preload_all_rows_by_config(¶ms.actor_context.streaming_config)
119 .build()
120 .await;
121
122 let upstream_table = node.get_arrangement_table().unwrap();
123 let versioned = upstream_table.get_version().is_ok();
124
125 macro_rules! new_executor {
126 ($SD:ident) => {{
127 let upstream_table = ReplicatedStateTable::new_replicated(
129 upstream_table,
130 state_store.clone(),
131 vnodes,
132 column_ids,
133 )
134 .await;
135 ArrangementBackfillExecutor::<_, $SD>::new(
136 upstream_table,
137 upstream,
138 state_table,
139 output_indices,
140 progress,
141 params.executor_stats.clone(),
142 params.env.config().developer.chunk_size,
143 node.rate_limit.into(),
144 params.fragment_id,
145 )
146 .boxed()
147 }};
148 }
149 if versioned {
150 new_executor!(ColumnAwareSerde)
151 } else {
152 new_executor!(BasicSerde)
153 }
154 }
155 StreamScanType::CrossDbSnapshotBackfill => {
156 let table_desc: &StorageTableDesc = node.get_table_desc()?;
157 assert!(params.input.is_empty());
158
159 let output_indices = node
160 .output_indices
161 .iter()
162 .map(|&i| i as usize)
163 .collect_vec();
164
165 let column_ids = node
166 .upstream_column_ids
167 .iter()
168 .map(ColumnId::from)
169 .collect_vec();
170
171 let vnodes = params.vnode_bitmap.map(Arc::new);
172 let barrier_rx = params
173 .local_barrier_manager
174 .subscribe_barrier(params.actor_context.id);
175
176 let upstream_table = BatchTable::new_partial(
177 state_store.clone(),
178 column_ids,
179 vnodes.clone(),
180 table_desc,
181 );
182
183 let state_table = node.get_state_table()?;
184 let state_table = StateTableBuilder::new(state_table, state_store.clone(), vnodes)
185 .enable_preload_all_rows_by_config(¶ms.actor_context.streaming_config)
186 .build()
187 .await;
188
189 let chunk_size = params.env.config().developer.chunk_size;
190 let snapshot_epoch = node
191 .snapshot_backfill_epoch
192 .ok_or_else(|| anyhow!("snapshot epoch not set for {:?}", node))?;
193
194 UpstreamTableExecutor::new(
195 upstream_table.table_id(),
196 upstream_table,
197 state_table,
198 snapshot_epoch,
199 output_indices,
200 chunk_size,
201 node.rate_limit.into(),
202 params.actor_context,
203 barrier_rx,
204 progress,
205 )
206 .boxed()
207 }
208 StreamScanType::SnapshotBackfill => {
209 unreachable!(
210 "SnapshotBackfillExecutor is handled specially when in `StreamActorManager::create_nodes_inner`"
211 )
212 }
213 StreamScanType::Unspecified => unreachable!(),
214 };
215
216 if crate::consistency::insane() {
217 let mut info = params.info.clone();
218 info.identity = format!("{} (troubled)", info.identity);
219 Ok((
220 params.info,
221 TroublemakerExecutor::new(
222 (info, exec).into(),
223 params.env.config().developer.chunk_size,
224 ),
225 )
226 .into())
227 } else {
228 Ok((params.info, exec).into())
229 }
230 }
231}