risingwave_stream/from_proto/
stream_scan.rs1use std::sync::Arc;
16
17use anyhow::anyhow;
18use risingwave_common::catalog::ColumnId;
19use risingwave_common::util::value_encoding::BasicSerde;
20use risingwave_common::util::value_encoding::column_aware_row_encoding::ColumnAwareSerde;
21use risingwave_pb::plan_common::StorageTableDesc;
22use risingwave_pb::stream_plan::{StreamScanNode, StreamScanType};
23use risingwave_storage::table::batch_table::BatchTable;
24
25use super::*;
26use crate::common::table::state_table::{ReplicatedStateTable, StateTable};
27use crate::executor::{
28 ArrangementBackfillExecutor, BackfillExecutor, ChainExecutor, RearrangedChainExecutor,
29 TroublemakerExecutor, UpstreamTableExecutor,
30};
31
32pub struct StreamScanExecutorBuilder;
33
34impl ExecutorBuilder for StreamScanExecutorBuilder {
35 type Node = StreamScanNode;
36
37 async fn new_boxed_executor(
38 params: ExecutorParams,
39 node: &Self::Node,
40 state_store: impl StateStore,
41 ) -> StreamResult<Executor> {
42 let progress = params
44 .local_barrier_manager
45 .register_create_mview_progress(params.actor_context.id);
46
47 let output_indices = node
48 .output_indices
49 .iter()
50 .map(|&i| i as usize)
51 .collect_vec();
52
53 let exec = match node.stream_scan_type() {
54 StreamScanType::Chain | StreamScanType::UpstreamOnly => {
55 let [upstream, snapshot]: [_; 2] = params.input.try_into().unwrap();
56 let upstream_only = matches!(node.stream_scan_type(), StreamScanType::UpstreamOnly);
57 ChainExecutor::new(snapshot, upstream, progress, upstream_only).boxed()
58 }
59 StreamScanType::Rearrange => {
60 let [upstream, snapshot]: [_; 2] = params.input.try_into().unwrap();
61 RearrangedChainExecutor::new(snapshot, upstream, progress).boxed()
62 }
63
64 StreamScanType::Backfill => {
65 let [upstream, _]: [_; 2] = params.input.try_into().unwrap();
66 let table_desc: &StorageTableDesc = node.get_table_desc()?;
67
68 let column_ids = node
69 .upstream_column_ids
70 .iter()
71 .map(ColumnId::from)
72 .collect_vec();
73
74 let vnodes = params.vnode_bitmap.map(Arc::new);
75
76 let state_table = if let Ok(table) = node.get_state_table() {
77 Some(
78 StateTable::from_table_catalog(table, state_store.clone(), vnodes.clone())
79 .await,
80 )
81 } else {
82 None
83 };
84
85 let upstream_table =
86 BatchTable::new_partial(state_store.clone(), column_ids, vnodes, table_desc);
87
88 BackfillExecutor::new(
89 upstream_table,
90 upstream,
91 state_table,
92 output_indices,
93 progress,
94 params.executor_stats.clone(),
95 params.env.config().developer.chunk_size,
96 node.rate_limit.into(),
97 )
98 .boxed()
99 }
100 StreamScanType::ArrangementBackfill => {
101 let [upstream, _]: [_; 2] = params.input.try_into().unwrap();
102 let column_ids = node
103 .upstream_column_ids
104 .iter()
105 .map(ColumnId::from)
106 .collect_vec();
107
108 let vnodes = params.vnode_bitmap.map(Arc::new);
109
110 let state_table = node.get_state_table().unwrap();
111 let state_table = StateTable::from_table_catalog(
112 state_table,
113 state_store.clone(),
114 vnodes.clone(),
115 )
116 .await;
117
118 let upstream_table = node.get_arrangement_table().unwrap();
119 let versioned = upstream_table.get_version().is_ok();
120
121 macro_rules! new_executor {
122 ($SD:ident) => {{
123 let upstream_table =
124 ReplicatedStateTable::<_, $SD>::from_table_catalog_with_output_column_ids(
125 upstream_table,
126 state_store.clone(),
127 vnodes,
128 column_ids,
129 )
130 .await;
131 ArrangementBackfillExecutor::<_, $SD>::new(
132 upstream_table,
133 upstream,
134 state_table,
135 output_indices,
136 progress,
137 params.executor_stats.clone(),
138 params.env.config().developer.chunk_size,
139 node.rate_limit.into()
140 )
141 .boxed()
142 }};
143 }
144 if versioned {
145 new_executor!(ColumnAwareSerde)
146 } else {
147 new_executor!(BasicSerde)
148 }
149 }
150 StreamScanType::CrossDbSnapshotBackfill => {
151 let table_desc: &StorageTableDesc = node.get_table_desc()?;
152 assert!(params.input.is_empty());
153
154 let output_indices = node
155 .output_indices
156 .iter()
157 .map(|&i| i as usize)
158 .collect_vec();
159
160 let column_ids = node
161 .upstream_column_ids
162 .iter()
163 .map(ColumnId::from)
164 .collect_vec();
165
166 let vnodes = params.vnode_bitmap.map(Arc::new);
167 let barrier_rx = params
168 .local_barrier_manager
169 .subscribe_barrier(params.actor_context.id);
170
171 let upstream_table = BatchTable::new_partial(
172 state_store.clone(),
173 column_ids,
174 vnodes.clone(),
175 table_desc,
176 );
177
178 let state_table = node.get_state_table()?;
179 let state_table =
180 StateTable::from_table_catalog(state_table, state_store.clone(), vnodes).await;
181
182 let chunk_size = params.env.config().developer.chunk_size;
183 let snapshot_epoch = node
184 .snapshot_backfill_epoch
185 .ok_or_else(|| anyhow!("snapshot epoch not set for {:?}", node))?;
186
187 UpstreamTableExecutor::new(
188 upstream_table.table_id(),
189 upstream_table,
190 state_table,
191 snapshot_epoch,
192 output_indices,
193 chunk_size,
194 node.rate_limit.into(),
195 params.actor_context,
196 barrier_rx,
197 progress,
198 )
199 .boxed()
200 }
201 StreamScanType::SnapshotBackfill => {
202 unreachable!(
203 "SnapshotBackfillExecutor is handled specially when in `StreamActorManager::create_nodes_inner`"
204 )
205 }
206 StreamScanType::Unspecified => unreachable!(),
207 };
208
209 if crate::consistency::insane() {
210 let mut info = params.info.clone();
211 info.identity = format!("{} (troubled)", info.identity);
212 Ok((
213 params.info,
214 TroublemakerExecutor::new(
215 (info, exec).into(),
216 params.env.config().developer.chunk_size,
217 ),
218 )
219 .into())
220 } else {
221 Ok((params.info, exec).into())
222 }
223 }
224}