risingwave_stream/executor/backfill/snapshot_backfill/
vnode_stream.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::mem::{replace, take};
17use std::pin::Pin;
18use std::task::{Context, Poll, ready};
19
20use futures::stream::{FuturesUnordered, Peekable, StreamFuture};
21use futures::{Stream, StreamExt, TryStreamExt};
22use pin_project::pin_project;
23use risingwave_common::array::{Op, StreamChunk};
24use risingwave_common::hash::VirtualNode;
25use risingwave_common::row::{OwnedRow, Row, RowExt};
26use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
27use risingwave_common_rate_limit::RateLimit;
28use risingwave_storage::table::ChangeLogRow;
29
30use crate::executor::StreamExecutorResult;
31use crate::executor::backfill::utils::create_builder;
32
33pub(super) trait ChangeLogRowStream =
34    Stream<Item = StreamExecutorResult<ChangeLogRow>> + Sized + 'static;
35
36#[pin_project]
37struct StreamWithVnode<St: ChangeLogRowStream> {
38    #[pin]
39    stream: Peekable<St>,
40    vnode: VirtualNode,
41    row_count: usize,
42}
43
44impl<St: ChangeLogRowStream> Stream for StreamWithVnode<St> {
45    type Item = St::Item;
46
47    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
48        let this = self.project();
49        let poll_result = this.stream.poll_next(cx);
50        if let Poll::Ready(Some(Ok(change_log_row))) = &poll_result {
51            match change_log_row {
52                ChangeLogRow::Insert(_) | ChangeLogRow::Delete(_) => {
53                    *this.row_count += 1;
54                }
55                ChangeLogRow::Update { .. } => {
56                    *this.row_count += 2;
57                }
58            }
59        }
60        poll_result
61    }
62}
63
64type ChangeLogRowVnodeStream<St> = Pin<Box<StreamWithVnode<St>>>;
65
66pub(super) struct VnodeStream<St: ChangeLogRowStream> {
67    streams: FuturesUnordered<StreamFuture<ChangeLogRowVnodeStream<St>>>,
68    pk_indices: Vec<usize>,
69    finished_vnode: HashMap<VirtualNode, usize>,
70    data_chunk_builder: DataChunkBuilder,
71    ops: Vec<Op>,
72}
73
74impl<St: ChangeLogRowStream> VnodeStream<St> {
75    pub(super) fn new(
76        vnode_streams: impl IntoIterator<Item = (VirtualNode, St, usize)>,
77        pk_indices: Vec<usize>,
78        data_chunk_builder: DataChunkBuilder,
79    ) -> Self {
80        assert!(data_chunk_builder.is_empty());
81        assert!(data_chunk_builder.batch_size() >= 2);
82        let streams = FuturesUnordered::from_iter(vnode_streams.into_iter().map(
83            |(vnode, stream, row_count)| {
84                let stream = stream.peekable();
85                Box::pin(StreamWithVnode {
86                    stream,
87                    vnode,
88                    row_count,
89                })
90                .into_future()
91            },
92        ));
93        let ops = Vec::with_capacity(data_chunk_builder.batch_size());
94        Self {
95            streams,
96            pk_indices,
97            finished_vnode: HashMap::new(),
98            data_chunk_builder,
99            ops,
100        }
101    }
102
103    pub(super) fn take_finished_vnodes(&mut self) -> HashMap<VirtualNode, usize> {
104        assert!(self.streams.is_empty());
105        assert!(self.data_chunk_builder.is_empty());
106        take(&mut self.finished_vnode)
107    }
108}
109
110impl<St: ChangeLogRowStream> VnodeStream<St> {
111    fn poll_next_row(
112        &mut self,
113        cx: &mut Context<'_>,
114    ) -> Poll<StreamExecutorResult<Option<ChangeLogRow>>> {
115        loop {
116            let ready_item = match ready!(self.streams.poll_next_unpin(cx)) {
117                None => Ok(None),
118                Some((None, stream)) => {
119                    self.finished_vnode
120                        .try_insert(stream.vnode, stream.row_count)
121                        .expect("non-duplicate");
122                    continue;
123                }
124                Some((Some(Ok(item)), stream)) => {
125                    // TODO: may avoid generating a `StreamFuture` for each row, because
126                    // `FuturesUnordered::push` involve memory allocation of `Arc`, and may
127                    // incur some unnecessary costs.
128                    self.streams.push(stream.into_future());
129                    Ok(Some(item))
130                }
131                Some((Some(Err(e)), _stream)) => Err(e),
132            };
133            break Poll::Ready(ready_item);
134        }
135    }
136
137    pub(super) fn consume_builder(&mut self) -> Option<StreamChunk> {
138        self.data_chunk_builder.consume_all().map(|chunk| {
139            let ops = replace(
140                &mut self.ops,
141                Vec::with_capacity(self.data_chunk_builder.batch_size()),
142            );
143            StreamChunk::from_parts(ops, chunk)
144        })
145    }
146
147    pub(super) fn update_rate_limiter(&mut self, new_rate_limit: RateLimit, chunk_size: usize) {
148        assert!(self.data_chunk_builder.is_empty());
149        self.data_chunk_builder = create_builder(
150            new_rate_limit,
151            chunk_size,
152            self.data_chunk_builder.data_types(),
153        );
154    }
155
156    pub(super) async fn for_vnode_pk_progress(
157        &mut self,
158        mut on_vnode_progress: impl FnMut(VirtualNode, usize, Option<OwnedRow>),
159    ) -> StreamExecutorResult<()> {
160        assert!(self.data_chunk_builder.is_empty());
161        for (vnode, row_count) in &self.finished_vnode {
162            on_vnode_progress(*vnode, *row_count, None);
163        }
164        let streams = take(&mut self.streams);
165        // When the `VnodeStream` is polled, the `FuturesUnordered` will generate a special cx to poll the futures.
166        // The pending futures will be stored in a separate linked list and will not be polled until the special cx is awakened
167        // and move the awakened future from the linked list to a ready queue.
168        //
169        // However, here if we use `FuturesUnordered::iter_mut` to in place access the pending futures and call `peek` directly,
170        // the cx specially generated in `FuturedUnordered::poll_next` will be overwritten, and then even the peeked future is ready,
171        // it will not be moved to the ready queue, and the `FuturesUnordered` will be stuck forever.
172        //
173        // Therefore, to avoid this, we will take all stream futures out and push them back again, so that all futures will be in the ready queue.
174        for vnode_stream_future in streams {
175            let mut vnode_stream = vnode_stream_future.into_inner().expect("should exist");
176            match vnode_stream.as_mut().project().stream.peek().await {
177                Some(Ok(change_log_row)) => {
178                    let row = match change_log_row {
179                        ChangeLogRow::Insert(row) | ChangeLogRow::Delete(row) => row,
180                        ChangeLogRow::Update {
181                            new_value,
182                            old_value,
183                        } => {
184                            if cfg!(debug_assertions) {
185                                assert_eq!(
186                                    old_value.project(&self.pk_indices),
187                                    new_value.project(&self.pk_indices)
188                                );
189                            }
190                            new_value
191                        }
192                    };
193                    let pk = row.project(&self.pk_indices).to_owned_row();
194                    on_vnode_progress(vnode_stream.vnode, vnode_stream.row_count, Some(pk));
195                    self.streams.push(vnode_stream.into_future());
196                }
197                Some(Err(_)) => {
198                    return Err(vnode_stream.try_next().await.expect_err("checked Err"));
199                }
200                None => {
201                    self.finished_vnode
202                        .try_insert(vnode_stream.vnode, vnode_stream.row_count)
203                        .expect("non-duplicate");
204                    on_vnode_progress(vnode_stream.vnode, vnode_stream.row_count, None);
205                }
206            }
207        }
208        Ok(())
209    }
210}
211
212impl<St: ChangeLogRowStream> Stream for VnodeStream<St> {
213    type Item = StreamExecutorResult<StreamChunk>;
214
215    // Here we implement the stream on our own instead of generating the stream with
216    // `try_stream` macro, because we want to access the state of the streams on the flight.
217    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
218        let this = self.get_mut();
219        let capacity = this.data_chunk_builder.batch_size();
220        loop {
221            match ready!(this.poll_next_row(cx)) {
222                Ok(Some(change_log_row)) => {
223                    let may_chunk = match change_log_row {
224                        ChangeLogRow::Insert(row) => {
225                            this.ops.push(Op::Insert);
226                            this.data_chunk_builder.append_one_row(row)
227                        }
228                        ChangeLogRow::Delete(row) => {
229                            this.ops.push(Op::Delete);
230                            this.data_chunk_builder.append_one_row(row)
231                        }
232                        ChangeLogRow::Update {
233                            new_value,
234                            old_value,
235                        } => {
236                            if this.data_chunk_builder.can_append_update() {
237                                this.ops.extend([Op::UpdateDelete, Op::UpdateInsert]);
238                                assert!(
239                                    this.data_chunk_builder.append_one_row(old_value).is_none()
240                                );
241                                this.data_chunk_builder.append_one_row(new_value)
242                            } else {
243                                let chunk = this
244                                    .data_chunk_builder
245                                    .consume_all()
246                                    .expect("should be Some when not can_append");
247                                let ops = replace(&mut this.ops, Vec::with_capacity(capacity));
248                                this.ops.extend([Op::UpdateDelete, Op::UpdateInsert]);
249                                assert!(
250                                    this.data_chunk_builder.append_one_row(old_value).is_none()
251                                );
252                                assert!(
253                                    this.data_chunk_builder.append_one_row(new_value).is_none()
254                                );
255                                break Poll::Ready(Some(Ok(StreamChunk::from_parts(ops, chunk))));
256                            }
257                        }
258                    };
259                    if let Some(chunk) = may_chunk {
260                        let ops = replace(&mut this.ops, Vec::with_capacity(capacity));
261                        break Poll::Ready(Some(Ok(StreamChunk::from_parts(ops, chunk))));
262                    }
263                }
264                Ok(None) => {
265                    break if let Some(chunk) = this.data_chunk_builder.consume_all() {
266                        let ops = take(&mut this.ops);
267                        Poll::Ready(Some(Ok(StreamChunk::from_parts(ops, chunk))))
268                    } else {
269                        Poll::Ready(None)
270                    };
271                }
272                Err(e) => {
273                    break Poll::Ready(Some(Err(e)));
274                }
275            }
276        }
277    }
278}
279
280#[cfg(test)]
281mod tests {
282    use std::collections::HashMap;
283    use std::future::poll_fn;
284    use std::sync::LazyLock;
285    use std::task::Poll;
286
287    use anyhow::anyhow;
288    use futures::{Future, FutureExt, pin_mut};
289    use risingwave_common::hash::VirtualNode;
290    use risingwave_common::row::OwnedRow;
291    use risingwave_common::types::{DataType, ScalarImpl};
292    use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
293    use risingwave_storage::table::ChangeLogRow;
294    use tokio::sync::mpsc::unbounded_channel;
295    use tokio_stream::StreamExt;
296    use tokio_stream::wrappers::UnboundedReceiverStream;
297
298    use crate::executor::backfill::snapshot_backfill::vnode_stream::VnodeStream;
299
300    static DATA_TYPES: LazyLock<Vec<DataType>> = LazyLock::new(|| vec![DataType::Int64]);
301
302    fn test_row(index: i64) -> OwnedRow {
303        OwnedRow::new(vec![Some(ScalarImpl::Int64(index))])
304    }
305
306    impl<St: super::ChangeLogRowStream> VnodeStream<St> {
307        async fn assert_progress(
308            &mut self,
309            progress: impl IntoIterator<Item = (VirtualNode, usize, Option<OwnedRow>)>,
310        ) {
311            let expected_progress_map: HashMap<_, _> = progress
312                .into_iter()
313                .map(|(vnode, row_count, progress)| (vnode, (row_count, progress)))
314                .collect();
315            let mut progress_map = HashMap::new();
316            self.for_vnode_pk_progress(|vnode, row_count, progress| {
317                progress_map
318                    .try_insert(vnode, (row_count, progress))
319                    .unwrap();
320            })
321            .await
322            .unwrap();
323            assert_eq!(expected_progress_map, progress_map);
324        }
325    }
326
327    #[tokio::test]
328    async fn test_basic() {
329        let [vnode1, vnode2] = [1, 2].map(VirtualNode::from_index);
330        let (tx1, rx1) = unbounded_channel();
331        let (tx2, rx2) = unbounded_channel();
332        let mut stream = VnodeStream::<UnboundedReceiverStream<_>>::new(
333            [
334                (vnode1, UnboundedReceiverStream::new(rx1), 10),
335                (vnode2, UnboundedReceiverStream::new(rx2), 0),
336            ]
337            .into_iter(),
338            vec![0],
339            DataChunkBuilder::new(DATA_TYPES.clone(), 3),
340        );
341        assert!(stream.next().now_or_never().is_none());
342        tx1.send(Ok(ChangeLogRow::Insert(test_row(0)))).unwrap();
343        assert!(stream.next().now_or_never().is_none());
344        tx2.send(Ok(ChangeLogRow::Insert(test_row(0)))).unwrap();
345        tx2.send(Ok(ChangeLogRow::Insert(test_row(1)))).unwrap();
346        assert_eq!(3, stream.next().await.unwrap().unwrap().cardinality());
347
348        let next_row = test_row(1);
349        {
350            let future =
351                stream.assert_progress([(vnode1, 11, Some(next_row.clone())), (vnode2, 2, None)]);
352            pin_mut!(future);
353            assert!((&mut future).now_or_never().is_none());
354            tx1.send(Ok(ChangeLogRow::Insert(next_row.clone())))
355                .unwrap();
356            assert!((&mut future).now_or_never().is_none());
357            drop(tx2);
358            future.await;
359        }
360        assert!(stream.next().now_or_never().is_none());
361        assert_eq!(1, stream.consume_builder().unwrap().cardinality());
362        {
363            let future = stream.assert_progress([(vnode1, 12, None), (vnode2, 2, None)]);
364            pin_mut!(future);
365            assert!((&mut future).now_or_never().is_none());
366            drop(tx1);
367            future.await;
368        }
369        assert!(stream.next().await.is_none());
370    }
371
372    #[tokio::test]
373    async fn test_update() {
374        let (tx, rx) = unbounded_channel();
375        let mut stream = VnodeStream::new(
376            [(VirtualNode::ZERO, UnboundedReceiverStream::new(rx), 0)].into_iter(),
377            vec![0],
378            DataChunkBuilder::new(DATA_TYPES.clone(), 3),
379        );
380        assert!(stream.next().now_or_never().is_none());
381        tx.send(Ok(ChangeLogRow::Insert(test_row(0)))).unwrap();
382        tx.send(Ok(ChangeLogRow::Insert(test_row(1)))).unwrap();
383        assert!(stream.next().now_or_never().is_none());
384        tx.send(Ok(ChangeLogRow::Update {
385            new_value: test_row(2),
386            old_value: test_row(3),
387        }))
388        .unwrap();
389        assert_eq!(2, stream.next().await.unwrap().unwrap().cardinality());
390        drop(tx);
391        assert_eq!(2, stream.next().await.unwrap().unwrap().cardinality());
392        assert!(stream.next().await.is_none());
393        stream.assert_progress([(VirtualNode::ZERO, 4, None)]).await;
394    }
395
396    #[tokio::test]
397    async fn test_empty() {
398        let mut stream = VnodeStream::<UnboundedReceiverStream<_>>::new(
399            [].into_iter(),
400            vec![0],
401            DataChunkBuilder::new(DATA_TYPES.clone(), 1024),
402        );
403        assert!(stream.next().await.is_none());
404    }
405
406    #[tokio::test]
407    async fn test_err() {
408        {
409            let (tx, rx) = unbounded_channel();
410            let mut stream = VnodeStream::new(
411                [(VirtualNode::ZERO, UnboundedReceiverStream::new(rx), 0)].into_iter(),
412                vec![0],
413                DataChunkBuilder::new(DATA_TYPES.clone(), 3),
414            );
415            assert!(stream.next().now_or_never().is_none());
416            tx.send(Err(anyhow!("err").into())).unwrap();
417            assert!(stream.next().await.unwrap().is_err());
418        }
419        {
420            let (tx, rx) = unbounded_channel();
421            let mut stream = VnodeStream::new(
422                [(VirtualNode::ZERO, UnboundedReceiverStream::new(rx), 0)].into_iter(),
423                vec![0],
424                DataChunkBuilder::new(DATA_TYPES.clone(), 3),
425            );
426            assert!(stream.next().now_or_never().is_none());
427            let future = stream.for_vnode_pk_progress(|_, _, _| unreachable!());
428            pin_mut!(future);
429            assert!((&mut future).now_or_never().is_none());
430            tx.send(Err(anyhow!("err").into())).unwrap();
431            assert!(future.await.is_err());
432        }
433    }
434
435    #[tokio::test]
436    async fn test_futures_unordered_peek() {
437        let (tx, rx) = unbounded_channel();
438        let mut stream = VnodeStream::new(
439            [(VirtualNode::ZERO, UnboundedReceiverStream::new(rx), 0)].into_iter(),
440            vec![0],
441            DataChunkBuilder::new(DATA_TYPES.clone(), 1024),
442        );
443        // poll the stream for once, and then the stream future inside it will be stored in the pending list.
444        assert!(stream.next().now_or_never().is_none());
445        let row = test_row(1);
446        {
447            let fut = stream.assert_progress([(VirtualNode::ZERO, 0, Some(row.clone()))]);
448            pin_mut!(fut);
449            assert!(
450                poll_fn(|cx| Poll::Ready(fut.as_mut().poll(cx)))
451                    .await
452                    .is_pending()
453            );
454            tx.send(Ok(ChangeLogRow::Insert(row.clone()))).unwrap();
455            drop(tx);
456            fut.await;
457        }
458        let chunk = stream.next().await.unwrap().unwrap();
459        assert_eq!(chunk.capacity(), 1);
460    }
461}