risingwave_stream/executor/top_n/
utils.rsuse std::future::Future;
use risingwave_common::bitmap::Bitmap;
use risingwave_common::util::epoch::EpochPair;
use risingwave_common::util::row_serde::OrderedRowSerde;
use risingwave_common::util::sort_util::ColumnOrder;
use super::top_n_cache::CacheKey;
use crate::executor::prelude::*;
pub trait TopNExecutorBase: Send + 'static {
fn apply_chunk(
&mut self,
chunk: StreamChunk,
) -> impl Future<Output = StreamExecutorResult<Option<StreamChunk>>> + Send;
fn flush_data(
&mut self,
epoch: EpochPair,
) -> impl Future<Output = StreamExecutorResult<()>> + Send;
fn try_flush_data(&mut self) -> impl Future<Output = StreamExecutorResult<()>> + Send;
fn update_vnode_bitmap(&mut self, _vnode_bitmap: Arc<Bitmap>) {
unreachable!()
}
fn evict(&mut self) {}
fn init(&mut self, epoch: EpochPair) -> impl Future<Output = StreamExecutorResult<()>> + Send;
fn handle_watermark(
&mut self,
watermark: Watermark,
) -> impl Future<Output = Option<Watermark>> + Send;
}
pub struct TopNExecutorWrapper<E> {
pub(super) input: Executor,
pub(super) ctx: ActorContextRef,
pub(super) inner: E,
}
impl<E> Execute for TopNExecutorWrapper<E>
where
E: TopNExecutorBase,
{
fn execute(self: Box<Self>) -> BoxedMessageStream {
self.top_n_executor_execute().boxed()
}
}
impl<E> TopNExecutorWrapper<E>
where
E: TopNExecutorBase,
{
#[try_stream(ok = Message, error = StreamExecutorError)]
pub(crate) async fn top_n_executor_execute(mut self: Box<Self>) {
let mut input = self.input.execute();
let barrier = expect_first_barrier(&mut input).await?;
let barrier_epoch = barrier.epoch;
yield Message::Barrier(barrier);
self.inner.init(barrier_epoch).await?;
#[for_await]
for msg in input {
self.inner.evict();
let msg = msg?;
match msg {
Message::Watermark(watermark) => {
if let Some(output_watermark) = self.inner.handle_watermark(watermark).await {
yield Message::Watermark(output_watermark);
}
}
Message::Chunk(chunk) => {
if let Some(output_chunk) = self.inner.apply_chunk(chunk).await? {
yield Message::Chunk(output_chunk);
}
self.inner.try_flush_data().await?;
}
Message::Barrier(barrier) => {
self.inner.flush_data(barrier.epoch).await?;
if let Some(vnode_bitmap) = barrier.as_update_vnode_bitmap(self.ctx.id) {
self.inner.update_vnode_bitmap(vnode_bitmap);
}
yield Message::Barrier(barrier)
}
};
}
}
}
pub fn serialize_pk_to_cache_key(pk: impl Row, cache_key_serde: &CacheKeySerde) -> CacheKey {
let pk = pk.into_owned_row().into_inner();
let (cache_key_first, cache_key_second) = pk.split_at(cache_key_serde.2);
(
cache_key_first.memcmp_serialize(&cache_key_serde.0),
cache_key_second.memcmp_serialize(&cache_key_serde.1),
)
}
pub type CacheKeySerde = (OrderedRowSerde, OrderedRowSerde, usize);
pub fn create_cache_key_serde(
storage_key: &[ColumnOrder],
schema: &Schema,
order_by: &[ColumnOrder],
group_by: &[usize],
) -> CacheKeySerde {
{
for i in 0..group_by.len() {
assert_eq!(storage_key[i].column_index, group_by[i]);
}
for i in group_by.len()..(group_by.len() + order_by.len()) {
assert_eq!(storage_key[i], order_by[i - group_by.len()]);
}
}
let (cache_key_data_types, cache_key_order_types): (Vec<_>, Vec<_>) = storage_key
[group_by.len()..]
.iter()
.map(|o| (schema[o.column_index].data_type(), o.order_type))
.unzip();
let order_by_len = order_by.len();
let (first_key_data_types, second_key_data_types) = cache_key_data_types.split_at(order_by_len);
let (first_key_order_types, second_key_order_types) =
cache_key_order_types.split_at(order_by_len);
let first_key_serde = OrderedRowSerde::new(
first_key_data_types.to_vec(),
first_key_order_types.to_vec(),
);
let second_key_serde = OrderedRowSerde::new(
second_key_data_types.to_vec(),
second_key_order_types.to_vec(),
);
(first_key_serde, second_key_serde, order_by_len)
}
use risingwave_common::row;
pub trait GroupKey = row::Row + Send + Sync;
pub const NO_GROUP_KEY: Option<row::Empty> = None;