risingwave_stream/executor/top_n/
top_n_appendonly.rs1use risingwave_common::array::Op;
16use risingwave_common::row::{RowDeserializer, RowExt};
17use risingwave_common::util::epoch::EpochPair;
18use risingwave_common::util::sort_util::ColumnOrder;
19
20use super::top_n_cache::{AppendOnlyTopNCacheTrait, TopNStaging};
21use super::utils::*;
22use super::{ManagedTopNState, TopNCache};
23use crate::common::table::state_table::StateTablePostCommit;
24use crate::executor::prelude::*;
25
26pub type AppendOnlyTopNExecutor<S, const WITH_TIES: bool> =
33 TopNExecutorWrapper<InnerAppendOnlyTopNExecutor<S, WITH_TIES>>;
34
35impl<S: StateStore, const WITH_TIES: bool> AppendOnlyTopNExecutor<S, WITH_TIES> {
36 #[allow(clippy::too_many_arguments)]
37 pub fn new(
38 input: Executor,
39 ctx: ActorContextRef,
40 schema: Schema,
41 storage_key: Vec<ColumnOrder>,
42 offset_and_limit: (usize, usize),
43 order_by: Vec<ColumnOrder>,
44 state_table: StateTable<S>,
45 ) -> StreamResult<Self> {
46 Ok(TopNExecutorWrapper {
47 input,
48 ctx,
49 inner: InnerAppendOnlyTopNExecutor::new(
50 schema,
51 storage_key,
52 offset_and_limit,
53 order_by,
54 state_table,
55 )?,
56 })
57 }
58}
59
60pub struct InnerAppendOnlyTopNExecutor<S: StateStore, const WITH_TIES: bool> {
61 schema: Schema,
62
63 storage_key_indices: PkIndices,
65
66 managed_state: ManagedTopNState<S>,
68
69 cache: TopNCache<WITH_TIES>,
72
73 cache_key_serde: CacheKeySerde,
75}
76
77impl<S: StateStore, const WITH_TIES: bool> InnerAppendOnlyTopNExecutor<S, WITH_TIES> {
78 #[allow(clippy::too_many_arguments)]
79 pub fn new(
80 schema: Schema,
81 storage_key: Vec<ColumnOrder>,
82 offset_and_limit: (usize, usize),
83 order_by: Vec<ColumnOrder>,
84 state_table: StateTable<S>,
85 ) -> StreamResult<Self> {
86 let num_offset = offset_and_limit.0;
87 let num_limit = offset_and_limit.1;
88
89 let cache_key_serde = create_cache_key_serde(&storage_key, &schema, &order_by, &[]);
90 let managed_state = ManagedTopNState::<S>::new(state_table, cache_key_serde.clone());
91 let data_types = schema.data_types();
92
93 Ok(Self {
94 schema,
95 managed_state,
96 storage_key_indices: storage_key.into_iter().map(|op| op.column_index).collect(),
97 cache: TopNCache::new(num_offset, num_limit, data_types),
98 cache_key_serde,
99 })
100 }
101}
102
103impl<S: StateStore, const WITH_TIES: bool> TopNExecutorBase
104 for InnerAppendOnlyTopNExecutor<S, WITH_TIES>
105where
106 TopNCache<WITH_TIES>: AppendOnlyTopNCacheTrait,
107{
108 type State = S;
109
110 async fn apply_chunk(
111 &mut self,
112 chunk: StreamChunk,
113 ) -> StreamExecutorResult<Option<StreamChunk>> {
114 let mut staging = TopNStaging::new();
115 let data_types = self.schema.data_types();
116 let deserializer = RowDeserializer::new(data_types.clone());
117 for (op, row_ref) in chunk.rows() {
119 debug_assert_eq!(op, Op::Insert);
120 let pk_row = row_ref.project(&self.storage_key_indices);
121 let cache_key = serialize_pk_to_cache_key(pk_row, &self.cache_key_serde);
122 self.cache.insert(
123 cache_key,
124 row_ref,
125 &mut staging,
126 &mut self.managed_state,
127 &deserializer,
128 )?;
129 }
130
131 if staging.is_empty() {
132 return Ok(None);
133 }
134 let mut chunk_builder = StreamChunkBuilder::unlimited(data_types, Some(staging.len()));
135 for res in staging.into_deserialized_changes(&deserializer) {
136 let (op, row) = res?;
137 let _none = chunk_builder.append_row(op, row);
138 }
139 Ok(chunk_builder.take())
140 }
141
142 async fn flush_data(
143 &mut self,
144 epoch: EpochPair,
145 ) -> StreamExecutorResult<StateTablePostCommit<'_, S>> {
146 self.managed_state.flush(epoch).await
147 }
148
149 async fn try_flush_data(&mut self) -> StreamExecutorResult<()> {
150 self.managed_state.try_flush().await
151 }
152
153 async fn init(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
154 self.managed_state.init_epoch(epoch).await?;
155 self.managed_state
156 .init_topn_cache(NO_GROUP_KEY, &mut self.cache)
157 .await
158 }
159
160 async fn handle_watermark(&mut self, _: Watermark) -> Option<Watermark> {
161 None
163 }
164}
165
166#[cfg(test)]
167mod tests {
168
169 use risingwave_common::array::StreamChunk;
170 use risingwave_common::array::stream_chunk::StreamChunkTestExt;
171 use risingwave_common::catalog::{Field, Schema};
172 use risingwave_common::types::DataType;
173 use risingwave_common::util::epoch::test_epoch;
174 use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
175
176 use super::AppendOnlyTopNExecutor;
177 use crate::executor::test_utils::top_n_executor::create_in_memory_state_table;
178 use crate::executor::test_utils::{MockSource, StreamExecutorTestExt};
179 use crate::executor::{ActorContext, Barrier, Execute, Executor, Message, PkIndices};
180
181 fn create_stream_chunks() -> Vec<StreamChunk> {
182 let chunk1 = StreamChunk::from_pretty(
183 " I I
184 + 1 0
185 + 2 1
186 + 3 2
187 + 10 3
188 + 9 4
189 + 8 5",
190 );
191 let chunk2 = StreamChunk::from_pretty(
192 " I I
193 + 7 6
194 + 3 7
195 + 1 8
196 + 9 9",
197 );
198 let chunk3 = StreamChunk::from_pretty(
199 " I I
200 + 1 12
201 + 1 13
202 + 2 14
203 + 3 15",
204 );
205 vec![chunk1, chunk2, chunk3]
206 }
207
208 fn create_schema() -> Schema {
209 Schema {
210 fields: vec![
211 Field::unnamed(DataType::Int64),
212 Field::unnamed(DataType::Int64),
213 ],
214 }
215 }
216
217 fn storage_key() -> Vec<ColumnOrder> {
218 order_by()
219 }
220
221 fn order_by() -> Vec<ColumnOrder> {
222 vec![
223 ColumnOrder::new(0, OrderType::ascending()),
224 ColumnOrder::new(1, OrderType::ascending()),
225 ]
226 }
227
228 fn pk_indices() -> PkIndices {
229 vec![0, 1]
230 }
231
232 fn create_source() -> Executor {
233 let mut chunks = create_stream_chunks();
234 MockSource::with_messages(vec![
235 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
236 Message::Chunk(std::mem::take(&mut chunks[0])),
237 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
238 Message::Chunk(std::mem::take(&mut chunks[1])),
239 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
240 Message::Chunk(std::mem::take(&mut chunks[2])),
241 ])
242 .into_executor(create_schema(), pk_indices())
243 }
244
245 #[tokio::test]
246 async fn test_append_only_top_n_executor_with_limit() {
247 let storage_key = storage_key();
248 let source = create_source();
249 let state_table = create_in_memory_state_table(
250 &[DataType::Int64, DataType::Int64],
251 &[OrderType::ascending(), OrderType::ascending()],
252 &pk_indices(),
253 )
254 .await;
255
256 let schema = source.schema().clone();
257 let top_n = AppendOnlyTopNExecutor::<_, false>::new(
258 source,
259 ActorContext::for_test(0),
260 schema,
261 storage_key,
262 (0, 5),
263 order_by(),
264 state_table,
265 )
266 .unwrap();
267 let mut top_n = top_n.boxed().execute();
268
269 top_n.expect_barrier().await;
271 assert_eq!(
272 top_n.expect_chunk().await.sort_rows(),
273 StreamChunk::from_pretty(
274 " I I
275 + 1 0
276 + 2 1
277 + 3 2
278 + 9 4
279 + 8 5"
280 )
281 .sort_rows(),
282 );
283 top_n.expect_barrier().await;
287 assert_eq!(
288 top_n.expect_chunk().await.sort_rows(),
289 StreamChunk::from_pretty(
290 " I I
291 - 9 4
292 - 8 5
293 + 3 7
294 + 1 8"
295 )
296 .sort_rows(),
297 );
298 top_n.expect_barrier().await;
302 assert_eq!(
303 top_n.expect_chunk().await.sort_rows(),
304 StreamChunk::from_pretty(
305 " I I
306 - 3 7
307 + 1 12
308 - 3 2
309 + 1 13"
310 )
311 .sort_rows(),
312 );
313 }
316
317 #[tokio::test]
318 async fn test_append_only_top_n_executor_with_offset_and_limit() {
319 let source = create_source();
320 let state_table = create_in_memory_state_table(
321 &[DataType::Int64, DataType::Int64],
322 &[OrderType::ascending(), OrderType::ascending()],
323 &pk_indices(),
324 )
325 .await;
326
327 let schema = source.schema().clone();
328 let top_n = AppendOnlyTopNExecutor::<_, false>::new(
329 source,
330 ActorContext::for_test(0),
331 schema,
332 storage_key(),
333 (3, 4),
334 order_by(),
335 state_table,
336 )
337 .unwrap();
338 let mut top_n = top_n.boxed().execute();
339
340 top_n.expect_barrier().await;
342 assert_eq!(
343 top_n.expect_chunk().await.sort_rows(),
344 StreamChunk::from_pretty(
345 " I I
346 + 10 3
347 + 9 4
348 + 8 5"
349 )
350 .sort_rows(),
351 );
352 top_n.expect_barrier().await;
356 assert_eq!(
357 top_n.expect_chunk().await.sort_rows(),
358 StreamChunk::from_pretty(
359 " I I
360 + 7 6
361 - 10 3
362 + 3 7
363 - 9 4
364 + 3 2"
365 )
366 .sort_rows(),
367 );
368 top_n.expect_barrier().await;
372 assert_eq!(
373 top_n.expect_chunk().await.sort_rows(),
374 StreamChunk::from_pretty(
375 " I I
376 - 8 5
377 + 2 1
378 - 7 6
379 + 1 13
380 - 3 7
381 + 2 14"
382 )
383 .sort_rows(),
384 );
385 }
388}