1use std::collections::HashMap;
16use std::mem::{replace, take};
17use std::pin::Pin;
18use std::task::{Context, Poll, ready};
19
20use futures::stream::{FuturesUnordered, Peekable, StreamFuture};
21use futures::{Stream, StreamExt, TryStreamExt};
22use pin_project::pin_project;
23use risingwave_common::array::{Op, StreamChunk};
24use risingwave_common::hash::VirtualNode;
25use risingwave_common::row::{OwnedRow, Row, RowExt};
26use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
27use risingwave_common_rate_limit::RateLimit;
28use risingwave_storage::table::ChangeLogRow;
29
30use crate::executor::StreamExecutorResult;
31use crate::executor::backfill::utils::create_builder;
32
33pub(super) trait ChangeLogRowStream =
34 Stream<Item = StreamExecutorResult<ChangeLogRow>> + Sized + 'static;
35
36#[pin_project]
37struct StreamWithVnode<St: ChangeLogRowStream> {
38 #[pin]
39 stream: Peekable<St>,
40 vnode: VirtualNode,
41 row_count: usize,
42}
43
44impl<St: ChangeLogRowStream> Stream for StreamWithVnode<St> {
45 type Item = St::Item;
46
47 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
48 let this = self.project();
49 let poll_result = this.stream.poll_next(cx);
50 if let Poll::Ready(Some(Ok(change_log_row))) = &poll_result {
51 match change_log_row {
52 ChangeLogRow::Insert(_) | ChangeLogRow::Delete(_) => {
53 *this.row_count += 1;
54 }
55 ChangeLogRow::Update { .. } => {
56 *this.row_count += 2;
57 }
58 }
59 }
60 poll_result
61 }
62}
63
64type ChangeLogRowVnodeStream<St> = Pin<Box<StreamWithVnode<St>>>;
65
66pub(super) struct VnodeStream<St: ChangeLogRowStream> {
67 streams: FuturesUnordered<StreamFuture<ChangeLogRowVnodeStream<St>>>,
68 pk_indices: Vec<usize>,
69 finished_vnode: HashMap<VirtualNode, usize>,
70 data_chunk_builder: DataChunkBuilder,
71 ops: Vec<Op>,
72}
73
74impl<St: ChangeLogRowStream> VnodeStream<St> {
75 pub(super) fn new(
76 vnode_streams: impl IntoIterator<Item = (VirtualNode, St, usize)>,
77 pk_indices: Vec<usize>,
78 data_chunk_builder: DataChunkBuilder,
79 ) -> Self {
80 assert!(data_chunk_builder.is_empty());
81 assert!(data_chunk_builder.batch_size() >= 2);
82 let streams = FuturesUnordered::from_iter(vnode_streams.into_iter().map(
83 |(vnode, stream, row_count)| {
84 let stream = stream.peekable();
85 Box::pin(StreamWithVnode {
86 stream,
87 vnode,
88 row_count,
89 })
90 .into_future()
91 },
92 ));
93 let ops = Vec::with_capacity(data_chunk_builder.batch_size());
94 Self {
95 streams,
96 pk_indices,
97 finished_vnode: HashMap::new(),
98 data_chunk_builder,
99 ops,
100 }
101 }
102
103 pub(super) fn take_finished_vnodes(&mut self) -> HashMap<VirtualNode, usize> {
104 assert!(self.streams.is_empty());
105 assert!(self.data_chunk_builder.is_empty());
106 take(&mut self.finished_vnode)
107 }
108}
109
110impl<St: ChangeLogRowStream> VnodeStream<St> {
111 fn poll_next_row(
112 &mut self,
113 cx: &mut Context<'_>,
114 ) -> Poll<StreamExecutorResult<Option<ChangeLogRow>>> {
115 loop {
116 let ready_item = match ready!(self.streams.poll_next_unpin(cx)) {
117 None => Ok(None),
118 Some((None, stream)) => {
119 self.finished_vnode
120 .try_insert(stream.vnode, stream.row_count)
121 .expect("non-duplicate");
122 continue;
123 }
124 Some((Some(Ok(item)), stream)) => {
125 self.streams.push(stream.into_future());
129 Ok(Some(item))
130 }
131 Some((Some(Err(e)), _stream)) => Err(e),
132 };
133 break Poll::Ready(ready_item);
134 }
135 }
136
137 pub(super) fn consume_builder(&mut self) -> Option<StreamChunk> {
138 self.data_chunk_builder.consume_all().map(|chunk| {
139 let ops = replace(
140 &mut self.ops,
141 Vec::with_capacity(self.data_chunk_builder.batch_size()),
142 );
143 StreamChunk::from_parts(ops, chunk)
144 })
145 }
146
147 pub(super) fn update_rate_limiter(&mut self, new_rate_limit: RateLimit, chunk_size: usize) {
148 assert!(self.data_chunk_builder.is_empty());
149 self.data_chunk_builder = create_builder(
150 new_rate_limit,
151 chunk_size,
152 self.data_chunk_builder.data_types(),
153 );
154 }
155
156 pub(super) async fn for_vnode_pk_progress(
157 &mut self,
158 mut on_vnode_progress: impl FnMut(VirtualNode, usize, Option<OwnedRow>),
159 ) -> StreamExecutorResult<()> {
160 assert!(self.data_chunk_builder.is_empty());
161 for (vnode, row_count) in &self.finished_vnode {
162 on_vnode_progress(*vnode, *row_count, None);
163 }
164 let streams = take(&mut self.streams);
165 for vnode_stream_future in streams {
175 let mut vnode_stream = vnode_stream_future.into_inner().expect("should exist");
176 match vnode_stream.as_mut().project().stream.peek().await {
177 Some(Ok(change_log_row)) => {
178 let row = match change_log_row {
179 ChangeLogRow::Insert(row) | ChangeLogRow::Delete(row) => row,
180 ChangeLogRow::Update {
181 new_value,
182 old_value,
183 } => {
184 if cfg!(debug_assertions) {
185 assert_eq!(
186 old_value.project(&self.pk_indices),
187 new_value.project(&self.pk_indices)
188 );
189 }
190 new_value
191 }
192 };
193 let pk = row.project(&self.pk_indices).to_owned_row();
194 on_vnode_progress(vnode_stream.vnode, vnode_stream.row_count, Some(pk));
195 self.streams.push(vnode_stream.into_future());
196 }
197 Some(Err(_)) => {
198 return Err(vnode_stream.try_next().await.expect_err("checked Err"));
199 }
200 None => {
201 self.finished_vnode
202 .try_insert(vnode_stream.vnode, vnode_stream.row_count)
203 .expect("non-duplicate");
204 on_vnode_progress(vnode_stream.vnode, vnode_stream.row_count, None);
205 }
206 }
207 }
208 Ok(())
209 }
210}
211
212impl<St: ChangeLogRowStream> Stream for VnodeStream<St> {
213 type Item = StreamExecutorResult<StreamChunk>;
214
215 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
218 let this = self.get_mut();
219 let capacity = this.data_chunk_builder.batch_size();
220 loop {
221 match ready!(this.poll_next_row(cx)) {
222 Ok(Some(change_log_row)) => {
223 let may_chunk = match change_log_row {
224 ChangeLogRow::Insert(row) => {
225 this.ops.push(Op::Insert);
226 this.data_chunk_builder.append_one_row(row)
227 }
228 ChangeLogRow::Delete(row) => {
229 this.ops.push(Op::Delete);
230 this.data_chunk_builder.append_one_row(row)
231 }
232 ChangeLogRow::Update {
233 new_value,
234 old_value,
235 } => {
236 if this.data_chunk_builder.can_append_update() {
237 this.ops.extend([Op::UpdateDelete, Op::UpdateInsert]);
238 assert!(
239 this.data_chunk_builder.append_one_row(old_value).is_none()
240 );
241 this.data_chunk_builder.append_one_row(new_value)
242 } else {
243 let chunk = this
244 .data_chunk_builder
245 .consume_all()
246 .expect("should be Some when not can_append");
247 let ops = replace(&mut this.ops, Vec::with_capacity(capacity));
248 this.ops.extend([Op::UpdateDelete, Op::UpdateInsert]);
249 assert!(
250 this.data_chunk_builder.append_one_row(old_value).is_none()
251 );
252 assert!(
253 this.data_chunk_builder.append_one_row(new_value).is_none()
254 );
255 break Poll::Ready(Some(Ok(StreamChunk::from_parts(ops, chunk))));
256 }
257 }
258 };
259 if let Some(chunk) = may_chunk {
260 let ops = replace(&mut this.ops, Vec::with_capacity(capacity));
261 break Poll::Ready(Some(Ok(StreamChunk::from_parts(ops, chunk))));
262 }
263 }
264 Ok(None) => {
265 break if let Some(chunk) = this.data_chunk_builder.consume_all() {
266 let ops = take(&mut this.ops);
267 Poll::Ready(Some(Ok(StreamChunk::from_parts(ops, chunk))))
268 } else {
269 Poll::Ready(None)
270 };
271 }
272 Err(e) => {
273 break Poll::Ready(Some(Err(e)));
274 }
275 }
276 }
277 }
278}
279
280#[cfg(test)]
281mod tests {
282 use std::collections::HashMap;
283 use std::future::poll_fn;
284 use std::sync::LazyLock;
285 use std::task::Poll;
286
287 use anyhow::anyhow;
288 use futures::{Future, FutureExt, pin_mut};
289 use risingwave_common::hash::VirtualNode;
290 use risingwave_common::row::OwnedRow;
291 use risingwave_common::types::{DataType, ScalarImpl};
292 use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
293 use risingwave_storage::table::ChangeLogRow;
294 use tokio::sync::mpsc::unbounded_channel;
295 use tokio_stream::StreamExt;
296 use tokio_stream::wrappers::UnboundedReceiverStream;
297
298 use crate::executor::backfill::snapshot_backfill::vnode_stream::VnodeStream;
299
300 static DATA_TYPES: LazyLock<Vec<DataType>> = LazyLock::new(|| vec![DataType::Int64]);
301
302 fn test_row(index: i64) -> OwnedRow {
303 OwnedRow::new(vec![Some(ScalarImpl::Int64(index))])
304 }
305
306 impl<St: super::ChangeLogRowStream> VnodeStream<St> {
307 async fn assert_progress(
308 &mut self,
309 progress: impl IntoIterator<Item = (VirtualNode, usize, Option<OwnedRow>)>,
310 ) {
311 let expected_progress_map: HashMap<_, _> = progress
312 .into_iter()
313 .map(|(vnode, row_count, progress)| (vnode, (row_count, progress)))
314 .collect();
315 let mut progress_map = HashMap::new();
316 self.for_vnode_pk_progress(|vnode, row_count, progress| {
317 progress_map
318 .try_insert(vnode, (row_count, progress))
319 .unwrap();
320 })
321 .await
322 .unwrap();
323 assert_eq!(expected_progress_map, progress_map);
324 }
325 }
326
327 #[tokio::test]
328 async fn test_basic() {
329 let [vnode1, vnode2] = [1, 2].map(VirtualNode::from_index);
330 let (tx1, rx1) = unbounded_channel();
331 let (tx2, rx2) = unbounded_channel();
332 let mut stream = VnodeStream::<UnboundedReceiverStream<_>>::new(
333 [
334 (vnode1, UnboundedReceiverStream::new(rx1), 10),
335 (vnode2, UnboundedReceiverStream::new(rx2), 0),
336 ]
337 .into_iter(),
338 vec![0],
339 DataChunkBuilder::new(DATA_TYPES.clone(), 3),
340 );
341 assert!(stream.next().now_or_never().is_none());
342 tx1.send(Ok(ChangeLogRow::Insert(test_row(0)))).unwrap();
343 assert!(stream.next().now_or_never().is_none());
344 tx2.send(Ok(ChangeLogRow::Insert(test_row(0)))).unwrap();
345 tx2.send(Ok(ChangeLogRow::Insert(test_row(1)))).unwrap();
346 assert_eq!(3, stream.next().await.unwrap().unwrap().cardinality());
347
348 let next_row = test_row(1);
349 {
350 let future =
351 stream.assert_progress([(vnode1, 11, Some(next_row.clone())), (vnode2, 2, None)]);
352 pin_mut!(future);
353 assert!((&mut future).now_or_never().is_none());
354 tx1.send(Ok(ChangeLogRow::Insert(next_row.clone())))
355 .unwrap();
356 assert!((&mut future).now_or_never().is_none());
357 drop(tx2);
358 future.await;
359 }
360 assert!(stream.next().now_or_never().is_none());
361 assert_eq!(1, stream.consume_builder().unwrap().cardinality());
362 {
363 let future = stream.assert_progress([(vnode1, 12, None), (vnode2, 2, None)]);
364 pin_mut!(future);
365 assert!((&mut future).now_or_never().is_none());
366 drop(tx1);
367 future.await;
368 }
369 assert!(stream.next().await.is_none());
370 }
371
372 #[tokio::test]
373 async fn test_update() {
374 let (tx, rx) = unbounded_channel();
375 let mut stream = VnodeStream::new(
376 [(VirtualNode::ZERO, UnboundedReceiverStream::new(rx), 0)].into_iter(),
377 vec![0],
378 DataChunkBuilder::new(DATA_TYPES.clone(), 3),
379 );
380 assert!(stream.next().now_or_never().is_none());
381 tx.send(Ok(ChangeLogRow::Insert(test_row(0)))).unwrap();
382 tx.send(Ok(ChangeLogRow::Insert(test_row(1)))).unwrap();
383 assert!(stream.next().now_or_never().is_none());
384 tx.send(Ok(ChangeLogRow::Update {
385 new_value: test_row(2),
386 old_value: test_row(3),
387 }))
388 .unwrap();
389 assert_eq!(2, stream.next().await.unwrap().unwrap().cardinality());
390 drop(tx);
391 assert_eq!(2, stream.next().await.unwrap().unwrap().cardinality());
392 assert!(stream.next().await.is_none());
393 stream.assert_progress([(VirtualNode::ZERO, 4, None)]).await;
394 }
395
396 #[tokio::test]
397 async fn test_empty() {
398 let mut stream = VnodeStream::<UnboundedReceiverStream<_>>::new(
399 [].into_iter(),
400 vec![0],
401 DataChunkBuilder::new(DATA_TYPES.clone(), 1024),
402 );
403 assert!(stream.next().await.is_none());
404 }
405
406 #[tokio::test]
407 async fn test_err() {
408 {
409 let (tx, rx) = unbounded_channel();
410 let mut stream = VnodeStream::new(
411 [(VirtualNode::ZERO, UnboundedReceiverStream::new(rx), 0)].into_iter(),
412 vec![0],
413 DataChunkBuilder::new(DATA_TYPES.clone(), 3),
414 );
415 assert!(stream.next().now_or_never().is_none());
416 tx.send(Err(anyhow!("err").into())).unwrap();
417 assert!(stream.next().await.unwrap().is_err());
418 }
419 {
420 let (tx, rx) = unbounded_channel();
421 let mut stream = VnodeStream::new(
422 [(VirtualNode::ZERO, UnboundedReceiverStream::new(rx), 0)].into_iter(),
423 vec![0],
424 DataChunkBuilder::new(DATA_TYPES.clone(), 3),
425 );
426 assert!(stream.next().now_or_never().is_none());
427 let future = stream.for_vnode_pk_progress(|_, _, _| unreachable!());
428 pin_mut!(future);
429 assert!((&mut future).now_or_never().is_none());
430 tx.send(Err(anyhow!("err").into())).unwrap();
431 assert!(future.await.is_err());
432 }
433 }
434
435 #[tokio::test]
436 async fn test_futures_unordered_peek() {
437 let (tx, rx) = unbounded_channel();
438 let mut stream = VnodeStream::new(
439 [(VirtualNode::ZERO, UnboundedReceiverStream::new(rx), 0)].into_iter(),
440 vec![0],
441 DataChunkBuilder::new(DATA_TYPES.clone(), 1024),
442 );
443 assert!(stream.next().now_or_never().is_none());
445 let row = test_row(1);
446 {
447 let fut = stream.assert_progress([(VirtualNode::ZERO, 0, Some(row.clone()))]);
448 pin_mut!(fut);
449 assert!(
450 poll_fn(|cx| Poll::Ready(fut.as_mut().poll(cx)))
451 .await
452 .is_pending()
453 );
454 tx.send(Ok(ChangeLogRow::Insert(row.clone()))).unwrap();
455 drop(tx);
456 fut.await;
457 }
458 let chunk = stream.next().await.unwrap().unwrap();
459 assert_eq!(chunk.capacity(), 1);
460 }
461}