risingwave_stream/executor/backfill/snapshot_backfill/
vnode_stream.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::mem::{replace, take};
17use std::pin::Pin;
18use std::task::{Context, Poll, ready};
19
20use futures::stream::{FuturesUnordered, Peekable, StreamFuture};
21use futures::{Stream, StreamExt, TryStreamExt};
22use pin_project::pin_project;
23use risingwave_common::array::{Op, StreamChunk};
24use risingwave_common::hash::VirtualNode;
25use risingwave_common::row::{OwnedRow, Row, RowExt};
26use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
27use risingwave_storage::table::ChangeLogRow;
28
29use crate::executor::StreamExecutorResult;
30
31pub(super) trait ChangeLogRowStream =
32    Stream<Item = StreamExecutorResult<ChangeLogRow>> + Sized + 'static;
33
34#[pin_project]
35struct StreamWithVnode<St: ChangeLogRowStream> {
36    #[pin]
37    stream: Peekable<St>,
38    vnode: VirtualNode,
39    row_count: usize,
40}
41
42impl<St: ChangeLogRowStream> Stream for StreamWithVnode<St> {
43    type Item = St::Item;
44
45    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
46        let this = self.project();
47        let poll_result = this.stream.poll_next(cx);
48        if let Poll::Ready(Some(Ok(change_log_row))) = &poll_result {
49            match change_log_row {
50                ChangeLogRow::Insert(_) | ChangeLogRow::Delete(_) => {
51                    *this.row_count += 1;
52                }
53                ChangeLogRow::Update { .. } => {
54                    *this.row_count += 2;
55                }
56            }
57        }
58        poll_result
59    }
60}
61
62type ChangeLogRowVnodeStream<St> = Pin<Box<StreamWithVnode<St>>>;
63
64pub(super) struct VnodeStream<St: ChangeLogRowStream> {
65    streams: FuturesUnordered<StreamFuture<ChangeLogRowVnodeStream<St>>>,
66    pk_indices: Vec<usize>,
67    finished_vnode: HashMap<VirtualNode, usize>,
68    data_chunk_builder: DataChunkBuilder,
69    ops: Vec<Op>,
70}
71
72impl<St: ChangeLogRowStream> VnodeStream<St> {
73    pub(super) fn new(
74        vnode_streams: impl IntoIterator<Item = (VirtualNode, St, usize)>,
75        pk_indices: Vec<usize>,
76        data_chunk_builder: DataChunkBuilder,
77    ) -> Self {
78        assert!(data_chunk_builder.is_empty());
79        assert!(data_chunk_builder.batch_size() > 2);
80        let streams = FuturesUnordered::from_iter(vnode_streams.into_iter().map(
81            |(vnode, stream, row_count)| {
82                let stream = stream.peekable();
83                Box::pin(StreamWithVnode {
84                    stream,
85                    vnode,
86                    row_count,
87                })
88                .into_future()
89            },
90        ));
91        let ops = Vec::with_capacity(data_chunk_builder.batch_size());
92        Self {
93            streams,
94            pk_indices,
95            finished_vnode: HashMap::new(),
96            data_chunk_builder,
97            ops,
98        }
99    }
100
101    pub(super) fn take_finished_vnodes(&mut self) -> HashMap<VirtualNode, usize> {
102        assert!(self.streams.is_empty());
103        assert!(self.data_chunk_builder.is_empty());
104        take(&mut self.finished_vnode)
105    }
106}
107
108impl<St: ChangeLogRowStream> VnodeStream<St> {
109    fn poll_next_row(
110        &mut self,
111        cx: &mut Context<'_>,
112    ) -> Poll<StreamExecutorResult<Option<ChangeLogRow>>> {
113        loop {
114            let ready_item = match ready!(self.streams.poll_next_unpin(cx)) {
115                None => Ok(None),
116                Some((None, stream)) => {
117                    self.finished_vnode
118                        .try_insert(stream.vnode, stream.row_count)
119                        .expect("non-duplicate");
120                    continue;
121                }
122                Some((Some(Ok(item)), stream)) => {
123                    // TODO: may avoid generating a `StreamFuture` for each row, because
124                    // `FuturesUnordered::push` involve memory allocation of `Arc`, and may
125                    // incur some unnecessary costs.
126                    self.streams.push(stream.into_future());
127                    Ok(Some(item))
128                }
129                Some((Some(Err(e)), _stream)) => Err(e),
130            };
131            break Poll::Ready(ready_item);
132        }
133    }
134
135    pub(super) fn consume_builder(&mut self) -> Option<StreamChunk> {
136        self.data_chunk_builder.consume_all().map(|chunk| {
137            let ops = replace(
138                &mut self.ops,
139                Vec::with_capacity(self.data_chunk_builder.batch_size()),
140            );
141            StreamChunk::from_parts(ops, chunk)
142        })
143    }
144
145    pub(super) async fn for_vnode_pk_progress(
146        &mut self,
147        mut on_vnode_progress: impl FnMut(VirtualNode, usize, Option<OwnedRow>),
148    ) -> StreamExecutorResult<()> {
149        assert!(self.data_chunk_builder.is_empty());
150        for (vnode, row_count) in &self.finished_vnode {
151            on_vnode_progress(*vnode, *row_count, None);
152        }
153        let streams = take(&mut self.streams);
154        // When the `VnodeStream` is polled, the `FuturesUnordered` will generate a special cx to poll the futures.
155        // The pending futures will be stored in a separate linked list and will not be polled until the special cx is awakened
156        // and move the awakened future from the linked list to a ready queue.
157        //
158        // However, here if we use `FuturesUnordered::iter_mut` to in place access the pending futures and call `peek` directly,
159        // the cx specially generated in `FuturedUnordered::poll_next` will be overwritten, and then even the peeked future is ready,
160        // it will not be moved to the ready queue, and the `FuturesUnordered` will be stuck forever.
161        //
162        // Therefore, to avoid this, we will take all stream futures out and push them back again, so that all futures will be in the ready queue.
163        for vnode_stream_future in streams {
164            let mut vnode_stream = vnode_stream_future.into_inner().expect("should exist");
165            match vnode_stream.as_mut().project().stream.peek().await {
166                Some(Ok(change_log_row)) => {
167                    let row = match change_log_row {
168                        ChangeLogRow::Insert(row) | ChangeLogRow::Delete(row) => row,
169                        ChangeLogRow::Update {
170                            new_value,
171                            old_value,
172                        } => {
173                            if cfg!(debug_assertions) {
174                                assert_eq!(
175                                    old_value.project(&self.pk_indices),
176                                    new_value.project(&self.pk_indices)
177                                );
178                            }
179                            new_value
180                        }
181                    };
182                    let pk = row.project(&self.pk_indices).to_owned_row();
183                    on_vnode_progress(vnode_stream.vnode, vnode_stream.row_count, Some(pk));
184                    self.streams.push(vnode_stream.into_future());
185                }
186                Some(Err(_)) => {
187                    return Err(vnode_stream.try_next().await.expect_err("checked Err"));
188                }
189                None => {
190                    self.finished_vnode
191                        .try_insert(vnode_stream.vnode, vnode_stream.row_count)
192                        .expect("non-duplicate");
193                    on_vnode_progress(vnode_stream.vnode, vnode_stream.row_count, None);
194                }
195            }
196        }
197        Ok(())
198    }
199}
200
201impl<St: ChangeLogRowStream> Stream for VnodeStream<St> {
202    type Item = StreamExecutorResult<StreamChunk>;
203
204    // Here we implement the stream on our own instead of generating the stream with
205    // `try_stream` macro, because we want to access the state of the streams on the flight.
206    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
207        let this = self.get_mut();
208        let capacity = this.data_chunk_builder.batch_size();
209        loop {
210            match ready!(this.poll_next_row(cx)) {
211                Ok(Some(change_log_row)) => {
212                    let may_chunk = match change_log_row {
213                        ChangeLogRow::Insert(row) => {
214                            this.ops.push(Op::Insert);
215                            this.data_chunk_builder.append_one_row(row)
216                        }
217                        ChangeLogRow::Delete(row) => {
218                            this.ops.push(Op::Delete);
219                            this.data_chunk_builder.append_one_row(row)
220                        }
221                        ChangeLogRow::Update {
222                            new_value,
223                            old_value,
224                        } => {
225                            if this.data_chunk_builder.can_append_update() {
226                                this.ops.extend([Op::UpdateDelete, Op::UpdateInsert]);
227                                assert!(
228                                    this.data_chunk_builder.append_one_row(old_value).is_none()
229                                );
230                                this.data_chunk_builder.append_one_row(new_value)
231                            } else {
232                                let chunk = this
233                                    .data_chunk_builder
234                                    .consume_all()
235                                    .expect("should be Some when not can_append");
236                                let ops = replace(&mut this.ops, Vec::with_capacity(capacity));
237                                this.ops.extend([Op::UpdateDelete, Op::UpdateInsert]);
238                                assert!(
239                                    this.data_chunk_builder.append_one_row(old_value).is_none()
240                                );
241                                assert!(
242                                    this.data_chunk_builder.append_one_row(new_value).is_none()
243                                );
244                                break Poll::Ready(Some(Ok(StreamChunk::from_parts(ops, chunk))));
245                            }
246                        }
247                    };
248                    if let Some(chunk) = may_chunk {
249                        let ops = replace(&mut this.ops, Vec::with_capacity(capacity));
250                        break Poll::Ready(Some(Ok(StreamChunk::from_parts(ops, chunk))));
251                    }
252                }
253                Ok(None) => {
254                    break if let Some(chunk) = this.data_chunk_builder.consume_all() {
255                        let ops = take(&mut this.ops);
256                        Poll::Ready(Some(Ok(StreamChunk::from_parts(ops, chunk))))
257                    } else {
258                        Poll::Ready(None)
259                    };
260                }
261                Err(e) => {
262                    break Poll::Ready(Some(Err(e)));
263                }
264            }
265        }
266    }
267}
268
269#[cfg(test)]
270mod tests {
271    use std::collections::HashMap;
272    use std::future::poll_fn;
273    use std::sync::LazyLock;
274    use std::task::Poll;
275
276    use anyhow::anyhow;
277    use futures::{Future, FutureExt, pin_mut};
278    use risingwave_common::hash::VirtualNode;
279    use risingwave_common::row::OwnedRow;
280    use risingwave_common::types::{DataType, ScalarImpl};
281    use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
282    use risingwave_storage::table::ChangeLogRow;
283    use tokio::sync::mpsc::unbounded_channel;
284    use tokio_stream::StreamExt;
285    use tokio_stream::wrappers::UnboundedReceiverStream;
286
287    use crate::executor::backfill::snapshot_backfill::vnode_stream::VnodeStream;
288
289    static DATA_TYPES: LazyLock<Vec<DataType>> = LazyLock::new(|| vec![DataType::Int64]);
290
291    fn test_row(index: i64) -> OwnedRow {
292        OwnedRow::new(vec![Some(ScalarImpl::Int64(index))])
293    }
294
295    impl<St: super::ChangeLogRowStream> VnodeStream<St> {
296        async fn assert_progress(
297            &mut self,
298            progress: impl IntoIterator<Item = (VirtualNode, usize, Option<OwnedRow>)>,
299        ) {
300            let expected_progress_map: HashMap<_, _> = progress
301                .into_iter()
302                .map(|(vnode, row_count, progress)| (vnode, (row_count, progress)))
303                .collect();
304            let mut progress_map = HashMap::new();
305            self.for_vnode_pk_progress(|vnode, row_count, progress| {
306                progress_map
307                    .try_insert(vnode, (row_count, progress))
308                    .unwrap();
309            })
310            .await
311            .unwrap();
312            assert_eq!(expected_progress_map, progress_map);
313        }
314    }
315
316    #[tokio::test]
317    async fn test_basic() {
318        let [vnode1, vnode2] = [1, 2].map(VirtualNode::from_index);
319        let (tx1, rx1) = unbounded_channel();
320        let (tx2, rx2) = unbounded_channel();
321        let mut stream = VnodeStream::<UnboundedReceiverStream<_>>::new(
322            [
323                (vnode1, UnboundedReceiverStream::new(rx1), 10),
324                (vnode2, UnboundedReceiverStream::new(rx2), 0),
325            ]
326            .into_iter(),
327            vec![0],
328            DataChunkBuilder::new(DATA_TYPES.clone(), 3),
329        );
330        assert!(stream.next().now_or_never().is_none());
331        tx1.send(Ok(ChangeLogRow::Insert(test_row(0)))).unwrap();
332        assert!(stream.next().now_or_never().is_none());
333        tx2.send(Ok(ChangeLogRow::Insert(test_row(0)))).unwrap();
334        tx2.send(Ok(ChangeLogRow::Insert(test_row(1)))).unwrap();
335        assert_eq!(3, stream.next().await.unwrap().unwrap().cardinality());
336
337        let next_row = test_row(1);
338        {
339            let future =
340                stream.assert_progress([(vnode1, 11, Some(next_row.clone())), (vnode2, 2, None)]);
341            pin_mut!(future);
342            assert!((&mut future).now_or_never().is_none());
343            tx1.send(Ok(ChangeLogRow::Insert(next_row.clone())))
344                .unwrap();
345            assert!((&mut future).now_or_never().is_none());
346            drop(tx2);
347            future.await;
348        }
349        assert!(stream.next().now_or_never().is_none());
350        assert_eq!(1, stream.consume_builder().unwrap().cardinality());
351        {
352            let future = stream.assert_progress([(vnode1, 12, None), (vnode2, 2, None)]);
353            pin_mut!(future);
354            assert!((&mut future).now_or_never().is_none());
355            drop(tx1);
356            future.await;
357        }
358        assert!(stream.next().await.is_none());
359    }
360
361    #[tokio::test]
362    async fn test_update() {
363        let (tx, rx) = unbounded_channel();
364        let mut stream = VnodeStream::new(
365            [(VirtualNode::ZERO, UnboundedReceiverStream::new(rx), 0)].into_iter(),
366            vec![0],
367            DataChunkBuilder::new(DATA_TYPES.clone(), 3),
368        );
369        assert!(stream.next().now_or_never().is_none());
370        tx.send(Ok(ChangeLogRow::Insert(test_row(0)))).unwrap();
371        tx.send(Ok(ChangeLogRow::Insert(test_row(1)))).unwrap();
372        assert!(stream.next().now_or_never().is_none());
373        tx.send(Ok(ChangeLogRow::Update {
374            new_value: test_row(2),
375            old_value: test_row(3),
376        }))
377        .unwrap();
378        assert_eq!(2, stream.next().await.unwrap().unwrap().cardinality());
379        drop(tx);
380        assert_eq!(2, stream.next().await.unwrap().unwrap().cardinality());
381        assert!(stream.next().await.is_none());
382        stream.assert_progress([(VirtualNode::ZERO, 4, None)]).await;
383    }
384
385    #[tokio::test]
386    async fn test_empty() {
387        let mut stream = VnodeStream::<UnboundedReceiverStream<_>>::new(
388            [].into_iter(),
389            vec![0],
390            DataChunkBuilder::new(DATA_TYPES.clone(), 1024),
391        );
392        assert!(stream.next().await.is_none());
393    }
394
395    #[tokio::test]
396    async fn test_err() {
397        {
398            let (tx, rx) = unbounded_channel();
399            let mut stream = VnodeStream::new(
400                [(VirtualNode::ZERO, UnboundedReceiverStream::new(rx), 0)].into_iter(),
401                vec![0],
402                DataChunkBuilder::new(DATA_TYPES.clone(), 3),
403            );
404            assert!(stream.next().now_or_never().is_none());
405            tx.send(Err(anyhow!("err").into())).unwrap();
406            assert!(stream.next().await.unwrap().is_err());
407        }
408        {
409            let (tx, rx) = unbounded_channel();
410            let mut stream = VnodeStream::new(
411                [(VirtualNode::ZERO, UnboundedReceiverStream::new(rx), 0)].into_iter(),
412                vec![0],
413                DataChunkBuilder::new(DATA_TYPES.clone(), 3),
414            );
415            assert!(stream.next().now_or_never().is_none());
416            let future = stream.for_vnode_pk_progress(|_, _, _| unreachable!());
417            pin_mut!(future);
418            assert!((&mut future).now_or_never().is_none());
419            tx.send(Err(anyhow!("err").into())).unwrap();
420            assert!(future.await.is_err());
421        }
422    }
423
424    #[tokio::test]
425    async fn test_futures_unordered_peek() {
426        let (tx, rx) = unbounded_channel();
427        let mut stream = VnodeStream::new(
428            [(VirtualNode::ZERO, UnboundedReceiverStream::new(rx), 0)].into_iter(),
429            vec![0],
430            DataChunkBuilder::new(DATA_TYPES.clone(), 1024),
431        );
432        // poll the stream for once, and then the stream future inside it will be stored in the pending list.
433        assert!(stream.next().now_or_never().is_none());
434        let row = test_row(1);
435        {
436            let fut = stream.assert_progress([(VirtualNode::ZERO, 0, Some(row.clone()))]);
437            pin_mut!(fut);
438            assert!(
439                poll_fn(|cx| Poll::Ready(fut.as_mut().poll(cx)))
440                    .await
441                    .is_pending()
442            );
443            tx.send(Ok(ChangeLogRow::Insert(row.clone()))).unwrap();
444            drop(tx);
445            fut.await;
446        }
447        let chunk = stream.next().await.unwrap().unwrap();
448        assert_eq!(chunk.capacity(), 1);
449    }
450}