1use std::cmp::Ordering;
16use std::collections::VecDeque;
17use std::fmt::Debug;
18use std::future::{Future, poll_fn};
19use std::pin::pin;
20use std::sync::Arc;
21use std::task::Poll;
22use std::time::Instant;
23
24use await_tree::InstrumentAwait;
25use either::Either;
26use futures::future::BoxFuture;
27use futures::{TryFuture, TryFutureExt};
28use risingwave_common::array::StreamChunk;
29use risingwave_common::bail;
30use risingwave_common::bitmap::Bitmap;
31use risingwave_common::catalog::Field;
32use risingwave_common::metrics::{LabelGuardedIntCounter, LabelGuardedIntGauge};
33use risingwave_common::util::epoch::{EpochPair, INVALID_EPOCH};
34use risingwave_common_estimate_size::EstimateSize;
35use risingwave_common_rate_limit::{RateLimit, RateLimiter};
36use tokio::select;
37use tokio::sync::mpsc::UnboundedReceiver;
38
39pub type LogStoreResult<T> = Result<T, anyhow::Error>;
40pub type ChunkId = usize;
41
42#[derive(Debug, PartialEq, Copy, Clone)]
43pub enum TruncateOffset {
44 Chunk { epoch: u64, chunk_id: ChunkId },
45 Barrier { epoch: u64 },
46}
47
48impl PartialOrd for TruncateOffset {
49 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
50 let extract = |offset: &TruncateOffset| match offset {
51 TruncateOffset::Chunk { epoch, chunk_id } => (*epoch, *chunk_id),
52 TruncateOffset::Barrier { epoch } => (*epoch, usize::MAX),
53 };
54 let this = extract(self);
55 let other = extract(other);
56 this.partial_cmp(&other)
57 }
58}
59
60impl TruncateOffset {
61 pub fn next_chunk_id(&self) -> ChunkId {
62 match self {
63 TruncateOffset::Chunk { chunk_id, .. } => chunk_id + 1,
64 TruncateOffset::Barrier { .. } => 0,
65 }
66 }
67
68 pub fn epoch(&self) -> u64 {
69 match self {
70 TruncateOffset::Chunk { epoch, .. } | TruncateOffset::Barrier { epoch } => *epoch,
71 }
72 }
73
74 pub fn check_next_offset(&self, next_offset: TruncateOffset) -> LogStoreResult<()> {
75 if *self >= next_offset {
76 bail!(
77 "next offset {:?} should be later than current offset {:?}",
78 next_offset,
79 self
80 )
81 } else {
82 Ok(())
83 }
84 }
85
86 pub fn check_next_item_epoch(&self, epoch: u64) -> LogStoreResult<()> {
87 match self {
88 TruncateOffset::Chunk {
89 epoch: offset_epoch,
90 ..
91 } => {
92 if epoch != *offset_epoch {
93 bail!(
94 "new item epoch {} does not match current chunk offset epoch {}",
95 epoch,
96 offset_epoch
97 );
98 }
99 }
100 TruncateOffset::Barrier {
101 epoch: offset_epoch,
102 } => {
103 if epoch <= *offset_epoch {
104 bail!(
105 "new item epoch {} does not exceed barrier offset epoch {}",
106 epoch,
107 offset_epoch
108 );
109 }
110 }
111 }
112 Ok(())
113 }
114}
115
116#[derive(Debug)]
117pub enum LogStoreReadItem {
118 StreamChunk {
119 chunk: StreamChunk,
120 chunk_id: ChunkId,
121 },
122 Barrier {
123 is_checkpoint: bool,
124 new_vnode_bitmap: Option<Arc<Bitmap>>,
125 is_stop: bool,
126 add_columns: Option<Vec<Field>>,
127 },
128}
129
130pub trait LogWriterPostFlushCurrentEpochFn<'a> = FnOnce() -> BoxFuture<'a, LogStoreResult<()>>;
131
132#[must_use]
133pub struct LogWriterPostFlushCurrentEpoch<'a>(
134 Box<dyn LogWriterPostFlushCurrentEpochFn<'a> + Send + 'a>,
135);
136
137impl<'a> LogWriterPostFlushCurrentEpoch<'a> {
138 pub fn new(f: impl LogWriterPostFlushCurrentEpochFn<'a> + Send + 'a) -> Self {
139 Self(Box::new(f))
140 }
141
142 pub async fn post_yield_barrier(self) -> LogStoreResult<()> {
143 self.0().await
144 }
145}
146
147pub struct FlushCurrentEpochOptions {
148 pub is_checkpoint: bool,
149 pub new_vnode_bitmap: Option<Arc<Bitmap>>,
150 pub is_stop: bool,
151 pub add_columns: Option<Vec<Field>>,
152}
153
154pub trait LogWriter: Send {
155 fn init(
157 &mut self,
158 epoch: EpochPair,
159 pause_read_on_bootstrap: bool,
160 ) -> impl Future<Output = LogStoreResult<()>> + Send + '_;
161
162 fn write_chunk(
164 &mut self,
165 chunk: StreamChunk,
166 ) -> impl Future<Output = LogStoreResult<()>> + Send + '_;
167
168 fn flush_current_epoch(
170 &mut self,
171 next_epoch: u64,
172 options: FlushCurrentEpochOptions,
173 ) -> impl Future<Output = LogStoreResult<LogWriterPostFlushCurrentEpoch<'_>>> + Send + '_;
174
175 fn pause(&mut self) -> LogStoreResult<()>;
176
177 fn resume(&mut self) -> LogStoreResult<()>;
178}
179
180pub trait LogReader: Send + Sized + 'static {
181 fn init(&mut self) -> impl Future<Output = LogStoreResult<()>> + Send + '_;
183
184 fn start_from(
186 &mut self,
187 start_offset: Option<u64>,
188 ) -> impl Future<Output = LogStoreResult<()>> + Send + '_;
189
190 fn next_item(
194 &mut self,
195 ) -> impl Future<Output = LogStoreResult<(u64, LogStoreReadItem)>> + Send + '_;
196
197 fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()>;
200
201 fn rewind(&mut self) -> impl Future<Output = LogStoreResult<()>> + Send + '_;
205}
206
207pub trait LogStoreFactory: Send + 'static {
208 const ALLOW_REWIND: bool;
209 const REBUILD_SINK_ON_UPDATE_VNODE_BITMAP: bool;
210 type Reader: LogReader;
211 type Writer: LogWriter;
212
213 fn build(self) -> impl Future<Output = (Self::Reader, Self::Writer)> + Send;
214}
215
216pub struct TransformChunkLogReader<F: Fn(StreamChunk) -> StreamChunk, R: LogReader> {
217 f: F,
218 inner: R,
219}
220
221impl<F: Fn(StreamChunk) -> StreamChunk + Send + 'static, R: LogReader> LogReader
222 for TransformChunkLogReader<F, R>
223{
224 fn init(&mut self) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
225 self.inner.init()
226 }
227
228 async fn next_item(&mut self) -> LogStoreResult<(u64, LogStoreReadItem)> {
229 let (epoch, item) = self.inner.next_item().await?;
230 let item = match item {
231 LogStoreReadItem::StreamChunk { chunk, chunk_id } => LogStoreReadItem::StreamChunk {
232 chunk: (self.f)(chunk),
233 chunk_id,
234 },
235 other => other,
236 };
237 Ok((epoch, item))
238 }
239
240 fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()> {
241 self.inner.truncate(offset)
242 }
243
244 fn rewind(&mut self) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
245 self.inner.rewind()
246 }
247
248 fn start_from(
249 &mut self,
250 start_offset: Option<u64>,
251 ) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
252 self.inner.start_from(start_offset)
253 }
254}
255
256pub struct BackpressureMonitoredLogReader<R: LogReader> {
257 inner: R,
258 wait_new_future_start_time: Option<Instant>,
260 wait_new_future_duration_ns: LabelGuardedIntCounter,
261}
262
263impl<R: LogReader> BackpressureMonitoredLogReader<R> {
264 fn new(inner: R, wait_new_future_duration_ns: LabelGuardedIntCounter) -> Self {
265 Self {
266 inner,
267 wait_new_future_start_time: None,
268 wait_new_future_duration_ns,
269 }
270 }
271}
272
273impl<R: LogReader> LogReader for BackpressureMonitoredLogReader<R> {
274 fn init(&mut self) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
275 self.wait_new_future_start_time = None;
276 self.inner.init()
277 }
278
279 fn next_item(
280 &mut self,
281 ) -> impl Future<Output = LogStoreResult<(u64, LogStoreReadItem)>> + Send + '_ {
282 if let Some(start_time) = self.wait_new_future_start_time.take() {
283 self.wait_new_future_duration_ns
284 .inc_by(start_time.elapsed().as_nanos() as _);
285 }
286 self.inner.next_item().inspect_ok(|_| {
287 self.wait_new_future_start_time = Some(Instant::now());
289 })
290 }
291
292 fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()> {
293 self.inner.truncate(offset)
294 }
295
296 fn rewind(&mut self) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
297 self.inner.rewind().inspect_ok(|_| {
298 self.wait_new_future_start_time = None;
299 })
300 }
301
302 fn start_from(
303 &mut self,
304 start_offset: Option<u64>,
305 ) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
306 self.inner.start_from(start_offset)
307 }
308}
309
310pub struct MonitoredLogReader<R: LogReader> {
311 inner: R,
312 read_epoch: u64,
313 metrics: LogReaderMetrics,
314}
315
316pub struct LogReaderMetrics {
317 pub log_store_latest_read_epoch: LabelGuardedIntGauge,
318 pub log_store_read_rows: LabelGuardedIntCounter,
319 pub log_store_read_bytes: LabelGuardedIntCounter,
320 pub log_store_reader_wait_new_future_duration_ns: LabelGuardedIntCounter,
321}
322
323impl<R: LogReader> MonitoredLogReader<R> {
324 pub fn new(inner: R, metrics: LogReaderMetrics) -> Self {
325 Self {
326 inner,
327 read_epoch: INVALID_EPOCH,
328 metrics,
329 }
330 }
331}
332
333impl<R: LogReader> LogReader for MonitoredLogReader<R> {
334 async fn init(&mut self) -> LogStoreResult<()> {
335 self.inner.init().instrument_await("log_reader_init").await
336 }
337
338 async fn next_item(&mut self) -> LogStoreResult<(u64, LogStoreReadItem)> {
339 self.inner
340 .next_item()
341 .instrument_await("log_reader_next_item")
342 .await
343 .inspect(|(epoch, item)| {
344 if self.read_epoch != *epoch {
345 self.read_epoch = *epoch;
346 self.metrics.log_store_latest_read_epoch.set(*epoch as _);
347 }
348 if let LogStoreReadItem::StreamChunk { chunk, .. } = item {
349 self.metrics
350 .log_store_read_rows
351 .inc_by(chunk.cardinality() as _);
352 self.metrics
353 .log_store_read_bytes
354 .inc_by(chunk.estimated_size() as u64);
355 }
356 })
357 }
358
359 fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()> {
360 self.inner.truncate(offset)
361 }
362
363 fn rewind(&mut self) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
364 self.inner.rewind().instrument_await("log_reader_rewind")
365 }
366
367 fn start_from(
368 &mut self,
369 start_offset: Option<u64>,
370 ) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
371 self.inner.start_from(start_offset)
372 }
373}
374
375type UpstreamChunkOffset = TruncateOffset;
376type DownstreamChunkOffset = TruncateOffset;
377
378struct SplitChunk {
379 chunk: StreamChunk,
380 upstream_chunk_offset: UpstreamChunkOffset,
381 is_last: bool,
383}
384
385struct RateLimitedLogReaderCore<R: LogReader> {
386 inner: R,
387 consumed_offset_queue: VecDeque<(DownstreamChunkOffset, UpstreamChunkOffset)>,
389 unconsumed_chunk_queue: VecDeque<SplitChunk>,
391 next_chunk_id: usize,
392}
393
394impl<R: LogReader> RateLimitedLogReaderCore<R> {
395 async fn next_item(&mut self) -> LogStoreResult<Either<SplitChunk, (u64, LogStoreReadItem)>> {
397 match self.unconsumed_chunk_queue.pop_back() {
400 Some(split_chunk) => Ok(Either::Left(split_chunk)),
401 None => {
402 let (epoch, item) = self.inner.next_item().await?;
403 match item {
404 LogStoreReadItem::StreamChunk { chunk, chunk_id } => {
405 Ok(Either::Left(SplitChunk {
406 chunk,
407 upstream_chunk_offset: UpstreamChunkOffset::Chunk { epoch, chunk_id },
408 is_last: true,
409 }))
410 }
411 LogStoreReadItem::Barrier { .. } => {
412 self.consumed_offset_queue.push_front((
413 TruncateOffset::Barrier { epoch },
414 TruncateOffset::Barrier { epoch },
415 ));
416 self.next_chunk_id = 0;
417 Ok(Either::Right((epoch, item)))
418 }
419 }
420 }
421 }
422 }
423}
424
425pub struct RateLimitedLogReader<R: LogReader> {
426 core: RateLimitedLogReaderCore<R>,
427 rate_limiter: RateLimiter,
428 control_rx: UnboundedReceiver<RateLimit>,
429}
430
431impl<R: LogReader> RateLimitedLogReader<R> {
432 pub fn new(inner: R, control_rx: UnboundedReceiver<RateLimit>) -> Self {
433 Self {
434 core: RateLimitedLogReaderCore {
435 inner,
436 consumed_offset_queue: VecDeque::new(),
437 unconsumed_chunk_queue: VecDeque::new(),
438 next_chunk_id: 0,
439 },
440 rate_limiter: RateLimiter::new(RateLimit::Disabled),
441 control_rx,
442 }
443 }
444}
445
446impl<R: LogReader> RateLimitedLogReader<R> {
447 async fn apply_rate_limit(
448 &mut self,
449 split_chunk: SplitChunk,
450 ) -> LogStoreResult<(u64, LogStoreReadItem)> {
451 let split_chunk = match self.rate_limiter.rate_limit() {
452 RateLimit::Pause => unreachable!(
453 "apply_rate_limit is not supposed to be called while the stream is paused"
454 ),
455 RateLimit::Disabled => split_chunk,
456 RateLimit::Fixed(limit) => {
457 let limit = limit.get();
458 let required_permits = split_chunk.chunk.compute_rate_limit_chunk_permits();
459 if required_permits <= limit {
460 self.rate_limiter.wait(required_permits).await;
461 split_chunk
462 } else {
463 let mut chunks = split_chunk.chunk.split(limit as _).into_iter();
465 let mut is_last = split_chunk.is_last;
466 let upstream_chunk_offset = split_chunk.upstream_chunk_offset;
467
468 let first_chunk = chunks.next().unwrap();
470
471 for chunk in chunks.rev() {
473 self.core.unconsumed_chunk_queue.push_back(SplitChunk {
475 chunk,
476 upstream_chunk_offset,
477 is_last,
478 });
479 is_last = false;
480 }
481
482 self.rate_limiter
484 .wait(first_chunk.compute_rate_limit_chunk_permits())
485 .await;
486 SplitChunk {
487 chunk: first_chunk,
488 upstream_chunk_offset,
489 is_last,
490 }
491 }
492 }
493 };
494
495 let epoch = split_chunk.upstream_chunk_offset.epoch();
497 let downstream_chunk_id = self.core.next_chunk_id;
498 self.core.next_chunk_id += 1;
499 if split_chunk.is_last {
500 self.core.consumed_offset_queue.push_front((
501 TruncateOffset::Chunk {
502 epoch,
503 chunk_id: downstream_chunk_id,
504 },
505 split_chunk.upstream_chunk_offset,
506 ));
507 }
508
509 Ok((
510 epoch,
511 LogStoreReadItem::StreamChunk {
512 chunk: split_chunk.chunk,
513 chunk_id: downstream_chunk_id,
514 },
515 ))
516 }
517}
518
519impl<R: LogReader> LogReader for RateLimitedLogReader<R> {
520 async fn init(&mut self) -> LogStoreResult<()> {
521 self.core.inner.init().await
522 }
523
524 async fn next_item(&mut self) -> LogStoreResult<(u64, LogStoreReadItem)> {
525 let mut paused = false;
526 loop {
527 select! {
528 biased;
529 recv = pin!(self.control_rx.recv()) => {
530 let new_rate_limit = match recv {
531 Some(limit) => limit,
532 None => bail!("rate limit control channel closed"),
533 };
534 let old_rate_limit = self.rate_limiter.update(new_rate_limit);
535 paused = matches!(new_rate_limit, RateLimit::Pause);
536 tracing::info!("rate limit changed from {:?} to {:?}, paused = {paused}", old_rate_limit, new_rate_limit);
537 },
538 item = self.core.next_item(), if !paused => {
539 let item = item?;
540 match item {
541 Either::Left(split_chunk) => {
542 return self.apply_rate_limit(split_chunk).await;
543 },
544 Either::Right(item) => {
545 assert!(matches!(item.1, LogStoreReadItem::Barrier{..}));
546 return Ok(item);
547 },
548 }
549 }
550 }
551 }
552 }
553
554 fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()> {
555 let mut truncate_offset = None;
556 while let Some((downstream_offset, upstream_offset)) =
557 self.core.consumed_offset_queue.back()
558 {
559 if *downstream_offset <= offset {
560 truncate_offset = Some(*upstream_offset);
561 self.core.consumed_offset_queue.pop_back();
562 } else {
563 break;
564 }
565 }
566 tracing::trace!(
567 "rate limited log store reader truncate offset {:?}, downstream offset {:?}",
568 truncate_offset,
569 offset
570 );
571 if let Some(offset) = truncate_offset {
572 self.core.inner.truncate(offset)
573 } else {
574 Ok(())
575 }
576 }
577
578 fn rewind(&mut self) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
579 self.core.unconsumed_chunk_queue.clear();
580 self.core.consumed_offset_queue.clear();
581 self.core.next_chunk_id = 0;
582 self.core.inner.rewind()
583 }
584
585 fn start_from(
586 &mut self,
587 start_offset: Option<u64>,
588 ) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
589 self.core.inner.start_from(start_offset)
590 }
591}
592
593#[easy_ext::ext(LogReaderExt)]
594impl<T> T
595where
596 T: LogReader,
597{
598 pub fn transform_chunk<F: Fn(StreamChunk) -> StreamChunk + Sized>(
599 self,
600 f: F,
601 ) -> TransformChunkLogReader<F, Self> {
602 TransformChunkLogReader { f, inner: self }
603 }
604
605 pub fn monitored(self, metrics: LogReaderMetrics) -> impl LogReader {
606 let wait_new_future_duration = metrics.log_store_reader_wait_new_future_duration_ns.clone();
608 BackpressureMonitoredLogReader::new(
609 MonitoredLogReader::new(self, metrics),
610 wait_new_future_duration,
611 )
612 }
613
614 pub fn rate_limited(self, control_rx: UnboundedReceiver<RateLimit>) -> impl LogReader {
615 RateLimitedLogReader::new(self, control_rx)
616 }
617}
618
619pub struct MonitoredLogWriter<W: LogWriter> {
620 inner: W,
621 metrics: LogWriterMetrics,
622}
623
624pub struct LogWriterMetrics {
625 pub log_store_first_write_epoch: LabelGuardedIntGauge,
627 pub log_store_latest_write_epoch: LabelGuardedIntGauge,
628 pub log_store_write_rows: LabelGuardedIntCounter,
629}
630
631impl<W: LogWriter> LogWriter for MonitoredLogWriter<W> {
632 async fn init(
633 &mut self,
634 epoch: EpochPair,
635 pause_read_on_bootstrap: bool,
636 ) -> LogStoreResult<()> {
637 self.metrics
638 .log_store_first_write_epoch
639 .set(epoch.curr as _);
640 self.metrics
641 .log_store_latest_write_epoch
642 .set(epoch.curr as _);
643 self.inner.init(epoch, pause_read_on_bootstrap).await
644 }
645
646 async fn write_chunk(&mut self, chunk: StreamChunk) -> LogStoreResult<()> {
647 self.metrics
648 .log_store_write_rows
649 .inc_by(chunk.cardinality() as _);
650 self.inner.write_chunk(chunk).await
651 }
652
653 async fn flush_current_epoch(
654 &mut self,
655 next_epoch: u64,
656 options: FlushCurrentEpochOptions,
657 ) -> LogStoreResult<LogWriterPostFlushCurrentEpoch<'_>> {
658 let post_flush = self.inner.flush_current_epoch(next_epoch, options).await?;
659 self.metrics
660 .log_store_latest_write_epoch
661 .set(next_epoch as _);
662 Ok(post_flush)
663 }
664
665 fn pause(&mut self) -> LogStoreResult<()> {
666 self.inner.pause()
667 }
668
669 fn resume(&mut self) -> LogStoreResult<()> {
670 self.inner.resume()
671 }
672}
673
674#[easy_ext::ext(LogWriterExt)]
675impl<T> T
676where
677 T: LogWriter + Sized,
678{
679 pub fn monitored(self, metrics: LogWriterMetrics) -> MonitoredLogWriter<T> {
680 MonitoredLogWriter {
681 inner: self,
682 metrics,
683 }
684 }
685}
686
687enum DeliveryFutureManagerItem<F> {
688 Chunk {
689 chunk_id: ChunkId,
690 futures: VecDeque<F>,
692 },
693 Barrier,
694}
695
696pub struct DeliveryFutureManager<F> {
697 future_count: usize,
698 max_future_count: usize,
699 items: VecDeque<(u64, DeliveryFutureManagerItem<F>)>,
701}
702
703impl<F> DeliveryFutureManager<F> {
704 pub fn new(max_future_count: usize) -> Self {
705 Self {
706 future_count: 0,
707 max_future_count,
708 items: Default::default(),
709 }
710 }
711
712 pub fn add_barrier(&mut self, epoch: u64) {
713 if let Some((item_epoch, last_item)) = self.items.back() {
714 match last_item {
715 DeliveryFutureManagerItem::Chunk { .. } => {
716 assert_eq!(*item_epoch, epoch)
717 }
718 DeliveryFutureManagerItem::Barrier => {
719 assert!(
720 epoch > *item_epoch,
721 "new barrier epoch {} should be greater than prev barrier {}",
722 epoch,
723 item_epoch
724 );
725 }
726 }
727 }
728 self.items
729 .push_back((epoch, DeliveryFutureManagerItem::Barrier));
730 }
731
732 pub fn start_write_chunk(
733 &mut self,
734 epoch: u64,
735 chunk_id: ChunkId,
736 ) -> DeliveryFutureManagerAddFuture<'_, F> {
737 if let Some((item_epoch, item)) = self.items.back() {
738 match item {
739 DeliveryFutureManagerItem::Chunk {
740 chunk_id: item_chunk_id,
741 ..
742 } => {
743 assert_eq!(epoch, *item_epoch);
744 assert!(
745 chunk_id > *item_chunk_id,
746 "new chunk id {} should be greater than prev chunk id {}",
747 chunk_id,
748 item_chunk_id
749 );
750 }
751 DeliveryFutureManagerItem::Barrier => {
752 assert!(
753 epoch > *item_epoch,
754 "new chunk epoch {} should be greater than prev barrier: {}",
755 epoch,
756 item_epoch
757 );
758 }
759 }
760 }
761 self.items.push_back((
762 epoch,
763 DeliveryFutureManagerItem::Chunk {
764 chunk_id,
765 futures: VecDeque::new(),
766 },
767 ));
768 DeliveryFutureManagerAddFuture(self)
769 }
770}
771
772pub struct DeliveryFutureManagerAddFuture<'a, F>(&'a mut DeliveryFutureManager<F>);
773
774impl<F: TryFuture<Ok = ()> + Unpin + 'static> DeliveryFutureManagerAddFuture<'_, F> {
775 pub async fn add_future_may_await(&mut self, future: F) -> Result<bool, F::Error> {
778 let mut has_await = false;
779 while self.0.future_count >= self.0.max_future_count {
780 self.await_one_delivery().await?;
781 has_await = true;
782 }
783 match self.0.items.back_mut() {
784 Some((_, DeliveryFutureManagerItem::Chunk { futures, .. })) => {
785 futures.push_back(future);
786 self.0.future_count += 1;
787 Ok(has_await)
788 }
789 _ => unreachable!("should add future only after add a new chunk"),
790 }
791 }
792
793 pub async fn await_one_delivery(&mut self) -> Result<(), F::Error> {
794 for (_, item) in &mut self.0.items {
795 if let DeliveryFutureManagerItem::Chunk { futures, .. } = item
796 && let Some(mut delivery_future) = futures.pop_front()
797 {
798 self.0.future_count -= 1;
799 return poll_fn(|cx| delivery_future.try_poll_unpin(cx)).await;
800 } else {
801 continue;
802 }
803 }
804 Ok(())
805 }
806
807 pub fn future_count(&self) -> usize {
808 self.0.future_count
809 }
810
811 pub fn max_future_count(&self) -> usize {
812 self.0.max_future_count
813 }
814}
815
816impl<F: TryFuture<Ok = ()> + Unpin + 'static> DeliveryFutureManager<F> {
817 pub fn next_truncate_offset(
818 &mut self,
819 ) -> impl Future<Output = Result<TruncateOffset, F::Error>> + '_ {
820 poll_fn(move |cx| {
821 let mut latest_offset: Option<TruncateOffset> = None;
822 'outer: while let Some((epoch, item)) = self.items.front_mut() {
823 match item {
824 DeliveryFutureManagerItem::Chunk { chunk_id, futures } => {
825 while let Some(future) = futures.front_mut() {
826 match future.try_poll_unpin(cx) {
827 Poll::Ready(result) => match result {
828 Ok(()) => {
829 self.future_count -= 1;
830 futures.pop_front();
831 }
832 Err(e) => {
833 return Poll::Ready(Err(e));
834 }
835 },
836 Poll::Pending => {
837 break 'outer;
838 }
839 }
840 }
841
842 assert!(futures.is_empty());
845 latest_offset = Some(TruncateOffset::Chunk {
846 epoch: *epoch,
847 chunk_id: *chunk_id,
848 });
849 self.items.pop_front().expect("items not empty");
850 }
851 DeliveryFutureManagerItem::Barrier => {
852 latest_offset = Some(TruncateOffset::Barrier { epoch: *epoch });
853 self.items.pop_front().expect("items not empty");
854 break 'outer;
856 }
857 }
858 }
859 if let Some(offset) = latest_offset {
860 Poll::Ready(Ok(offset))
861 } else {
862 Poll::Pending
863 }
864 })
865 }
866}
867
868#[cfg(test)]
869mod tests {
870 use std::future::{Future, poll_fn};
871 use std::pin::pin;
872 use std::task::Poll;
873
874 use futures::{FutureExt, TryFuture};
875 use risingwave_common::util::epoch::test_epoch;
876 use tokio::sync::oneshot;
877 use tokio::sync::oneshot::Receiver;
878
879 use super::LogStoreResult;
880 use crate::sink::log_store::{DeliveryFutureManager, TruncateOffset};
881
882 #[test]
883 fn test_truncate_offset_cmp() {
884 assert!(
885 TruncateOffset::Barrier { epoch: 232 }
886 < TruncateOffset::Chunk {
887 epoch: 233,
888 chunk_id: 1
889 }
890 );
891 assert_eq!(
892 TruncateOffset::Chunk {
893 epoch: 1,
894 chunk_id: 1
895 },
896 TruncateOffset::Chunk {
897 epoch: 1,
898 chunk_id: 1
899 }
900 );
901 assert!(
902 TruncateOffset::Chunk {
903 epoch: 1,
904 chunk_id: 1
905 } < TruncateOffset::Chunk {
906 epoch: 1,
907 chunk_id: 2
908 }
909 );
910 assert!(
911 TruncateOffset::Barrier { epoch: 1 }
912 > TruncateOffset::Chunk {
913 epoch: 1,
914 chunk_id: 2
915 }
916 );
917 assert!(
918 TruncateOffset::Chunk {
919 epoch: 1,
920 chunk_id: 2
921 } < TruncateOffset::Barrier { epoch: 1 }
922 );
923 assert!(
924 TruncateOffset::Chunk {
925 epoch: 2,
926 chunk_id: 2
927 } > TruncateOffset::Barrier { epoch: 1 }
928 );
929 assert!(TruncateOffset::Barrier { epoch: 2 } > TruncateOffset::Barrier { epoch: 1 });
930 }
931
932 type TestFuture = impl TryFuture<Ok = (), Error = anyhow::Error> + Unpin + 'static;
933
934 #[define_opaque(TestFuture)]
935 fn to_test_future(rx: Receiver<LogStoreResult<()>>) -> TestFuture {
936 async move { rx.await.unwrap() }.boxed()
937 }
938
939 #[tokio::test]
940 async fn test_empty() {
941 let mut manager = DeliveryFutureManager::<TestFuture>::new(2);
942 let mut future = pin!(manager.next_truncate_offset());
943 assert!(
944 poll_fn(|cx| Poll::Ready(future.as_mut().poll(cx)))
945 .await
946 .is_pending()
947 );
948 }
949
950 #[tokio::test]
951 async fn test_future_delivery_manager_basic() {
952 let mut manager = DeliveryFutureManager::new(2);
953 let epoch1 = 233;
954 let chunk_id1 = 1;
955 let (tx1_1, rx1_1) = oneshot::channel();
956 let mut write_chunk = manager.start_write_chunk(epoch1, chunk_id1);
957 assert!(
958 !write_chunk
959 .add_future_may_await(to_test_future(rx1_1))
960 .await
961 .unwrap()
962 );
963 assert_eq!(manager.future_count, 1);
964 {
965 let mut next_truncate_offset = pin!(manager.next_truncate_offset());
966 assert!(
967 poll_fn(|cx| Poll::Ready(next_truncate_offset.as_mut().poll(cx)))
968 .await
969 .is_pending()
970 );
971 tx1_1.send(Ok(())).unwrap();
972 assert_eq!(
973 next_truncate_offset.await.unwrap(),
974 TruncateOffset::Chunk {
975 epoch: epoch1,
976 chunk_id: chunk_id1
977 }
978 );
979 }
980 assert_eq!(manager.future_count, 0);
981 manager.add_barrier(epoch1);
982 assert_eq!(
983 manager.next_truncate_offset().await.unwrap(),
984 TruncateOffset::Barrier { epoch: epoch1 }
985 );
986 }
987
988 #[tokio::test]
989 async fn test_future_delivery_manager_compress_chunk() {
990 let mut manager = DeliveryFutureManager::new(10);
991 let epoch1 = test_epoch(233);
992 let chunk_id1 = 1;
993 let chunk_id2 = chunk_id1 + 1;
994 let chunk_id3 = chunk_id2 + 1;
995 let (tx1_1, rx1_1) = oneshot::channel();
996 let (tx1_2, rx1_2) = oneshot::channel();
997 let (tx1_3, rx1_3) = oneshot::channel();
998 let epoch2 = test_epoch(234);
999 let (tx2_1, rx2_1) = oneshot::channel();
1000 assert!(
1001 !manager
1002 .start_write_chunk(epoch1, chunk_id1)
1003 .add_future_may_await(to_test_future(rx1_1))
1004 .await
1005 .unwrap()
1006 );
1007 assert!(
1008 !manager
1009 .start_write_chunk(epoch1, chunk_id2)
1010 .add_future_may_await(to_test_future(rx1_2))
1011 .await
1012 .unwrap()
1013 );
1014 assert!(
1015 !manager
1016 .start_write_chunk(epoch1, chunk_id3)
1017 .add_future_may_await(to_test_future(rx1_3))
1018 .await
1019 .unwrap()
1020 );
1021 manager.add_barrier(epoch1);
1022 assert!(
1023 !manager
1024 .start_write_chunk(epoch2, chunk_id1)
1025 .add_future_may_await(to_test_future(rx2_1))
1026 .await
1027 .unwrap()
1028 );
1029 assert_eq!(manager.future_count, 4);
1030 {
1031 let mut next_truncate_offset = pin!(manager.next_truncate_offset());
1032 assert!(
1033 poll_fn(|cx| Poll::Ready(next_truncate_offset.as_mut().poll(cx)))
1034 .await
1035 .is_pending()
1036 );
1037 tx1_2.send(Ok(())).unwrap();
1038 assert!(
1039 poll_fn(|cx| Poll::Ready(next_truncate_offset.as_mut().poll(cx)))
1040 .await
1041 .is_pending()
1042 );
1043 tx1_1.send(Ok(())).unwrap();
1044 assert_eq!(
1046 next_truncate_offset.await.unwrap(),
1047 TruncateOffset::Chunk {
1048 epoch: epoch1,
1049 chunk_id: chunk_id2
1050 }
1051 );
1052 }
1053 assert_eq!(manager.future_count, 2);
1054 {
1055 let mut next_truncate_offset = pin!(manager.next_truncate_offset());
1056 assert!(
1057 poll_fn(|cx| Poll::Ready(next_truncate_offset.as_mut().poll(cx)))
1058 .await
1059 .is_pending()
1060 );
1061 tx1_3.send(Ok(())).unwrap();
1062 tx2_1.send(Ok(())).unwrap();
1063 assert_eq!(
1065 next_truncate_offset.await.unwrap(),
1066 TruncateOffset::Barrier { epoch: epoch1 }
1067 );
1068 }
1069 assert_eq!(manager.future_count, 1);
1070 assert_eq!(
1071 manager.next_truncate_offset().await.unwrap(),
1072 TruncateOffset::Chunk {
1073 epoch: epoch2,
1074 chunk_id: chunk_id1
1075 }
1076 );
1077 }
1078
1079 #[tokio::test]
1080 async fn test_future_delivery_manager_await_future() {
1081 let mut manager = DeliveryFutureManager::new(2);
1082 let epoch = 233;
1083 let chunk_id1 = 1;
1084 let chunk_id2 = chunk_id1 + 1;
1085 let (tx1_1, rx1_1) = oneshot::channel();
1086 let (tx1_2, rx1_2) = oneshot::channel();
1087 let (tx2_1, rx2_1) = oneshot::channel();
1088 let (tx2_2, rx2_2) = oneshot::channel();
1089
1090 {
1091 let mut write_chunk = manager.start_write_chunk(epoch, chunk_id1);
1092 assert!(
1093 !write_chunk
1094 .add_future_may_await(to_test_future(rx1_1))
1095 .await
1096 .unwrap()
1097 );
1098 assert!(
1099 !write_chunk
1100 .add_future_may_await(to_test_future(rx1_2))
1101 .await
1102 .unwrap()
1103 );
1104 assert_eq!(manager.future_count, 2);
1105 }
1106
1107 {
1108 let mut write_chunk = manager.start_write_chunk(epoch, chunk_id2);
1109 {
1110 let mut future1 = pin!(write_chunk.add_future_may_await(to_test_future(rx2_1)));
1111 assert!(
1112 poll_fn(|cx| Poll::Ready(future1.as_mut().poll(cx)))
1113 .await
1114 .is_pending()
1115 );
1116 tx1_1.send(Ok(())).unwrap();
1117 assert!(future1.await.unwrap());
1118 }
1119 assert_eq!(2, write_chunk.future_count());
1120 {
1121 let mut future2 = pin!(write_chunk.add_future_may_await(to_test_future(rx2_2)));
1122 assert!(
1123 poll_fn(|cx| Poll::Ready(future2.as_mut().poll(cx)))
1124 .await
1125 .is_pending()
1126 );
1127 tx1_2.send(Ok(())).unwrap();
1128 assert!(future2.await.unwrap());
1129 }
1130 assert_eq!(2, write_chunk.future_count());
1131 {
1132 let mut future3 = pin!(write_chunk.await_one_delivery());
1133 assert!(
1134 poll_fn(|cx| Poll::Ready(future3.as_mut().poll(cx)))
1135 .await
1136 .is_pending()
1137 );
1138 tx2_1.send(Ok(())).unwrap();
1139 future3.await.unwrap();
1140 }
1141 assert_eq!(1, write_chunk.future_count());
1142 }
1143
1144 assert_eq!(
1145 manager.next_truncate_offset().await.unwrap(),
1146 TruncateOffset::Chunk {
1147 epoch,
1148 chunk_id: chunk_id1
1149 }
1150 );
1151
1152 assert_eq!(1, manager.future_count);
1153
1154 {
1155 let mut future = pin!(manager.next_truncate_offset());
1156 assert!(
1157 poll_fn(|cx| Poll::Ready(future.as_mut().poll(cx)))
1158 .await
1159 .is_pending()
1160 );
1161 tx2_2.send(Ok(())).unwrap();
1162 assert_eq!(
1163 future.await.unwrap(),
1164 TruncateOffset::Chunk {
1165 epoch,
1166 chunk_id: chunk_id2
1167 }
1168 );
1169 }
1170
1171 assert_eq!(0, manager.future_count);
1172 }
1173}