risingwave_connector/sink/
log_store.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::collections::VecDeque;
17use std::fmt::Debug;
18use std::future::{Future, poll_fn};
19use std::pin::pin;
20use std::sync::Arc;
21use std::task::Poll;
22use std::time::Instant;
23
24use await_tree::InstrumentAwait;
25use either::Either;
26use futures::future::BoxFuture;
27use futures::{TryFuture, TryFutureExt};
28use risingwave_common::array::StreamChunk;
29use risingwave_common::bail;
30use risingwave_common::bitmap::Bitmap;
31use risingwave_common::metrics::{LabelGuardedIntCounter, LabelGuardedIntGauge};
32use risingwave_common::util::epoch::{EpochPair, INVALID_EPOCH};
33use risingwave_common_estimate_size::EstimateSize;
34use risingwave_common_rate_limit::{RateLimit, RateLimiter};
35use tokio::select;
36use tokio::sync::mpsc::UnboundedReceiver;
37
38pub type LogStoreResult<T> = Result<T, anyhow::Error>;
39pub type ChunkId = usize;
40
41#[derive(Debug, PartialEq, Copy, Clone)]
42pub enum TruncateOffset {
43    Chunk { epoch: u64, chunk_id: ChunkId },
44    Barrier { epoch: u64 },
45}
46
47impl PartialOrd for TruncateOffset {
48    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
49        let extract = |offset: &TruncateOffset| match offset {
50            TruncateOffset::Chunk { epoch, chunk_id } => (*epoch, *chunk_id),
51            TruncateOffset::Barrier { epoch } => (*epoch, usize::MAX),
52        };
53        let this = extract(self);
54        let other = extract(other);
55        this.partial_cmp(&other)
56    }
57}
58
59impl TruncateOffset {
60    pub fn next_chunk_id(&self) -> ChunkId {
61        match self {
62            TruncateOffset::Chunk { chunk_id, .. } => chunk_id + 1,
63            TruncateOffset::Barrier { .. } => 0,
64        }
65    }
66
67    pub fn epoch(&self) -> u64 {
68        match self {
69            TruncateOffset::Chunk { epoch, .. } | TruncateOffset::Barrier { epoch } => *epoch,
70        }
71    }
72
73    pub fn check_next_offset(&self, next_offset: TruncateOffset) -> LogStoreResult<()> {
74        if *self >= next_offset {
75            bail!(
76                "next offset {:?} should be later than current offset {:?}",
77                next_offset,
78                self
79            )
80        } else {
81            Ok(())
82        }
83    }
84
85    pub fn check_next_item_epoch(&self, epoch: u64) -> LogStoreResult<()> {
86        match self {
87            TruncateOffset::Chunk {
88                epoch: offset_epoch,
89                ..
90            } => {
91                if epoch != *offset_epoch {
92                    bail!(
93                        "new item epoch {} does not match current chunk offset epoch {}",
94                        epoch,
95                        offset_epoch
96                    );
97                }
98            }
99            TruncateOffset::Barrier {
100                epoch: offset_epoch,
101            } => {
102                if epoch <= *offset_epoch {
103                    bail!(
104                        "new item epoch {} does not exceed barrier offset epoch {}",
105                        epoch,
106                        offset_epoch
107                    );
108                }
109            }
110        }
111        Ok(())
112    }
113}
114
115#[derive(Debug)]
116pub enum LogStoreReadItem {
117    StreamChunk {
118        chunk: StreamChunk,
119        chunk_id: ChunkId,
120    },
121    Barrier {
122        is_checkpoint: bool,
123        new_vnode_bitmap: Option<Arc<Bitmap>>,
124        is_stop: bool,
125    },
126}
127
128pub trait LogWriterPostFlushCurrentEpochFn<'a> = FnOnce() -> BoxFuture<'a, LogStoreResult<()>>;
129
130#[must_use]
131pub struct LogWriterPostFlushCurrentEpoch<'a>(
132    Box<dyn LogWriterPostFlushCurrentEpochFn<'a> + Send + 'a>,
133);
134
135impl<'a> LogWriterPostFlushCurrentEpoch<'a> {
136    pub fn new(f: impl LogWriterPostFlushCurrentEpochFn<'a> + Send + 'a) -> Self {
137        Self(Box::new(f))
138    }
139
140    pub async fn post_yield_barrier(self) -> LogStoreResult<()> {
141        self.0().await
142    }
143}
144
145pub struct FlushCurrentEpochOptions {
146    pub is_checkpoint: bool,
147    pub new_vnode_bitmap: Option<Arc<Bitmap>>,
148    pub is_stop: bool,
149}
150
151pub trait LogWriter: Send {
152    /// Initialize the log writer with an epoch
153    fn init(
154        &mut self,
155        epoch: EpochPair,
156        pause_read_on_bootstrap: bool,
157    ) -> impl Future<Output = LogStoreResult<()>> + Send + '_;
158
159    /// Write a stream chunk to the log writer
160    fn write_chunk(
161        &mut self,
162        chunk: StreamChunk,
163    ) -> impl Future<Output = LogStoreResult<()>> + Send + '_;
164
165    /// Mark current epoch as finished and sealed, and flush the unconsumed log data.
166    fn flush_current_epoch(
167        &mut self,
168        next_epoch: u64,
169        options: FlushCurrentEpochOptions,
170    ) -> impl Future<Output = LogStoreResult<LogWriterPostFlushCurrentEpoch<'_>>> + Send + '_;
171
172    fn pause(&mut self) -> LogStoreResult<()>;
173
174    fn resume(&mut self) -> LogStoreResult<()>;
175}
176
177pub trait LogReader: Send + Sized + 'static {
178    /// Initialize the log reader. Usually function as waiting for log writer to be initialized.
179    fn init(&mut self) -> impl Future<Output = LogStoreResult<()>> + Send + '_;
180
181    /// Consume log store from given `start_offset` or aligned start offset recorded previously.
182    fn start_from(
183        &mut self,
184        start_offset: Option<u64>,
185    ) -> impl Future<Output = LogStoreResult<()>> + Send + '_;
186
187    /// Emit the next item.
188    ///
189    /// The implementation should ensure that the future is cancellation safe.
190    fn next_item(
191        &mut self,
192    ) -> impl Future<Output = LogStoreResult<(u64, LogStoreReadItem)>> + Send + '_;
193
194    /// Mark that all items emitted so far have been consumed and it is safe to truncate the log
195    /// from the current offset.
196    fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()>;
197
198    /// Reset the log reader to after the latest truncate offset
199    ///
200    /// The return flag means whether the log store support rewind
201    fn rewind(&mut self) -> impl Future<Output = LogStoreResult<()>> + Send + '_;
202}
203
204pub trait LogStoreFactory: Send + 'static {
205    const ALLOW_REWIND: bool;
206    const REBUILD_SINK_ON_UPDATE_VNODE_BITMAP: bool;
207    type Reader: LogReader;
208    type Writer: LogWriter;
209
210    fn build(self) -> impl Future<Output = (Self::Reader, Self::Writer)> + Send;
211}
212
213pub struct TransformChunkLogReader<F: Fn(StreamChunk) -> StreamChunk, R: LogReader> {
214    f: F,
215    inner: R,
216}
217
218impl<F: Fn(StreamChunk) -> StreamChunk + Send + 'static, R: LogReader> LogReader
219    for TransformChunkLogReader<F, R>
220{
221    fn init(&mut self) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
222        self.inner.init()
223    }
224
225    async fn next_item(&mut self) -> LogStoreResult<(u64, LogStoreReadItem)> {
226        let (epoch, item) = self.inner.next_item().await?;
227        let item = match item {
228            LogStoreReadItem::StreamChunk { chunk, chunk_id } => LogStoreReadItem::StreamChunk {
229                chunk: (self.f)(chunk),
230                chunk_id,
231            },
232            other => other,
233        };
234        Ok((epoch, item))
235    }
236
237    fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()> {
238        self.inner.truncate(offset)
239    }
240
241    fn rewind(&mut self) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
242        self.inner.rewind()
243    }
244
245    fn start_from(
246        &mut self,
247        start_offset: Option<u64>,
248    ) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
249        self.inner.start_from(start_offset)
250    }
251}
252
253pub struct BackpressureMonitoredLogReader<R: LogReader> {
254    inner: R,
255    /// Start time to wait for new future after poll ready
256    wait_new_future_start_time: Option<Instant>,
257    wait_new_future_duration_ns: LabelGuardedIntCounter<4>,
258}
259
260impl<R: LogReader> BackpressureMonitoredLogReader<R> {
261    fn new(inner: R, wait_new_future_duration_ns: LabelGuardedIntCounter<4>) -> Self {
262        Self {
263            inner,
264            wait_new_future_start_time: None,
265            wait_new_future_duration_ns,
266        }
267    }
268}
269
270impl<R: LogReader> LogReader for BackpressureMonitoredLogReader<R> {
271    fn init(&mut self) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
272        self.wait_new_future_start_time = None;
273        self.inner.init()
274    }
275
276    fn next_item(
277        &mut self,
278    ) -> impl Future<Output = LogStoreResult<(u64, LogStoreReadItem)>> + Send + '_ {
279        if let Some(start_time) = self.wait_new_future_start_time.take() {
280            self.wait_new_future_duration_ns
281                .inc_by(start_time.elapsed().as_nanos() as _);
282        }
283        self.inner.next_item().inspect_ok(|_| {
284            // Set start time when return ready
285            self.wait_new_future_start_time = Some(Instant::now());
286        })
287    }
288
289    fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()> {
290        self.inner.truncate(offset)
291    }
292
293    fn rewind(&mut self) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
294        self.inner.rewind().inspect_ok(|_| {
295            self.wait_new_future_start_time = None;
296        })
297    }
298
299    fn start_from(
300        &mut self,
301        start_offset: Option<u64>,
302    ) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
303        self.inner.start_from(start_offset)
304    }
305}
306
307pub struct MonitoredLogReader<R: LogReader> {
308    inner: R,
309    read_epoch: u64,
310    metrics: LogReaderMetrics,
311}
312
313pub struct LogReaderMetrics {
314    pub log_store_latest_read_epoch: LabelGuardedIntGauge<4>,
315    pub log_store_read_rows: LabelGuardedIntCounter<4>,
316    pub log_store_read_bytes: LabelGuardedIntCounter<4>,
317    pub log_store_reader_wait_new_future_duration_ns: LabelGuardedIntCounter<4>,
318}
319
320impl<R: LogReader> MonitoredLogReader<R> {
321    pub fn new(inner: R, metrics: LogReaderMetrics) -> Self {
322        Self {
323            inner,
324            read_epoch: INVALID_EPOCH,
325            metrics,
326        }
327    }
328}
329
330impl<R: LogReader> LogReader for MonitoredLogReader<R> {
331    async fn init(&mut self) -> LogStoreResult<()> {
332        self.inner.init().instrument_await("log_reader_init").await
333    }
334
335    async fn next_item(&mut self) -> LogStoreResult<(u64, LogStoreReadItem)> {
336        self.inner
337            .next_item()
338            .instrument_await("log_reader_next_item")
339            .await
340            .inspect(|(epoch, item)| {
341                if self.read_epoch != *epoch {
342                    self.read_epoch = *epoch;
343                    self.metrics.log_store_latest_read_epoch.set(*epoch as _);
344                }
345                if let LogStoreReadItem::StreamChunk { chunk, .. } = item {
346                    self.metrics
347                        .log_store_read_rows
348                        .inc_by(chunk.cardinality() as _);
349                    self.metrics
350                        .log_store_read_bytes
351                        .inc_by(chunk.estimated_size() as u64);
352                }
353            })
354    }
355
356    fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()> {
357        self.inner.truncate(offset)
358    }
359
360    fn rewind(&mut self) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
361        self.inner.rewind().instrument_await("log_reader_rewind")
362    }
363
364    fn start_from(
365        &mut self,
366        start_offset: Option<u64>,
367    ) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
368        self.inner.start_from(start_offset)
369    }
370}
371
372type UpstreamChunkOffset = TruncateOffset;
373type DownstreamChunkOffset = TruncateOffset;
374
375struct SplitChunk {
376    chunk: StreamChunk,
377    upstream_chunk_offset: UpstreamChunkOffset,
378    // Indicate whether this is the last split chunk from the same `upstream_chunk_offset`
379    is_last: bool,
380}
381
382struct RateLimitedLogReaderCore<R: LogReader> {
383    inner: R,
384    // Newer items at the front
385    consumed_offset_queue: VecDeque<(DownstreamChunkOffset, UpstreamChunkOffset)>,
386    // Newer items at the front
387    unconsumed_chunk_queue: VecDeque<SplitChunk>,
388    next_chunk_id: usize,
389}
390
391impl<R: LogReader> RateLimitedLogReaderCore<R> {
392    // Returns: Left - chunk, Right - Barrier/UpdateVnodeBitmap
393    async fn next_item(&mut self) -> LogStoreResult<Either<SplitChunk, (u64, LogStoreReadItem)>> {
394        // Get upstream chunk from unconsumed_chunk_queue first.
395        // If unconsumed_chunk_queue is empty, get the chunk from the inner log reader.
396        match self.unconsumed_chunk_queue.pop_back() {
397            Some(split_chunk) => Ok(Either::Left(split_chunk)),
398            None => {
399                let (epoch, item) = self.inner.next_item().await?;
400                match item {
401                    LogStoreReadItem::StreamChunk { chunk, chunk_id } => {
402                        Ok(Either::Left(SplitChunk {
403                            chunk,
404                            upstream_chunk_offset: UpstreamChunkOffset::Chunk { epoch, chunk_id },
405                            is_last: true,
406                        }))
407                    }
408                    LogStoreReadItem::Barrier { .. } => {
409                        self.consumed_offset_queue.push_front((
410                            TruncateOffset::Barrier { epoch },
411                            TruncateOffset::Barrier { epoch },
412                        ));
413                        self.next_chunk_id = 0;
414                        Ok(Either::Right((epoch, item)))
415                    }
416                }
417            }
418        }
419    }
420}
421
422pub struct RateLimitedLogReader<R: LogReader> {
423    core: RateLimitedLogReaderCore<R>,
424    rate_limiter: RateLimiter,
425    control_rx: UnboundedReceiver<RateLimit>,
426}
427
428impl<R: LogReader> RateLimitedLogReader<R> {
429    pub fn new(inner: R, control_rx: UnboundedReceiver<RateLimit>) -> Self {
430        Self {
431            core: RateLimitedLogReaderCore {
432                inner,
433                consumed_offset_queue: VecDeque::new(),
434                unconsumed_chunk_queue: VecDeque::new(),
435                next_chunk_id: 0,
436            },
437            rate_limiter: RateLimiter::new(RateLimit::Disabled),
438            control_rx,
439        }
440    }
441}
442
443impl<R: LogReader> RateLimitedLogReader<R> {
444    async fn apply_rate_limit(
445        &mut self,
446        split_chunk: SplitChunk,
447    ) -> LogStoreResult<(u64, LogStoreReadItem)> {
448        let split_chunk = match self.rate_limiter.rate_limit() {
449            RateLimit::Pause => unreachable!(
450                "apply_rate_limit is not supposed to be called while the stream is paused"
451            ),
452            RateLimit::Disabled => split_chunk,
453            RateLimit::Fixed(limit) => {
454                let limit = limit.get();
455                let required_permits = split_chunk.chunk.compute_rate_limit_chunk_permits();
456                if required_permits <= limit {
457                    self.rate_limiter.wait(required_permits).await;
458                    split_chunk
459                } else {
460                    // Cut the chunk into smaller chunks
461                    let mut chunks = split_chunk.chunk.split(limit as _).into_iter();
462                    let mut is_last = split_chunk.is_last;
463                    let upstream_chunk_offset = split_chunk.upstream_chunk_offset;
464
465                    // The first chunk after splitting will be returned
466                    let first_chunk = chunks.next().unwrap();
467
468                    // The remaining chunks will be pushed to the queue
469                    for chunk in chunks.rev() {
470                        // The last chunk after splitting inherits the `is_last` from the original chunk
471                        self.core.unconsumed_chunk_queue.push_back(SplitChunk {
472                            chunk,
473                            upstream_chunk_offset,
474                            is_last,
475                        });
476                        is_last = false;
477                    }
478
479                    // Trigger rate limit and return the first chunk
480                    self.rate_limiter
481                        .wait(first_chunk.compute_rate_limit_chunk_permits())
482                        .await;
483                    SplitChunk {
484                        chunk: first_chunk,
485                        upstream_chunk_offset,
486                        is_last,
487                    }
488                }
489            }
490        };
491
492        // Update the consumed_offset_queue if the `split_chunk` is the last chunk of the upstream chunk
493        let epoch = split_chunk.upstream_chunk_offset.epoch();
494        let downstream_chunk_id = self.core.next_chunk_id;
495        self.core.next_chunk_id += 1;
496        if split_chunk.is_last {
497            self.core.consumed_offset_queue.push_front((
498                TruncateOffset::Chunk {
499                    epoch,
500                    chunk_id: downstream_chunk_id,
501                },
502                split_chunk.upstream_chunk_offset,
503            ));
504        }
505
506        Ok((
507            epoch,
508            LogStoreReadItem::StreamChunk {
509                chunk: split_chunk.chunk,
510                chunk_id: downstream_chunk_id,
511            },
512        ))
513    }
514}
515
516impl<R: LogReader> LogReader for RateLimitedLogReader<R> {
517    async fn init(&mut self) -> LogStoreResult<()> {
518        self.core.inner.init().await
519    }
520
521    async fn next_item(&mut self) -> LogStoreResult<(u64, LogStoreReadItem)> {
522        let mut paused = false;
523        loop {
524            select! {
525                biased;
526                recv = pin!(self.control_rx.recv()) => {
527                    let new_rate_limit = match recv {
528                        Some(limit) => limit,
529                        None => bail!("rate limit control channel closed"),
530                    };
531                    let old_rate_limit = self.rate_limiter.update(new_rate_limit);
532                    paused = matches!(new_rate_limit, RateLimit::Pause);
533                    tracing::info!("rate limit changed from {:?} to {:?}, paused = {paused}", old_rate_limit, new_rate_limit);
534                },
535                item = self.core.next_item(), if !paused => {
536                    let item = item?;
537                    match item {
538                        Either::Left(split_chunk) => {
539                            return self.apply_rate_limit(split_chunk).await;
540                        },
541                        Either::Right(item) => {
542                            assert!(matches!(item.1, LogStoreReadItem::Barrier{..}));
543                            return Ok(item);
544                        },
545                    }
546                }
547            }
548        }
549    }
550
551    fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()> {
552        let mut truncate_offset = None;
553        while let Some((downstream_offset, upstream_offset)) =
554            self.core.consumed_offset_queue.back()
555        {
556            if *downstream_offset <= offset {
557                truncate_offset = Some(*upstream_offset);
558                self.core.consumed_offset_queue.pop_back();
559            } else {
560                break;
561            }
562        }
563        tracing::trace!(
564            "rate limited log store reader truncate offset {:?}, downstream offset {:?}",
565            truncate_offset,
566            offset
567        );
568        if let Some(offset) = truncate_offset {
569            self.core.inner.truncate(offset)
570        } else {
571            Ok(())
572        }
573    }
574
575    fn rewind(&mut self) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
576        self.core.unconsumed_chunk_queue.clear();
577        self.core.consumed_offset_queue.clear();
578        self.core.next_chunk_id = 0;
579        self.core.inner.rewind()
580    }
581
582    fn start_from(
583        &mut self,
584        start_offset: Option<u64>,
585    ) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
586        self.core.inner.start_from(start_offset)
587    }
588}
589
590#[easy_ext::ext(LogReaderExt)]
591impl<T> T
592where
593    T: LogReader,
594{
595    pub fn transform_chunk<F: Fn(StreamChunk) -> StreamChunk + Sized>(
596        self,
597        f: F,
598    ) -> TransformChunkLogReader<F, Self> {
599        TransformChunkLogReader { f, inner: self }
600    }
601
602    pub fn monitored(self, metrics: LogReaderMetrics) -> impl LogReader {
603        // TODO: The `clone()` can be avoided if move backpressure inside `MonitoredLogReader`
604        let wait_new_future_duration = metrics.log_store_reader_wait_new_future_duration_ns.clone();
605        BackpressureMonitoredLogReader::new(
606            MonitoredLogReader::new(self, metrics),
607            wait_new_future_duration,
608        )
609    }
610
611    pub fn rate_limited(self, control_rx: UnboundedReceiver<RateLimit>) -> impl LogReader {
612        RateLimitedLogReader::new(self, control_rx)
613    }
614}
615
616pub struct MonitoredLogWriter<W: LogWriter> {
617    inner: W,
618    metrics: LogWriterMetrics,
619}
620
621pub struct LogWriterMetrics {
622    // Labels: [actor_id, sink_id, sink_name]
623    pub log_store_first_write_epoch: LabelGuardedIntGauge<3>,
624    pub log_store_latest_write_epoch: LabelGuardedIntGauge<3>,
625    pub log_store_write_rows: LabelGuardedIntCounter<3>,
626}
627
628impl<W: LogWriter> LogWriter for MonitoredLogWriter<W> {
629    async fn init(
630        &mut self,
631        epoch: EpochPair,
632        pause_read_on_bootstrap: bool,
633    ) -> LogStoreResult<()> {
634        self.metrics
635            .log_store_first_write_epoch
636            .set(epoch.curr as _);
637        self.metrics
638            .log_store_latest_write_epoch
639            .set(epoch.curr as _);
640        self.inner.init(epoch, pause_read_on_bootstrap).await
641    }
642
643    async fn write_chunk(&mut self, chunk: StreamChunk) -> LogStoreResult<()> {
644        self.metrics
645            .log_store_write_rows
646            .inc_by(chunk.cardinality() as _);
647        self.inner.write_chunk(chunk).await
648    }
649
650    async fn flush_current_epoch(
651        &mut self,
652        next_epoch: u64,
653        options: FlushCurrentEpochOptions,
654    ) -> LogStoreResult<LogWriterPostFlushCurrentEpoch<'_>> {
655        let post_flush = self.inner.flush_current_epoch(next_epoch, options).await?;
656        self.metrics
657            .log_store_latest_write_epoch
658            .set(next_epoch as _);
659        Ok(post_flush)
660    }
661
662    fn pause(&mut self) -> LogStoreResult<()> {
663        self.inner.pause()
664    }
665
666    fn resume(&mut self) -> LogStoreResult<()> {
667        self.inner.resume()
668    }
669}
670
671#[easy_ext::ext(LogWriterExt)]
672impl<T> T
673where
674    T: LogWriter + Sized,
675{
676    pub fn monitored(self, metrics: LogWriterMetrics) -> MonitoredLogWriter<T> {
677        MonitoredLogWriter {
678            inner: self,
679            metrics,
680        }
681    }
682}
683
684enum DeliveryFutureManagerItem<F> {
685    Chunk {
686        chunk_id: ChunkId,
687        // earlier future at the front
688        futures: VecDeque<F>,
689    },
690    Barrier,
691}
692
693pub struct DeliveryFutureManager<F> {
694    future_count: usize,
695    max_future_count: usize,
696    // earlier items at the front
697    items: VecDeque<(u64, DeliveryFutureManagerItem<F>)>,
698}
699
700impl<F> DeliveryFutureManager<F> {
701    pub fn new(max_future_count: usize) -> Self {
702        Self {
703            future_count: 0,
704            max_future_count,
705            items: Default::default(),
706        }
707    }
708
709    pub fn add_barrier(&mut self, epoch: u64) {
710        if let Some((item_epoch, last_item)) = self.items.back() {
711            match last_item {
712                DeliveryFutureManagerItem::Chunk { .. } => {
713                    assert_eq!(*item_epoch, epoch)
714                }
715                DeliveryFutureManagerItem::Barrier => {
716                    assert!(
717                        epoch > *item_epoch,
718                        "new barrier epoch {} should be greater than prev barrier {}",
719                        epoch,
720                        item_epoch
721                    );
722                }
723            }
724        }
725        self.items
726            .push_back((epoch, DeliveryFutureManagerItem::Barrier));
727    }
728
729    pub fn start_write_chunk(
730        &mut self,
731        epoch: u64,
732        chunk_id: ChunkId,
733    ) -> DeliveryFutureManagerAddFuture<'_, F> {
734        if let Some((item_epoch, item)) = self.items.back() {
735            match item {
736                DeliveryFutureManagerItem::Chunk {
737                    chunk_id: item_chunk_id,
738                    ..
739                } => {
740                    assert_eq!(epoch, *item_epoch);
741                    assert!(
742                        chunk_id > *item_chunk_id,
743                        "new chunk id {} should be greater than prev chunk id {}",
744                        chunk_id,
745                        item_chunk_id
746                    );
747                }
748                DeliveryFutureManagerItem::Barrier => {
749                    assert!(
750                        epoch > *item_epoch,
751                        "new chunk epoch {} should be greater than prev barrier: {}",
752                        epoch,
753                        item_epoch
754                    );
755                }
756            }
757        }
758        self.items.push_back((
759            epoch,
760            DeliveryFutureManagerItem::Chunk {
761                chunk_id,
762                futures: VecDeque::new(),
763            },
764        ));
765        DeliveryFutureManagerAddFuture(self)
766    }
767}
768
769pub struct DeliveryFutureManagerAddFuture<'a, F>(&'a mut DeliveryFutureManager<F>);
770
771impl<F: TryFuture<Ok = ()> + Unpin + 'static> DeliveryFutureManagerAddFuture<'_, F> {
772    /// Add a new future to the latest started written chunk.
773    /// The returned bool value indicate whether we have awaited on any previous futures.
774    pub async fn add_future_may_await(&mut self, future: F) -> Result<bool, F::Error> {
775        let mut has_await = false;
776        while self.0.future_count >= self.0.max_future_count {
777            self.await_one_delivery().await?;
778            has_await = true;
779        }
780        match self.0.items.back_mut() {
781            Some((_, DeliveryFutureManagerItem::Chunk { futures, .. })) => {
782                futures.push_back(future);
783                self.0.future_count += 1;
784                Ok(has_await)
785            }
786            _ => unreachable!("should add future only after add a new chunk"),
787        }
788    }
789
790    pub async fn await_one_delivery(&mut self) -> Result<(), F::Error> {
791        for (_, item) in &mut self.0.items {
792            if let DeliveryFutureManagerItem::Chunk { futures, .. } = item
793                && let Some(mut delivery_future) = futures.pop_front()
794            {
795                self.0.future_count -= 1;
796                return poll_fn(|cx| delivery_future.try_poll_unpin(cx)).await;
797            } else {
798                continue;
799            }
800        }
801        Ok(())
802    }
803
804    pub fn future_count(&self) -> usize {
805        self.0.future_count
806    }
807
808    pub fn max_future_count(&self) -> usize {
809        self.0.max_future_count
810    }
811}
812
813impl<F: TryFuture<Ok = ()> + Unpin + 'static> DeliveryFutureManager<F> {
814    pub fn next_truncate_offset(
815        &mut self,
816    ) -> impl Future<Output = Result<TruncateOffset, F::Error>> + '_ {
817        poll_fn(move |cx| {
818            let mut latest_offset: Option<TruncateOffset> = None;
819            'outer: while let Some((epoch, item)) = self.items.front_mut() {
820                match item {
821                    DeliveryFutureManagerItem::Chunk { chunk_id, futures } => {
822                        while let Some(future) = futures.front_mut() {
823                            match future.try_poll_unpin(cx) {
824                                Poll::Ready(result) => match result {
825                                    Ok(()) => {
826                                        self.future_count -= 1;
827                                        futures.pop_front();
828                                    }
829                                    Err(e) => {
830                                        return Poll::Ready(Err(e));
831                                    }
832                                },
833                                Poll::Pending => {
834                                    break 'outer;
835                                }
836                            }
837                        }
838
839                        // when we reach here, there must not be any pending or error future.
840                        // Which means all futures of this stream chunk have been finished
841                        assert!(futures.is_empty());
842                        latest_offset = Some(TruncateOffset::Chunk {
843                            epoch: *epoch,
844                            chunk_id: *chunk_id,
845                        });
846                        self.items.pop_front().expect("items not empty");
847                    }
848                    DeliveryFutureManagerItem::Barrier => {
849                        latest_offset = Some(TruncateOffset::Barrier { epoch: *epoch });
850                        self.items.pop_front().expect("items not empty");
851                        // Barrier will be yielded anyway
852                        break 'outer;
853                    }
854                }
855            }
856            if let Some(offset) = latest_offset {
857                Poll::Ready(Ok(offset))
858            } else {
859                Poll::Pending
860            }
861        })
862    }
863}
864
865#[cfg(test)]
866mod tests {
867    use std::future::{Future, poll_fn};
868    use std::pin::pin;
869    use std::task::Poll;
870
871    use futures::{FutureExt, TryFuture};
872    use risingwave_common::util::epoch::test_epoch;
873    use tokio::sync::oneshot;
874    use tokio::sync::oneshot::Receiver;
875
876    use super::LogStoreResult;
877    use crate::sink::log_store::{DeliveryFutureManager, TruncateOffset};
878
879    #[test]
880    fn test_truncate_offset_cmp() {
881        assert!(
882            TruncateOffset::Barrier { epoch: 232 }
883                < TruncateOffset::Chunk {
884                    epoch: 233,
885                    chunk_id: 1
886                }
887        );
888        assert_eq!(
889            TruncateOffset::Chunk {
890                epoch: 1,
891                chunk_id: 1
892            },
893            TruncateOffset::Chunk {
894                epoch: 1,
895                chunk_id: 1
896            }
897        );
898        assert!(
899            TruncateOffset::Chunk {
900                epoch: 1,
901                chunk_id: 1
902            } < TruncateOffset::Chunk {
903                epoch: 1,
904                chunk_id: 2
905            }
906        );
907        assert!(
908            TruncateOffset::Barrier { epoch: 1 }
909                > TruncateOffset::Chunk {
910                    epoch: 1,
911                    chunk_id: 2
912                }
913        );
914        assert!(
915            TruncateOffset::Chunk {
916                epoch: 1,
917                chunk_id: 2
918            } < TruncateOffset::Barrier { epoch: 1 }
919        );
920        assert!(
921            TruncateOffset::Chunk {
922                epoch: 2,
923                chunk_id: 2
924            } > TruncateOffset::Barrier { epoch: 1 }
925        );
926        assert!(TruncateOffset::Barrier { epoch: 2 } > TruncateOffset::Barrier { epoch: 1 });
927    }
928
929    type TestFuture = impl TryFuture<Ok = (), Error = anyhow::Error> + Unpin + 'static;
930    fn to_test_future(rx: Receiver<LogStoreResult<()>>) -> TestFuture {
931        async move { rx.await.unwrap() }.boxed()
932    }
933
934    #[tokio::test]
935    async fn test_empty() {
936        let mut manager = DeliveryFutureManager::<TestFuture>::new(2);
937        let mut future = pin!(manager.next_truncate_offset());
938        assert!(
939            poll_fn(|cx| Poll::Ready(future.as_mut().poll(cx)))
940                .await
941                .is_pending()
942        );
943    }
944
945    #[tokio::test]
946    async fn test_future_delivery_manager_basic() {
947        let mut manager = DeliveryFutureManager::new(2);
948        let epoch1 = 233;
949        let chunk_id1 = 1;
950        let (tx1_1, rx1_1) = oneshot::channel();
951        let mut write_chunk = manager.start_write_chunk(epoch1, chunk_id1);
952        assert!(
953            !write_chunk
954                .add_future_may_await(to_test_future(rx1_1))
955                .await
956                .unwrap()
957        );
958        assert_eq!(manager.future_count, 1);
959        {
960            let mut next_truncate_offset = pin!(manager.next_truncate_offset());
961            assert!(
962                poll_fn(|cx| Poll::Ready(next_truncate_offset.as_mut().poll(cx)))
963                    .await
964                    .is_pending()
965            );
966            tx1_1.send(Ok(())).unwrap();
967            assert_eq!(
968                next_truncate_offset.await.unwrap(),
969                TruncateOffset::Chunk {
970                    epoch: epoch1,
971                    chunk_id: chunk_id1
972                }
973            );
974        }
975        assert_eq!(manager.future_count, 0);
976        manager.add_barrier(epoch1);
977        assert_eq!(
978            manager.next_truncate_offset().await.unwrap(),
979            TruncateOffset::Barrier { epoch: epoch1 }
980        );
981    }
982
983    #[tokio::test]
984    async fn test_future_delivery_manager_compress_chunk() {
985        let mut manager = DeliveryFutureManager::new(10);
986        let epoch1 = test_epoch(233);
987        let chunk_id1 = 1;
988        let chunk_id2 = chunk_id1 + 1;
989        let chunk_id3 = chunk_id2 + 1;
990        let (tx1_1, rx1_1) = oneshot::channel();
991        let (tx1_2, rx1_2) = oneshot::channel();
992        let (tx1_3, rx1_3) = oneshot::channel();
993        let epoch2 = test_epoch(234);
994        let (tx2_1, rx2_1) = oneshot::channel();
995        assert!(
996            !manager
997                .start_write_chunk(epoch1, chunk_id1)
998                .add_future_may_await(to_test_future(rx1_1))
999                .await
1000                .unwrap()
1001        );
1002        assert!(
1003            !manager
1004                .start_write_chunk(epoch1, chunk_id2)
1005                .add_future_may_await(to_test_future(rx1_2))
1006                .await
1007                .unwrap()
1008        );
1009        assert!(
1010            !manager
1011                .start_write_chunk(epoch1, chunk_id3)
1012                .add_future_may_await(to_test_future(rx1_3))
1013                .await
1014                .unwrap()
1015        );
1016        manager.add_barrier(epoch1);
1017        assert!(
1018            !manager
1019                .start_write_chunk(epoch2, chunk_id1)
1020                .add_future_may_await(to_test_future(rx2_1))
1021                .await
1022                .unwrap()
1023        );
1024        assert_eq!(manager.future_count, 4);
1025        {
1026            let mut next_truncate_offset = pin!(manager.next_truncate_offset());
1027            assert!(
1028                poll_fn(|cx| Poll::Ready(next_truncate_offset.as_mut().poll(cx)))
1029                    .await
1030                    .is_pending()
1031            );
1032            tx1_2.send(Ok(())).unwrap();
1033            assert!(
1034                poll_fn(|cx| Poll::Ready(next_truncate_offset.as_mut().poll(cx)))
1035                    .await
1036                    .is_pending()
1037            );
1038            tx1_1.send(Ok(())).unwrap();
1039            // The offset of chunk1 and chunk2 are compressed
1040            assert_eq!(
1041                next_truncate_offset.await.unwrap(),
1042                TruncateOffset::Chunk {
1043                    epoch: epoch1,
1044                    chunk_id: chunk_id2
1045                }
1046            );
1047        }
1048        assert_eq!(manager.future_count, 2);
1049        {
1050            let mut next_truncate_offset = pin!(manager.next_truncate_offset());
1051            assert!(
1052                poll_fn(|cx| Poll::Ready(next_truncate_offset.as_mut().poll(cx)))
1053                    .await
1054                    .is_pending()
1055            );
1056            tx1_3.send(Ok(())).unwrap();
1057            tx2_1.send(Ok(())).unwrap();
1058            // Emit barrier though later chunk has finished.
1059            assert_eq!(
1060                next_truncate_offset.await.unwrap(),
1061                TruncateOffset::Barrier { epoch: epoch1 }
1062            );
1063        }
1064        assert_eq!(manager.future_count, 1);
1065        assert_eq!(
1066            manager.next_truncate_offset().await.unwrap(),
1067            TruncateOffset::Chunk {
1068                epoch: epoch2,
1069                chunk_id: chunk_id1
1070            }
1071        );
1072    }
1073
1074    #[tokio::test]
1075    async fn test_future_delivery_manager_await_future() {
1076        let mut manager = DeliveryFutureManager::new(2);
1077        let epoch = 233;
1078        let chunk_id1 = 1;
1079        let chunk_id2 = chunk_id1 + 1;
1080        let (tx1_1, rx1_1) = oneshot::channel();
1081        let (tx1_2, rx1_2) = oneshot::channel();
1082        let (tx2_1, rx2_1) = oneshot::channel();
1083        let (tx2_2, rx2_2) = oneshot::channel();
1084
1085        {
1086            let mut write_chunk = manager.start_write_chunk(epoch, chunk_id1);
1087            assert!(
1088                !write_chunk
1089                    .add_future_may_await(to_test_future(rx1_1))
1090                    .await
1091                    .unwrap()
1092            );
1093            assert!(
1094                !write_chunk
1095                    .add_future_may_await(to_test_future(rx1_2))
1096                    .await
1097                    .unwrap()
1098            );
1099            assert_eq!(manager.future_count, 2);
1100        }
1101
1102        {
1103            let mut write_chunk = manager.start_write_chunk(epoch, chunk_id2);
1104            {
1105                let mut future1 = pin!(write_chunk.add_future_may_await(to_test_future(rx2_1)));
1106                assert!(
1107                    poll_fn(|cx| Poll::Ready(future1.as_mut().poll(cx)))
1108                        .await
1109                        .is_pending()
1110                );
1111                tx1_1.send(Ok(())).unwrap();
1112                assert!(future1.await.unwrap());
1113            }
1114            assert_eq!(2, write_chunk.future_count());
1115            {
1116                let mut future2 = pin!(write_chunk.add_future_may_await(to_test_future(rx2_2)));
1117                assert!(
1118                    poll_fn(|cx| Poll::Ready(future2.as_mut().poll(cx)))
1119                        .await
1120                        .is_pending()
1121                );
1122                tx1_2.send(Ok(())).unwrap();
1123                assert!(future2.await.unwrap());
1124            }
1125            assert_eq!(2, write_chunk.future_count());
1126            {
1127                let mut future3 = pin!(write_chunk.await_one_delivery());
1128                assert!(
1129                    poll_fn(|cx| Poll::Ready(future3.as_mut().poll(cx)))
1130                        .await
1131                        .is_pending()
1132                );
1133                tx2_1.send(Ok(())).unwrap();
1134                future3.await.unwrap();
1135            }
1136            assert_eq!(1, write_chunk.future_count());
1137        }
1138
1139        assert_eq!(
1140            manager.next_truncate_offset().await.unwrap(),
1141            TruncateOffset::Chunk {
1142                epoch,
1143                chunk_id: chunk_id1
1144            }
1145        );
1146
1147        assert_eq!(1, manager.future_count);
1148
1149        {
1150            let mut future = pin!(manager.next_truncate_offset());
1151            assert!(
1152                poll_fn(|cx| Poll::Ready(future.as_mut().poll(cx)))
1153                    .await
1154                    .is_pending()
1155            );
1156            tx2_2.send(Ok(())).unwrap();
1157            assert_eq!(
1158                future.await.unwrap(),
1159                TruncateOffset::Chunk {
1160                    epoch,
1161                    chunk_id: chunk_id2
1162                }
1163            );
1164        }
1165
1166        assert_eq!(0, manager.future_count);
1167    }
1168}