use std::cmp::Ordering;
use std::collections::VecDeque;
use std::fmt::Debug;
use std::future::{poll_fn, Future};
use std::sync::Arc;
use std::task::Poll;
use std::time::Instant;
use await_tree::InstrumentAwait;
use futures::{TryFuture, TryFutureExt};
use risingwave_common::array::StreamChunk;
use risingwave_common::bail;
use risingwave_common::bitmap::Bitmap;
use risingwave_common::metrics::{LabelGuardedIntCounter, LabelGuardedIntGauge};
use risingwave_common::util::epoch::{EpochPair, INVALID_EPOCH};
pub type LogStoreResult<T> = Result<T, anyhow::Error>;
pub type ChunkId = usize;
#[derive(Debug, PartialEq, Copy, Clone)]
pub enum TruncateOffset {
Chunk { epoch: u64, chunk_id: ChunkId },
Barrier { epoch: u64 },
}
impl PartialOrd for TruncateOffset {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
let extract = |offset: &TruncateOffset| match offset {
TruncateOffset::Chunk { epoch, chunk_id } => (*epoch, *chunk_id),
TruncateOffset::Barrier { epoch } => (*epoch, usize::MAX),
};
let this = extract(self);
let other = extract(other);
this.partial_cmp(&other)
}
}
impl TruncateOffset {
pub fn next_chunk_id(&self) -> ChunkId {
match self {
TruncateOffset::Chunk { chunk_id, .. } => chunk_id + 1,
TruncateOffset::Barrier { .. } => 0,
}
}
pub fn epoch(&self) -> u64 {
match self {
TruncateOffset::Chunk { epoch, .. } | TruncateOffset::Barrier { epoch } => *epoch,
}
}
pub fn check_next_offset(&self, next_offset: TruncateOffset) -> LogStoreResult<()> {
if *self >= next_offset {
bail!(
"next offset {:?} should be later than current offset {:?}",
next_offset,
self
)
} else {
Ok(())
}
}
pub fn check_next_item_epoch(&self, epoch: u64) -> LogStoreResult<()> {
match self {
TruncateOffset::Chunk {
epoch: offset_epoch,
..
} => {
if epoch != *offset_epoch {
bail!(
"new item epoch {} does not match current chunk offset epoch {}",
epoch,
offset_epoch
);
}
}
TruncateOffset::Barrier {
epoch: offset_epoch,
} => {
if epoch <= *offset_epoch {
bail!(
"new item epoch {} does not exceed barrier offset epoch {}",
epoch,
offset_epoch
);
}
}
}
Ok(())
}
}
#[derive(Debug)]
pub enum LogStoreReadItem {
StreamChunk {
chunk: StreamChunk,
chunk_id: ChunkId,
},
Barrier {
is_checkpoint: bool,
},
UpdateVnodeBitmap(Arc<Bitmap>),
}
pub trait LogWriter: Send {
fn init(
&mut self,
epoch: EpochPair,
pause_read_on_bootstrap: bool,
) -> impl Future<Output = LogStoreResult<()>> + Send + '_;
fn write_chunk(
&mut self,
chunk: StreamChunk,
) -> impl Future<Output = LogStoreResult<()>> + Send + '_;
fn flush_current_epoch(
&mut self,
next_epoch: u64,
is_checkpoint: bool,
) -> impl Future<Output = LogStoreResult<()>> + Send + '_;
fn update_vnode_bitmap(
&mut self,
new_vnodes: Arc<Bitmap>,
) -> impl Future<Output = LogStoreResult<()>> + Send + '_;
fn pause(&mut self) -> LogStoreResult<()>;
fn resume(&mut self) -> LogStoreResult<()>;
}
pub trait LogReader: Send + Sized + 'static {
fn init(&mut self) -> impl Future<Output = LogStoreResult<()>> + Send + '_;
fn next_item(
&mut self,
) -> impl Future<Output = LogStoreResult<(u64, LogStoreReadItem)>> + Send + '_;
fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()>;
fn rewind(
&mut self,
) -> impl Future<Output = LogStoreResult<(bool, Option<Bitmap>)>> + Send + '_;
}
pub trait LogStoreFactory: Send + 'static {
type Reader: LogReader;
type Writer: LogWriter;
fn build(self) -> impl Future<Output = (Self::Reader, Self::Writer)> + Send;
}
pub struct TransformChunkLogReader<F: Fn(StreamChunk) -> StreamChunk, R: LogReader> {
f: F,
inner: R,
}
impl<F: Fn(StreamChunk) -> StreamChunk + Send + 'static, R: LogReader> LogReader
for TransformChunkLogReader<F, R>
{
fn init(&mut self) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
self.inner.init()
}
async fn next_item(&mut self) -> LogStoreResult<(u64, LogStoreReadItem)> {
let (epoch, item) = self.inner.next_item().await?;
let item = match item {
LogStoreReadItem::StreamChunk { chunk, chunk_id } => LogStoreReadItem::StreamChunk {
chunk: (self.f)(chunk),
chunk_id,
},
other => other,
};
Ok((epoch, item))
}
fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()> {
self.inner.truncate(offset)
}
fn rewind(
&mut self,
) -> impl Future<Output = LogStoreResult<(bool, Option<Bitmap>)>> + Send + '_ {
self.inner.rewind()
}
}
pub struct BackpressureMonitoredLogReader<R: LogReader> {
inner: R,
wait_new_future_start_time: Option<Instant>,
wait_new_future_duration_ns: LabelGuardedIntCounter<4>,
}
impl<R: LogReader> BackpressureMonitoredLogReader<R> {
fn new(inner: R, wait_new_future_duration_ns: LabelGuardedIntCounter<4>) -> Self {
Self {
inner,
wait_new_future_start_time: None,
wait_new_future_duration_ns,
}
}
}
impl<R: LogReader> LogReader for BackpressureMonitoredLogReader<R> {
fn init(&mut self) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
self.wait_new_future_start_time = None;
self.inner.init()
}
fn next_item(
&mut self,
) -> impl Future<Output = LogStoreResult<(u64, LogStoreReadItem)>> + Send + '_ {
if let Some(start_time) = self.wait_new_future_start_time.take() {
self.wait_new_future_duration_ns
.inc_by(start_time.elapsed().as_nanos() as _);
}
self.inner.next_item().inspect_ok(|_| {
self.wait_new_future_start_time = Some(Instant::now());
})
}
fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()> {
self.inner.truncate(offset)
}
fn rewind(
&mut self,
) -> impl Future<Output = LogStoreResult<(bool, Option<Bitmap>)>> + Send + '_ {
self.inner.rewind().inspect_ok(|_| {
self.wait_new_future_start_time = None;
})
}
}
pub struct MonitoredLogReader<R: LogReader> {
inner: R,
read_epoch: u64,
metrics: LogReaderMetrics,
}
pub struct LogReaderMetrics {
pub log_store_latest_read_epoch: LabelGuardedIntGauge<4>,
pub log_store_read_rows: LabelGuardedIntCounter<4>,
pub log_store_reader_wait_new_future_duration_ns: LabelGuardedIntCounter<4>,
}
impl<R: LogReader> MonitoredLogReader<R> {
pub fn new(inner: R, metrics: LogReaderMetrics) -> Self {
Self {
inner,
read_epoch: INVALID_EPOCH,
metrics,
}
}
}
impl<R: LogReader> LogReader for MonitoredLogReader<R> {
async fn init(&mut self) -> LogStoreResult<()> {
self.inner.init().instrument_await("log_reader_init").await
}
async fn next_item(&mut self) -> LogStoreResult<(u64, LogStoreReadItem)> {
self.inner
.next_item()
.instrument_await("log_reader_next_item")
.await
.inspect(|(epoch, item)| {
if self.read_epoch != *epoch {
self.read_epoch = *epoch;
self.metrics.log_store_latest_read_epoch.set(*epoch as _);
}
if let LogStoreReadItem::StreamChunk { chunk, .. } = item {
self.metrics
.log_store_read_rows
.inc_by(chunk.cardinality() as _);
}
})
}
fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()> {
self.inner.truncate(offset)
}
fn rewind(
&mut self,
) -> impl Future<Output = LogStoreResult<(bool, Option<Bitmap>)>> + Send + '_ {
self.inner.rewind().instrument_await("log_reader_rewind")
}
}
#[easy_ext::ext(LogReaderExt)]
impl<T> T
where
T: LogReader,
{
pub fn transform_chunk<F: Fn(StreamChunk) -> StreamChunk + Sized>(
self,
f: F,
) -> TransformChunkLogReader<F, Self> {
TransformChunkLogReader { f, inner: self }
}
pub fn monitored(self, metrics: LogReaderMetrics) -> impl LogReader {
let wait_new_future_duration = metrics.log_store_reader_wait_new_future_duration_ns.clone();
BackpressureMonitoredLogReader::new(
MonitoredLogReader::new(self, metrics),
wait_new_future_duration,
)
}
}
pub struct MonitoredLogWriter<W: LogWriter> {
inner: W,
metrics: LogWriterMetrics,
}
pub struct LogWriterMetrics {
pub log_store_first_write_epoch: LabelGuardedIntGauge<3>,
pub log_store_latest_write_epoch: LabelGuardedIntGauge<3>,
pub log_store_write_rows: LabelGuardedIntCounter<3>,
}
impl<W: LogWriter> LogWriter for MonitoredLogWriter<W> {
async fn init(
&mut self,
epoch: EpochPair,
pause_read_on_bootstrap: bool,
) -> LogStoreResult<()> {
self.metrics
.log_store_first_write_epoch
.set(epoch.curr as _);
self.metrics
.log_store_latest_write_epoch
.set(epoch.curr as _);
self.inner.init(epoch, pause_read_on_bootstrap).await
}
async fn write_chunk(&mut self, chunk: StreamChunk) -> LogStoreResult<()> {
self.metrics
.log_store_write_rows
.inc_by(chunk.cardinality() as _);
self.inner.write_chunk(chunk).await
}
async fn flush_current_epoch(
&mut self,
next_epoch: u64,
is_checkpoint: bool,
) -> LogStoreResult<()> {
self.inner
.flush_current_epoch(next_epoch, is_checkpoint)
.await?;
self.metrics
.log_store_latest_write_epoch
.set(next_epoch as _);
Ok(())
}
async fn update_vnode_bitmap(&mut self, new_vnodes: Arc<Bitmap>) -> LogStoreResult<()> {
self.inner.update_vnode_bitmap(new_vnodes).await
}
fn pause(&mut self) -> LogStoreResult<()> {
self.inner.pause()
}
fn resume(&mut self) -> LogStoreResult<()> {
self.inner.resume()
}
}
#[easy_ext::ext(LogWriterExt)]
impl<T> T
where
T: LogWriter + Sized,
{
pub fn monitored(self, metrics: LogWriterMetrics) -> MonitoredLogWriter<T> {
MonitoredLogWriter {
inner: self,
metrics,
}
}
}
enum DeliveryFutureManagerItem<F> {
Chunk {
chunk_id: ChunkId,
futures: VecDeque<F>,
},
Barrier,
}
pub struct DeliveryFutureManager<F> {
future_count: usize,
max_future_count: usize,
items: VecDeque<(u64, DeliveryFutureManagerItem<F>)>,
}
impl<F> DeliveryFutureManager<F> {
pub fn new(max_future_count: usize) -> Self {
Self {
future_count: 0,
max_future_count,
items: Default::default(),
}
}
pub fn add_barrier(&mut self, epoch: u64) {
if let Some((item_epoch, last_item)) = self.items.back() {
match last_item {
DeliveryFutureManagerItem::Chunk { .. } => {
assert_eq!(*item_epoch, epoch)
}
DeliveryFutureManagerItem::Barrier => {
assert!(
epoch > *item_epoch,
"new barrier epoch {} should be greater than prev barrier {}",
epoch,
item_epoch
);
}
}
}
self.items
.push_back((epoch, DeliveryFutureManagerItem::Barrier));
}
pub fn start_write_chunk(
&mut self,
epoch: u64,
chunk_id: ChunkId,
) -> DeliveryFutureManagerAddFuture<'_, F> {
if let Some((item_epoch, item)) = self.items.back() {
match item {
DeliveryFutureManagerItem::Chunk {
chunk_id: item_chunk_id,
..
} => {
assert_eq!(epoch, *item_epoch);
assert!(
chunk_id > *item_chunk_id,
"new chunk id {} should be greater than prev chunk id {}",
chunk_id,
item_chunk_id
);
}
DeliveryFutureManagerItem::Barrier => {
assert!(
epoch > *item_epoch,
"new chunk epoch {} should be greater than prev barrier: {}",
epoch,
item_epoch
);
}
}
}
self.items.push_back((
epoch,
DeliveryFutureManagerItem::Chunk {
chunk_id,
futures: VecDeque::new(),
},
));
DeliveryFutureManagerAddFuture(self)
}
}
pub struct DeliveryFutureManagerAddFuture<'a, F>(&'a mut DeliveryFutureManager<F>);
impl<'a, F: TryFuture<Ok = ()> + Unpin + 'static> DeliveryFutureManagerAddFuture<'a, F> {
pub async fn add_future_may_await(&mut self, future: F) -> Result<bool, F::Error> {
let mut has_await = false;
while self.0.future_count >= self.0.max_future_count {
self.await_one_delivery().await?;
has_await = true;
}
match self.0.items.back_mut() {
Some((_, DeliveryFutureManagerItem::Chunk { futures, .. })) => {
futures.push_back(future);
self.0.future_count += 1;
Ok(has_await)
}
_ => unreachable!("should add future only after add a new chunk"),
}
}
pub async fn await_one_delivery(&mut self) -> Result<(), F::Error> {
for (_, item) in &mut self.0.items {
if let DeliveryFutureManagerItem::Chunk { futures, .. } = item
&& let Some(mut delivery_future) = futures.pop_front()
{
self.0.future_count -= 1;
return poll_fn(|cx| delivery_future.try_poll_unpin(cx)).await;
} else {
continue;
}
}
Ok(())
}
pub fn future_count(&self) -> usize {
self.0.future_count
}
pub fn max_future_count(&self) -> usize {
self.0.max_future_count
}
}
impl<F: TryFuture<Ok = ()> + Unpin + 'static> DeliveryFutureManager<F> {
pub fn next_truncate_offset(
&mut self,
) -> impl Future<Output = Result<TruncateOffset, F::Error>> + '_ {
poll_fn(move |cx| {
let mut latest_offset: Option<TruncateOffset> = None;
'outer: while let Some((epoch, item)) = self.items.front_mut() {
match item {
DeliveryFutureManagerItem::Chunk { chunk_id, futures } => {
while let Some(future) = futures.front_mut() {
match future.try_poll_unpin(cx) {
Poll::Ready(result) => match result {
Ok(()) => {
self.future_count -= 1;
futures.pop_front();
}
Err(e) => {
return Poll::Ready(Err(e));
}
},
Poll::Pending => {
break 'outer;
}
}
}
assert!(futures.is_empty());
latest_offset = Some(TruncateOffset::Chunk {
epoch: *epoch,
chunk_id: *chunk_id,
});
self.items.pop_front().expect("items not empty");
}
DeliveryFutureManagerItem::Barrier => {
latest_offset = Some(TruncateOffset::Barrier { epoch: *epoch });
self.items.pop_front().expect("items not empty");
break 'outer;
}
}
}
if let Some(offset) = latest_offset {
Poll::Ready(Ok(offset))
} else {
Poll::Pending
}
})
}
}
#[cfg(test)]
mod tests {
use std::future::{poll_fn, Future};
use std::pin::pin;
use std::task::Poll;
use futures::{FutureExt, TryFuture};
use risingwave_common::util::epoch::test_epoch;
use tokio::sync::oneshot;
use tokio::sync::oneshot::Receiver;
use super::LogStoreResult;
use crate::sink::log_store::{DeliveryFutureManager, TruncateOffset};
#[test]
fn test_truncate_offset_cmp() {
assert!(
TruncateOffset::Barrier { epoch: 232 }
< TruncateOffset::Chunk {
epoch: 233,
chunk_id: 1
}
);
assert_eq!(
TruncateOffset::Chunk {
epoch: 1,
chunk_id: 1
},
TruncateOffset::Chunk {
epoch: 1,
chunk_id: 1
}
);
assert!(
TruncateOffset::Chunk {
epoch: 1,
chunk_id: 1
} < TruncateOffset::Chunk {
epoch: 1,
chunk_id: 2
}
);
assert!(
TruncateOffset::Barrier { epoch: 1 }
> TruncateOffset::Chunk {
epoch: 1,
chunk_id: 2
}
);
assert!(
TruncateOffset::Chunk {
epoch: 1,
chunk_id: 2
} < TruncateOffset::Barrier { epoch: 1 }
);
assert!(
TruncateOffset::Chunk {
epoch: 2,
chunk_id: 2
} > TruncateOffset::Barrier { epoch: 1 }
);
assert!(TruncateOffset::Barrier { epoch: 2 } > TruncateOffset::Barrier { epoch: 1 });
}
type TestFuture = impl TryFuture<Ok = (), Error = anyhow::Error> + Unpin + 'static;
fn to_test_future(rx: Receiver<LogStoreResult<()>>) -> TestFuture {
async move { rx.await.unwrap() }.boxed()
}
#[tokio::test]
async fn test_empty() {
let mut manager = DeliveryFutureManager::<TestFuture>::new(2);
let mut future = pin!(manager.next_truncate_offset());
assert!(poll_fn(|cx| Poll::Ready(future.as_mut().poll(cx)))
.await
.is_pending());
}
#[tokio::test]
async fn test_future_delivery_manager_basic() {
let mut manager = DeliveryFutureManager::new(2);
let epoch1 = 233;
let chunk_id1 = 1;
let (tx1_1, rx1_1) = oneshot::channel();
let mut write_chunk = manager.start_write_chunk(epoch1, chunk_id1);
assert!(!write_chunk
.add_future_may_await(to_test_future(rx1_1))
.await
.unwrap());
assert_eq!(manager.future_count, 1);
{
let mut next_truncate_offset = pin!(manager.next_truncate_offset());
assert!(
poll_fn(|cx| Poll::Ready(next_truncate_offset.as_mut().poll(cx)))
.await
.is_pending()
);
tx1_1.send(Ok(())).unwrap();
assert_eq!(
next_truncate_offset.await.unwrap(),
TruncateOffset::Chunk {
epoch: epoch1,
chunk_id: chunk_id1
}
);
}
assert_eq!(manager.future_count, 0);
manager.add_barrier(epoch1);
assert_eq!(
manager.next_truncate_offset().await.unwrap(),
TruncateOffset::Barrier { epoch: epoch1 }
);
}
#[tokio::test]
async fn test_future_delivery_manager_compress_chunk() {
let mut manager = DeliveryFutureManager::new(10);
let epoch1 = test_epoch(233);
let chunk_id1 = 1;
let chunk_id2 = chunk_id1 + 1;
let chunk_id3 = chunk_id2 + 1;
let (tx1_1, rx1_1) = oneshot::channel();
let (tx1_2, rx1_2) = oneshot::channel();
let (tx1_3, rx1_3) = oneshot::channel();
let epoch2 = test_epoch(234);
let (tx2_1, rx2_1) = oneshot::channel();
assert!(!manager
.start_write_chunk(epoch1, chunk_id1)
.add_future_may_await(to_test_future(rx1_1))
.await
.unwrap());
assert!(!manager
.start_write_chunk(epoch1, chunk_id2)
.add_future_may_await(to_test_future(rx1_2))
.await
.unwrap());
assert!(!manager
.start_write_chunk(epoch1, chunk_id3)
.add_future_may_await(to_test_future(rx1_3))
.await
.unwrap());
manager.add_barrier(epoch1);
assert!(!manager
.start_write_chunk(epoch2, chunk_id1)
.add_future_may_await(to_test_future(rx2_1))
.await
.unwrap());
assert_eq!(manager.future_count, 4);
{
let mut next_truncate_offset = pin!(manager.next_truncate_offset());
assert!(
poll_fn(|cx| Poll::Ready(next_truncate_offset.as_mut().poll(cx)))
.await
.is_pending()
);
tx1_2.send(Ok(())).unwrap();
assert!(
poll_fn(|cx| Poll::Ready(next_truncate_offset.as_mut().poll(cx)))
.await
.is_pending()
);
tx1_1.send(Ok(())).unwrap();
assert_eq!(
next_truncate_offset.await.unwrap(),
TruncateOffset::Chunk {
epoch: epoch1,
chunk_id: chunk_id2
}
);
}
assert_eq!(manager.future_count, 2);
{
let mut next_truncate_offset = pin!(manager.next_truncate_offset());
assert!(
poll_fn(|cx| Poll::Ready(next_truncate_offset.as_mut().poll(cx)))
.await
.is_pending()
);
tx1_3.send(Ok(())).unwrap();
tx2_1.send(Ok(())).unwrap();
assert_eq!(
next_truncate_offset.await.unwrap(),
TruncateOffset::Barrier { epoch: epoch1 }
);
}
assert_eq!(manager.future_count, 1);
assert_eq!(
manager.next_truncate_offset().await.unwrap(),
TruncateOffset::Chunk {
epoch: epoch2,
chunk_id: chunk_id1
}
);
}
#[tokio::test]
async fn test_future_delivery_manager_await_future() {
let mut manager = DeliveryFutureManager::new(2);
let epoch = 233;
let chunk_id1 = 1;
let chunk_id2 = chunk_id1 + 1;
let (tx1_1, rx1_1) = oneshot::channel();
let (tx1_2, rx1_2) = oneshot::channel();
let (tx2_1, rx2_1) = oneshot::channel();
let (tx2_2, rx2_2) = oneshot::channel();
{
let mut write_chunk = manager.start_write_chunk(epoch, chunk_id1);
assert!(!write_chunk
.add_future_may_await(to_test_future(rx1_1))
.await
.unwrap());
assert!(!write_chunk
.add_future_may_await(to_test_future(rx1_2))
.await
.unwrap());
assert_eq!(manager.future_count, 2);
}
{
let mut write_chunk = manager.start_write_chunk(epoch, chunk_id2);
{
let mut future1 = pin!(write_chunk.add_future_may_await(to_test_future(rx2_1)));
assert!(poll_fn(|cx| Poll::Ready(future1.as_mut().poll(cx)))
.await
.is_pending());
tx1_1.send(Ok(())).unwrap();
assert!(future1.await.unwrap());
}
assert_eq!(2, write_chunk.future_count());
{
let mut future2 = pin!(write_chunk.add_future_may_await(to_test_future(rx2_2)));
assert!(poll_fn(|cx| Poll::Ready(future2.as_mut().poll(cx)))
.await
.is_pending());
tx1_2.send(Ok(())).unwrap();
assert!(future2.await.unwrap());
}
assert_eq!(2, write_chunk.future_count());
{
let mut future3 = pin!(write_chunk.await_one_delivery());
assert!(poll_fn(|cx| Poll::Ready(future3.as_mut().poll(cx)))
.await
.is_pending());
tx2_1.send(Ok(())).unwrap();
future3.await.unwrap();
}
assert_eq!(1, write_chunk.future_count());
}
assert_eq!(
manager.next_truncate_offset().await.unwrap(),
TruncateOffset::Chunk {
epoch,
chunk_id: chunk_id1
}
);
assert_eq!(1, manager.future_count);
{
let mut future = pin!(manager.next_truncate_offset());
assert!(poll_fn(|cx| Poll::Ready(future.as_mut().poll(cx)))
.await
.is_pending());
tx2_2.send(Ok(())).unwrap();
assert_eq!(
future.await.unwrap(),
TruncateOffset::Chunk {
epoch,
chunk_id: chunk_id2
}
);
}
assert_eq!(0, manager.future_count);
}
}