risingwave_stream/executor/top_n/
top_n_appendonly.rs1use risingwave_common::array::Op;
16use risingwave_common::row::{RowDeserializer, RowExt};
17use risingwave_common::util::epoch::EpochPair;
18use risingwave_common::util::sort_util::ColumnOrder;
19
20use super::top_n_cache::{AppendOnlyTopNCacheTrait, TopNStaging};
21use super::utils::*;
22use super::{ManagedTopNState, TopNCache};
23use crate::common::table::state_table::StateTablePostCommit;
24use crate::executor::prelude::*;
25
26pub type AppendOnlyTopNExecutor<S, const WITH_TIES: bool> =
33 TopNExecutorWrapper<InnerAppendOnlyTopNExecutor<S, WITH_TIES>>;
34
35impl<S: StateStore, const WITH_TIES: bool> AppendOnlyTopNExecutor<S, WITH_TIES> {
36 pub fn new(
37 input: Executor,
38 ctx: ActorContextRef,
39 schema: Schema,
40 storage_key: Vec<ColumnOrder>,
41 offset_and_limit: (usize, usize),
42 order_by: Vec<ColumnOrder>,
43 state_table: StateTable<S>,
44 ) -> StreamResult<Self> {
45 Ok(TopNExecutorWrapper {
46 input,
47 ctx,
48 inner: InnerAppendOnlyTopNExecutor::new(
49 schema,
50 storage_key,
51 offset_and_limit,
52 order_by,
53 state_table,
54 )?,
55 })
56 }
57}
58
59pub struct InnerAppendOnlyTopNExecutor<S: StateStore, const WITH_TIES: bool> {
60 schema: Schema,
61
62 storage_key_indices: Vec<usize>,
64
65 managed_state: ManagedTopNState<S>,
67
68 cache: TopNCache<WITH_TIES>,
71
72 cache_key_serde: CacheKeySerde,
74}
75
76impl<S: StateStore, const WITH_TIES: bool> InnerAppendOnlyTopNExecutor<S, WITH_TIES> {
77 pub fn new(
78 schema: Schema,
79 storage_key: Vec<ColumnOrder>,
80 offset_and_limit: (usize, usize),
81 order_by: Vec<ColumnOrder>,
82 state_table: StateTable<S>,
83 ) -> StreamResult<Self> {
84 let num_offset = offset_and_limit.0;
85 let num_limit = offset_and_limit.1;
86
87 let cache_key_serde = create_cache_key_serde(&storage_key, &schema, &order_by, &[]);
88 let managed_state = ManagedTopNState::<S>::new(state_table, cache_key_serde.clone());
89 let data_types = schema.data_types();
90
91 Ok(Self {
92 schema,
93 managed_state,
94 storage_key_indices: storage_key.into_iter().map(|op| op.column_index).collect(),
95 cache: TopNCache::new(num_offset, num_limit, data_types),
96 cache_key_serde,
97 })
98 }
99}
100
101impl<S: StateStore, const WITH_TIES: bool> TopNExecutorBase
102 for InnerAppendOnlyTopNExecutor<S, WITH_TIES>
103where
104 TopNCache<WITH_TIES>: AppendOnlyTopNCacheTrait,
105{
106 type State = S;
107
108 async fn apply_chunk(
109 &mut self,
110 chunk: StreamChunk,
111 ) -> StreamExecutorResult<Option<StreamChunk>> {
112 let mut staging = TopNStaging::new();
113 let data_types = self.schema.data_types();
114 let deserializer = RowDeserializer::new(data_types.clone());
115 for (op, row_ref) in chunk.rows() {
117 debug_assert_eq!(op, Op::Insert);
118 let pk_row = row_ref.project(&self.storage_key_indices);
119 let cache_key = serialize_pk_to_cache_key(pk_row, &self.cache_key_serde);
120 self.cache.insert(
121 cache_key,
122 row_ref,
123 &mut staging,
124 &mut self.managed_state,
125 &deserializer,
126 )?;
127 }
128
129 if staging.is_empty() {
130 return Ok(None);
131 }
132 let mut chunk_builder = StreamChunkBuilder::unlimited(data_types, Some(staging.len()));
133 for res in staging.into_deserialized_changes(&deserializer) {
134 let record = res?;
135 let _none = chunk_builder.append_record(record);
136 }
137 Ok(chunk_builder.take())
138 }
139
140 async fn flush_data(
141 &mut self,
142 epoch: EpochPair,
143 ) -> StreamExecutorResult<StateTablePostCommit<'_, S>> {
144 self.managed_state.flush(epoch).await
145 }
146
147 async fn try_flush_data(&mut self) -> StreamExecutorResult<()> {
148 self.managed_state.try_flush().await
149 }
150
151 async fn init(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
152 self.managed_state.init_epoch(epoch).await?;
153 self.managed_state
154 .init_topn_cache(NO_GROUP_KEY, &mut self.cache)
155 .await
156 }
157
158 async fn handle_watermark(&mut self, _: Watermark) -> Option<Watermark> {
159 None
161 }
162}
163
164#[cfg(test)]
165mod tests {
166
167 use risingwave_common::array::StreamChunk;
168 use risingwave_common::array::stream_chunk::StreamChunkTestExt;
169 use risingwave_common::catalog::{Field, Schema};
170 use risingwave_common::types::DataType;
171 use risingwave_common::util::epoch::test_epoch;
172 use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
173
174 use super::AppendOnlyTopNExecutor;
175 use crate::executor::test_utils::top_n_executor::create_in_memory_state_table;
176 use crate::executor::test_utils::{MockSource, StreamExecutorTestExt};
177 use crate::executor::{ActorContext, Barrier, Execute, Executor, Message, StreamKey};
178
179 fn create_stream_chunks() -> Vec<StreamChunk> {
180 let chunk1 = StreamChunk::from_pretty(
181 " I I
182 + 1 0
183 + 2 1
184 + 3 2
185 + 10 3
186 + 9 4
187 + 8 5",
188 );
189 let chunk2 = StreamChunk::from_pretty(
190 " I I
191 + 7 6
192 + 3 7
193 + 1 8
194 + 9 9",
195 );
196 let chunk3 = StreamChunk::from_pretty(
197 " I I
198 + 1 12
199 + 1 13
200 + 2 14
201 + 3 15",
202 );
203 vec![chunk1, chunk2, chunk3]
204 }
205
206 fn create_schema() -> Schema {
207 Schema {
208 fields: vec![
209 Field::unnamed(DataType::Int64),
210 Field::unnamed(DataType::Int64),
211 ],
212 }
213 }
214
215 fn storage_key() -> Vec<ColumnOrder> {
216 order_by()
217 }
218
219 fn order_by() -> Vec<ColumnOrder> {
220 vec![
221 ColumnOrder::new(0, OrderType::ascending()),
222 ColumnOrder::new(1, OrderType::ascending()),
223 ]
224 }
225
226 fn stream_key() -> StreamKey {
227 vec![0, 1]
228 }
229
230 fn create_source() -> Executor {
231 let mut chunks = create_stream_chunks();
232 MockSource::with_messages(vec![
233 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
234 Message::Chunk(std::mem::take(&mut chunks[0])),
235 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
236 Message::Chunk(std::mem::take(&mut chunks[1])),
237 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
238 Message::Chunk(std::mem::take(&mut chunks[2])),
239 ])
240 .into_executor(create_schema(), stream_key())
241 }
242
243 #[tokio::test]
244 async fn test_append_only_top_n_executor_with_limit() {
245 let storage_key = storage_key();
246 let source = create_source();
247 let state_table = create_in_memory_state_table(
248 &[DataType::Int64, DataType::Int64],
249 &[OrderType::ascending(), OrderType::ascending()],
250 &stream_key(),
251 )
252 .await;
253
254 let schema = source.schema().clone();
255 let top_n = AppendOnlyTopNExecutor::<_, false>::new(
256 source,
257 ActorContext::for_test(0),
258 schema,
259 storage_key,
260 (0, 5),
261 order_by(),
262 state_table,
263 )
264 .unwrap();
265 let mut top_n = top_n.boxed().execute();
266
267 top_n.expect_barrier().await;
269 assert_eq!(
270 top_n.expect_chunk().await.sort_rows(),
271 StreamChunk::from_pretty(
272 " I I
273 + 1 0
274 + 2 1
275 + 3 2
276 + 9 4
277 + 8 5"
278 )
279 .sort_rows(),
280 );
281 top_n.expect_barrier().await;
285 assert_eq!(
286 top_n.expect_chunk().await.sort_rows(),
287 StreamChunk::from_pretty(
288 " I I
289 - 9 4
290 - 8 5
291 + 3 7
292 + 1 8"
293 )
294 .sort_rows(),
295 );
296 top_n.expect_barrier().await;
300 assert_eq!(
301 top_n.expect_chunk().await.sort_rows(),
302 StreamChunk::from_pretty(
303 " I I
304 - 3 7
305 + 1 12
306 - 3 2
307 + 1 13"
308 )
309 .sort_rows(),
310 );
311 }
314
315 #[tokio::test]
316 async fn test_append_only_top_n_executor_with_offset_and_limit() {
317 let source = create_source();
318 let state_table = create_in_memory_state_table(
319 &[DataType::Int64, DataType::Int64],
320 &[OrderType::ascending(), OrderType::ascending()],
321 &stream_key(),
322 )
323 .await;
324
325 let schema = source.schema().clone();
326 let top_n = AppendOnlyTopNExecutor::<_, false>::new(
327 source,
328 ActorContext::for_test(0),
329 schema,
330 storage_key(),
331 (3, 4),
332 order_by(),
333 state_table,
334 )
335 .unwrap();
336 let mut top_n = top_n.boxed().execute();
337
338 top_n.expect_barrier().await;
340 assert_eq!(
341 top_n.expect_chunk().await.sort_rows(),
342 StreamChunk::from_pretty(
343 " I I
344 + 10 3
345 + 9 4
346 + 8 5"
347 )
348 .sort_rows(),
349 );
350 top_n.expect_barrier().await;
354 assert_eq!(
355 top_n.expect_chunk().await.sort_rows(),
356 StreamChunk::from_pretty(
357 " I I
358 + 7 6
359 - 10 3
360 + 3 7
361 - 9 4
362 + 3 2"
363 )
364 .sort_rows(),
365 );
366 top_n.expect_barrier().await;
370 assert_eq!(
371 top_n.expect_chunk().await.sort_rows(),
372 StreamChunk::from_pretty(
373 " I I
374 - 8 5
375 + 2 1
376 - 7 6
377 + 1 13
378 - 3 7
379 + 2 14"
380 )
381 .sort_rows(),
382 );
383 }
386}