risingwave_stream/executor/backfill/snapshot_backfill/
vnode_stream.rs1use std::collections::HashMap;
16use std::mem::{replace, take};
17use std::pin::Pin;
18use std::task::{Context, Poll, ready};
19
20use futures::stream::{FuturesUnordered, Peekable, StreamFuture};
21use futures::{Stream, StreamExt, TryStreamExt};
22use pin_project::pin_project;
23use risingwave_common::array::{Op, StreamChunk};
24use risingwave_common::hash::VirtualNode;
25use risingwave_common::row::{OwnedRow, Row, RowExt};
26use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
27use risingwave_storage::table::ChangeLogRow;
28
29use crate::executor::StreamExecutorResult;
30
31pub(super) trait ChangeLogRowStream =
32 Stream<Item = StreamExecutorResult<ChangeLogRow>> + Sized + 'static;
33
34#[pin_project]
35struct StreamWithVnode<St: ChangeLogRowStream> {
36 #[pin]
37 stream: Peekable<St>,
38 vnode: VirtualNode,
39 row_count: usize,
40}
41
42impl<St: ChangeLogRowStream> Stream for StreamWithVnode<St> {
43 type Item = St::Item;
44
45 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
46 let this = self.project();
47 let poll_result = this.stream.poll_next(cx);
48 if let Poll::Ready(Some(Ok(change_log_row))) = &poll_result {
49 match change_log_row {
50 ChangeLogRow::Insert(_) | ChangeLogRow::Delete(_) => {
51 *this.row_count += 1;
52 }
53 ChangeLogRow::Update { .. } => {
54 *this.row_count += 2;
55 }
56 }
57 }
58 poll_result
59 }
60}
61
62type ChangeLogRowVnodeStream<St> = Pin<Box<StreamWithVnode<St>>>;
63
64pub(super) struct VnodeStream<St: ChangeLogRowStream> {
65 streams: FuturesUnordered<StreamFuture<ChangeLogRowVnodeStream<St>>>,
66 pk_indices: Vec<usize>,
67 finished_vnode: HashMap<VirtualNode, usize>,
68 data_chunk_builder: DataChunkBuilder,
69 ops: Vec<Op>,
70}
71
72impl<St: ChangeLogRowStream> VnodeStream<St> {
73 pub(super) fn new(
74 vnode_streams: impl IntoIterator<Item = (VirtualNode, St, usize)>,
75 pk_indices: Vec<usize>,
76 data_chunk_builder: DataChunkBuilder,
77 ) -> Self {
78 assert!(data_chunk_builder.is_empty());
79 assert!(data_chunk_builder.batch_size() > 2);
80 let streams = FuturesUnordered::from_iter(vnode_streams.into_iter().map(
81 |(vnode, stream, row_count)| {
82 let stream = stream.peekable();
83 Box::pin(StreamWithVnode {
84 stream,
85 vnode,
86 row_count,
87 })
88 .into_future()
89 },
90 ));
91 let ops = Vec::with_capacity(data_chunk_builder.batch_size());
92 Self {
93 streams,
94 pk_indices,
95 finished_vnode: HashMap::new(),
96 data_chunk_builder,
97 ops,
98 }
99 }
100
101 pub(super) fn take_finished_vnodes(&mut self) -> HashMap<VirtualNode, usize> {
102 assert!(self.streams.is_empty());
103 assert!(self.data_chunk_builder.is_empty());
104 take(&mut self.finished_vnode)
105 }
106}
107
108impl<St: ChangeLogRowStream> VnodeStream<St> {
109 fn poll_next_row(
110 &mut self,
111 cx: &mut Context<'_>,
112 ) -> Poll<StreamExecutorResult<Option<ChangeLogRow>>> {
113 loop {
114 let ready_item = match ready!(self.streams.poll_next_unpin(cx)) {
115 None => Ok(None),
116 Some((None, stream)) => {
117 self.finished_vnode
118 .try_insert(stream.vnode, stream.row_count)
119 .expect("non-duplicate");
120 continue;
121 }
122 Some((Some(Ok(item)), stream)) => {
123 self.streams.push(stream.into_future());
127 Ok(Some(item))
128 }
129 Some((Some(Err(e)), _stream)) => Err(e),
130 };
131 break Poll::Ready(ready_item);
132 }
133 }
134
135 pub(super) fn consume_builder(&mut self) -> Option<StreamChunk> {
136 self.data_chunk_builder.consume_all().map(|chunk| {
137 let ops = replace(
138 &mut self.ops,
139 Vec::with_capacity(self.data_chunk_builder.batch_size()),
140 );
141 StreamChunk::from_parts(ops, chunk)
142 })
143 }
144
145 pub(super) async fn for_vnode_pk_progress(
146 &mut self,
147 mut on_vnode_progress: impl FnMut(VirtualNode, usize, Option<OwnedRow>),
148 ) -> StreamExecutorResult<()> {
149 assert!(self.data_chunk_builder.is_empty());
150 for (vnode, row_count) in &self.finished_vnode {
151 on_vnode_progress(*vnode, *row_count, None);
152 }
153 let streams = take(&mut self.streams);
154 for vnode_stream_future in streams {
164 let mut vnode_stream = vnode_stream_future.into_inner().expect("should exist");
165 match vnode_stream.as_mut().project().stream.peek().await {
166 Some(Ok(change_log_row)) => {
167 let row = match change_log_row {
168 ChangeLogRow::Insert(row) | ChangeLogRow::Delete(row) => row,
169 ChangeLogRow::Update {
170 new_value,
171 old_value,
172 } => {
173 if cfg!(debug_assertions) {
174 assert_eq!(
175 old_value.project(&self.pk_indices),
176 new_value.project(&self.pk_indices)
177 );
178 }
179 new_value
180 }
181 };
182 let pk = row.project(&self.pk_indices).to_owned_row();
183 on_vnode_progress(vnode_stream.vnode, vnode_stream.row_count, Some(pk));
184 self.streams.push(vnode_stream.into_future());
185 }
186 Some(Err(_)) => {
187 return Err(vnode_stream.try_next().await.expect_err("checked Err"));
188 }
189 None => {
190 self.finished_vnode
191 .try_insert(vnode_stream.vnode, vnode_stream.row_count)
192 .expect("non-duplicate");
193 on_vnode_progress(vnode_stream.vnode, vnode_stream.row_count, None);
194 }
195 }
196 }
197 Ok(())
198 }
199}
200
201impl<St: ChangeLogRowStream> Stream for VnodeStream<St> {
202 type Item = StreamExecutorResult<StreamChunk>;
203
204 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
207 let this = self.get_mut();
208 let capacity = this.data_chunk_builder.batch_size();
209 loop {
210 match ready!(this.poll_next_row(cx)) {
211 Ok(Some(change_log_row)) => {
212 let may_chunk = match change_log_row {
213 ChangeLogRow::Insert(row) => {
214 this.ops.push(Op::Insert);
215 this.data_chunk_builder.append_one_row(row)
216 }
217 ChangeLogRow::Delete(row) => {
218 this.ops.push(Op::Delete);
219 this.data_chunk_builder.append_one_row(row)
220 }
221 ChangeLogRow::Update {
222 new_value,
223 old_value,
224 } => {
225 if this.data_chunk_builder.can_append_update() {
226 this.ops.extend([Op::UpdateDelete, Op::UpdateInsert]);
227 assert!(
228 this.data_chunk_builder.append_one_row(old_value).is_none()
229 );
230 this.data_chunk_builder.append_one_row(new_value)
231 } else {
232 let chunk = this
233 .data_chunk_builder
234 .consume_all()
235 .expect("should be Some when not can_append");
236 let ops = replace(&mut this.ops, Vec::with_capacity(capacity));
237 this.ops.extend([Op::UpdateDelete, Op::UpdateInsert]);
238 assert!(
239 this.data_chunk_builder.append_one_row(old_value).is_none()
240 );
241 assert!(
242 this.data_chunk_builder.append_one_row(new_value).is_none()
243 );
244 break Poll::Ready(Some(Ok(StreamChunk::from_parts(ops, chunk))));
245 }
246 }
247 };
248 if let Some(chunk) = may_chunk {
249 let ops = replace(&mut this.ops, Vec::with_capacity(capacity));
250 break Poll::Ready(Some(Ok(StreamChunk::from_parts(ops, chunk))));
251 }
252 }
253 Ok(None) => {
254 break if let Some(chunk) = this.data_chunk_builder.consume_all() {
255 let ops = take(&mut this.ops);
256 Poll::Ready(Some(Ok(StreamChunk::from_parts(ops, chunk))))
257 } else {
258 Poll::Ready(None)
259 };
260 }
261 Err(e) => {
262 break Poll::Ready(Some(Err(e)));
263 }
264 }
265 }
266 }
267}
268
269#[cfg(test)]
270mod tests {
271 use std::collections::HashMap;
272 use std::future::poll_fn;
273 use std::sync::LazyLock;
274 use std::task::Poll;
275
276 use anyhow::anyhow;
277 use futures::{Future, FutureExt, pin_mut};
278 use risingwave_common::hash::VirtualNode;
279 use risingwave_common::row::OwnedRow;
280 use risingwave_common::types::{DataType, ScalarImpl};
281 use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
282 use risingwave_storage::table::ChangeLogRow;
283 use tokio::sync::mpsc::unbounded_channel;
284 use tokio_stream::StreamExt;
285 use tokio_stream::wrappers::UnboundedReceiverStream;
286
287 use crate::executor::backfill::snapshot_backfill::vnode_stream::VnodeStream;
288
289 static DATA_TYPES: LazyLock<Vec<DataType>> = LazyLock::new(|| vec![DataType::Int64]);
290
291 fn test_row(index: i64) -> OwnedRow {
292 OwnedRow::new(vec![Some(ScalarImpl::Int64(index))])
293 }
294
295 impl<St: super::ChangeLogRowStream> VnodeStream<St> {
296 async fn assert_progress(
297 &mut self,
298 progress: impl IntoIterator<Item = (VirtualNode, usize, Option<OwnedRow>)>,
299 ) {
300 let expected_progress_map: HashMap<_, _> = progress
301 .into_iter()
302 .map(|(vnode, row_count, progress)| (vnode, (row_count, progress)))
303 .collect();
304 let mut progress_map = HashMap::new();
305 self.for_vnode_pk_progress(|vnode, row_count, progress| {
306 progress_map
307 .try_insert(vnode, (row_count, progress))
308 .unwrap();
309 })
310 .await
311 .unwrap();
312 assert_eq!(expected_progress_map, progress_map);
313 }
314 }
315
316 #[tokio::test]
317 async fn test_basic() {
318 let [vnode1, vnode2] = [1, 2].map(VirtualNode::from_index);
319 let (tx1, rx1) = unbounded_channel();
320 let (tx2, rx2) = unbounded_channel();
321 let mut stream = VnodeStream::<UnboundedReceiverStream<_>>::new(
322 [
323 (vnode1, UnboundedReceiverStream::new(rx1), 10),
324 (vnode2, UnboundedReceiverStream::new(rx2), 0),
325 ]
326 .into_iter(),
327 vec![0],
328 DataChunkBuilder::new(DATA_TYPES.clone(), 3),
329 );
330 assert!(stream.next().now_or_never().is_none());
331 tx1.send(Ok(ChangeLogRow::Insert(test_row(0)))).unwrap();
332 assert!(stream.next().now_or_never().is_none());
333 tx2.send(Ok(ChangeLogRow::Insert(test_row(0)))).unwrap();
334 tx2.send(Ok(ChangeLogRow::Insert(test_row(1)))).unwrap();
335 assert_eq!(3, stream.next().await.unwrap().unwrap().cardinality());
336
337 let next_row = test_row(1);
338 {
339 let future =
340 stream.assert_progress([(vnode1, 11, Some(next_row.clone())), (vnode2, 2, None)]);
341 pin_mut!(future);
342 assert!((&mut future).now_or_never().is_none());
343 tx1.send(Ok(ChangeLogRow::Insert(next_row.clone())))
344 .unwrap();
345 assert!((&mut future).now_or_never().is_none());
346 drop(tx2);
347 future.await;
348 }
349 assert!(stream.next().now_or_never().is_none());
350 assert_eq!(1, stream.consume_builder().unwrap().cardinality());
351 {
352 let future = stream.assert_progress([(vnode1, 12, None), (vnode2, 2, None)]);
353 pin_mut!(future);
354 assert!((&mut future).now_or_never().is_none());
355 drop(tx1);
356 future.await;
357 }
358 assert!(stream.next().await.is_none());
359 }
360
361 #[tokio::test]
362 async fn test_update() {
363 let (tx, rx) = unbounded_channel();
364 let mut stream = VnodeStream::new(
365 [(VirtualNode::ZERO, UnboundedReceiverStream::new(rx), 0)].into_iter(),
366 vec![0],
367 DataChunkBuilder::new(DATA_TYPES.clone(), 3),
368 );
369 assert!(stream.next().now_or_never().is_none());
370 tx.send(Ok(ChangeLogRow::Insert(test_row(0)))).unwrap();
371 tx.send(Ok(ChangeLogRow::Insert(test_row(1)))).unwrap();
372 assert!(stream.next().now_or_never().is_none());
373 tx.send(Ok(ChangeLogRow::Update {
374 new_value: test_row(2),
375 old_value: test_row(3),
376 }))
377 .unwrap();
378 assert_eq!(2, stream.next().await.unwrap().unwrap().cardinality());
379 drop(tx);
380 assert_eq!(2, stream.next().await.unwrap().unwrap().cardinality());
381 assert!(stream.next().await.is_none());
382 stream.assert_progress([(VirtualNode::ZERO, 4, None)]).await;
383 }
384
385 #[tokio::test]
386 async fn test_empty() {
387 let mut stream = VnodeStream::<UnboundedReceiverStream<_>>::new(
388 [].into_iter(),
389 vec![0],
390 DataChunkBuilder::new(DATA_TYPES.clone(), 1024),
391 );
392 assert!(stream.next().await.is_none());
393 }
394
395 #[tokio::test]
396 async fn test_err() {
397 {
398 let (tx, rx) = unbounded_channel();
399 let mut stream = VnodeStream::new(
400 [(VirtualNode::ZERO, UnboundedReceiverStream::new(rx), 0)].into_iter(),
401 vec![0],
402 DataChunkBuilder::new(DATA_TYPES.clone(), 3),
403 );
404 assert!(stream.next().now_or_never().is_none());
405 tx.send(Err(anyhow!("err").into())).unwrap();
406 assert!(stream.next().await.unwrap().is_err());
407 }
408 {
409 let (tx, rx) = unbounded_channel();
410 let mut stream = VnodeStream::new(
411 [(VirtualNode::ZERO, UnboundedReceiverStream::new(rx), 0)].into_iter(),
412 vec![0],
413 DataChunkBuilder::new(DATA_TYPES.clone(), 3),
414 );
415 assert!(stream.next().now_or_never().is_none());
416 let future = stream.for_vnode_pk_progress(|_, _, _| unreachable!());
417 pin_mut!(future);
418 assert!((&mut future).now_or_never().is_none());
419 tx.send(Err(anyhow!("err").into())).unwrap();
420 assert!(future.await.is_err());
421 }
422 }
423
424 #[tokio::test]
425 async fn test_futures_unordered_peek() {
426 let (tx, rx) = unbounded_channel();
427 let mut stream = VnodeStream::new(
428 [(VirtualNode::ZERO, UnboundedReceiverStream::new(rx), 0)].into_iter(),
429 vec![0],
430 DataChunkBuilder::new(DATA_TYPES.clone(), 1024),
431 );
432 assert!(stream.next().now_or_never().is_none());
434 let row = test_row(1);
435 {
436 let fut = stream.assert_progress([(VirtualNode::ZERO, 0, Some(row.clone()))]);
437 pin_mut!(fut);
438 assert!(
439 poll_fn(|cx| Poll::Ready(fut.as_mut().poll(cx)))
440 .await
441 .is_pending()
442 );
443 tx.send(Ok(ChangeLogRow::Insert(row.clone()))).unwrap();
444 drop(tx);
445 fut.await;
446 }
447 let chunk = stream.next().await.unwrap().unwrap();
448 assert_eq!(chunk.capacity(), 1);
449 }
450}