risingwave_connector/sink/
log_store.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::collections::VecDeque;
17use std::fmt::Debug;
18use std::future::{Future, pending, poll_fn};
19use std::pin::pin;
20use std::sync::Arc;
21use std::task::Poll;
22use std::time::Instant;
23
24use await_tree::InstrumentAwait;
25use futures::future::BoxFuture;
26use futures::{TryFuture, TryFutureExt};
27use risingwave_common::array::StreamChunk;
28use risingwave_common::bail;
29use risingwave_common::bitmap::Bitmap;
30use risingwave_common::catalog::Field;
31use risingwave_common::metrics::{LabelGuardedIntCounter, LabelGuardedIntGauge};
32use risingwave_common::util::epoch::{EpochPair, INVALID_EPOCH};
33use risingwave_common_estimate_size::EstimateSize;
34use risingwave_common_rate_limit::{RateLimit, RateLimiter};
35use tokio::select;
36use tokio::sync::mpsc::UnboundedReceiver;
37
38pub type LogStoreResult<T> = Result<T, anyhow::Error>;
39pub type ChunkId = usize;
40
41#[derive(Debug, PartialEq, Copy, Clone)]
42pub enum TruncateOffset {
43    Chunk { epoch: u64, chunk_id: ChunkId },
44    Barrier { epoch: u64 },
45}
46
47impl PartialOrd for TruncateOffset {
48    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
49        let extract = |offset: &TruncateOffset| match offset {
50            TruncateOffset::Chunk { epoch, chunk_id } => (*epoch, *chunk_id),
51            TruncateOffset::Barrier { epoch } => (*epoch, usize::MAX),
52        };
53        let this = extract(self);
54        let other = extract(other);
55        this.partial_cmp(&other)
56    }
57}
58
59impl TruncateOffset {
60    pub fn next_chunk_id(&self) -> ChunkId {
61        match self {
62            TruncateOffset::Chunk { chunk_id, .. } => chunk_id + 1,
63            TruncateOffset::Barrier { .. } => 0,
64        }
65    }
66
67    pub fn epoch(&self) -> u64 {
68        match self {
69            TruncateOffset::Chunk { epoch, .. } | TruncateOffset::Barrier { epoch } => *epoch,
70        }
71    }
72
73    pub fn check_next_offset(&self, next_offset: TruncateOffset) -> LogStoreResult<()> {
74        if *self >= next_offset {
75            bail!(
76                "next offset {:?} should be later than current offset {:?}",
77                next_offset,
78                self
79            )
80        } else {
81            Ok(())
82        }
83    }
84
85    pub fn check_next_item_epoch(&self, epoch: u64) -> LogStoreResult<()> {
86        match self {
87            TruncateOffset::Chunk {
88                epoch: offset_epoch,
89                ..
90            } => {
91                if epoch != *offset_epoch {
92                    bail!(
93                        "new item epoch {} does not match current chunk offset epoch {}",
94                        epoch,
95                        offset_epoch
96                    );
97                }
98            }
99            TruncateOffset::Barrier {
100                epoch: offset_epoch,
101            } => {
102                if epoch <= *offset_epoch {
103                    bail!(
104                        "new item epoch {} does not exceed barrier offset epoch {}",
105                        epoch,
106                        offset_epoch
107                    );
108                }
109            }
110        }
111        Ok(())
112    }
113}
114
115#[derive(Debug)]
116pub enum LogStoreReadItem {
117    StreamChunk {
118        chunk: StreamChunk,
119        chunk_id: ChunkId,
120    },
121    Barrier {
122        is_checkpoint: bool,
123        new_vnode_bitmap: Option<Arc<Bitmap>>,
124        is_stop: bool,
125        add_columns: Option<Vec<Field>>,
126    },
127}
128
129pub trait LogWriterPostFlushCurrentEpochFn<'a> = FnOnce() -> BoxFuture<'a, LogStoreResult<()>>;
130
131#[must_use]
132pub struct LogWriterPostFlushCurrentEpoch<'a>(
133    Box<dyn LogWriterPostFlushCurrentEpochFn<'a> + Send + 'a>,
134);
135
136impl<'a> LogWriterPostFlushCurrentEpoch<'a> {
137    pub fn new(f: impl LogWriterPostFlushCurrentEpochFn<'a> + Send + 'a) -> Self {
138        Self(Box::new(f))
139    }
140
141    pub async fn post_yield_barrier(self) -> LogStoreResult<()> {
142        self.0().await
143    }
144}
145
146pub struct FlushCurrentEpochOptions {
147    pub is_checkpoint: bool,
148    pub new_vnode_bitmap: Option<Arc<Bitmap>>,
149    pub is_stop: bool,
150    pub add_columns: Option<Vec<Field>>,
151}
152
153pub trait LogWriter: Send {
154    /// Initialize the log writer with an epoch
155    fn init(
156        &mut self,
157        epoch: EpochPair,
158        pause_read_on_bootstrap: bool,
159    ) -> impl Future<Output = LogStoreResult<()>> + Send + '_;
160
161    /// Write a stream chunk to the log writer
162    fn write_chunk(
163        &mut self,
164        chunk: StreamChunk,
165    ) -> impl Future<Output = LogStoreResult<()>> + Send + '_;
166
167    /// Mark current epoch as finished and sealed, and flush the unconsumed log data.
168    fn flush_current_epoch(
169        &mut self,
170        next_epoch: u64,
171        options: FlushCurrentEpochOptions,
172    ) -> impl Future<Output = LogStoreResult<LogWriterPostFlushCurrentEpoch<'_>>> + Send + '_;
173
174    fn pause(&mut self) -> LogStoreResult<()>;
175
176    fn resume(&mut self) -> LogStoreResult<()>;
177}
178
179pub trait LogReader: Send + Sized + 'static {
180    /// Initialize the log reader. Usually function as waiting for log writer to be initialized.
181    fn init(&mut self) -> impl Future<Output = LogStoreResult<()>> + Send + '_;
182
183    /// Consume log store from given `start_offset` or aligned start offset recorded previously.
184    fn start_from(
185        &mut self,
186        start_offset: Option<u64>,
187    ) -> impl Future<Output = LogStoreResult<()>> + Send + '_;
188
189    /// Emit the next item.
190    ///
191    /// The implementation should ensure that the future is cancellation safe.
192    fn next_item(
193        &mut self,
194    ) -> impl Future<Output = LogStoreResult<(u64, LogStoreReadItem)>> + Send + '_;
195
196    /// Mark that all items emitted so far have been consumed and it is safe to truncate the log
197    /// from the current offset.
198    fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()>;
199
200    /// Reset the log reader to after the latest truncate offset
201    ///
202    /// The return flag means whether the log store support rewind
203    fn rewind(&mut self) -> impl Future<Output = LogStoreResult<()>> + Send + '_;
204}
205
206pub trait LogStoreFactory: Send + 'static {
207    const ALLOW_REWIND: bool;
208    const REBUILD_SINK_ON_UPDATE_VNODE_BITMAP: bool;
209    type Reader: LogReader;
210    type Writer: LogWriter;
211
212    fn build(self) -> impl Future<Output = (Self::Reader, Self::Writer)> + Send;
213}
214
215pub struct TransformChunkLogReader<F: Fn(StreamChunk) -> StreamChunk, R: LogReader> {
216    f: F,
217    inner: R,
218}
219
220impl<F: Fn(StreamChunk) -> StreamChunk + Send + 'static, R: LogReader> LogReader
221    for TransformChunkLogReader<F, R>
222{
223    fn init(&mut self) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
224        self.inner.init()
225    }
226
227    async fn next_item(&mut self) -> LogStoreResult<(u64, LogStoreReadItem)> {
228        let (epoch, item) = self.inner.next_item().await?;
229        let item = match item {
230            LogStoreReadItem::StreamChunk { chunk, chunk_id } => LogStoreReadItem::StreamChunk {
231                chunk: (self.f)(chunk),
232                chunk_id,
233            },
234            other => other,
235        };
236        Ok((epoch, item))
237    }
238
239    fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()> {
240        self.inner.truncate(offset)
241    }
242
243    fn rewind(&mut self) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
244        self.inner.rewind()
245    }
246
247    fn start_from(
248        &mut self,
249        start_offset: Option<u64>,
250    ) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
251        self.inner.start_from(start_offset)
252    }
253}
254
255pub struct BackpressureMonitoredLogReader<R: LogReader> {
256    inner: R,
257    /// Start time to wait for new future after poll ready
258    wait_new_future_start_time: Option<Instant>,
259    wait_new_future_duration_ns: LabelGuardedIntCounter,
260}
261
262impl<R: LogReader> BackpressureMonitoredLogReader<R> {
263    fn new(inner: R, wait_new_future_duration_ns: LabelGuardedIntCounter) -> Self {
264        Self {
265            inner,
266            wait_new_future_start_time: None,
267            wait_new_future_duration_ns,
268        }
269    }
270}
271
272impl<R: LogReader> LogReader for BackpressureMonitoredLogReader<R> {
273    fn init(&mut self) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
274        self.wait_new_future_start_time = None;
275        self.inner.init()
276    }
277
278    fn next_item(
279        &mut self,
280    ) -> impl Future<Output = LogStoreResult<(u64, LogStoreReadItem)>> + Send + '_ {
281        if let Some(start_time) = self.wait_new_future_start_time.take() {
282            self.wait_new_future_duration_ns
283                .inc_by(start_time.elapsed().as_nanos() as _);
284        }
285        self.inner.next_item().inspect_ok(|_| {
286            // Set start time when return ready
287            self.wait_new_future_start_time = Some(Instant::now());
288        })
289    }
290
291    fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()> {
292        self.inner.truncate(offset)
293    }
294
295    fn rewind(&mut self) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
296        self.inner.rewind().inspect_ok(|_| {
297            self.wait_new_future_start_time = None;
298        })
299    }
300
301    fn start_from(
302        &mut self,
303        start_offset: Option<u64>,
304    ) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
305        self.inner.start_from(start_offset)
306    }
307}
308
309pub struct MonitoredLogReader<R: LogReader> {
310    inner: R,
311    read_epoch: u64,
312    metrics: LogReaderMetrics,
313}
314
315pub struct LogReaderMetrics {
316    pub log_store_latest_read_epoch: LabelGuardedIntGauge,
317    pub log_store_read_rows: LabelGuardedIntCounter,
318    pub log_store_read_bytes: LabelGuardedIntCounter,
319    pub log_store_reader_wait_new_future_duration_ns: LabelGuardedIntCounter,
320}
321
322impl<R: LogReader> MonitoredLogReader<R> {
323    pub fn new(inner: R, metrics: LogReaderMetrics) -> Self {
324        Self {
325            inner,
326            read_epoch: INVALID_EPOCH,
327            metrics,
328        }
329    }
330}
331
332impl<R: LogReader> LogReader for MonitoredLogReader<R> {
333    async fn init(&mut self) -> LogStoreResult<()> {
334        self.inner.init().instrument_await("log_reader_init").await
335    }
336
337    async fn next_item(&mut self) -> LogStoreResult<(u64, LogStoreReadItem)> {
338        self.inner
339            .next_item()
340            .instrument_await("log_reader_next_item")
341            .await
342            .inspect(|(epoch, item)| {
343                if self.read_epoch != *epoch {
344                    self.read_epoch = *epoch;
345                    self.metrics.log_store_latest_read_epoch.set(*epoch as _);
346                }
347                if let LogStoreReadItem::StreamChunk { chunk, .. } = item {
348                    self.metrics
349                        .log_store_read_rows
350                        .inc_by(chunk.cardinality() as _);
351                    self.metrics
352                        .log_store_read_bytes
353                        .inc_by(chunk.estimated_size() as u64);
354                }
355            })
356    }
357
358    fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()> {
359        self.inner.truncate(offset)
360    }
361
362    fn rewind(&mut self) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
363        self.inner.rewind().instrument_await("log_reader_rewind")
364    }
365
366    fn start_from(
367        &mut self,
368        start_offset: Option<u64>,
369    ) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
370        self.inner.start_from(start_offset)
371    }
372}
373
374#[derive(Copy, Clone, PartialOrd, PartialEq, Debug)]
375struct UpstreamChunkOffset(TruncateOffset);
376#[derive(Copy, Clone, PartialOrd, PartialEq)]
377struct DownstreamChunkOffset(TruncateOffset);
378
379struct RateLimitedLogReaderCore<R: LogReader> {
380    inner: R,
381    consuming_chunk: Option<(
382        UpstreamChunkOffset,
383        // Newer items at the front, push_front, pop_back
384        VecDeque<DownstreamChunkOffset>,
385        Vec<StreamChunk>, // split chunks
386    )>,
387    // Newer items at the front, push_front, pop_back
388    consumed_offset_queue: VecDeque<(UpstreamChunkOffset, VecDeque<DownstreamChunkOffset>)>,
389    next_chunk_id: usize,
390    rate_limiter: RateLimiter,
391}
392
393pub struct RateLimitedLogReader<R: LogReader> {
394    core: RateLimitedLogReaderCore<R>,
395    control_rx: UnboundedReceiver<RateLimit>,
396}
397
398impl<R: LogReader> RateLimitedLogReader<R> {
399    pub fn new(inner: R, control_rx: UnboundedReceiver<RateLimit>) -> Self {
400        Self {
401            core: RateLimitedLogReaderCore {
402                inner,
403                consuming_chunk: None,
404                consumed_offset_queue: VecDeque::new(),
405                next_chunk_id: 0,
406                rate_limiter: RateLimiter::new(RateLimit::Disabled),
407            },
408            control_rx,
409        }
410    }
411}
412
413impl<R: LogReader> RateLimitedLogReaderCore<R> {
414    fn peek_next_pending_chunk(&self) -> Option<&StreamChunk> {
415        self.consuming_chunk
416            .as_ref()
417            .and_then(|(_, _, chunk)| chunk.last())
418    }
419
420    fn consume_next_pending_chunk(&mut self) -> Option<(u64, StreamChunk, ChunkId)> {
421        let Some((upstream_offset, consumed_offsets, pending_chunk)) = &mut self.consuming_chunk
422        else {
423            return None;
424        };
425        let epoch = upstream_offset.0.epoch();
426
427        let item = pending_chunk.pop().map(|chunk| {
428            let chunk_id = self.next_chunk_id;
429            self.next_chunk_id += 1;
430            consumed_offsets.push_front(DownstreamChunkOffset(TruncateOffset::Chunk {
431                epoch,
432                chunk_id,
433            }));
434            (epoch, chunk, chunk_id)
435        });
436        if pending_chunk.is_empty() {
437            let (upstream_offset, consumed_offsets, _) =
438                self.consuming_chunk.take().expect("checked some");
439            self.consumed_offset_queue
440                .push_front((upstream_offset, consumed_offsets));
441        }
442        item
443    }
444
445    fn consume_single_upstream_item(
446        &mut self,
447        epoch: u64,
448        mut item: LogStoreReadItem,
449    ) -> (u64, LogStoreReadItem) {
450        assert!(self.consuming_chunk.is_none());
451        let (upstream_offset, downstream_offset) = match &mut item {
452            LogStoreReadItem::StreamChunk { chunk_id, .. } => {
453                let upstream_chunk_id = *chunk_id;
454                let downstream_chunk_id = self.next_chunk_id;
455                self.next_chunk_id += 1;
456                *chunk_id = downstream_chunk_id;
457                (
458                    UpstreamChunkOffset(TruncateOffset::Chunk {
459                        epoch,
460                        chunk_id: upstream_chunk_id,
461                    }),
462                    DownstreamChunkOffset(TruncateOffset::Chunk {
463                        epoch,
464                        chunk_id: downstream_chunk_id,
465                    }),
466                )
467            }
468            LogStoreReadItem::Barrier { .. } => (
469                UpstreamChunkOffset(TruncateOffset::Barrier { epoch }),
470                DownstreamChunkOffset(TruncateOffset::Barrier { epoch }),
471            ),
472        };
473        self.consumed_offset_queue
474            .push_front((upstream_offset, VecDeque::from_iter([downstream_offset])));
475        (epoch, item)
476    }
477
478    async fn next_item(&mut self) -> LogStoreResult<(u64, LogStoreReadItem)> {
479        match self.rate_limiter.rate_limit() {
480            RateLimit::Pause => pending().await,
481            RateLimit::Disabled => {
482                if let Some((epoch, chunk, chunk_id)) = self.consume_next_pending_chunk() {
483                    Ok((epoch, LogStoreReadItem::StreamChunk { chunk, chunk_id }))
484                } else {
485                    let (epoch, item) = self.inner.next_item().await?;
486                    Ok(self.consume_single_upstream_item(epoch, item))
487                }
488            }
489            RateLimit::Fixed(limit) => {
490                if self.peek_next_pending_chunk().is_none() {
491                    let (epoch, item) = self.inner.next_item().await?;
492                    match item {
493                        LogStoreReadItem::StreamChunk { chunk, chunk_id } => {
494                            let chunks = if chunk.rate_limit_permits() < limit.get() {
495                                vec![chunk]
496                            } else {
497                                let mut chunks = chunk.split(limit.get() as _);
498                                // reverse to make the first chunk to be popped first
499                                chunks.reverse();
500                                chunks
501                            };
502                            assert!(!chunks.is_empty());
503
504                            assert!(
505                                self.consuming_chunk
506                                    .replace((
507                                        UpstreamChunkOffset(TruncateOffset::Chunk {
508                                            epoch,
509                                            chunk_id
510                                        }),
511                                        VecDeque::new(),
512                                        chunks,
513                                    ))
514                                    .is_none()
515                            );
516                        }
517                        item @ LogStoreReadItem::Barrier { .. } => {
518                            return Ok(self.consume_single_upstream_item(epoch, item));
519                        }
520                    };
521                }
522                let chunk = self.peek_next_pending_chunk().expect("must Some");
523                self.rate_limiter.wait_chunk(chunk).await;
524                let (epoch, chunk, chunk_id) =
525                    self.consume_next_pending_chunk().expect("must Some");
526                Ok((epoch, LogStoreReadItem::StreamChunk { chunk, chunk_id }))
527            }
528        }
529    }
530}
531
532impl<R: LogReader> LogReader for RateLimitedLogReader<R> {
533    async fn init(&mut self) -> LogStoreResult<()> {
534        self.core.inner.init().await
535    }
536
537    async fn next_item(&mut self) -> LogStoreResult<(u64, LogStoreReadItem)> {
538        loop {
539            select! {
540                biased;
541                recv = pin!(self.control_rx.recv()) => {
542                    let new_rate_limit = match recv {
543                        Some(limit) => limit,
544                        None => bail!("rate limit control channel closed"),
545                    };
546                    let old_rate_limit = self.core.rate_limiter.update(new_rate_limit);
547                    let paused = matches!(new_rate_limit, RateLimit::Pause);
548                    tracing::info!("rate limit changed from {:?} to {:?}, paused = {paused}", old_rate_limit, new_rate_limit);
549                },
550                item = self.core.next_item() => {
551                    return item;
552                }
553            }
554        }
555    }
556
557    fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()> {
558        let downstream_offset = DownstreamChunkOffset(offset);
559        let mut truncate_offset = None;
560        let mut stop = false;
561        'outer: while let Some((upstream_offset, downstream_offsets)) =
562            self.core.consumed_offset_queue.back_mut()
563        {
564            while let Some(prev_downstream_offset) = downstream_offsets.back() {
565                if *prev_downstream_offset <= downstream_offset {
566                    downstream_offsets.pop_back();
567                } else {
568                    stop = true;
569                    break 'outer;
570                }
571            }
572            truncate_offset = Some(*upstream_offset);
573            self.core.consumed_offset_queue.pop_back();
574        }
575        if !stop && let Some((_, downstream_offsets, _)) = &mut self.core.consuming_chunk {
576            while let Some(prev_downstream_offset) = downstream_offsets.back() {
577                if *prev_downstream_offset <= downstream_offset {
578                    downstream_offsets.pop_back();
579                } else {
580                    // stop = true;
581                    break;
582                }
583            }
584        }
585        tracing::trace!(
586            "rate limited log store reader truncate offset {:?}, downstream offset {:?}",
587            truncate_offset,
588            offset
589        );
590        if let Some(offset) = truncate_offset {
591            self.core.inner.truncate(offset.0)
592        } else {
593            Ok(())
594        }
595    }
596
597    fn rewind(&mut self) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
598        self.core.consuming_chunk = None;
599        self.core.consumed_offset_queue.clear();
600        self.core.next_chunk_id = 0;
601        self.core.inner.rewind()
602    }
603
604    fn start_from(
605        &mut self,
606        start_offset: Option<u64>,
607    ) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
608        self.core.inner.start_from(start_offset)
609    }
610}
611
612#[easy_ext::ext(LogReaderExt)]
613impl<T> T
614where
615    T: LogReader,
616{
617    pub fn transform_chunk<F: Fn(StreamChunk) -> StreamChunk + Sized>(
618        self,
619        f: F,
620    ) -> TransformChunkLogReader<F, Self> {
621        TransformChunkLogReader { f, inner: self }
622    }
623
624    pub fn monitored(self, metrics: LogReaderMetrics) -> impl LogReader {
625        // TODO: The `clone()` can be avoided if move backpressure inside `MonitoredLogReader`
626        let wait_new_future_duration = metrics.log_store_reader_wait_new_future_duration_ns.clone();
627        BackpressureMonitoredLogReader::new(
628            MonitoredLogReader::new(self, metrics),
629            wait_new_future_duration,
630        )
631    }
632
633    pub fn rate_limited(self, control_rx: UnboundedReceiver<RateLimit>) -> impl LogReader {
634        RateLimitedLogReader::new(self, control_rx)
635    }
636}
637
638pub struct MonitoredLogWriter<W: LogWriter> {
639    inner: W,
640    metrics: LogWriterMetrics,
641}
642
643pub struct LogWriterMetrics {
644    // Labels: [actor_id, sink_id, sink_name]
645    pub log_store_first_write_epoch: LabelGuardedIntGauge,
646    pub log_store_latest_write_epoch: LabelGuardedIntGauge,
647    pub log_store_write_rows: LabelGuardedIntCounter,
648}
649
650impl<W: LogWriter> LogWriter for MonitoredLogWriter<W> {
651    async fn init(
652        &mut self,
653        epoch: EpochPair,
654        pause_read_on_bootstrap: bool,
655    ) -> LogStoreResult<()> {
656        self.metrics
657            .log_store_first_write_epoch
658            .set(epoch.curr as _);
659        self.metrics
660            .log_store_latest_write_epoch
661            .set(epoch.curr as _);
662        self.inner.init(epoch, pause_read_on_bootstrap).await
663    }
664
665    async fn write_chunk(&mut self, chunk: StreamChunk) -> LogStoreResult<()> {
666        self.metrics
667            .log_store_write_rows
668            .inc_by(chunk.cardinality() as _);
669        self.inner.write_chunk(chunk).await
670    }
671
672    async fn flush_current_epoch(
673        &mut self,
674        next_epoch: u64,
675        options: FlushCurrentEpochOptions,
676    ) -> LogStoreResult<LogWriterPostFlushCurrentEpoch<'_>> {
677        let post_flush = self.inner.flush_current_epoch(next_epoch, options).await?;
678        self.metrics
679            .log_store_latest_write_epoch
680            .set(next_epoch as _);
681        Ok(post_flush)
682    }
683
684    fn pause(&mut self) -> LogStoreResult<()> {
685        self.inner.pause()
686    }
687
688    fn resume(&mut self) -> LogStoreResult<()> {
689        self.inner.resume()
690    }
691}
692
693#[easy_ext::ext(LogWriterExt)]
694impl<T> T
695where
696    T: LogWriter + Sized,
697{
698    pub fn monitored(self, metrics: LogWriterMetrics) -> MonitoredLogWriter<T> {
699        MonitoredLogWriter {
700            inner: self,
701            metrics,
702        }
703    }
704}
705
706enum DeliveryFutureManagerItem<F> {
707    Chunk {
708        chunk_id: ChunkId,
709        // earlier future at the front
710        futures: VecDeque<F>,
711    },
712    Barrier,
713}
714
715pub struct DeliveryFutureManager<F> {
716    future_count: usize,
717    max_future_count: usize,
718    // earlier items at the front
719    items: VecDeque<(u64, DeliveryFutureManagerItem<F>)>,
720}
721
722impl<F> DeliveryFutureManager<F> {
723    pub fn new(max_future_count: usize) -> Self {
724        Self {
725            future_count: 0,
726            max_future_count,
727            items: Default::default(),
728        }
729    }
730
731    pub fn add_barrier(&mut self, epoch: u64) {
732        if let Some((item_epoch, last_item)) = self.items.back() {
733            match last_item {
734                DeliveryFutureManagerItem::Chunk { .. } => {
735                    assert_eq!(*item_epoch, epoch)
736                }
737                DeliveryFutureManagerItem::Barrier => {
738                    assert!(
739                        epoch > *item_epoch,
740                        "new barrier epoch {} should be greater than prev barrier {}",
741                        epoch,
742                        item_epoch
743                    );
744                }
745            }
746        }
747        self.items
748            .push_back((epoch, DeliveryFutureManagerItem::Barrier));
749    }
750
751    pub fn start_write_chunk(
752        &mut self,
753        epoch: u64,
754        chunk_id: ChunkId,
755    ) -> DeliveryFutureManagerAddFuture<'_, F> {
756        if let Some((item_epoch, item)) = self.items.back() {
757            match item {
758                DeliveryFutureManagerItem::Chunk {
759                    chunk_id: item_chunk_id,
760                    ..
761                } => {
762                    assert_eq!(epoch, *item_epoch);
763                    assert!(
764                        chunk_id > *item_chunk_id,
765                        "new chunk id {} should be greater than prev chunk id {}",
766                        chunk_id,
767                        item_chunk_id
768                    );
769                }
770                DeliveryFutureManagerItem::Barrier => {
771                    assert!(
772                        epoch > *item_epoch,
773                        "new chunk epoch {} should be greater than prev barrier: {}",
774                        epoch,
775                        item_epoch
776                    );
777                }
778            }
779        }
780        self.items.push_back((
781            epoch,
782            DeliveryFutureManagerItem::Chunk {
783                chunk_id,
784                futures: VecDeque::new(),
785            },
786        ));
787        DeliveryFutureManagerAddFuture(self)
788    }
789}
790
791pub struct DeliveryFutureManagerAddFuture<'a, F>(&'a mut DeliveryFutureManager<F>);
792
793impl<F: TryFuture<Ok = ()> + Unpin + 'static> DeliveryFutureManagerAddFuture<'_, F> {
794    /// Add a new future to the latest started written chunk.
795    /// The returned bool value indicate whether we have awaited on any previous futures.
796    pub async fn add_future_may_await(&mut self, future: F) -> Result<bool, F::Error> {
797        let mut has_await = false;
798        while self.0.future_count >= self.0.max_future_count {
799            self.await_one_delivery().await?;
800            has_await = true;
801        }
802        match self.0.items.back_mut() {
803            Some((_, DeliveryFutureManagerItem::Chunk { futures, .. })) => {
804                futures.push_back(future);
805                self.0.future_count += 1;
806                Ok(has_await)
807            }
808            _ => unreachable!("should add future only after add a new chunk"),
809        }
810    }
811
812    pub async fn await_one_delivery(&mut self) -> Result<(), F::Error> {
813        for (_, item) in &mut self.0.items {
814            if let DeliveryFutureManagerItem::Chunk { futures, .. } = item
815                && let Some(mut delivery_future) = futures.pop_front()
816            {
817                self.0.future_count -= 1;
818                return poll_fn(|cx| delivery_future.try_poll_unpin(cx)).await;
819            } else {
820                continue;
821            }
822        }
823        Ok(())
824    }
825
826    pub fn future_count(&self) -> usize {
827        self.0.future_count
828    }
829
830    pub fn max_future_count(&self) -> usize {
831        self.0.max_future_count
832    }
833}
834
835impl<F: TryFuture<Ok = ()> + Unpin + 'static> DeliveryFutureManager<F> {
836    pub fn next_truncate_offset(
837        &mut self,
838    ) -> impl Future<Output = Result<TruncateOffset, F::Error>> + '_ {
839        poll_fn(move |cx| {
840            let mut latest_offset: Option<TruncateOffset> = None;
841            'outer: while let Some((epoch, item)) = self.items.front_mut() {
842                match item {
843                    DeliveryFutureManagerItem::Chunk { chunk_id, futures } => {
844                        while let Some(future) = futures.front_mut() {
845                            match future.try_poll_unpin(cx) {
846                                Poll::Ready(result) => match result {
847                                    Ok(()) => {
848                                        self.future_count -= 1;
849                                        futures.pop_front();
850                                    }
851                                    Err(e) => {
852                                        return Poll::Ready(Err(e));
853                                    }
854                                },
855                                Poll::Pending => {
856                                    break 'outer;
857                                }
858                            }
859                        }
860
861                        // when we reach here, there must not be any pending or error future.
862                        // Which means all futures of this stream chunk have been finished
863                        assert!(futures.is_empty());
864                        latest_offset = Some(TruncateOffset::Chunk {
865                            epoch: *epoch,
866                            chunk_id: *chunk_id,
867                        });
868                        self.items.pop_front().expect("items not empty");
869                    }
870                    DeliveryFutureManagerItem::Barrier => {
871                        latest_offset = Some(TruncateOffset::Barrier { epoch: *epoch });
872                        self.items.pop_front().expect("items not empty");
873                        // Barrier will be yielded anyway
874                        break 'outer;
875                    }
876                }
877            }
878            if let Some(offset) = latest_offset {
879                Poll::Ready(Ok(offset))
880            } else {
881                Poll::Pending
882            }
883        })
884    }
885}
886
887#[cfg(test)]
888mod tests {
889    use std::future::{Future, poll_fn};
890    use std::pin::pin;
891    use std::task::Poll;
892
893    use futures::{FutureExt, TryFuture};
894    use risingwave_common::util::epoch::test_epoch;
895    use tokio::sync::oneshot;
896    use tokio::sync::oneshot::Receiver;
897
898    use super::LogStoreResult;
899    use crate::sink::log_store::{DeliveryFutureManager, TruncateOffset};
900
901    #[test]
902    fn test_truncate_offset_cmp() {
903        assert!(
904            TruncateOffset::Barrier { epoch: 232 }
905                < TruncateOffset::Chunk {
906                    epoch: 233,
907                    chunk_id: 1
908                }
909        );
910        assert_eq!(
911            TruncateOffset::Chunk {
912                epoch: 1,
913                chunk_id: 1
914            },
915            TruncateOffset::Chunk {
916                epoch: 1,
917                chunk_id: 1
918            }
919        );
920        assert!(
921            TruncateOffset::Chunk {
922                epoch: 1,
923                chunk_id: 1
924            } < TruncateOffset::Chunk {
925                epoch: 1,
926                chunk_id: 2
927            }
928        );
929        assert!(
930            TruncateOffset::Barrier { epoch: 1 }
931                > TruncateOffset::Chunk {
932                    epoch: 1,
933                    chunk_id: 2
934                }
935        );
936        assert!(
937            TruncateOffset::Chunk {
938                epoch: 1,
939                chunk_id: 2
940            } < TruncateOffset::Barrier { epoch: 1 }
941        );
942        assert!(
943            TruncateOffset::Chunk {
944                epoch: 2,
945                chunk_id: 2
946            } > TruncateOffset::Barrier { epoch: 1 }
947        );
948        assert!(TruncateOffset::Barrier { epoch: 2 } > TruncateOffset::Barrier { epoch: 1 });
949    }
950
951    type TestFuture = impl TryFuture<Ok = (), Error = anyhow::Error> + Unpin + 'static;
952
953    #[define_opaque(TestFuture)]
954    fn to_test_future(rx: Receiver<LogStoreResult<()>>) -> TestFuture {
955        async move { rx.await.unwrap() }.boxed()
956    }
957
958    #[tokio::test]
959    async fn test_empty() {
960        let mut manager = DeliveryFutureManager::<TestFuture>::new(2);
961        let mut future = pin!(manager.next_truncate_offset());
962        assert!(
963            poll_fn(|cx| Poll::Ready(future.as_mut().poll(cx)))
964                .await
965                .is_pending()
966        );
967    }
968
969    #[tokio::test]
970    async fn test_future_delivery_manager_basic() {
971        let mut manager = DeliveryFutureManager::new(2);
972        let epoch1 = 233;
973        let chunk_id1 = 1;
974        let (tx1_1, rx1_1) = oneshot::channel();
975        let mut write_chunk = manager.start_write_chunk(epoch1, chunk_id1);
976        assert!(
977            !write_chunk
978                .add_future_may_await(to_test_future(rx1_1))
979                .await
980                .unwrap()
981        );
982        assert_eq!(manager.future_count, 1);
983        {
984            let mut next_truncate_offset = pin!(manager.next_truncate_offset());
985            assert!(
986                poll_fn(|cx| Poll::Ready(next_truncate_offset.as_mut().poll(cx)))
987                    .await
988                    .is_pending()
989            );
990            tx1_1.send(Ok(())).unwrap();
991            assert_eq!(
992                next_truncate_offset.await.unwrap(),
993                TruncateOffset::Chunk {
994                    epoch: epoch1,
995                    chunk_id: chunk_id1
996                }
997            );
998        }
999        assert_eq!(manager.future_count, 0);
1000        manager.add_barrier(epoch1);
1001        assert_eq!(
1002            manager.next_truncate_offset().await.unwrap(),
1003            TruncateOffset::Barrier { epoch: epoch1 }
1004        );
1005    }
1006
1007    #[tokio::test]
1008    async fn test_future_delivery_manager_compress_chunk() {
1009        let mut manager = DeliveryFutureManager::new(10);
1010        let epoch1 = test_epoch(233);
1011        let chunk_id1 = 1;
1012        let chunk_id2 = chunk_id1 + 1;
1013        let chunk_id3 = chunk_id2 + 1;
1014        let (tx1_1, rx1_1) = oneshot::channel();
1015        let (tx1_2, rx1_2) = oneshot::channel();
1016        let (tx1_3, rx1_3) = oneshot::channel();
1017        let epoch2 = test_epoch(234);
1018        let (tx2_1, rx2_1) = oneshot::channel();
1019        assert!(
1020            !manager
1021                .start_write_chunk(epoch1, chunk_id1)
1022                .add_future_may_await(to_test_future(rx1_1))
1023                .await
1024                .unwrap()
1025        );
1026        assert!(
1027            !manager
1028                .start_write_chunk(epoch1, chunk_id2)
1029                .add_future_may_await(to_test_future(rx1_2))
1030                .await
1031                .unwrap()
1032        );
1033        assert!(
1034            !manager
1035                .start_write_chunk(epoch1, chunk_id3)
1036                .add_future_may_await(to_test_future(rx1_3))
1037                .await
1038                .unwrap()
1039        );
1040        manager.add_barrier(epoch1);
1041        assert!(
1042            !manager
1043                .start_write_chunk(epoch2, chunk_id1)
1044                .add_future_may_await(to_test_future(rx2_1))
1045                .await
1046                .unwrap()
1047        );
1048        assert_eq!(manager.future_count, 4);
1049        {
1050            let mut next_truncate_offset = pin!(manager.next_truncate_offset());
1051            assert!(
1052                poll_fn(|cx| Poll::Ready(next_truncate_offset.as_mut().poll(cx)))
1053                    .await
1054                    .is_pending()
1055            );
1056            tx1_2.send(Ok(())).unwrap();
1057            assert!(
1058                poll_fn(|cx| Poll::Ready(next_truncate_offset.as_mut().poll(cx)))
1059                    .await
1060                    .is_pending()
1061            );
1062            tx1_1.send(Ok(())).unwrap();
1063            // The offset of chunk1 and chunk2 are compressed
1064            assert_eq!(
1065                next_truncate_offset.await.unwrap(),
1066                TruncateOffset::Chunk {
1067                    epoch: epoch1,
1068                    chunk_id: chunk_id2
1069                }
1070            );
1071        }
1072        assert_eq!(manager.future_count, 2);
1073        {
1074            let mut next_truncate_offset = pin!(manager.next_truncate_offset());
1075            assert!(
1076                poll_fn(|cx| Poll::Ready(next_truncate_offset.as_mut().poll(cx)))
1077                    .await
1078                    .is_pending()
1079            );
1080            tx1_3.send(Ok(())).unwrap();
1081            tx2_1.send(Ok(())).unwrap();
1082            // Emit barrier though later chunk has finished.
1083            assert_eq!(
1084                next_truncate_offset.await.unwrap(),
1085                TruncateOffset::Barrier { epoch: epoch1 }
1086            );
1087        }
1088        assert_eq!(manager.future_count, 1);
1089        assert_eq!(
1090            manager.next_truncate_offset().await.unwrap(),
1091            TruncateOffset::Chunk {
1092                epoch: epoch2,
1093                chunk_id: chunk_id1
1094            }
1095        );
1096    }
1097
1098    #[tokio::test]
1099    async fn test_future_delivery_manager_await_future() {
1100        let mut manager = DeliveryFutureManager::new(2);
1101        let epoch = 233;
1102        let chunk_id1 = 1;
1103        let chunk_id2 = chunk_id1 + 1;
1104        let (tx1_1, rx1_1) = oneshot::channel();
1105        let (tx1_2, rx1_2) = oneshot::channel();
1106        let (tx2_1, rx2_1) = oneshot::channel();
1107        let (tx2_2, rx2_2) = oneshot::channel();
1108
1109        {
1110            let mut write_chunk = manager.start_write_chunk(epoch, chunk_id1);
1111            assert!(
1112                !write_chunk
1113                    .add_future_may_await(to_test_future(rx1_1))
1114                    .await
1115                    .unwrap()
1116            );
1117            assert!(
1118                !write_chunk
1119                    .add_future_may_await(to_test_future(rx1_2))
1120                    .await
1121                    .unwrap()
1122            );
1123            assert_eq!(manager.future_count, 2);
1124        }
1125
1126        {
1127            let mut write_chunk = manager.start_write_chunk(epoch, chunk_id2);
1128            {
1129                let mut future1 = pin!(write_chunk.add_future_may_await(to_test_future(rx2_1)));
1130                assert!(
1131                    poll_fn(|cx| Poll::Ready(future1.as_mut().poll(cx)))
1132                        .await
1133                        .is_pending()
1134                );
1135                tx1_1.send(Ok(())).unwrap();
1136                assert!(future1.await.unwrap());
1137            }
1138            assert_eq!(2, write_chunk.future_count());
1139            {
1140                let mut future2 = pin!(write_chunk.add_future_may_await(to_test_future(rx2_2)));
1141                assert!(
1142                    poll_fn(|cx| Poll::Ready(future2.as_mut().poll(cx)))
1143                        .await
1144                        .is_pending()
1145                );
1146                tx1_2.send(Ok(())).unwrap();
1147                assert!(future2.await.unwrap());
1148            }
1149            assert_eq!(2, write_chunk.future_count());
1150            {
1151                let mut future3 = pin!(write_chunk.await_one_delivery());
1152                assert!(
1153                    poll_fn(|cx| Poll::Ready(future3.as_mut().poll(cx)))
1154                        .await
1155                        .is_pending()
1156                );
1157                tx2_1.send(Ok(())).unwrap();
1158                future3.await.unwrap();
1159            }
1160            assert_eq!(1, write_chunk.future_count());
1161        }
1162
1163        assert_eq!(
1164            manager.next_truncate_offset().await.unwrap(),
1165            TruncateOffset::Chunk {
1166                epoch,
1167                chunk_id: chunk_id1
1168            }
1169        );
1170
1171        assert_eq!(1, manager.future_count);
1172
1173        {
1174            let mut future = pin!(manager.next_truncate_offset());
1175            assert!(
1176                poll_fn(|cx| Poll::Ready(future.as_mut().poll(cx)))
1177                    .await
1178                    .is_pending()
1179            );
1180            tx2_2.send(Ok(())).unwrap();
1181            assert_eq!(
1182                future.await.unwrap(),
1183                TruncateOffset::Chunk {
1184                    epoch,
1185                    chunk_id: chunk_id2
1186                }
1187            );
1188        }
1189
1190        assert_eq!(0, manager.future_count);
1191    }
1192}