1use std::cmp::Ordering;
16use std::collections::VecDeque;
17use std::fmt::Debug;
18use std::future::{Future, poll_fn};
19use std::pin::pin;
20use std::sync::Arc;
21use std::task::Poll;
22use std::time::Instant;
23
24use await_tree::InstrumentAwait;
25use either::Either;
26use futures::future::BoxFuture;
27use futures::{TryFuture, TryFutureExt};
28use risingwave_common::array::StreamChunk;
29use risingwave_common::bail;
30use risingwave_common::bitmap::Bitmap;
31use risingwave_common::metrics::{LabelGuardedIntCounter, LabelGuardedIntGauge};
32use risingwave_common::util::epoch::{EpochPair, INVALID_EPOCH};
33use risingwave_common_estimate_size::EstimateSize;
34use risingwave_common_rate_limit::{RateLimit, RateLimiter};
35use tokio::select;
36use tokio::sync::mpsc::UnboundedReceiver;
37
38pub type LogStoreResult<T> = Result<T, anyhow::Error>;
39pub type ChunkId = usize;
40
41#[derive(Debug, PartialEq, Copy, Clone)]
42pub enum TruncateOffset {
43 Chunk { epoch: u64, chunk_id: ChunkId },
44 Barrier { epoch: u64 },
45}
46
47impl PartialOrd for TruncateOffset {
48 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
49 let extract = |offset: &TruncateOffset| match offset {
50 TruncateOffset::Chunk { epoch, chunk_id } => (*epoch, *chunk_id),
51 TruncateOffset::Barrier { epoch } => (*epoch, usize::MAX),
52 };
53 let this = extract(self);
54 let other = extract(other);
55 this.partial_cmp(&other)
56 }
57}
58
59impl TruncateOffset {
60 pub fn next_chunk_id(&self) -> ChunkId {
61 match self {
62 TruncateOffset::Chunk { chunk_id, .. } => chunk_id + 1,
63 TruncateOffset::Barrier { .. } => 0,
64 }
65 }
66
67 pub fn epoch(&self) -> u64 {
68 match self {
69 TruncateOffset::Chunk { epoch, .. } | TruncateOffset::Barrier { epoch } => *epoch,
70 }
71 }
72
73 pub fn check_next_offset(&self, next_offset: TruncateOffset) -> LogStoreResult<()> {
74 if *self >= next_offset {
75 bail!(
76 "next offset {:?} should be later than current offset {:?}",
77 next_offset,
78 self
79 )
80 } else {
81 Ok(())
82 }
83 }
84
85 pub fn check_next_item_epoch(&self, epoch: u64) -> LogStoreResult<()> {
86 match self {
87 TruncateOffset::Chunk {
88 epoch: offset_epoch,
89 ..
90 } => {
91 if epoch != *offset_epoch {
92 bail!(
93 "new item epoch {} does not match current chunk offset epoch {}",
94 epoch,
95 offset_epoch
96 );
97 }
98 }
99 TruncateOffset::Barrier {
100 epoch: offset_epoch,
101 } => {
102 if epoch <= *offset_epoch {
103 bail!(
104 "new item epoch {} does not exceed barrier offset epoch {}",
105 epoch,
106 offset_epoch
107 );
108 }
109 }
110 }
111 Ok(())
112 }
113}
114
115#[derive(Debug)]
116pub enum LogStoreReadItem {
117 StreamChunk {
118 chunk: StreamChunk,
119 chunk_id: ChunkId,
120 },
121 Barrier {
122 is_checkpoint: bool,
123 new_vnode_bitmap: Option<Arc<Bitmap>>,
124 is_stop: bool,
125 },
126}
127
128pub trait LogWriterPostFlushCurrentEpochFn<'a> = FnOnce() -> BoxFuture<'a, LogStoreResult<()>>;
129
130#[must_use]
131pub struct LogWriterPostFlushCurrentEpoch<'a>(
132 Box<dyn LogWriterPostFlushCurrentEpochFn<'a> + Send + 'a>,
133);
134
135impl<'a> LogWriterPostFlushCurrentEpoch<'a> {
136 pub fn new(f: impl LogWriterPostFlushCurrentEpochFn<'a> + Send + 'a) -> Self {
137 Self(Box::new(f))
138 }
139
140 pub async fn post_yield_barrier(self) -> LogStoreResult<()> {
141 self.0().await
142 }
143}
144
145pub struct FlushCurrentEpochOptions {
146 pub is_checkpoint: bool,
147 pub new_vnode_bitmap: Option<Arc<Bitmap>>,
148 pub is_stop: bool,
149}
150
151pub trait LogWriter: Send {
152 fn init(
154 &mut self,
155 epoch: EpochPair,
156 pause_read_on_bootstrap: bool,
157 ) -> impl Future<Output = LogStoreResult<()>> + Send + '_;
158
159 fn write_chunk(
161 &mut self,
162 chunk: StreamChunk,
163 ) -> impl Future<Output = LogStoreResult<()>> + Send + '_;
164
165 fn flush_current_epoch(
167 &mut self,
168 next_epoch: u64,
169 options: FlushCurrentEpochOptions,
170 ) -> impl Future<Output = LogStoreResult<LogWriterPostFlushCurrentEpoch<'_>>> + Send + '_;
171
172 fn pause(&mut self) -> LogStoreResult<()>;
173
174 fn resume(&mut self) -> LogStoreResult<()>;
175}
176
177pub trait LogReader: Send + Sized + 'static {
178 fn init(&mut self) -> impl Future<Output = LogStoreResult<()>> + Send + '_;
180
181 fn start_from(
183 &mut self,
184 start_offset: Option<u64>,
185 ) -> impl Future<Output = LogStoreResult<()>> + Send + '_;
186
187 fn next_item(
191 &mut self,
192 ) -> impl Future<Output = LogStoreResult<(u64, LogStoreReadItem)>> + Send + '_;
193
194 fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()>;
197
198 fn rewind(&mut self) -> impl Future<Output = LogStoreResult<()>> + Send + '_;
202}
203
204pub trait LogStoreFactory: Send + 'static {
205 const ALLOW_REWIND: bool;
206 const REBUILD_SINK_ON_UPDATE_VNODE_BITMAP: bool;
207 type Reader: LogReader;
208 type Writer: LogWriter;
209
210 fn build(self) -> impl Future<Output = (Self::Reader, Self::Writer)> + Send;
211}
212
213pub struct TransformChunkLogReader<F: Fn(StreamChunk) -> StreamChunk, R: LogReader> {
214 f: F,
215 inner: R,
216}
217
218impl<F: Fn(StreamChunk) -> StreamChunk + Send + 'static, R: LogReader> LogReader
219 for TransformChunkLogReader<F, R>
220{
221 fn init(&mut self) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
222 self.inner.init()
223 }
224
225 async fn next_item(&mut self) -> LogStoreResult<(u64, LogStoreReadItem)> {
226 let (epoch, item) = self.inner.next_item().await?;
227 let item = match item {
228 LogStoreReadItem::StreamChunk { chunk, chunk_id } => LogStoreReadItem::StreamChunk {
229 chunk: (self.f)(chunk),
230 chunk_id,
231 },
232 other => other,
233 };
234 Ok((epoch, item))
235 }
236
237 fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()> {
238 self.inner.truncate(offset)
239 }
240
241 fn rewind(&mut self) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
242 self.inner.rewind()
243 }
244
245 fn start_from(
246 &mut self,
247 start_offset: Option<u64>,
248 ) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
249 self.inner.start_from(start_offset)
250 }
251}
252
253pub struct BackpressureMonitoredLogReader<R: LogReader> {
254 inner: R,
255 wait_new_future_start_time: Option<Instant>,
257 wait_new_future_duration_ns: LabelGuardedIntCounter,
258}
259
260impl<R: LogReader> BackpressureMonitoredLogReader<R> {
261 fn new(inner: R, wait_new_future_duration_ns: LabelGuardedIntCounter) -> Self {
262 Self {
263 inner,
264 wait_new_future_start_time: None,
265 wait_new_future_duration_ns,
266 }
267 }
268}
269
270impl<R: LogReader> LogReader for BackpressureMonitoredLogReader<R> {
271 fn init(&mut self) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
272 self.wait_new_future_start_time = None;
273 self.inner.init()
274 }
275
276 fn next_item(
277 &mut self,
278 ) -> impl Future<Output = LogStoreResult<(u64, LogStoreReadItem)>> + Send + '_ {
279 if let Some(start_time) = self.wait_new_future_start_time.take() {
280 self.wait_new_future_duration_ns
281 .inc_by(start_time.elapsed().as_nanos() as _);
282 }
283 self.inner.next_item().inspect_ok(|_| {
284 self.wait_new_future_start_time = Some(Instant::now());
286 })
287 }
288
289 fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()> {
290 self.inner.truncate(offset)
291 }
292
293 fn rewind(&mut self) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
294 self.inner.rewind().inspect_ok(|_| {
295 self.wait_new_future_start_time = None;
296 })
297 }
298
299 fn start_from(
300 &mut self,
301 start_offset: Option<u64>,
302 ) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
303 self.inner.start_from(start_offset)
304 }
305}
306
307pub struct MonitoredLogReader<R: LogReader> {
308 inner: R,
309 read_epoch: u64,
310 metrics: LogReaderMetrics,
311}
312
313pub struct LogReaderMetrics {
314 pub log_store_latest_read_epoch: LabelGuardedIntGauge,
315 pub log_store_read_rows: LabelGuardedIntCounter,
316 pub log_store_read_bytes: LabelGuardedIntCounter,
317 pub log_store_reader_wait_new_future_duration_ns: LabelGuardedIntCounter,
318}
319
320impl<R: LogReader> MonitoredLogReader<R> {
321 pub fn new(inner: R, metrics: LogReaderMetrics) -> Self {
322 Self {
323 inner,
324 read_epoch: INVALID_EPOCH,
325 metrics,
326 }
327 }
328}
329
330impl<R: LogReader> LogReader for MonitoredLogReader<R> {
331 async fn init(&mut self) -> LogStoreResult<()> {
332 self.inner.init().instrument_await("log_reader_init").await
333 }
334
335 async fn next_item(&mut self) -> LogStoreResult<(u64, LogStoreReadItem)> {
336 self.inner
337 .next_item()
338 .instrument_await("log_reader_next_item")
339 .await
340 .inspect(|(epoch, item)| {
341 if self.read_epoch != *epoch {
342 self.read_epoch = *epoch;
343 self.metrics.log_store_latest_read_epoch.set(*epoch as _);
344 }
345 if let LogStoreReadItem::StreamChunk { chunk, .. } = item {
346 self.metrics
347 .log_store_read_rows
348 .inc_by(chunk.cardinality() as _);
349 self.metrics
350 .log_store_read_bytes
351 .inc_by(chunk.estimated_size() as u64);
352 }
353 })
354 }
355
356 fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()> {
357 self.inner.truncate(offset)
358 }
359
360 fn rewind(&mut self) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
361 self.inner.rewind().instrument_await("log_reader_rewind")
362 }
363
364 fn start_from(
365 &mut self,
366 start_offset: Option<u64>,
367 ) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
368 self.inner.start_from(start_offset)
369 }
370}
371
372type UpstreamChunkOffset = TruncateOffset;
373type DownstreamChunkOffset = TruncateOffset;
374
375struct SplitChunk {
376 chunk: StreamChunk,
377 upstream_chunk_offset: UpstreamChunkOffset,
378 is_last: bool,
380}
381
382struct RateLimitedLogReaderCore<R: LogReader> {
383 inner: R,
384 consumed_offset_queue: VecDeque<(DownstreamChunkOffset, UpstreamChunkOffset)>,
386 unconsumed_chunk_queue: VecDeque<SplitChunk>,
388 next_chunk_id: usize,
389}
390
391impl<R: LogReader> RateLimitedLogReaderCore<R> {
392 async fn next_item(&mut self) -> LogStoreResult<Either<SplitChunk, (u64, LogStoreReadItem)>> {
394 match self.unconsumed_chunk_queue.pop_back() {
397 Some(split_chunk) => Ok(Either::Left(split_chunk)),
398 None => {
399 let (epoch, item) = self.inner.next_item().await?;
400 match item {
401 LogStoreReadItem::StreamChunk { chunk, chunk_id } => {
402 Ok(Either::Left(SplitChunk {
403 chunk,
404 upstream_chunk_offset: UpstreamChunkOffset::Chunk { epoch, chunk_id },
405 is_last: true,
406 }))
407 }
408 LogStoreReadItem::Barrier { .. } => {
409 self.consumed_offset_queue.push_front((
410 TruncateOffset::Barrier { epoch },
411 TruncateOffset::Barrier { epoch },
412 ));
413 self.next_chunk_id = 0;
414 Ok(Either::Right((epoch, item)))
415 }
416 }
417 }
418 }
419 }
420}
421
422pub struct RateLimitedLogReader<R: LogReader> {
423 core: RateLimitedLogReaderCore<R>,
424 rate_limiter: RateLimiter,
425 control_rx: UnboundedReceiver<RateLimit>,
426}
427
428impl<R: LogReader> RateLimitedLogReader<R> {
429 pub fn new(inner: R, control_rx: UnboundedReceiver<RateLimit>) -> Self {
430 Self {
431 core: RateLimitedLogReaderCore {
432 inner,
433 consumed_offset_queue: VecDeque::new(),
434 unconsumed_chunk_queue: VecDeque::new(),
435 next_chunk_id: 0,
436 },
437 rate_limiter: RateLimiter::new(RateLimit::Disabled),
438 control_rx,
439 }
440 }
441}
442
443impl<R: LogReader> RateLimitedLogReader<R> {
444 async fn apply_rate_limit(
445 &mut self,
446 split_chunk: SplitChunk,
447 ) -> LogStoreResult<(u64, LogStoreReadItem)> {
448 let split_chunk = match self.rate_limiter.rate_limit() {
449 RateLimit::Pause => unreachable!(
450 "apply_rate_limit is not supposed to be called while the stream is paused"
451 ),
452 RateLimit::Disabled => split_chunk,
453 RateLimit::Fixed(limit) => {
454 let limit = limit.get();
455 let required_permits = split_chunk.chunk.compute_rate_limit_chunk_permits();
456 if required_permits <= limit {
457 self.rate_limiter.wait(required_permits).await;
458 split_chunk
459 } else {
460 let mut chunks = split_chunk.chunk.split(limit as _).into_iter();
462 let mut is_last = split_chunk.is_last;
463 let upstream_chunk_offset = split_chunk.upstream_chunk_offset;
464
465 let first_chunk = chunks.next().unwrap();
467
468 for chunk in chunks.rev() {
470 self.core.unconsumed_chunk_queue.push_back(SplitChunk {
472 chunk,
473 upstream_chunk_offset,
474 is_last,
475 });
476 is_last = false;
477 }
478
479 self.rate_limiter
481 .wait(first_chunk.compute_rate_limit_chunk_permits())
482 .await;
483 SplitChunk {
484 chunk: first_chunk,
485 upstream_chunk_offset,
486 is_last,
487 }
488 }
489 }
490 };
491
492 let epoch = split_chunk.upstream_chunk_offset.epoch();
494 let downstream_chunk_id = self.core.next_chunk_id;
495 self.core.next_chunk_id += 1;
496 if split_chunk.is_last {
497 self.core.consumed_offset_queue.push_front((
498 TruncateOffset::Chunk {
499 epoch,
500 chunk_id: downstream_chunk_id,
501 },
502 split_chunk.upstream_chunk_offset,
503 ));
504 }
505
506 Ok((
507 epoch,
508 LogStoreReadItem::StreamChunk {
509 chunk: split_chunk.chunk,
510 chunk_id: downstream_chunk_id,
511 },
512 ))
513 }
514}
515
516impl<R: LogReader> LogReader for RateLimitedLogReader<R> {
517 async fn init(&mut self) -> LogStoreResult<()> {
518 self.core.inner.init().await
519 }
520
521 async fn next_item(&mut self) -> LogStoreResult<(u64, LogStoreReadItem)> {
522 let mut paused = false;
523 loop {
524 select! {
525 biased;
526 recv = pin!(self.control_rx.recv()) => {
527 let new_rate_limit = match recv {
528 Some(limit) => limit,
529 None => bail!("rate limit control channel closed"),
530 };
531 let old_rate_limit = self.rate_limiter.update(new_rate_limit);
532 paused = matches!(new_rate_limit, RateLimit::Pause);
533 tracing::info!("rate limit changed from {:?} to {:?}, paused = {paused}", old_rate_limit, new_rate_limit);
534 },
535 item = self.core.next_item(), if !paused => {
536 let item = item?;
537 match item {
538 Either::Left(split_chunk) => {
539 return self.apply_rate_limit(split_chunk).await;
540 },
541 Either::Right(item) => {
542 assert!(matches!(item.1, LogStoreReadItem::Barrier{..}));
543 return Ok(item);
544 },
545 }
546 }
547 }
548 }
549 }
550
551 fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()> {
552 let mut truncate_offset = None;
553 while let Some((downstream_offset, upstream_offset)) =
554 self.core.consumed_offset_queue.back()
555 {
556 if *downstream_offset <= offset {
557 truncate_offset = Some(*upstream_offset);
558 self.core.consumed_offset_queue.pop_back();
559 } else {
560 break;
561 }
562 }
563 tracing::trace!(
564 "rate limited log store reader truncate offset {:?}, downstream offset {:?}",
565 truncate_offset,
566 offset
567 );
568 if let Some(offset) = truncate_offset {
569 self.core.inner.truncate(offset)
570 } else {
571 Ok(())
572 }
573 }
574
575 fn rewind(&mut self) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
576 self.core.unconsumed_chunk_queue.clear();
577 self.core.consumed_offset_queue.clear();
578 self.core.next_chunk_id = 0;
579 self.core.inner.rewind()
580 }
581
582 fn start_from(
583 &mut self,
584 start_offset: Option<u64>,
585 ) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
586 self.core.inner.start_from(start_offset)
587 }
588}
589
590#[easy_ext::ext(LogReaderExt)]
591impl<T> T
592where
593 T: LogReader,
594{
595 pub fn transform_chunk<F: Fn(StreamChunk) -> StreamChunk + Sized>(
596 self,
597 f: F,
598 ) -> TransformChunkLogReader<F, Self> {
599 TransformChunkLogReader { f, inner: self }
600 }
601
602 pub fn monitored(self, metrics: LogReaderMetrics) -> impl LogReader {
603 let wait_new_future_duration = metrics.log_store_reader_wait_new_future_duration_ns.clone();
605 BackpressureMonitoredLogReader::new(
606 MonitoredLogReader::new(self, metrics),
607 wait_new_future_duration,
608 )
609 }
610
611 pub fn rate_limited(self, control_rx: UnboundedReceiver<RateLimit>) -> impl LogReader {
612 RateLimitedLogReader::new(self, control_rx)
613 }
614}
615
616pub struct MonitoredLogWriter<W: LogWriter> {
617 inner: W,
618 metrics: LogWriterMetrics,
619}
620
621pub struct LogWriterMetrics {
622 pub log_store_first_write_epoch: LabelGuardedIntGauge,
624 pub log_store_latest_write_epoch: LabelGuardedIntGauge,
625 pub log_store_write_rows: LabelGuardedIntCounter,
626}
627
628impl<W: LogWriter> LogWriter for MonitoredLogWriter<W> {
629 async fn init(
630 &mut self,
631 epoch: EpochPair,
632 pause_read_on_bootstrap: bool,
633 ) -> LogStoreResult<()> {
634 self.metrics
635 .log_store_first_write_epoch
636 .set(epoch.curr as _);
637 self.metrics
638 .log_store_latest_write_epoch
639 .set(epoch.curr as _);
640 self.inner.init(epoch, pause_read_on_bootstrap).await
641 }
642
643 async fn write_chunk(&mut self, chunk: StreamChunk) -> LogStoreResult<()> {
644 self.metrics
645 .log_store_write_rows
646 .inc_by(chunk.cardinality() as _);
647 self.inner.write_chunk(chunk).await
648 }
649
650 async fn flush_current_epoch(
651 &mut self,
652 next_epoch: u64,
653 options: FlushCurrentEpochOptions,
654 ) -> LogStoreResult<LogWriterPostFlushCurrentEpoch<'_>> {
655 let post_flush = self.inner.flush_current_epoch(next_epoch, options).await?;
656 self.metrics
657 .log_store_latest_write_epoch
658 .set(next_epoch as _);
659 Ok(post_flush)
660 }
661
662 fn pause(&mut self) -> LogStoreResult<()> {
663 self.inner.pause()
664 }
665
666 fn resume(&mut self) -> LogStoreResult<()> {
667 self.inner.resume()
668 }
669}
670
671#[easy_ext::ext(LogWriterExt)]
672impl<T> T
673where
674 T: LogWriter + Sized,
675{
676 pub fn monitored(self, metrics: LogWriterMetrics) -> MonitoredLogWriter<T> {
677 MonitoredLogWriter {
678 inner: self,
679 metrics,
680 }
681 }
682}
683
684enum DeliveryFutureManagerItem<F> {
685 Chunk {
686 chunk_id: ChunkId,
687 futures: VecDeque<F>,
689 },
690 Barrier,
691}
692
693pub struct DeliveryFutureManager<F> {
694 future_count: usize,
695 max_future_count: usize,
696 items: VecDeque<(u64, DeliveryFutureManagerItem<F>)>,
698}
699
700impl<F> DeliveryFutureManager<F> {
701 pub fn new(max_future_count: usize) -> Self {
702 Self {
703 future_count: 0,
704 max_future_count,
705 items: Default::default(),
706 }
707 }
708
709 pub fn add_barrier(&mut self, epoch: u64) {
710 if let Some((item_epoch, last_item)) = self.items.back() {
711 match last_item {
712 DeliveryFutureManagerItem::Chunk { .. } => {
713 assert_eq!(*item_epoch, epoch)
714 }
715 DeliveryFutureManagerItem::Barrier => {
716 assert!(
717 epoch > *item_epoch,
718 "new barrier epoch {} should be greater than prev barrier {}",
719 epoch,
720 item_epoch
721 );
722 }
723 }
724 }
725 self.items
726 .push_back((epoch, DeliveryFutureManagerItem::Barrier));
727 }
728
729 pub fn start_write_chunk(
730 &mut self,
731 epoch: u64,
732 chunk_id: ChunkId,
733 ) -> DeliveryFutureManagerAddFuture<'_, F> {
734 if let Some((item_epoch, item)) = self.items.back() {
735 match item {
736 DeliveryFutureManagerItem::Chunk {
737 chunk_id: item_chunk_id,
738 ..
739 } => {
740 assert_eq!(epoch, *item_epoch);
741 assert!(
742 chunk_id > *item_chunk_id,
743 "new chunk id {} should be greater than prev chunk id {}",
744 chunk_id,
745 item_chunk_id
746 );
747 }
748 DeliveryFutureManagerItem::Barrier => {
749 assert!(
750 epoch > *item_epoch,
751 "new chunk epoch {} should be greater than prev barrier: {}",
752 epoch,
753 item_epoch
754 );
755 }
756 }
757 }
758 self.items.push_back((
759 epoch,
760 DeliveryFutureManagerItem::Chunk {
761 chunk_id,
762 futures: VecDeque::new(),
763 },
764 ));
765 DeliveryFutureManagerAddFuture(self)
766 }
767}
768
769pub struct DeliveryFutureManagerAddFuture<'a, F>(&'a mut DeliveryFutureManager<F>);
770
771impl<F: TryFuture<Ok = ()> + Unpin + 'static> DeliveryFutureManagerAddFuture<'_, F> {
772 pub async fn add_future_may_await(&mut self, future: F) -> Result<bool, F::Error> {
775 let mut has_await = false;
776 while self.0.future_count >= self.0.max_future_count {
777 self.await_one_delivery().await?;
778 has_await = true;
779 }
780 match self.0.items.back_mut() {
781 Some((_, DeliveryFutureManagerItem::Chunk { futures, .. })) => {
782 futures.push_back(future);
783 self.0.future_count += 1;
784 Ok(has_await)
785 }
786 _ => unreachable!("should add future only after add a new chunk"),
787 }
788 }
789
790 pub async fn await_one_delivery(&mut self) -> Result<(), F::Error> {
791 for (_, item) in &mut self.0.items {
792 if let DeliveryFutureManagerItem::Chunk { futures, .. } = item
793 && let Some(mut delivery_future) = futures.pop_front()
794 {
795 self.0.future_count -= 1;
796 return poll_fn(|cx| delivery_future.try_poll_unpin(cx)).await;
797 } else {
798 continue;
799 }
800 }
801 Ok(())
802 }
803
804 pub fn future_count(&self) -> usize {
805 self.0.future_count
806 }
807
808 pub fn max_future_count(&self) -> usize {
809 self.0.max_future_count
810 }
811}
812
813impl<F: TryFuture<Ok = ()> + Unpin + 'static> DeliveryFutureManager<F> {
814 pub fn next_truncate_offset(
815 &mut self,
816 ) -> impl Future<Output = Result<TruncateOffset, F::Error>> + '_ {
817 poll_fn(move |cx| {
818 let mut latest_offset: Option<TruncateOffset> = None;
819 'outer: while let Some((epoch, item)) = self.items.front_mut() {
820 match item {
821 DeliveryFutureManagerItem::Chunk { chunk_id, futures } => {
822 while let Some(future) = futures.front_mut() {
823 match future.try_poll_unpin(cx) {
824 Poll::Ready(result) => match result {
825 Ok(()) => {
826 self.future_count -= 1;
827 futures.pop_front();
828 }
829 Err(e) => {
830 return Poll::Ready(Err(e));
831 }
832 },
833 Poll::Pending => {
834 break 'outer;
835 }
836 }
837 }
838
839 assert!(futures.is_empty());
842 latest_offset = Some(TruncateOffset::Chunk {
843 epoch: *epoch,
844 chunk_id: *chunk_id,
845 });
846 self.items.pop_front().expect("items not empty");
847 }
848 DeliveryFutureManagerItem::Barrier => {
849 latest_offset = Some(TruncateOffset::Barrier { epoch: *epoch });
850 self.items.pop_front().expect("items not empty");
851 break 'outer;
853 }
854 }
855 }
856 if let Some(offset) = latest_offset {
857 Poll::Ready(Ok(offset))
858 } else {
859 Poll::Pending
860 }
861 })
862 }
863}
864
865#[cfg(test)]
866mod tests {
867 use std::future::{Future, poll_fn};
868 use std::pin::pin;
869 use std::task::Poll;
870
871 use futures::{FutureExt, TryFuture};
872 use risingwave_common::util::epoch::test_epoch;
873 use tokio::sync::oneshot;
874 use tokio::sync::oneshot::Receiver;
875
876 use super::LogStoreResult;
877 use crate::sink::log_store::{DeliveryFutureManager, TruncateOffset};
878
879 #[test]
880 fn test_truncate_offset_cmp() {
881 assert!(
882 TruncateOffset::Barrier { epoch: 232 }
883 < TruncateOffset::Chunk {
884 epoch: 233,
885 chunk_id: 1
886 }
887 );
888 assert_eq!(
889 TruncateOffset::Chunk {
890 epoch: 1,
891 chunk_id: 1
892 },
893 TruncateOffset::Chunk {
894 epoch: 1,
895 chunk_id: 1
896 }
897 );
898 assert!(
899 TruncateOffset::Chunk {
900 epoch: 1,
901 chunk_id: 1
902 } < TruncateOffset::Chunk {
903 epoch: 1,
904 chunk_id: 2
905 }
906 );
907 assert!(
908 TruncateOffset::Barrier { epoch: 1 }
909 > TruncateOffset::Chunk {
910 epoch: 1,
911 chunk_id: 2
912 }
913 );
914 assert!(
915 TruncateOffset::Chunk {
916 epoch: 1,
917 chunk_id: 2
918 } < TruncateOffset::Barrier { epoch: 1 }
919 );
920 assert!(
921 TruncateOffset::Chunk {
922 epoch: 2,
923 chunk_id: 2
924 } > TruncateOffset::Barrier { epoch: 1 }
925 );
926 assert!(TruncateOffset::Barrier { epoch: 2 } > TruncateOffset::Barrier { epoch: 1 });
927 }
928
929 type TestFuture = impl TryFuture<Ok = (), Error = anyhow::Error> + Unpin + 'static;
930
931 #[define_opaque(TestFuture)]
932 fn to_test_future(rx: Receiver<LogStoreResult<()>>) -> TestFuture {
933 async move { rx.await.unwrap() }.boxed()
934 }
935
936 #[tokio::test]
937 async fn test_empty() {
938 let mut manager = DeliveryFutureManager::<TestFuture>::new(2);
939 let mut future = pin!(manager.next_truncate_offset());
940 assert!(
941 poll_fn(|cx| Poll::Ready(future.as_mut().poll(cx)))
942 .await
943 .is_pending()
944 );
945 }
946
947 #[tokio::test]
948 async fn test_future_delivery_manager_basic() {
949 let mut manager = DeliveryFutureManager::new(2);
950 let epoch1 = 233;
951 let chunk_id1 = 1;
952 let (tx1_1, rx1_1) = oneshot::channel();
953 let mut write_chunk = manager.start_write_chunk(epoch1, chunk_id1);
954 assert!(
955 !write_chunk
956 .add_future_may_await(to_test_future(rx1_1))
957 .await
958 .unwrap()
959 );
960 assert_eq!(manager.future_count, 1);
961 {
962 let mut next_truncate_offset = pin!(manager.next_truncate_offset());
963 assert!(
964 poll_fn(|cx| Poll::Ready(next_truncate_offset.as_mut().poll(cx)))
965 .await
966 .is_pending()
967 );
968 tx1_1.send(Ok(())).unwrap();
969 assert_eq!(
970 next_truncate_offset.await.unwrap(),
971 TruncateOffset::Chunk {
972 epoch: epoch1,
973 chunk_id: chunk_id1
974 }
975 );
976 }
977 assert_eq!(manager.future_count, 0);
978 manager.add_barrier(epoch1);
979 assert_eq!(
980 manager.next_truncate_offset().await.unwrap(),
981 TruncateOffset::Barrier { epoch: epoch1 }
982 );
983 }
984
985 #[tokio::test]
986 async fn test_future_delivery_manager_compress_chunk() {
987 let mut manager = DeliveryFutureManager::new(10);
988 let epoch1 = test_epoch(233);
989 let chunk_id1 = 1;
990 let chunk_id2 = chunk_id1 + 1;
991 let chunk_id3 = chunk_id2 + 1;
992 let (tx1_1, rx1_1) = oneshot::channel();
993 let (tx1_2, rx1_2) = oneshot::channel();
994 let (tx1_3, rx1_3) = oneshot::channel();
995 let epoch2 = test_epoch(234);
996 let (tx2_1, rx2_1) = oneshot::channel();
997 assert!(
998 !manager
999 .start_write_chunk(epoch1, chunk_id1)
1000 .add_future_may_await(to_test_future(rx1_1))
1001 .await
1002 .unwrap()
1003 );
1004 assert!(
1005 !manager
1006 .start_write_chunk(epoch1, chunk_id2)
1007 .add_future_may_await(to_test_future(rx1_2))
1008 .await
1009 .unwrap()
1010 );
1011 assert!(
1012 !manager
1013 .start_write_chunk(epoch1, chunk_id3)
1014 .add_future_may_await(to_test_future(rx1_3))
1015 .await
1016 .unwrap()
1017 );
1018 manager.add_barrier(epoch1);
1019 assert!(
1020 !manager
1021 .start_write_chunk(epoch2, chunk_id1)
1022 .add_future_may_await(to_test_future(rx2_1))
1023 .await
1024 .unwrap()
1025 );
1026 assert_eq!(manager.future_count, 4);
1027 {
1028 let mut next_truncate_offset = pin!(manager.next_truncate_offset());
1029 assert!(
1030 poll_fn(|cx| Poll::Ready(next_truncate_offset.as_mut().poll(cx)))
1031 .await
1032 .is_pending()
1033 );
1034 tx1_2.send(Ok(())).unwrap();
1035 assert!(
1036 poll_fn(|cx| Poll::Ready(next_truncate_offset.as_mut().poll(cx)))
1037 .await
1038 .is_pending()
1039 );
1040 tx1_1.send(Ok(())).unwrap();
1041 assert_eq!(
1043 next_truncate_offset.await.unwrap(),
1044 TruncateOffset::Chunk {
1045 epoch: epoch1,
1046 chunk_id: chunk_id2
1047 }
1048 );
1049 }
1050 assert_eq!(manager.future_count, 2);
1051 {
1052 let mut next_truncate_offset = pin!(manager.next_truncate_offset());
1053 assert!(
1054 poll_fn(|cx| Poll::Ready(next_truncate_offset.as_mut().poll(cx)))
1055 .await
1056 .is_pending()
1057 );
1058 tx1_3.send(Ok(())).unwrap();
1059 tx2_1.send(Ok(())).unwrap();
1060 assert_eq!(
1062 next_truncate_offset.await.unwrap(),
1063 TruncateOffset::Barrier { epoch: epoch1 }
1064 );
1065 }
1066 assert_eq!(manager.future_count, 1);
1067 assert_eq!(
1068 manager.next_truncate_offset().await.unwrap(),
1069 TruncateOffset::Chunk {
1070 epoch: epoch2,
1071 chunk_id: chunk_id1
1072 }
1073 );
1074 }
1075
1076 #[tokio::test]
1077 async fn test_future_delivery_manager_await_future() {
1078 let mut manager = DeliveryFutureManager::new(2);
1079 let epoch = 233;
1080 let chunk_id1 = 1;
1081 let chunk_id2 = chunk_id1 + 1;
1082 let (tx1_1, rx1_1) = oneshot::channel();
1083 let (tx1_2, rx1_2) = oneshot::channel();
1084 let (tx2_1, rx2_1) = oneshot::channel();
1085 let (tx2_2, rx2_2) = oneshot::channel();
1086
1087 {
1088 let mut write_chunk = manager.start_write_chunk(epoch, chunk_id1);
1089 assert!(
1090 !write_chunk
1091 .add_future_may_await(to_test_future(rx1_1))
1092 .await
1093 .unwrap()
1094 );
1095 assert!(
1096 !write_chunk
1097 .add_future_may_await(to_test_future(rx1_2))
1098 .await
1099 .unwrap()
1100 );
1101 assert_eq!(manager.future_count, 2);
1102 }
1103
1104 {
1105 let mut write_chunk = manager.start_write_chunk(epoch, chunk_id2);
1106 {
1107 let mut future1 = pin!(write_chunk.add_future_may_await(to_test_future(rx2_1)));
1108 assert!(
1109 poll_fn(|cx| Poll::Ready(future1.as_mut().poll(cx)))
1110 .await
1111 .is_pending()
1112 );
1113 tx1_1.send(Ok(())).unwrap();
1114 assert!(future1.await.unwrap());
1115 }
1116 assert_eq!(2, write_chunk.future_count());
1117 {
1118 let mut future2 = pin!(write_chunk.add_future_may_await(to_test_future(rx2_2)));
1119 assert!(
1120 poll_fn(|cx| Poll::Ready(future2.as_mut().poll(cx)))
1121 .await
1122 .is_pending()
1123 );
1124 tx1_2.send(Ok(())).unwrap();
1125 assert!(future2.await.unwrap());
1126 }
1127 assert_eq!(2, write_chunk.future_count());
1128 {
1129 let mut future3 = pin!(write_chunk.await_one_delivery());
1130 assert!(
1131 poll_fn(|cx| Poll::Ready(future3.as_mut().poll(cx)))
1132 .await
1133 .is_pending()
1134 );
1135 tx2_1.send(Ok(())).unwrap();
1136 future3.await.unwrap();
1137 }
1138 assert_eq!(1, write_chunk.future_count());
1139 }
1140
1141 assert_eq!(
1142 manager.next_truncate_offset().await.unwrap(),
1143 TruncateOffset::Chunk {
1144 epoch,
1145 chunk_id: chunk_id1
1146 }
1147 );
1148
1149 assert_eq!(1, manager.future_count);
1150
1151 {
1152 let mut future = pin!(manager.next_truncate_offset());
1153 assert!(
1154 poll_fn(|cx| Poll::Ready(future.as_mut().poll(cx)))
1155 .await
1156 .is_pending()
1157 );
1158 tx2_2.send(Ok(())).unwrap();
1159 assert_eq!(
1160 future.await.unwrap(),
1161 TruncateOffset::Chunk {
1162 epoch,
1163 chunk_id: chunk_id2
1164 }
1165 );
1166 }
1167
1168 assert_eq!(0, manager.future_count);
1169 }
1170}