risingwave_stream/from_proto/
stream_scan.rs1use std::sync::Arc;
16
17use anyhow::anyhow;
18use risingwave_common::catalog::ColumnId;
19use risingwave_common::util::value_encoding::BasicSerde;
20use risingwave_common::util::value_encoding::column_aware_row_encoding::ColumnAwareSerde;
21use risingwave_pb::plan_common::StorageTableDesc;
22use risingwave_pb::stream_plan::{StreamScanNode, StreamScanType};
23use risingwave_storage::table::batch_table::BatchTable;
24
25use super::*;
26use crate::common::table::state_table::{ReplicatedStateTable, StateTable};
27use crate::executor::{
28 ArrangementBackfillExecutor, BackfillExecutor, ChainExecutor, RearrangedChainExecutor,
29 TroublemakerExecutor, UpstreamTableExecutor,
30};
31
32pub struct StreamScanExecutorBuilder;
33
34impl ExecutorBuilder for StreamScanExecutorBuilder {
35 type Node = StreamScanNode;
36
37 async fn new_boxed_executor(
38 params: ExecutorParams,
39 node: &Self::Node,
40 state_store: impl StateStore,
41 ) -> StreamResult<Executor> {
42 let progress = params
44 .local_barrier_manager
45 .register_create_mview_progress(params.actor_context.id);
46
47 let output_indices = node
48 .output_indices
49 .iter()
50 .map(|&i| i as usize)
51 .collect_vec();
52
53 let exec = match node.stream_scan_type() {
54 StreamScanType::Chain | StreamScanType::UpstreamOnly => {
55 let [upstream, snapshot]: [_; 2] = params.input.try_into().unwrap();
56 let upstream_only = matches!(node.stream_scan_type(), StreamScanType::UpstreamOnly);
57 ChainExecutor::new(snapshot, upstream, progress, upstream_only).boxed()
58 }
59 StreamScanType::Rearrange => {
60 let [upstream, snapshot]: [_; 2] = params.input.try_into().unwrap();
61 RearrangedChainExecutor::new(snapshot, upstream, progress).boxed()
62 }
63
64 StreamScanType::Backfill => {
65 let [upstream, _]: [_; 2] = params.input.try_into().unwrap();
66 let table_desc: &StorageTableDesc = node.get_table_desc()?;
67
68 let column_ids = node
69 .upstream_column_ids
70 .iter()
71 .map(ColumnId::from)
72 .collect_vec();
73
74 let vnodes = params.vnode_bitmap.map(Arc::new);
75
76 let state_table = if let Ok(table) = node.get_state_table() {
77 Some(
78 StateTable::from_table_catalog(table, state_store.clone(), vnodes.clone())
79 .await,
80 )
81 } else {
82 None
83 };
84
85 let upstream_table =
86 BatchTable::new_partial(state_store.clone(), column_ids, vnodes, table_desc);
87
88 BackfillExecutor::new(
89 upstream_table,
90 upstream,
91 state_table,
92 output_indices,
93 progress,
94 params.executor_stats.clone(),
95 params.env.config().developer.chunk_size,
96 node.rate_limit.into(),
97 params.fragment_id,
98 )
99 .boxed()
100 }
101 StreamScanType::ArrangementBackfill => {
102 let [upstream, _]: [_; 2] = params.input.try_into().unwrap();
103 let column_ids = node
104 .upstream_column_ids
105 .iter()
106 .map(ColumnId::from)
107 .collect_vec();
108
109 let vnodes = params.vnode_bitmap.map(Arc::new);
110
111 let state_table = node.get_state_table().unwrap();
112 let state_table = StateTable::from_table_catalog(
113 state_table,
114 state_store.clone(),
115 vnodes.clone(),
116 )
117 .await;
118
119 let upstream_table = node.get_arrangement_table().unwrap();
120 let versioned = upstream_table.get_version().is_ok();
121
122 macro_rules! new_executor {
123 ($SD:ident) => {{
124 let upstream_table =
125 ReplicatedStateTable::<_, $SD>::from_table_catalog_with_output_column_ids(
126 upstream_table,
127 state_store.clone(),
128 vnodes,
129 column_ids,
130 )
131 .await;
132 ArrangementBackfillExecutor::<_, $SD>::new(
133 upstream_table,
134 upstream,
135 state_table,
136 output_indices,
137 progress,
138 params.executor_stats.clone(),
139 params.env.config().developer.chunk_size,
140 node.rate_limit.into(),
141 params.fragment_id,
142 )
143 .boxed()
144 }};
145 }
146 if versioned {
147 new_executor!(ColumnAwareSerde)
148 } else {
149 new_executor!(BasicSerde)
150 }
151 }
152 StreamScanType::CrossDbSnapshotBackfill => {
153 let table_desc: &StorageTableDesc = node.get_table_desc()?;
154 assert!(params.input.is_empty());
155
156 let output_indices = node
157 .output_indices
158 .iter()
159 .map(|&i| i as usize)
160 .collect_vec();
161
162 let column_ids = node
163 .upstream_column_ids
164 .iter()
165 .map(ColumnId::from)
166 .collect_vec();
167
168 let vnodes = params.vnode_bitmap.map(Arc::new);
169 let barrier_rx = params
170 .local_barrier_manager
171 .subscribe_barrier(params.actor_context.id);
172
173 let upstream_table = BatchTable::new_partial(
174 state_store.clone(),
175 column_ids,
176 vnodes.clone(),
177 table_desc,
178 );
179
180 let state_table = node.get_state_table()?;
181 let state_table =
182 StateTable::from_table_catalog(state_table, state_store.clone(), vnodes).await;
183
184 let chunk_size = params.env.config().developer.chunk_size;
185 let snapshot_epoch = node
186 .snapshot_backfill_epoch
187 .ok_or_else(|| anyhow!("snapshot epoch not set for {:?}", node))?;
188
189 UpstreamTableExecutor::new(
190 upstream_table.table_id(),
191 upstream_table,
192 state_table,
193 snapshot_epoch,
194 output_indices,
195 chunk_size,
196 node.rate_limit.into(),
197 params.actor_context,
198 barrier_rx,
199 progress,
200 )
201 .boxed()
202 }
203 StreamScanType::SnapshotBackfill => {
204 unreachable!(
205 "SnapshotBackfillExecutor is handled specially when in `StreamActorManager::create_nodes_inner`"
206 )
207 }
208 StreamScanType::Unspecified => unreachable!(),
209 };
210
211 if crate::consistency::insane() {
212 let mut info = params.info.clone();
213 info.identity = format!("{} (troubled)", info.identity);
214 Ok((
215 params.info,
216 TroublemakerExecutor::new(
217 (info, exec).into(),
218 params.env.config().developer.chunk_size,
219 ),
220 )
221 .into())
222 } else {
223 Ok((params.info, exec).into())
224 }
225 }
226}