risingwave_stream/executor/top_n/
utils.rs1use std::future::Future;
16
17use risingwave_common::util::epoch::EpochPair;
18use risingwave_common::util::row_serde::OrderedRowSerde;
19use risingwave_common::util::sort_util::ColumnOrder;
20
21use super::top_n_cache::CacheKey;
22use crate::executor::prelude::*;
23
24pub trait TopNExecutorBase: Send + 'static {
25 type State: StateStore;
26 fn apply_chunk(
30 &mut self,
31 chunk: StreamChunk,
32 ) -> impl Future<Output = StreamExecutorResult<Option<StreamChunk>>> + Send;
33
34 fn flush_data(
36 &mut self,
37 epoch: EpochPair,
38 ) -> impl Future<Output = StreamExecutorResult<StateTablePostCommit<'_, Self::State>>> + Send;
39
40 fn try_flush_data(&mut self) -> impl Future<Output = StreamExecutorResult<()>> + Send;
42
43 fn clear_cache(&mut self) {
44 unreachable!()
45 }
46
47 fn evict(&mut self) {}
48
49 fn init(&mut self, epoch: EpochPair) -> impl Future<Output = StreamExecutorResult<()>> + Send;
50
51 fn handle_watermark(
53 &mut self,
54 watermark: Watermark,
55 ) -> impl Future<Output = Option<Watermark>> + Send;
56}
57
58pub struct TopNExecutorWrapper<E> {
60 pub(super) input: Executor,
61 pub(super) ctx: ActorContextRef,
62 pub(super) inner: E,
63}
64
65impl<E> Execute for TopNExecutorWrapper<E>
66where
67 E: TopNExecutorBase,
68{
69 fn execute(self: Box<Self>) -> BoxedMessageStream {
70 self.top_n_executor_execute().boxed()
71 }
72}
73
74impl<E> TopNExecutorWrapper<E>
75where
76 E: TopNExecutorBase,
77{
78 #[try_stream(ok = Message, error = StreamExecutorError)]
82 pub(crate) async fn top_n_executor_execute(mut self: Box<Self>) {
83 let mut input = self.input.execute();
84
85 let barrier = expect_first_barrier(&mut input).await?;
86 let barrier_epoch = barrier.epoch;
87 yield Message::Barrier(barrier);
88 self.inner.init(barrier_epoch).await?;
89
90 #[for_await]
91 for msg in input {
92 self.inner.evict();
93 let msg = msg?;
94 match msg {
95 Message::Watermark(watermark) => {
96 if let Some(output_watermark) = self.inner.handle_watermark(watermark).await {
97 yield Message::Watermark(output_watermark);
98 }
99 }
100 Message::Chunk(chunk) => {
101 if let Some(output_chunk) = self.inner.apply_chunk(chunk).await? {
102 yield Message::Chunk(output_chunk);
103 }
104 self.inner.try_flush_data().await?;
105 }
106 Message::Barrier(barrier) => {
107 let post_commit = self.inner.flush_data(barrier.epoch).await?;
108 let update_vnode_bitmap = barrier.as_update_vnode_bitmap(self.ctx.id);
109 yield Message::Barrier(barrier);
110
111 if let Some((_, cache_may_stale)) =
113 post_commit.post_yield_barrier(update_vnode_bitmap).await?
114 {
115 if cache_may_stale {
118 self.inner.clear_cache();
119 }
120 }
121 }
122 };
123 }
124 }
125}
126
127pub fn serialize_pk_to_cache_key(pk: impl Row, cache_key_serde: &CacheKeySerde) -> CacheKey {
130 let pk = pk.into_owned_row().into_inner();
132 let (cache_key_first, cache_key_second) = pk.split_at(cache_key_serde.2);
133 (
134 cache_key_first.memcmp_serialize(&cache_key_serde.0),
135 cache_key_second.memcmp_serialize(&cache_key_serde.1),
136 )
137}
138
139pub type CacheKeySerde = (OrderedRowSerde, OrderedRowSerde, usize);
143
144pub fn create_cache_key_serde(
145 storage_key: &[ColumnOrder],
146 schema: &Schema,
147 order_by: &[ColumnOrder],
148 group_by: &[usize],
149) -> CacheKeySerde {
150 {
151 for i in 0..group_by.len() {
153 assert_eq!(storage_key[i].column_index, group_by[i]);
154 }
155 for i in group_by.len()..(group_by.len() + order_by.len()) {
156 assert_eq!(storage_key[i], order_by[i - group_by.len()]);
157 }
158 }
159
160 let (cache_key_data_types, cache_key_order_types): (Vec<_>, Vec<_>) = storage_key
161 [group_by.len()..]
162 .iter()
163 .map(|o| (schema[o.column_index].data_type(), o.order_type))
164 .unzip();
165
166 let order_by_len = order_by.len();
167 let (first_key_data_types, second_key_data_types) = cache_key_data_types.split_at(order_by_len);
168 let (first_key_order_types, second_key_order_types) =
169 cache_key_order_types.split_at(order_by_len);
170 let first_key_serde = OrderedRowSerde::new(
171 first_key_data_types.to_vec(),
172 first_key_order_types.to_vec(),
173 );
174 let second_key_serde = OrderedRowSerde::new(
175 second_key_data_types.to_vec(),
176 second_key_order_types.to_vec(),
177 );
178 (first_key_serde, second_key_serde, order_by_len)
179}
180
181use risingwave_common::row;
182
183use crate::common::table::state_table::StateTablePostCommit;
184
185pub trait GroupKey = row::Row + Send + Sync;
186pub const NO_GROUP_KEY: Option<row::Empty> = None;