risingwave_stream/common/log_store_impl/
in_mem.rsuse std::sync::Arc;
use anyhow::{anyhow, Context};
use await_tree::InstrumentAwait;
use risingwave_common::array::StreamChunk;
use risingwave_common::bitmap::Bitmap;
use risingwave_common::util::epoch::{EpochExt, EpochPair, INVALID_EPOCH};
use risingwave_connector::sink::log_store::{
LogReader, LogStoreFactory, LogStoreReadItem, LogStoreResult, LogWriter, TruncateOffset,
};
use tokio::sync::mpsc::{
channel, unbounded_channel, Receiver, Sender, UnboundedReceiver, UnboundedSender,
};
use tokio::sync::oneshot;
use crate::common::log_store_impl::in_mem::LogReaderEpochProgress::{AwaitingTruncate, Consuming};
enum InMemLogStoreItem {
StreamChunk(StreamChunk),
Barrier {
next_epoch: u64,
is_checkpoint: bool,
},
UpdateVnodeBitmap(Arc<Bitmap>),
}
pub struct BoundedInMemLogStoreWriter {
curr_epoch: Option<u64>,
init_epoch_tx: Option<oneshot::Sender<u64>>,
item_tx: Sender<InMemLogStoreItem>,
truncated_epoch_rx: UnboundedReceiver<u64>,
}
#[derive(Eq, PartialEq, Debug)]
enum LogReaderEpochProgress {
Consuming(u64),
AwaitingTruncate { sealed_epoch: u64, next_epoch: u64 },
}
const UNINITIALIZED: LogReaderEpochProgress = LogReaderEpochProgress::Consuming(INVALID_EPOCH);
pub struct BoundedInMemLogStoreReader {
epoch_progress: LogReaderEpochProgress,
init_epoch_rx: Option<oneshot::Receiver<u64>>,
item_rx: Receiver<InMemLogStoreItem>,
truncated_epoch_tx: UnboundedSender<u64>,
latest_offset: TruncateOffset,
truncate_offset: TruncateOffset,
}
pub struct BoundedInMemLogStoreFactory {
bound: usize,
}
impl BoundedInMemLogStoreFactory {
pub fn new(bound: usize) -> Self {
Self { bound }
}
}
impl LogStoreFactory for BoundedInMemLogStoreFactory {
type Reader = BoundedInMemLogStoreReader;
type Writer = BoundedInMemLogStoreWriter;
async fn build(self) -> (Self::Reader, Self::Writer) {
let (init_epoch_tx, init_epoch_rx) = oneshot::channel();
let (item_tx, item_rx) = channel(self.bound);
let (truncated_epoch_tx, truncated_epoch_rx) = unbounded_channel();
let reader = BoundedInMemLogStoreReader {
epoch_progress: UNINITIALIZED,
init_epoch_rx: Some(init_epoch_rx),
item_rx,
truncated_epoch_tx,
latest_offset: TruncateOffset::Barrier { epoch: 0 },
truncate_offset: TruncateOffset::Barrier { epoch: 0 },
};
let writer = BoundedInMemLogStoreWriter {
curr_epoch: None,
init_epoch_tx: Some(init_epoch_tx),
item_tx,
truncated_epoch_rx,
};
(reader, writer)
}
}
impl LogReader for BoundedInMemLogStoreReader {
async fn init(&mut self) -> LogStoreResult<()> {
let init_epoch_rx = self
.init_epoch_rx
.take()
.expect("should not init for twice");
let epoch = init_epoch_rx.await.context("unable to get init epoch")?;
assert_eq!(self.epoch_progress, UNINITIALIZED);
self.epoch_progress = LogReaderEpochProgress::Consuming(epoch);
self.latest_offset = TruncateOffset::Barrier {
epoch: epoch.prev_epoch(),
};
self.truncate_offset = TruncateOffset::Barrier {
epoch: epoch.prev_epoch(),
};
Ok(())
}
async fn next_item(&mut self) -> LogStoreResult<(u64, LogStoreReadItem)> {
match self.item_rx.recv().await {
Some(item) => match self.epoch_progress {
Consuming(current_epoch) => match item {
InMemLogStoreItem::StreamChunk(chunk) => {
let chunk_id = match self.latest_offset {
TruncateOffset::Chunk { epoch, chunk_id } => {
assert_eq!(epoch, current_epoch);
chunk_id + 1
}
TruncateOffset::Barrier { epoch } => {
assert!(
epoch < current_epoch,
"prev offset at barrier {} but current epoch {}",
epoch,
current_epoch
);
0
}
};
self.latest_offset = TruncateOffset::Chunk {
epoch: current_epoch,
chunk_id,
};
Ok((
current_epoch,
LogStoreReadItem::StreamChunk { chunk, chunk_id },
))
}
InMemLogStoreItem::Barrier {
is_checkpoint,
next_epoch,
} => {
if is_checkpoint {
self.epoch_progress = AwaitingTruncate {
next_epoch,
sealed_epoch: current_epoch,
};
} else {
self.epoch_progress = Consuming(next_epoch);
}
self.latest_offset = TruncateOffset::Barrier {
epoch: current_epoch,
};
Ok((current_epoch, LogStoreReadItem::Barrier { is_checkpoint }))
}
InMemLogStoreItem::UpdateVnodeBitmap(vnode_bitmap) => Ok((
current_epoch,
LogStoreReadItem::UpdateVnodeBitmap(vnode_bitmap),
)),
},
AwaitingTruncate { .. } => Err(anyhow!(
"should not call next_item on checkpoint barrier for in-mem log store"
)),
},
None => Err(anyhow!("end of log stream")),
}
}
fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()> {
if self.truncate_offset >= offset {
return Err(anyhow!(
"truncate offset {:?} but prev truncate offset is {:?}",
offset,
self.truncate_offset
));
}
if offset > self.latest_offset {
return Err(anyhow!(
"truncate at {:?} but latest offset is {:?}",
offset,
self.latest_offset
));
}
if let AwaitingTruncate {
sealed_epoch,
next_epoch,
} = &self.epoch_progress
{
if let TruncateOffset::Barrier { epoch } = offset
&& epoch == *sealed_epoch
{
let sealed_epoch = *sealed_epoch;
self.epoch_progress = Consuming(*next_epoch);
self.truncated_epoch_tx
.send(sealed_epoch)
.map_err(|_| anyhow!("unable to send sealed epoch"))?;
}
}
self.truncate_offset = offset;
Ok(())
}
async fn rewind(&mut self) -> LogStoreResult<(bool, Option<Bitmap>)> {
Ok((false, None))
}
}
impl LogWriter for BoundedInMemLogStoreWriter {
async fn init(
&mut self,
epoch: EpochPair,
_pause_read_on_bootstrap: bool,
) -> LogStoreResult<()> {
let init_epoch_tx = self.init_epoch_tx.take().expect("cannot be init for twice");
init_epoch_tx
.send(epoch.curr)
.map_err(|_| anyhow!("unable to send init epoch"))?;
self.curr_epoch = Some(epoch.curr);
Ok(())
}
async fn write_chunk(&mut self, chunk: StreamChunk) -> LogStoreResult<()> {
self.item_tx
.send(InMemLogStoreItem::StreamChunk(chunk))
.instrument_await("in_mem_send_item_chunk")
.await
.map_err(|_| anyhow!("unable to send stream chunk"))?;
Ok(())
}
async fn flush_current_epoch(
&mut self,
next_epoch: u64,
is_checkpoint: bool,
) -> LogStoreResult<()> {
self.item_tx
.send(InMemLogStoreItem::Barrier {
next_epoch,
is_checkpoint,
})
.instrument_await("in_mem_send_item_barrier")
.await
.map_err(|_| anyhow!("unable to send barrier"))?;
let prev_epoch = self
.curr_epoch
.replace(next_epoch)
.expect("should have epoch");
if is_checkpoint {
let truncated_epoch = self
.truncated_epoch_rx
.recv()
.instrument_await("in_mem_recv_truncated_epoch")
.await
.ok_or_else(|| anyhow!("cannot get truncated epoch"))?;
assert_eq!(truncated_epoch, prev_epoch);
}
Ok(())
}
async fn update_vnode_bitmap(&mut self, new_vnodes: Arc<Bitmap>) -> LogStoreResult<()> {
self.item_tx
.send(InMemLogStoreItem::UpdateVnodeBitmap(new_vnodes))
.instrument_await("in_mem_send_item_vnode_bitmap")
.await
.map_err(|_| anyhow!("unable to send vnode bitmap"))
}
fn pause(&mut self) -> LogStoreResult<()> {
Ok(())
}
fn resume(&mut self) -> LogStoreResult<()> {
Ok(())
}
}
#[cfg(test)]
mod tests {
use std::future::poll_fn;
use std::task::Poll;
use futures::FutureExt;
use risingwave_common::array::{Op, StreamChunkBuilder};
use risingwave_common::types::{DataType, ScalarImpl};
use risingwave_common::util::epoch::{test_epoch, EpochPair};
use risingwave_connector::sink::log_store::{
LogReader, LogStoreFactory, LogStoreReadItem, LogWriter, TruncateOffset,
};
use crate::common::log_store_impl::in_mem::BoundedInMemLogStoreFactory;
#[tokio::test]
async fn test_in_memory_log_store() {
let factory = BoundedInMemLogStoreFactory::new(4);
let (mut reader, mut writer) = factory.build().await;
let init_epoch = test_epoch(1);
let epoch1 = test_epoch(2);
let epoch2 = test_epoch(3);
let ops = vec![Op::Insert, Op::Delete, Op::UpdateInsert, Op::UpdateDelete];
let mut builder =
StreamChunkBuilder::unlimited(vec![DataType::Int64, DataType::Varchar], None);
for (i, op) in ops.into_iter().enumerate() {
assert!(builder
.append_row(
op,
[
Some(ScalarImpl::Int64(i as i64)),
Some(ScalarImpl::Utf8(format!("name_{}", i).into_boxed_str()))
]
)
.is_none());
}
let stream_chunk = builder.take().unwrap();
let stream_chunk_clone = stream_chunk.clone();
let mut join_handle = tokio::spawn(async move {
writer
.init(EpochPair::new_test_epoch(init_epoch), false)
.await
.unwrap();
writer
.write_chunk(stream_chunk_clone.clone())
.await
.unwrap();
writer
.write_chunk(stream_chunk_clone.clone())
.await
.unwrap();
writer.flush_current_epoch(epoch1, false).await.unwrap();
writer.write_chunk(stream_chunk_clone).await.unwrap();
writer.flush_current_epoch(epoch2, true).await.unwrap();
});
reader.init().await.unwrap();
let _chunk_id1_1 = match reader.next_item().await.unwrap() {
(epoch, LogStoreReadItem::StreamChunk { chunk, chunk_id }) => {
assert_eq!(epoch, init_epoch);
assert_eq!(&chunk, &stream_chunk);
chunk_id
}
_ => unreachable!(),
};
let chunk_id1_2 = match reader.next_item().await.unwrap() {
(epoch, LogStoreReadItem::StreamChunk { chunk, chunk_id }) => {
assert_eq!(epoch, init_epoch);
assert_eq!(&chunk, &stream_chunk);
chunk_id
}
_ => unreachable!(),
};
match reader.next_item().await.unwrap() {
(epoch, LogStoreReadItem::Barrier { is_checkpoint }) => {
assert!(!is_checkpoint);
assert_eq!(epoch, init_epoch);
}
_ => unreachable!(),
}
let chunk_id2_1 = match reader.next_item().await.unwrap() {
(epoch, LogStoreReadItem::StreamChunk { chunk, chunk_id }) => {
assert_eq!(&chunk, &stream_chunk);
assert_eq!(epoch, epoch1);
chunk_id
}
_ => unreachable!(),
};
match reader.next_item().await.unwrap() {
(epoch, LogStoreReadItem::Barrier { is_checkpoint }) => {
assert!(is_checkpoint);
assert_eq!(epoch, epoch1);
}
_ => unreachable!(),
}
reader
.truncate(TruncateOffset::Chunk {
epoch: init_epoch,
chunk_id: chunk_id1_2,
})
.unwrap();
assert!(poll_fn(|cx| Poll::Ready(join_handle.poll_unpin(cx)))
.await
.is_pending());
reader
.truncate(TruncateOffset::Chunk {
epoch: epoch1,
chunk_id: chunk_id2_1,
})
.unwrap();
assert!(poll_fn(|cx| Poll::Ready(join_handle.poll_unpin(cx)))
.await
.is_pending());
reader
.truncate(TruncateOffset::Barrier { epoch: epoch1 })
.unwrap();
join_handle.await.unwrap();
}
}