1use std::cmp::Ordering;
16use std::collections::VecDeque;
17use std::fmt::Debug;
18use std::future::{Future, poll_fn};
19use std::pin::pin;
20use std::sync::Arc;
21use std::task::Poll;
22use std::time::Instant;
23
24use await_tree::InstrumentAwait;
25use either::Either;
26use futures::future::BoxFuture;
27use futures::{TryFuture, TryFutureExt};
28use risingwave_common::array::StreamChunk;
29use risingwave_common::bail;
30use risingwave_common::bitmap::Bitmap;
31use risingwave_common::metrics::{LabelGuardedIntCounter, LabelGuardedIntGauge};
32use risingwave_common::util::epoch::{EpochPair, INVALID_EPOCH};
33use risingwave_common_estimate_size::EstimateSize;
34use risingwave_common_rate_limit::{RateLimit, RateLimiter};
35use tokio::select;
36use tokio::sync::mpsc::UnboundedReceiver;
37
38pub type LogStoreResult<T> = Result<T, anyhow::Error>;
39pub type ChunkId = usize;
40
41#[derive(Debug, PartialEq, Copy, Clone)]
42pub enum TruncateOffset {
43 Chunk { epoch: u64, chunk_id: ChunkId },
44 Barrier { epoch: u64 },
45}
46
47impl PartialOrd for TruncateOffset {
48 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
49 let extract = |offset: &TruncateOffset| match offset {
50 TruncateOffset::Chunk { epoch, chunk_id } => (*epoch, *chunk_id),
51 TruncateOffset::Barrier { epoch } => (*epoch, usize::MAX),
52 };
53 let this = extract(self);
54 let other = extract(other);
55 this.partial_cmp(&other)
56 }
57}
58
59impl TruncateOffset {
60 pub fn next_chunk_id(&self) -> ChunkId {
61 match self {
62 TruncateOffset::Chunk { chunk_id, .. } => chunk_id + 1,
63 TruncateOffset::Barrier { .. } => 0,
64 }
65 }
66
67 pub fn epoch(&self) -> u64 {
68 match self {
69 TruncateOffset::Chunk { epoch, .. } | TruncateOffset::Barrier { epoch } => *epoch,
70 }
71 }
72
73 pub fn check_next_offset(&self, next_offset: TruncateOffset) -> LogStoreResult<()> {
74 if *self >= next_offset {
75 bail!(
76 "next offset {:?} should be later than current offset {:?}",
77 next_offset,
78 self
79 )
80 } else {
81 Ok(())
82 }
83 }
84
85 pub fn check_next_item_epoch(&self, epoch: u64) -> LogStoreResult<()> {
86 match self {
87 TruncateOffset::Chunk {
88 epoch: offset_epoch,
89 ..
90 } => {
91 if epoch != *offset_epoch {
92 bail!(
93 "new item epoch {} does not match current chunk offset epoch {}",
94 epoch,
95 offset_epoch
96 );
97 }
98 }
99 TruncateOffset::Barrier {
100 epoch: offset_epoch,
101 } => {
102 if epoch <= *offset_epoch {
103 bail!(
104 "new item epoch {} does not exceed barrier offset epoch {}",
105 epoch,
106 offset_epoch
107 );
108 }
109 }
110 }
111 Ok(())
112 }
113}
114
115#[derive(Debug)]
116pub enum LogStoreReadItem {
117 StreamChunk {
118 chunk: StreamChunk,
119 chunk_id: ChunkId,
120 },
121 Barrier {
122 is_checkpoint: bool,
123 new_vnode_bitmap: Option<Arc<Bitmap>>,
124 is_stop: bool,
125 },
126}
127
128pub trait LogWriterPostFlushCurrentEpochFn<'a> = FnOnce() -> BoxFuture<'a, LogStoreResult<()>>;
129
130#[must_use]
131pub struct LogWriterPostFlushCurrentEpoch<'a>(
132 Box<dyn LogWriterPostFlushCurrentEpochFn<'a> + Send + 'a>,
133);
134
135impl<'a> LogWriterPostFlushCurrentEpoch<'a> {
136 pub fn new(f: impl LogWriterPostFlushCurrentEpochFn<'a> + Send + 'a) -> Self {
137 Self(Box::new(f))
138 }
139
140 pub async fn post_yield_barrier(self) -> LogStoreResult<()> {
141 self.0().await
142 }
143}
144
145pub struct FlushCurrentEpochOptions {
146 pub is_checkpoint: bool,
147 pub new_vnode_bitmap: Option<Arc<Bitmap>>,
148 pub is_stop: bool,
149}
150
151pub trait LogWriter: Send {
152 fn init(
154 &mut self,
155 epoch: EpochPair,
156 pause_read_on_bootstrap: bool,
157 ) -> impl Future<Output = LogStoreResult<()>> + Send + '_;
158
159 fn write_chunk(
161 &mut self,
162 chunk: StreamChunk,
163 ) -> impl Future<Output = LogStoreResult<()>> + Send + '_;
164
165 fn flush_current_epoch(
167 &mut self,
168 next_epoch: u64,
169 options: FlushCurrentEpochOptions,
170 ) -> impl Future<Output = LogStoreResult<LogWriterPostFlushCurrentEpoch<'_>>> + Send + '_;
171
172 fn pause(&mut self) -> LogStoreResult<()>;
173
174 fn resume(&mut self) -> LogStoreResult<()>;
175}
176
177pub trait LogReader: Send + Sized + 'static {
178 fn init(&mut self) -> impl Future<Output = LogStoreResult<()>> + Send + '_;
180
181 fn start_from(
183 &mut self,
184 start_offset: Option<u64>,
185 ) -> impl Future<Output = LogStoreResult<()>> + Send + '_;
186
187 fn next_item(
191 &mut self,
192 ) -> impl Future<Output = LogStoreResult<(u64, LogStoreReadItem)>> + Send + '_;
193
194 fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()>;
197
198 fn rewind(&mut self) -> impl Future<Output = LogStoreResult<()>> + Send + '_;
202}
203
204pub trait LogStoreFactory: Send + 'static {
205 const ALLOW_REWIND: bool;
206 const REBUILD_SINK_ON_UPDATE_VNODE_BITMAP: bool;
207 type Reader: LogReader;
208 type Writer: LogWriter;
209
210 fn build(self) -> impl Future<Output = (Self::Reader, Self::Writer)> + Send;
211}
212
213pub struct TransformChunkLogReader<F: Fn(StreamChunk) -> StreamChunk, R: LogReader> {
214 f: F,
215 inner: R,
216}
217
218impl<F: Fn(StreamChunk) -> StreamChunk + Send + 'static, R: LogReader> LogReader
219 for TransformChunkLogReader<F, R>
220{
221 fn init(&mut self) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
222 self.inner.init()
223 }
224
225 async fn next_item(&mut self) -> LogStoreResult<(u64, LogStoreReadItem)> {
226 let (epoch, item) = self.inner.next_item().await?;
227 let item = match item {
228 LogStoreReadItem::StreamChunk { chunk, chunk_id } => LogStoreReadItem::StreamChunk {
229 chunk: (self.f)(chunk),
230 chunk_id,
231 },
232 other => other,
233 };
234 Ok((epoch, item))
235 }
236
237 fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()> {
238 self.inner.truncate(offset)
239 }
240
241 fn rewind(&mut self) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
242 self.inner.rewind()
243 }
244
245 fn start_from(
246 &mut self,
247 start_offset: Option<u64>,
248 ) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
249 self.inner.start_from(start_offset)
250 }
251}
252
253pub struct BackpressureMonitoredLogReader<R: LogReader> {
254 inner: R,
255 wait_new_future_start_time: Option<Instant>,
257 wait_new_future_duration_ns: LabelGuardedIntCounter<4>,
258}
259
260impl<R: LogReader> BackpressureMonitoredLogReader<R> {
261 fn new(inner: R, wait_new_future_duration_ns: LabelGuardedIntCounter<4>) -> Self {
262 Self {
263 inner,
264 wait_new_future_start_time: None,
265 wait_new_future_duration_ns,
266 }
267 }
268}
269
270impl<R: LogReader> LogReader for BackpressureMonitoredLogReader<R> {
271 fn init(&mut self) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
272 self.wait_new_future_start_time = None;
273 self.inner.init()
274 }
275
276 fn next_item(
277 &mut self,
278 ) -> impl Future<Output = LogStoreResult<(u64, LogStoreReadItem)>> + Send + '_ {
279 if let Some(start_time) = self.wait_new_future_start_time.take() {
280 self.wait_new_future_duration_ns
281 .inc_by(start_time.elapsed().as_nanos() as _);
282 }
283 self.inner.next_item().inspect_ok(|_| {
284 self.wait_new_future_start_time = Some(Instant::now());
286 })
287 }
288
289 fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()> {
290 self.inner.truncate(offset)
291 }
292
293 fn rewind(&mut self) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
294 self.inner.rewind().inspect_ok(|_| {
295 self.wait_new_future_start_time = None;
296 })
297 }
298
299 fn start_from(
300 &mut self,
301 start_offset: Option<u64>,
302 ) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
303 self.inner.start_from(start_offset)
304 }
305}
306
307pub struct MonitoredLogReader<R: LogReader> {
308 inner: R,
309 read_epoch: u64,
310 metrics: LogReaderMetrics,
311}
312
313pub struct LogReaderMetrics {
314 pub log_store_latest_read_epoch: LabelGuardedIntGauge<4>,
315 pub log_store_read_rows: LabelGuardedIntCounter<4>,
316 pub log_store_read_bytes: LabelGuardedIntCounter<4>,
317 pub log_store_reader_wait_new_future_duration_ns: LabelGuardedIntCounter<4>,
318}
319
320impl<R: LogReader> MonitoredLogReader<R> {
321 pub fn new(inner: R, metrics: LogReaderMetrics) -> Self {
322 Self {
323 inner,
324 read_epoch: INVALID_EPOCH,
325 metrics,
326 }
327 }
328}
329
330impl<R: LogReader> LogReader for MonitoredLogReader<R> {
331 async fn init(&mut self) -> LogStoreResult<()> {
332 self.inner.init().instrument_await("log_reader_init").await
333 }
334
335 async fn next_item(&mut self) -> LogStoreResult<(u64, LogStoreReadItem)> {
336 self.inner
337 .next_item()
338 .instrument_await("log_reader_next_item")
339 .await
340 .inspect(|(epoch, item)| {
341 if self.read_epoch != *epoch {
342 self.read_epoch = *epoch;
343 self.metrics.log_store_latest_read_epoch.set(*epoch as _);
344 }
345 if let LogStoreReadItem::StreamChunk { chunk, .. } = item {
346 self.metrics
347 .log_store_read_rows
348 .inc_by(chunk.cardinality() as _);
349 self.metrics
350 .log_store_read_bytes
351 .inc_by(chunk.estimated_size() as u64);
352 }
353 })
354 }
355
356 fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()> {
357 self.inner.truncate(offset)
358 }
359
360 fn rewind(&mut self) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
361 self.inner.rewind().instrument_await("log_reader_rewind")
362 }
363
364 fn start_from(
365 &mut self,
366 start_offset: Option<u64>,
367 ) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
368 self.inner.start_from(start_offset)
369 }
370}
371
372type UpstreamChunkOffset = TruncateOffset;
373type DownstreamChunkOffset = TruncateOffset;
374
375struct SplitChunk {
376 chunk: StreamChunk,
377 upstream_chunk_offset: UpstreamChunkOffset,
378 is_last: bool,
380}
381
382struct RateLimitedLogReaderCore<R: LogReader> {
383 inner: R,
384 consumed_offset_queue: VecDeque<(DownstreamChunkOffset, UpstreamChunkOffset)>,
386 unconsumed_chunk_queue: VecDeque<SplitChunk>,
388 next_chunk_id: usize,
389}
390
391impl<R: LogReader> RateLimitedLogReaderCore<R> {
392 async fn next_item(&mut self) -> LogStoreResult<Either<SplitChunk, (u64, LogStoreReadItem)>> {
394 match self.unconsumed_chunk_queue.pop_back() {
397 Some(split_chunk) => Ok(Either::Left(split_chunk)),
398 None => {
399 let (epoch, item) = self.inner.next_item().await?;
400 match item {
401 LogStoreReadItem::StreamChunk { chunk, chunk_id } => {
402 Ok(Either::Left(SplitChunk {
403 chunk,
404 upstream_chunk_offset: UpstreamChunkOffset::Chunk { epoch, chunk_id },
405 is_last: true,
406 }))
407 }
408 LogStoreReadItem::Barrier { .. } => {
409 self.consumed_offset_queue.push_front((
410 TruncateOffset::Barrier { epoch },
411 TruncateOffset::Barrier { epoch },
412 ));
413 self.next_chunk_id = 0;
414 Ok(Either::Right((epoch, item)))
415 }
416 }
417 }
418 }
419 }
420}
421
422pub struct RateLimitedLogReader<R: LogReader> {
423 core: RateLimitedLogReaderCore<R>,
424 rate_limiter: RateLimiter,
425 control_rx: UnboundedReceiver<RateLimit>,
426}
427
428impl<R: LogReader> RateLimitedLogReader<R> {
429 pub fn new(inner: R, control_rx: UnboundedReceiver<RateLimit>) -> Self {
430 Self {
431 core: RateLimitedLogReaderCore {
432 inner,
433 consumed_offset_queue: VecDeque::new(),
434 unconsumed_chunk_queue: VecDeque::new(),
435 next_chunk_id: 0,
436 },
437 rate_limiter: RateLimiter::new(RateLimit::Disabled),
438 control_rx,
439 }
440 }
441}
442
443impl<R: LogReader> RateLimitedLogReader<R> {
444 async fn apply_rate_limit(
445 &mut self,
446 split_chunk: SplitChunk,
447 ) -> LogStoreResult<(u64, LogStoreReadItem)> {
448 let split_chunk = match self.rate_limiter.rate_limit() {
449 RateLimit::Pause => unreachable!(
450 "apply_rate_limit is not supposed to be called while the stream is paused"
451 ),
452 RateLimit::Disabled => split_chunk,
453 RateLimit::Fixed(limit) => {
454 let limit = limit.get();
455 let required_permits = split_chunk.chunk.compute_rate_limit_chunk_permits();
456 if required_permits <= limit {
457 self.rate_limiter.wait(required_permits).await;
458 split_chunk
459 } else {
460 let mut chunks = split_chunk.chunk.split(limit as _).into_iter();
462 let mut is_last = split_chunk.is_last;
463 let upstream_chunk_offset = split_chunk.upstream_chunk_offset;
464
465 let first_chunk = chunks.next().unwrap();
467
468 for chunk in chunks.rev() {
470 self.core.unconsumed_chunk_queue.push_back(SplitChunk {
472 chunk,
473 upstream_chunk_offset,
474 is_last,
475 });
476 is_last = false;
477 }
478
479 self.rate_limiter
481 .wait(first_chunk.compute_rate_limit_chunk_permits())
482 .await;
483 SplitChunk {
484 chunk: first_chunk,
485 upstream_chunk_offset,
486 is_last,
487 }
488 }
489 }
490 };
491
492 let epoch = split_chunk.upstream_chunk_offset.epoch();
494 let downstream_chunk_id = self.core.next_chunk_id;
495 self.core.next_chunk_id += 1;
496 if split_chunk.is_last {
497 self.core.consumed_offset_queue.push_front((
498 TruncateOffset::Chunk {
499 epoch,
500 chunk_id: downstream_chunk_id,
501 },
502 split_chunk.upstream_chunk_offset,
503 ));
504 }
505
506 Ok((
507 epoch,
508 LogStoreReadItem::StreamChunk {
509 chunk: split_chunk.chunk,
510 chunk_id: downstream_chunk_id,
511 },
512 ))
513 }
514}
515
516impl<R: LogReader> LogReader for RateLimitedLogReader<R> {
517 async fn init(&mut self) -> LogStoreResult<()> {
518 self.core.inner.init().await
519 }
520
521 async fn next_item(&mut self) -> LogStoreResult<(u64, LogStoreReadItem)> {
522 let mut paused = false;
523 loop {
524 select! {
525 biased;
526 recv = pin!(self.control_rx.recv()) => {
527 let new_rate_limit = match recv {
528 Some(limit) => limit,
529 None => bail!("rate limit control channel closed"),
530 };
531 let old_rate_limit = self.rate_limiter.update(new_rate_limit);
532 paused = matches!(new_rate_limit, RateLimit::Pause);
533 tracing::info!("rate limit changed from {:?} to {:?}, paused = {paused}", old_rate_limit, new_rate_limit);
534 },
535 item = self.core.next_item(), if !paused => {
536 let item = item?;
537 match item {
538 Either::Left(split_chunk) => {
539 return self.apply_rate_limit(split_chunk).await;
540 },
541 Either::Right(item) => {
542 assert!(matches!(item.1, LogStoreReadItem::Barrier{..}));
543 return Ok(item);
544 },
545 }
546 }
547 }
548 }
549 }
550
551 fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()> {
552 let mut truncate_offset = None;
553 while let Some((downstream_offset, upstream_offset)) =
554 self.core.consumed_offset_queue.back()
555 {
556 if *downstream_offset <= offset {
557 truncate_offset = Some(*upstream_offset);
558 self.core.consumed_offset_queue.pop_back();
559 } else {
560 break;
561 }
562 }
563 tracing::trace!(
564 "rate limited log store reader truncate offset {:?}, downstream offset {:?}",
565 truncate_offset,
566 offset
567 );
568 if let Some(offset) = truncate_offset {
569 self.core.inner.truncate(offset)
570 } else {
571 Ok(())
572 }
573 }
574
575 fn rewind(&mut self) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
576 self.core.unconsumed_chunk_queue.clear();
577 self.core.consumed_offset_queue.clear();
578 self.core.next_chunk_id = 0;
579 self.core.inner.rewind()
580 }
581
582 fn start_from(
583 &mut self,
584 start_offset: Option<u64>,
585 ) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
586 self.core.inner.start_from(start_offset)
587 }
588}
589
590#[easy_ext::ext(LogReaderExt)]
591impl<T> T
592where
593 T: LogReader,
594{
595 pub fn transform_chunk<F: Fn(StreamChunk) -> StreamChunk + Sized>(
596 self,
597 f: F,
598 ) -> TransformChunkLogReader<F, Self> {
599 TransformChunkLogReader { f, inner: self }
600 }
601
602 pub fn monitored(self, metrics: LogReaderMetrics) -> impl LogReader {
603 let wait_new_future_duration = metrics.log_store_reader_wait_new_future_duration_ns.clone();
605 BackpressureMonitoredLogReader::new(
606 MonitoredLogReader::new(self, metrics),
607 wait_new_future_duration,
608 )
609 }
610
611 pub fn rate_limited(self, control_rx: UnboundedReceiver<RateLimit>) -> impl LogReader {
612 RateLimitedLogReader::new(self, control_rx)
613 }
614}
615
616pub struct MonitoredLogWriter<W: LogWriter> {
617 inner: W,
618 metrics: LogWriterMetrics,
619}
620
621pub struct LogWriterMetrics {
622 pub log_store_first_write_epoch: LabelGuardedIntGauge<3>,
624 pub log_store_latest_write_epoch: LabelGuardedIntGauge<3>,
625 pub log_store_write_rows: LabelGuardedIntCounter<3>,
626}
627
628impl<W: LogWriter> LogWriter for MonitoredLogWriter<W> {
629 async fn init(
630 &mut self,
631 epoch: EpochPair,
632 pause_read_on_bootstrap: bool,
633 ) -> LogStoreResult<()> {
634 self.metrics
635 .log_store_first_write_epoch
636 .set(epoch.curr as _);
637 self.metrics
638 .log_store_latest_write_epoch
639 .set(epoch.curr as _);
640 self.inner.init(epoch, pause_read_on_bootstrap).await
641 }
642
643 async fn write_chunk(&mut self, chunk: StreamChunk) -> LogStoreResult<()> {
644 self.metrics
645 .log_store_write_rows
646 .inc_by(chunk.cardinality() as _);
647 self.inner.write_chunk(chunk).await
648 }
649
650 async fn flush_current_epoch(
651 &mut self,
652 next_epoch: u64,
653 options: FlushCurrentEpochOptions,
654 ) -> LogStoreResult<LogWriterPostFlushCurrentEpoch<'_>> {
655 let post_flush = self.inner.flush_current_epoch(next_epoch, options).await?;
656 self.metrics
657 .log_store_latest_write_epoch
658 .set(next_epoch as _);
659 Ok(post_flush)
660 }
661
662 fn pause(&mut self) -> LogStoreResult<()> {
663 self.inner.pause()
664 }
665
666 fn resume(&mut self) -> LogStoreResult<()> {
667 self.inner.resume()
668 }
669}
670
671#[easy_ext::ext(LogWriterExt)]
672impl<T> T
673where
674 T: LogWriter + Sized,
675{
676 pub fn monitored(self, metrics: LogWriterMetrics) -> MonitoredLogWriter<T> {
677 MonitoredLogWriter {
678 inner: self,
679 metrics,
680 }
681 }
682}
683
684enum DeliveryFutureManagerItem<F> {
685 Chunk {
686 chunk_id: ChunkId,
687 futures: VecDeque<F>,
689 },
690 Barrier,
691}
692
693pub struct DeliveryFutureManager<F> {
694 future_count: usize,
695 max_future_count: usize,
696 items: VecDeque<(u64, DeliveryFutureManagerItem<F>)>,
698}
699
700impl<F> DeliveryFutureManager<F> {
701 pub fn new(max_future_count: usize) -> Self {
702 Self {
703 future_count: 0,
704 max_future_count,
705 items: Default::default(),
706 }
707 }
708
709 pub fn add_barrier(&mut self, epoch: u64) {
710 if let Some((item_epoch, last_item)) = self.items.back() {
711 match last_item {
712 DeliveryFutureManagerItem::Chunk { .. } => {
713 assert_eq!(*item_epoch, epoch)
714 }
715 DeliveryFutureManagerItem::Barrier => {
716 assert!(
717 epoch > *item_epoch,
718 "new barrier epoch {} should be greater than prev barrier {}",
719 epoch,
720 item_epoch
721 );
722 }
723 }
724 }
725 self.items
726 .push_back((epoch, DeliveryFutureManagerItem::Barrier));
727 }
728
729 pub fn start_write_chunk(
730 &mut self,
731 epoch: u64,
732 chunk_id: ChunkId,
733 ) -> DeliveryFutureManagerAddFuture<'_, F> {
734 if let Some((item_epoch, item)) = self.items.back() {
735 match item {
736 DeliveryFutureManagerItem::Chunk {
737 chunk_id: item_chunk_id,
738 ..
739 } => {
740 assert_eq!(epoch, *item_epoch);
741 assert!(
742 chunk_id > *item_chunk_id,
743 "new chunk id {} should be greater than prev chunk id {}",
744 chunk_id,
745 item_chunk_id
746 );
747 }
748 DeliveryFutureManagerItem::Barrier => {
749 assert!(
750 epoch > *item_epoch,
751 "new chunk epoch {} should be greater than prev barrier: {}",
752 epoch,
753 item_epoch
754 );
755 }
756 }
757 }
758 self.items.push_back((
759 epoch,
760 DeliveryFutureManagerItem::Chunk {
761 chunk_id,
762 futures: VecDeque::new(),
763 },
764 ));
765 DeliveryFutureManagerAddFuture(self)
766 }
767}
768
769pub struct DeliveryFutureManagerAddFuture<'a, F>(&'a mut DeliveryFutureManager<F>);
770
771impl<F: TryFuture<Ok = ()> + Unpin + 'static> DeliveryFutureManagerAddFuture<'_, F> {
772 pub async fn add_future_may_await(&mut self, future: F) -> Result<bool, F::Error> {
775 let mut has_await = false;
776 while self.0.future_count >= self.0.max_future_count {
777 self.await_one_delivery().await?;
778 has_await = true;
779 }
780 match self.0.items.back_mut() {
781 Some((_, DeliveryFutureManagerItem::Chunk { futures, .. })) => {
782 futures.push_back(future);
783 self.0.future_count += 1;
784 Ok(has_await)
785 }
786 _ => unreachable!("should add future only after add a new chunk"),
787 }
788 }
789
790 pub async fn await_one_delivery(&mut self) -> Result<(), F::Error> {
791 for (_, item) in &mut self.0.items {
792 if let DeliveryFutureManagerItem::Chunk { futures, .. } = item
793 && let Some(mut delivery_future) = futures.pop_front()
794 {
795 self.0.future_count -= 1;
796 return poll_fn(|cx| delivery_future.try_poll_unpin(cx)).await;
797 } else {
798 continue;
799 }
800 }
801 Ok(())
802 }
803
804 pub fn future_count(&self) -> usize {
805 self.0.future_count
806 }
807
808 pub fn max_future_count(&self) -> usize {
809 self.0.max_future_count
810 }
811}
812
813impl<F: TryFuture<Ok = ()> + Unpin + 'static> DeliveryFutureManager<F> {
814 pub fn next_truncate_offset(
815 &mut self,
816 ) -> impl Future<Output = Result<TruncateOffset, F::Error>> + '_ {
817 poll_fn(move |cx| {
818 let mut latest_offset: Option<TruncateOffset> = None;
819 'outer: while let Some((epoch, item)) = self.items.front_mut() {
820 match item {
821 DeliveryFutureManagerItem::Chunk { chunk_id, futures } => {
822 while let Some(future) = futures.front_mut() {
823 match future.try_poll_unpin(cx) {
824 Poll::Ready(result) => match result {
825 Ok(()) => {
826 self.future_count -= 1;
827 futures.pop_front();
828 }
829 Err(e) => {
830 return Poll::Ready(Err(e));
831 }
832 },
833 Poll::Pending => {
834 break 'outer;
835 }
836 }
837 }
838
839 assert!(futures.is_empty());
842 latest_offset = Some(TruncateOffset::Chunk {
843 epoch: *epoch,
844 chunk_id: *chunk_id,
845 });
846 self.items.pop_front().expect("items not empty");
847 }
848 DeliveryFutureManagerItem::Barrier => {
849 latest_offset = Some(TruncateOffset::Barrier { epoch: *epoch });
850 self.items.pop_front().expect("items not empty");
851 break 'outer;
853 }
854 }
855 }
856 if let Some(offset) = latest_offset {
857 Poll::Ready(Ok(offset))
858 } else {
859 Poll::Pending
860 }
861 })
862 }
863}
864
865#[cfg(test)]
866mod tests {
867 use std::future::{Future, poll_fn};
868 use std::pin::pin;
869 use std::task::Poll;
870
871 use futures::{FutureExt, TryFuture};
872 use risingwave_common::util::epoch::test_epoch;
873 use tokio::sync::oneshot;
874 use tokio::sync::oneshot::Receiver;
875
876 use super::LogStoreResult;
877 use crate::sink::log_store::{DeliveryFutureManager, TruncateOffset};
878
879 #[test]
880 fn test_truncate_offset_cmp() {
881 assert!(
882 TruncateOffset::Barrier { epoch: 232 }
883 < TruncateOffset::Chunk {
884 epoch: 233,
885 chunk_id: 1
886 }
887 );
888 assert_eq!(
889 TruncateOffset::Chunk {
890 epoch: 1,
891 chunk_id: 1
892 },
893 TruncateOffset::Chunk {
894 epoch: 1,
895 chunk_id: 1
896 }
897 );
898 assert!(
899 TruncateOffset::Chunk {
900 epoch: 1,
901 chunk_id: 1
902 } < TruncateOffset::Chunk {
903 epoch: 1,
904 chunk_id: 2
905 }
906 );
907 assert!(
908 TruncateOffset::Barrier { epoch: 1 }
909 > TruncateOffset::Chunk {
910 epoch: 1,
911 chunk_id: 2
912 }
913 );
914 assert!(
915 TruncateOffset::Chunk {
916 epoch: 1,
917 chunk_id: 2
918 } < TruncateOffset::Barrier { epoch: 1 }
919 );
920 assert!(
921 TruncateOffset::Chunk {
922 epoch: 2,
923 chunk_id: 2
924 } > TruncateOffset::Barrier { epoch: 1 }
925 );
926 assert!(TruncateOffset::Barrier { epoch: 2 } > TruncateOffset::Barrier { epoch: 1 });
927 }
928
929 type TestFuture = impl TryFuture<Ok = (), Error = anyhow::Error> + Unpin + 'static;
930 fn to_test_future(rx: Receiver<LogStoreResult<()>>) -> TestFuture {
931 async move { rx.await.unwrap() }.boxed()
932 }
933
934 #[tokio::test]
935 async fn test_empty() {
936 let mut manager = DeliveryFutureManager::<TestFuture>::new(2);
937 let mut future = pin!(manager.next_truncate_offset());
938 assert!(
939 poll_fn(|cx| Poll::Ready(future.as_mut().poll(cx)))
940 .await
941 .is_pending()
942 );
943 }
944
945 #[tokio::test]
946 async fn test_future_delivery_manager_basic() {
947 let mut manager = DeliveryFutureManager::new(2);
948 let epoch1 = 233;
949 let chunk_id1 = 1;
950 let (tx1_1, rx1_1) = oneshot::channel();
951 let mut write_chunk = manager.start_write_chunk(epoch1, chunk_id1);
952 assert!(
953 !write_chunk
954 .add_future_may_await(to_test_future(rx1_1))
955 .await
956 .unwrap()
957 );
958 assert_eq!(manager.future_count, 1);
959 {
960 let mut next_truncate_offset = pin!(manager.next_truncate_offset());
961 assert!(
962 poll_fn(|cx| Poll::Ready(next_truncate_offset.as_mut().poll(cx)))
963 .await
964 .is_pending()
965 );
966 tx1_1.send(Ok(())).unwrap();
967 assert_eq!(
968 next_truncate_offset.await.unwrap(),
969 TruncateOffset::Chunk {
970 epoch: epoch1,
971 chunk_id: chunk_id1
972 }
973 );
974 }
975 assert_eq!(manager.future_count, 0);
976 manager.add_barrier(epoch1);
977 assert_eq!(
978 manager.next_truncate_offset().await.unwrap(),
979 TruncateOffset::Barrier { epoch: epoch1 }
980 );
981 }
982
983 #[tokio::test]
984 async fn test_future_delivery_manager_compress_chunk() {
985 let mut manager = DeliveryFutureManager::new(10);
986 let epoch1 = test_epoch(233);
987 let chunk_id1 = 1;
988 let chunk_id2 = chunk_id1 + 1;
989 let chunk_id3 = chunk_id2 + 1;
990 let (tx1_1, rx1_1) = oneshot::channel();
991 let (tx1_2, rx1_2) = oneshot::channel();
992 let (tx1_3, rx1_3) = oneshot::channel();
993 let epoch2 = test_epoch(234);
994 let (tx2_1, rx2_1) = oneshot::channel();
995 assert!(
996 !manager
997 .start_write_chunk(epoch1, chunk_id1)
998 .add_future_may_await(to_test_future(rx1_1))
999 .await
1000 .unwrap()
1001 );
1002 assert!(
1003 !manager
1004 .start_write_chunk(epoch1, chunk_id2)
1005 .add_future_may_await(to_test_future(rx1_2))
1006 .await
1007 .unwrap()
1008 );
1009 assert!(
1010 !manager
1011 .start_write_chunk(epoch1, chunk_id3)
1012 .add_future_may_await(to_test_future(rx1_3))
1013 .await
1014 .unwrap()
1015 );
1016 manager.add_barrier(epoch1);
1017 assert!(
1018 !manager
1019 .start_write_chunk(epoch2, chunk_id1)
1020 .add_future_may_await(to_test_future(rx2_1))
1021 .await
1022 .unwrap()
1023 );
1024 assert_eq!(manager.future_count, 4);
1025 {
1026 let mut next_truncate_offset = pin!(manager.next_truncate_offset());
1027 assert!(
1028 poll_fn(|cx| Poll::Ready(next_truncate_offset.as_mut().poll(cx)))
1029 .await
1030 .is_pending()
1031 );
1032 tx1_2.send(Ok(())).unwrap();
1033 assert!(
1034 poll_fn(|cx| Poll::Ready(next_truncate_offset.as_mut().poll(cx)))
1035 .await
1036 .is_pending()
1037 );
1038 tx1_1.send(Ok(())).unwrap();
1039 assert_eq!(
1041 next_truncate_offset.await.unwrap(),
1042 TruncateOffset::Chunk {
1043 epoch: epoch1,
1044 chunk_id: chunk_id2
1045 }
1046 );
1047 }
1048 assert_eq!(manager.future_count, 2);
1049 {
1050 let mut next_truncate_offset = pin!(manager.next_truncate_offset());
1051 assert!(
1052 poll_fn(|cx| Poll::Ready(next_truncate_offset.as_mut().poll(cx)))
1053 .await
1054 .is_pending()
1055 );
1056 tx1_3.send(Ok(())).unwrap();
1057 tx2_1.send(Ok(())).unwrap();
1058 assert_eq!(
1060 next_truncate_offset.await.unwrap(),
1061 TruncateOffset::Barrier { epoch: epoch1 }
1062 );
1063 }
1064 assert_eq!(manager.future_count, 1);
1065 assert_eq!(
1066 manager.next_truncate_offset().await.unwrap(),
1067 TruncateOffset::Chunk {
1068 epoch: epoch2,
1069 chunk_id: chunk_id1
1070 }
1071 );
1072 }
1073
1074 #[tokio::test]
1075 async fn test_future_delivery_manager_await_future() {
1076 let mut manager = DeliveryFutureManager::new(2);
1077 let epoch = 233;
1078 let chunk_id1 = 1;
1079 let chunk_id2 = chunk_id1 + 1;
1080 let (tx1_1, rx1_1) = oneshot::channel();
1081 let (tx1_2, rx1_2) = oneshot::channel();
1082 let (tx2_1, rx2_1) = oneshot::channel();
1083 let (tx2_2, rx2_2) = oneshot::channel();
1084
1085 {
1086 let mut write_chunk = manager.start_write_chunk(epoch, chunk_id1);
1087 assert!(
1088 !write_chunk
1089 .add_future_may_await(to_test_future(rx1_1))
1090 .await
1091 .unwrap()
1092 );
1093 assert!(
1094 !write_chunk
1095 .add_future_may_await(to_test_future(rx1_2))
1096 .await
1097 .unwrap()
1098 );
1099 assert_eq!(manager.future_count, 2);
1100 }
1101
1102 {
1103 let mut write_chunk = manager.start_write_chunk(epoch, chunk_id2);
1104 {
1105 let mut future1 = pin!(write_chunk.add_future_may_await(to_test_future(rx2_1)));
1106 assert!(
1107 poll_fn(|cx| Poll::Ready(future1.as_mut().poll(cx)))
1108 .await
1109 .is_pending()
1110 );
1111 tx1_1.send(Ok(())).unwrap();
1112 assert!(future1.await.unwrap());
1113 }
1114 assert_eq!(2, write_chunk.future_count());
1115 {
1116 let mut future2 = pin!(write_chunk.add_future_may_await(to_test_future(rx2_2)));
1117 assert!(
1118 poll_fn(|cx| Poll::Ready(future2.as_mut().poll(cx)))
1119 .await
1120 .is_pending()
1121 );
1122 tx1_2.send(Ok(())).unwrap();
1123 assert!(future2.await.unwrap());
1124 }
1125 assert_eq!(2, write_chunk.future_count());
1126 {
1127 let mut future3 = pin!(write_chunk.await_one_delivery());
1128 assert!(
1129 poll_fn(|cx| Poll::Ready(future3.as_mut().poll(cx)))
1130 .await
1131 .is_pending()
1132 );
1133 tx2_1.send(Ok(())).unwrap();
1134 future3.await.unwrap();
1135 }
1136 assert_eq!(1, write_chunk.future_count());
1137 }
1138
1139 assert_eq!(
1140 manager.next_truncate_offset().await.unwrap(),
1141 TruncateOffset::Chunk {
1142 epoch,
1143 chunk_id: chunk_id1
1144 }
1145 );
1146
1147 assert_eq!(1, manager.future_count);
1148
1149 {
1150 let mut future = pin!(manager.next_truncate_offset());
1151 assert!(
1152 poll_fn(|cx| Poll::Ready(future.as_mut().poll(cx)))
1153 .await
1154 .is_pending()
1155 );
1156 tx2_2.send(Ok(())).unwrap();
1157 assert_eq!(
1158 future.await.unwrap(),
1159 TruncateOffset::Chunk {
1160 epoch,
1161 chunk_id: chunk_id2
1162 }
1163 );
1164 }
1165
1166 assert_eq!(0, manager.future_count);
1167 }
1168}