risingwave_stream/executor/top_n/
top_n_plain.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::array::Op;
16use risingwave_common::row::{RowDeserializer, RowExt};
17use risingwave_common::util::epoch::EpochPair;
18use risingwave_common::util::sort_util::ColumnOrder;
19
20use super::top_n_cache::TopNStaging;
21use super::utils::*;
22use super::{ManagedTopNState, TopNCache, TopNCacheTrait};
23use crate::common::table::state_table::StateTablePostCommit;
24use crate::executor::prelude::*;
25
26/// `TopNExecutor` works with input with modification, it keeps all the data
27/// records/rows that have been seen, and returns topN records overall.
28pub type TopNExecutor<S, const WITH_TIES: bool> =
29    TopNExecutorWrapper<InnerTopNExecutor<S, WITH_TIES>>;
30
31impl<S: StateStore, const WITH_TIES: bool> TopNExecutor<S, WITH_TIES> {
32    #[allow(clippy::too_many_arguments)]
33    pub fn new(
34        input: Executor,
35        ctx: ActorContextRef,
36        schema: Schema,
37        storage_key: Vec<ColumnOrder>,
38        offset_and_limit: (usize, usize),
39        order_by: Vec<ColumnOrder>,
40        state_table: StateTable<S>,
41    ) -> StreamResult<Self> {
42        Ok(TopNExecutorWrapper {
43            input,
44            ctx,
45            inner: InnerTopNExecutor::new(
46                schema,
47                storage_key,
48                offset_and_limit,
49                order_by,
50                state_table,
51            )?,
52        })
53    }
54}
55
56impl<S: StateStore> TopNExecutor<S, true> {
57    /// It only has 1 capacity for high cache. Used to test the case where the last element in high
58    /// has ties.
59    #[allow(clippy::too_many_arguments)]
60    #[cfg(test)]
61    pub fn new_with_ties_for_test(
62        input: Executor,
63        ctx: ActorContextRef,
64        schema: Schema,
65        storage_key: Vec<ColumnOrder>,
66        offset_and_limit: (usize, usize),
67        order_by: Vec<ColumnOrder>,
68        state_table: StateTable<S>,
69    ) -> StreamResult<Self> {
70        let mut inner =
71            InnerTopNExecutor::new(schema, storage_key, offset_and_limit, order_by, state_table)?;
72
73        inner.cache.high_cache_capacity = 2;
74
75        Ok(TopNExecutorWrapper { input, ctx, inner })
76    }
77}
78
79pub struct InnerTopNExecutor<S: StateStore, const WITH_TIES: bool> {
80    schema: Schema,
81
82    /// The storage key indices of the `TopNExecutor`
83    storage_key_indices: PkIndices,
84
85    managed_state: ManagedTopNState<S>,
86
87    /// In-memory cache of top (N + N * `TOPN_CACHE_HIGH_CAPACITY_FACTOR`) rows
88    cache: TopNCache<WITH_TIES>,
89
90    /// Used for serializing pk into `CacheKey`.
91    cache_key_serde: CacheKeySerde,
92}
93
94impl<S: StateStore, const WITH_TIES: bool> InnerTopNExecutor<S, WITH_TIES> {
95    /// # Arguments
96    ///
97    /// `storage_key` -- the storage pk. It's composed of the ORDER BY columns and the missing
98    /// columns of pk.
99    ///
100    /// `order_by_len` -- The number of fields of the ORDER BY clause, and will be used to split key
101    /// into `CacheKey`.
102    #[allow(clippy::too_many_arguments)]
103    pub fn new(
104        schema: Schema,
105        storage_key: Vec<ColumnOrder>,
106        offset_and_limit: (usize, usize),
107        order_by: Vec<ColumnOrder>,
108        state_table: StateTable<S>,
109    ) -> StreamResult<Self> {
110        let num_offset = offset_and_limit.0;
111        let num_limit = offset_and_limit.1;
112
113        let cache_key_serde = create_cache_key_serde(&storage_key, &schema, &order_by, &[]);
114        let managed_state = ManagedTopNState::<S>::new(state_table, cache_key_serde.clone());
115        let data_types = schema.data_types();
116
117        Ok(Self {
118            schema,
119            managed_state,
120            storage_key_indices: storage_key.into_iter().map(|op| op.column_index).collect(),
121            cache: TopNCache::new(num_offset, num_limit, data_types),
122            cache_key_serde,
123        })
124    }
125}
126
127impl<S: StateStore, const WITH_TIES: bool> TopNExecutorBase for InnerTopNExecutor<S, WITH_TIES>
128where
129    TopNCache<WITH_TIES>: TopNCacheTrait,
130{
131    type State = S;
132
133    async fn apply_chunk(
134        &mut self,
135        chunk: StreamChunk,
136    ) -> StreamExecutorResult<Option<StreamChunk>> {
137        let mut staging = TopNStaging::new();
138
139        // apply the chunk to state table
140        for (op, row_ref) in chunk.rows() {
141            let pk_row = row_ref.project(&self.storage_key_indices);
142            let cache_key = serialize_pk_to_cache_key(pk_row, &self.cache_key_serde);
143            match op {
144                Op::Insert | Op::UpdateInsert => {
145                    // First insert input row to state store
146                    self.managed_state.insert(row_ref);
147                    self.cache.insert(cache_key, row_ref, &mut staging)
148                }
149
150                Op::Delete | Op::UpdateDelete => {
151                    // First remove the row from state store
152                    self.managed_state.delete(row_ref);
153                    self.cache
154                        .delete(
155                            NO_GROUP_KEY,
156                            &mut self.managed_state,
157                            cache_key,
158                            row_ref,
159                            &mut staging,
160                        )
161                        .await?
162                }
163            }
164        }
165
166        let data_types = self.schema.data_types();
167        let deserializer = RowDeserializer::new(data_types.clone());
168        if staging.is_empty() {
169            return Ok(None);
170        }
171        let mut chunk_builder = StreamChunkBuilder::unlimited(data_types, Some(staging.len()));
172        for res in staging.into_deserialized_changes(&deserializer) {
173            let (op, row) = res?;
174            let _none = chunk_builder.append_row(op, row);
175        }
176        Ok(chunk_builder.take())
177    }
178
179    async fn flush_data(
180        &mut self,
181        epoch: EpochPair,
182    ) -> StreamExecutorResult<StateTablePostCommit<'_, S>> {
183        self.managed_state.flush(epoch).await
184    }
185
186    async fn try_flush_data(&mut self) -> StreamExecutorResult<()> {
187        self.managed_state.try_flush().await
188    }
189
190    async fn init(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
191        self.managed_state.init_epoch(epoch).await?;
192        self.managed_state
193            .init_topn_cache(NO_GROUP_KEY, &mut self.cache)
194            .await
195    }
196
197    async fn handle_watermark(&mut self, _: Watermark) -> Option<Watermark> {
198        // TODO(yuhao): handle watermark
199        None
200    }
201}
202
203#[cfg(test)]
204mod tests {
205    use risingwave_common::array::stream_chunk::StreamChunkTestExt;
206    use risingwave_common::catalog::{Field, Schema};
207    use risingwave_common::types::DataType;
208    use risingwave_common::util::sort_util::OrderType;
209
210    use super::*;
211    use crate::executor::test_utils::MockSource;
212    use crate::executor::test_utils::top_n_executor::create_in_memory_state_table;
213    use crate::executor::{Barrier, Message};
214
215    mod test1 {
216
217        use risingwave_common::util::epoch::test_epoch;
218
219        use super::*;
220        use crate::executor::test_utils::StreamExecutorTestExt;
221
222        fn create_stream_chunks() -> Vec<StreamChunk> {
223            let chunk1 = StreamChunk::from_pretty(
224                "  I I
225                +  1 0
226                +  2 1
227                +  3 2
228                + 10 3
229                +  9 4
230                +  8 5",
231            );
232            let chunk2 = StreamChunk::from_pretty(
233                "  I I
234                +  7 6
235                -  3 2
236                -  1 0
237                +  5 7
238                -  2 1
239                + 11 8",
240            );
241            let chunk3 = StreamChunk::from_pretty(
242                "  I  I
243                +  6  9
244                + 12 10
245                + 13 11
246                + 14 12",
247            );
248            let chunk4 = StreamChunk::from_pretty(
249                "  I  I
250                -  5  7
251                -  6  9
252                - 11  8",
253            );
254            vec![chunk1, chunk2, chunk3, chunk4]
255        }
256
257        fn create_schema() -> Schema {
258            Schema {
259                fields: vec![
260                    Field::unnamed(DataType::Int64),
261                    Field::unnamed(DataType::Int64),
262                ],
263            }
264        }
265
266        fn storage_key() -> Vec<ColumnOrder> {
267            let mut v = order_by();
268            v.extend([ColumnOrder::new(1, OrderType::ascending())]);
269            v
270        }
271
272        fn order_by() -> Vec<ColumnOrder> {
273            vec![ColumnOrder::new(0, OrderType::ascending())]
274        }
275
276        fn pk_indices() -> PkIndices {
277            vec![0, 1]
278        }
279
280        fn create_source() -> Executor {
281            let mut chunks = create_stream_chunks();
282            let schema = create_schema();
283            MockSource::with_messages(vec![
284                Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
285                Message::Chunk(std::mem::take(&mut chunks[0])),
286                Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
287                Message::Chunk(std::mem::take(&mut chunks[1])),
288                Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
289                Message::Chunk(std::mem::take(&mut chunks[2])),
290                Message::Barrier(Barrier::new_test_barrier(test_epoch(4))),
291                Message::Chunk(std::mem::take(&mut chunks[3])),
292                Message::Barrier(Barrier::new_test_barrier(test_epoch(5))),
293            ])
294            .into_executor(schema, pk_indices())
295        }
296
297        #[tokio::test]
298        async fn test_top_n_executor_with_offset() {
299            let source = create_source();
300            let state_table = create_in_memory_state_table(
301                &[DataType::Int64, DataType::Int64],
302                &[OrderType::ascending(), OrderType::ascending()],
303                &pk_indices(),
304            )
305            .await;
306
307            let schema = source.schema().clone();
308            let top_n = TopNExecutor::<_, false>::new(
309                source,
310                ActorContext::for_test(0),
311                schema,
312                storage_key(),
313                (3, 1000),
314                order_by(),
315                state_table,
316            )
317            .unwrap();
318            let mut top_n = top_n.boxed().execute();
319
320            // consume the init barrier
321            top_n.expect_barrier().await;
322            assert_eq!(
323                top_n.expect_chunk().await.sort_rows(),
324                StreamChunk::from_pretty(
325                    "  I I
326                    + 10 3
327                    +  9 4
328                    +  8 5"
329                )
330                .sort_rows(),
331            );
332            // Barrier
333            top_n.expect_barrier().await;
334            assert_eq!(
335                top_n.expect_chunk().await.sort_rows(),
336                StreamChunk::from_pretty(
337                    "  I I
338                    -  8 5
339                    + 11 8"
340                )
341                .sort_rows(),
342            );
343
344            // barrier
345            top_n.expect_barrier().await;
346
347            // (8, 9, 10, 11, 12, 13, 14)
348            assert_eq!(
349                top_n.expect_chunk().await.sort_rows(),
350                StreamChunk::from_pretty(
351                    "  I  I
352                    +  8  5
353                    + 12 10
354                    + 13 11
355                    + 14 12"
356                )
357                .sort_rows(),
358            );
359            // barrier
360            top_n.expect_barrier().await;
361
362            // (10, 12, 13, 14)
363            assert_eq!(
364                top_n.expect_chunk().await.sort_rows(),
365                StreamChunk::from_pretty(
366                    "  I I
367                    -  8 5
368                    -  9 4
369                    - 11 8"
370                )
371                .sort_rows(),
372            );
373            // barrier
374            top_n.expect_barrier().await;
375        }
376
377        #[tokio::test]
378        async fn test_top_n_executor_with_limit() {
379            let source = create_source();
380            let state_table = create_in_memory_state_table(
381                &[DataType::Int64, DataType::Int64],
382                &[OrderType::ascending(), OrderType::ascending()],
383                &pk_indices(),
384            )
385            .await;
386            let schema = source.schema().clone();
387            let top_n = TopNExecutor::<_, false>::new(
388                source,
389                ActorContext::for_test(0),
390                schema,
391                storage_key(),
392                (0, 4),
393                order_by(),
394                state_table,
395            )
396            .unwrap();
397            let mut top_n = top_n.boxed().execute();
398
399            // consume the init barrier
400            top_n.expect_barrier().await;
401            assert_eq!(
402                top_n.expect_chunk().await.sort_rows(),
403                StreamChunk::from_pretty(
404                    "  I I
405                    +  1 0
406                    +  2 1
407                    +  3 2
408                    +  8 5"
409                )
410                .sort_rows(),
411            );
412            // now () -> (1, 2, 3, 8)
413
414            // barrier
415            top_n.expect_barrier().await;
416            assert_eq!(
417                top_n.expect_chunk().await.sort_rows(),
418                StreamChunk::from_pretty(
419                    "  I I
420                    +  7 6
421                    -  3 2
422                    -  1 0
423                    +  5 7
424                    -  2 1
425                    +  9 4"
426                )
427                .sort_rows(),
428            );
429
430            // (5, 7, 8, 9)
431            // barrier
432            top_n.expect_barrier().await;
433
434            assert_eq!(
435                top_n.expect_chunk().await.sort_rows(),
436                StreamChunk::from_pretty(
437                    "  I I
438                    -  9 4
439                    +  6 9"
440                )
441                .sort_rows(),
442            );
443            // (5, 6, 7, 8)
444            // barrier
445            top_n.expect_barrier().await;
446
447            assert_eq!(
448                top_n.expect_chunk().await.sort_rows(),
449                StreamChunk::from_pretty(
450                    "  I I
451                    -  5 7
452                    +  9 4
453                    -  6 9
454                    + 10 3"
455                )
456                .sort_rows(),
457            );
458            // (7, 8, 9, 10)
459            // barrier
460            top_n.expect_barrier().await;
461        }
462
463        // Should have the same result as above, since there are no duplicate sort keys.
464        #[tokio::test]
465        async fn test_top_n_executor_with_limit_with_ties() {
466            let source = create_source();
467            let state_table = create_in_memory_state_table(
468                &[DataType::Int64, DataType::Int64],
469                &[OrderType::ascending(), OrderType::ascending()],
470                &pk_indices(),
471            )
472            .await;
473            let schema = source.schema().clone();
474            let top_n = TopNExecutor::<_, true>::new(
475                source,
476                ActorContext::for_test(0),
477                schema,
478                storage_key(),
479                (0, 4),
480                order_by(),
481                state_table,
482            )
483            .unwrap();
484            let mut top_n = top_n.boxed().execute();
485
486            // consume the init barrier
487            top_n.expect_barrier().await;
488            assert_eq!(
489                top_n.expect_chunk().await.sort_rows(),
490                StreamChunk::from_pretty(
491                    "  I I
492                    +  1 0
493                    +  2 1
494                    +  3 2
495                    +  8 5"
496                )
497                .sort_rows(),
498            );
499            // now () -> (1, 2, 3, 8)
500
501            // barrier
502            top_n.expect_barrier().await;
503            assert_eq!(
504                top_n.expect_chunk().await.sort_rows(),
505                StreamChunk::from_pretty(
506                    " I I
507                    + 7 6
508                    - 3 2
509                    - 1 0
510                    + 5 7
511                    - 2 1
512                    + 9 4"
513                )
514                .sort_rows(),
515            );
516
517            // (5, 7, 8, 9)
518            // barrier
519            top_n.expect_barrier().await;
520
521            assert_eq!(
522                top_n.expect_chunk().await.sort_rows(),
523                StreamChunk::from_pretty(
524                    "  I I
525                    -  9 4
526                    +  6 9"
527                )
528                .sort_rows(),
529            );
530            // (5, 6, 7, 8)
531            // barrier
532            top_n.expect_barrier().await;
533
534            assert_eq!(
535                top_n.expect_chunk().await.sort_rows(),
536                StreamChunk::from_pretty(
537                    "  I I
538                    -  5 7
539                    +  9 4
540                    -  6 9
541                    + 10 3"
542                )
543                .sort_rows(),
544            );
545            // (7, 8, 9, 10)
546            // barrier
547            top_n.expect_barrier().await;
548        }
549
550        #[tokio::test]
551        async fn test_top_n_executor_with_offset_and_limit() {
552            let source = create_source();
553            let state_table = create_in_memory_state_table(
554                &[DataType::Int64, DataType::Int64],
555                &[OrderType::ascending(), OrderType::ascending()],
556                &pk_indices(),
557            )
558            .await;
559            let schema = source.schema().clone();
560            let top_n = TopNExecutor::<_, false>::new(
561                source,
562                ActorContext::for_test(0),
563                schema,
564                storage_key(),
565                (3, 4),
566                order_by(),
567                state_table,
568            )
569            .unwrap();
570            let mut top_n = top_n.boxed().execute();
571
572            // consume the init barrier
573            top_n.expect_barrier().await;
574            assert_eq!(
575                top_n.expect_chunk().await.sort_rows(),
576                StreamChunk::from_pretty(
577                    "  I I
578                    + 10 3
579                    +  9 4
580                    +  8 5"
581                )
582                .sort_rows(),
583            );
584            // barrier
585            top_n.expect_barrier().await;
586            assert_eq!(
587                top_n.expect_chunk().await.sort_rows(),
588                StreamChunk::from_pretty(
589                    "  I I
590                    -  8 5
591                    + 11 8"
592                )
593                .sort_rows(),
594            );
595            // barrier
596            top_n.expect_barrier().await;
597
598            assert_eq!(
599                top_n.expect_chunk().await.sort_rows(),
600                StreamChunk::from_pretty(
601                    "  I I
602                    +  8 5"
603                )
604                .sort_rows(),
605            );
606            // barrier
607            top_n.expect_barrier().await;
608            assert_eq!(
609                top_n.expect_chunk().await.sort_rows(),
610                StreamChunk::from_pretty(
611                    "  I  I
612                    -  8  5
613                    + 12 10
614                    -  9  4
615                    + 13 11
616                    - 11  8
617                    + 14 12"
618                )
619                .sort_rows(),
620            );
621            // barrier
622            top_n.expect_barrier().await;
623        }
624    }
625
626    mod test2 {
627
628        use risingwave_common::util::epoch::test_epoch;
629        use risingwave_storage::memory::MemoryStateStore;
630
631        use super::*;
632        use crate::executor::test_utils::StreamExecutorTestExt;
633        use crate::executor::test_utils::top_n_executor::create_in_memory_state_table_from_state_store;
634        fn create_source_new() -> Executor {
635            let mut chunks = vec![
636                StreamChunk::from_pretty(
637                    " I I I I
638                +  1 1 4 1001",
639                ),
640                StreamChunk::from_pretty(
641                    " I I I I
642                +  5 1 4 1002 ",
643                ),
644                StreamChunk::from_pretty(
645                    " I I I I
646                +  1 9 1 1003
647                +  9 8 1 1004
648                +  0 2 3 1005",
649                ),
650                StreamChunk::from_pretty(
651                    " I I I I
652                +  1 0 2 1006",
653                ),
654            ];
655            let schema = Schema {
656                fields: vec![
657                    Field::unnamed(DataType::Int64),
658                    Field::unnamed(DataType::Int64),
659                    Field::unnamed(DataType::Int64),
660                    Field::unnamed(DataType::Int64),
661                ],
662            };
663            MockSource::with_messages(vec![
664                Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
665                Message::Chunk(std::mem::take(&mut chunks[0])),
666                Message::Chunk(std::mem::take(&mut chunks[1])),
667                Message::Chunk(std::mem::take(&mut chunks[2])),
668                Message::Chunk(std::mem::take(&mut chunks[3])),
669                Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
670            ])
671            .into_executor(schema, pk_indices())
672        }
673
674        fn create_source_new_before_recovery() -> Executor {
675            let mut chunks = [
676                StreamChunk::from_pretty(
677                    " I I I I
678                +  1 1 4 1001",
679                ),
680                StreamChunk::from_pretty(
681                    " I I I I
682                +  5 1 4 1002 ",
683                ),
684            ];
685            let schema = Schema {
686                fields: vec![
687                    Field::unnamed(DataType::Int64),
688                    Field::unnamed(DataType::Int64),
689                    Field::unnamed(DataType::Int64),
690                    Field::unnamed(DataType::Int64),
691                ],
692            };
693            MockSource::with_messages(vec![
694                Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
695                Message::Chunk(std::mem::take(&mut chunks[0])),
696                Message::Chunk(std::mem::take(&mut chunks[1])),
697                Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
698            ])
699            .into_executor(schema, pk_indices())
700        }
701
702        fn create_source_new_after_recovery() -> Executor {
703            let mut chunks = [
704                StreamChunk::from_pretty(
705                    " I I I I
706                +  1 9 1 1003
707                +  9 8 1 1004
708                +  0 2 3 1005",
709                ),
710                StreamChunk::from_pretty(
711                    " I I I I
712                +  1 0 2 1006",
713                ),
714            ];
715            let schema = Schema {
716                fields: vec![
717                    Field::unnamed(DataType::Int64),
718                    Field::unnamed(DataType::Int64),
719                    Field::unnamed(DataType::Int64),
720                    Field::unnamed(DataType::Int64),
721                ],
722            };
723            MockSource::with_messages(vec![
724                Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
725                Message::Chunk(std::mem::take(&mut chunks[0])),
726                Message::Chunk(std::mem::take(&mut chunks[1])),
727                Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
728            ])
729            .into_executor(schema, pk_indices())
730        }
731
732        fn storage_key() -> Vec<ColumnOrder> {
733            order_by()
734        }
735
736        fn order_by() -> Vec<ColumnOrder> {
737            vec![
738                ColumnOrder::new(0, OrderType::ascending()),
739                ColumnOrder::new(3, OrderType::ascending()),
740            ]
741        }
742
743        fn pk_indices() -> PkIndices {
744            vec![0, 3]
745        }
746
747        #[tokio::test]
748        async fn test_top_n_executor_with_offset_and_limit_new() {
749            let source = create_source_new();
750            let state_table = create_in_memory_state_table(
751                &[
752                    DataType::Int64,
753                    DataType::Int64,
754                    DataType::Int64,
755                    DataType::Int64,
756                ],
757                &[OrderType::ascending(), OrderType::ascending()],
758                &pk_indices(),
759            )
760            .await;
761            let schema = source.schema().clone();
762            let top_n = TopNExecutor::<_, false>::new(
763                source,
764                ActorContext::for_test(0),
765                schema,
766                storage_key(),
767                (1, 3),
768                order_by(),
769                state_table,
770            )
771            .unwrap();
772            let mut top_n = top_n.boxed().execute();
773
774            // consume the init barrier
775            top_n.expect_barrier().await;
776
777            assert_eq!(
778                top_n.expect_chunk().await.sort_rows(),
779                StreamChunk::from_pretty(
780                    "  I I I I
781                    +  5 1 4 1002"
782                )
783                .sort_rows(),
784            );
785
786            assert_eq!(
787                top_n.expect_chunk().await.sort_rows(),
788                StreamChunk::from_pretty(
789                    "  I I I I
790                    +  1 9 1 1003
791                    +  1 1 4 1001",
792                )
793                .sort_rows(),
794            );
795
796            assert_eq!(
797                top_n.expect_chunk().await.sort_rows(),
798                StreamChunk::from_pretty(
799                    "  I I I I
800                    -  5 1 4 1002
801                    +  1 0 2 1006",
802                )
803                .sort_rows(),
804            );
805
806            // barrier
807            top_n.expect_barrier().await;
808        }
809
810        #[tokio::test]
811        async fn test_top_n_executor_with_offset_and_limit_new_after_recovery() {
812            let state_store = MemoryStateStore::new();
813            let state_table = create_in_memory_state_table_from_state_store(
814                &[
815                    DataType::Int64,
816                    DataType::Int64,
817                    DataType::Int64,
818                    DataType::Int64,
819                ],
820                &[OrderType::ascending(), OrderType::ascending()],
821                &pk_indices(),
822                state_store.clone(),
823            )
824            .await;
825            let source = create_source_new_before_recovery();
826            let schema = source.schema().clone();
827            let top_n = TopNExecutor::<_, false>::new(
828                source,
829                ActorContext::for_test(0),
830                schema,
831                storage_key(),
832                (1, 3),
833                order_by(),
834                state_table,
835            )
836            .unwrap();
837            let mut top_n = top_n.boxed().execute();
838
839            // consume the init barrier
840            top_n.expect_barrier().await;
841
842            assert_eq!(
843                top_n.expect_chunk().await.sort_rows(),
844                StreamChunk::from_pretty(
845                    "  I I I I
846                    +  5 1 4 1002"
847                )
848                .sort_rows(),
849            );
850
851            // barrier
852            top_n.expect_barrier().await;
853
854            let state_table = create_in_memory_state_table_from_state_store(
855                &[
856                    DataType::Int64,
857                    DataType::Int64,
858                    DataType::Int64,
859                    DataType::Int64,
860                ],
861                &[OrderType::ascending(), OrderType::ascending()],
862                &pk_indices(),
863                state_store,
864            )
865            .await;
866
867            // recovery
868            let source = create_source_new_after_recovery();
869            let schema = source.schema().clone();
870            let top_n_after_recovery = TopNExecutor::<_, false>::new(
871                source,
872                ActorContext::for_test(0),
873                schema,
874                storage_key(),
875                (1, 3),
876                order_by(),
877                state_table,
878            )
879            .unwrap();
880            let mut top_n = top_n_after_recovery.boxed().execute();
881
882            // barrier
883            top_n.expect_barrier().await;
884
885            assert_eq!(
886                top_n.expect_chunk().await.sort_rows(),
887                StreamChunk::from_pretty(
888                    "  I I I I
889                    +  1 9 1 1003
890                    +  1 1 4 1001",
891                )
892                .sort_rows(),
893            );
894
895            assert_eq!(
896                top_n.expect_chunk().await.sort_rows(),
897                StreamChunk::from_pretty(
898                    "  I I I I
899                    -  5 1 4 1002
900                    +  1 0 2 1006",
901                )
902                .sort_rows(),
903            );
904
905            // barrier
906            top_n.expect_barrier().await;
907        }
908    }
909
910    mod test_with_ties {
911
912        use risingwave_common::util::epoch::test_epoch;
913        use risingwave_storage::memory::MemoryStateStore;
914
915        use super::*;
916        use crate::executor::test_utils::StreamExecutorTestExt;
917        use crate::executor::test_utils::top_n_executor::create_in_memory_state_table_from_state_store;
918
919        fn create_source() -> Executor {
920            let mut chunks = vec![
921                StreamChunk::from_pretty(
922                    "  I I
923                    +  1 0
924                    +  2 1
925                    +  3 2
926                    + 10 3
927                    +  9 4
928                    +  8 5
929                    ",
930                ),
931                StreamChunk::from_pretty(
932                    "  I I
933                    +  3 6
934                    +  3 7
935                    +  1 8
936                    +  2 9
937                    + 10 10",
938                ),
939                StreamChunk::from_pretty(
940                    " I I
941                    - 1 0",
942                ),
943                StreamChunk::from_pretty(
944                    " I I
945                    - 1 8",
946                ),
947            ];
948            let schema = Schema {
949                fields: vec![
950                    Field::unnamed(DataType::Int64),
951                    Field::unnamed(DataType::Int64),
952                ],
953            };
954            MockSource::with_messages(vec![
955                Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
956                Message::Chunk(std::mem::take(&mut chunks[0])),
957                Message::Chunk(std::mem::take(&mut chunks[1])),
958                Message::Chunk(std::mem::take(&mut chunks[2])),
959                Message::Chunk(std::mem::take(&mut chunks[3])),
960                Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
961            ])
962            .into_executor(schema, pk_indices())
963        }
964
965        fn storage_key() -> Vec<ColumnOrder> {
966            let mut v = order_by();
967            v.push(ColumnOrder::new(1, OrderType::ascending()));
968            v
969        }
970
971        fn order_by() -> Vec<ColumnOrder> {
972            vec![ColumnOrder::new(0, OrderType::ascending())]
973        }
974
975        fn pk_indices() -> PkIndices {
976            vec![0, 1]
977        }
978
979        #[tokio::test]
980        async fn test_with_ties() {
981            let source = create_source();
982            let state_table = create_in_memory_state_table(
983                &[DataType::Int64, DataType::Int64],
984                &[OrderType::ascending(), OrderType::ascending()],
985                &pk_indices(),
986            )
987            .await;
988            let schema = source.schema().clone();
989            let top_n = TopNExecutor::new_with_ties_for_test(
990                source,
991                ActorContext::for_test(0),
992                schema,
993                storage_key(),
994                (0, 3),
995                order_by(),
996                state_table,
997            )
998            .unwrap();
999            let mut top_n = top_n.boxed().execute();
1000
1001            // consume the init barrier
1002            top_n.expect_barrier().await;
1003            assert_eq!(
1004                top_n.expect_chunk().await.sort_rows(),
1005                StreamChunk::from_pretty(
1006                    " I I
1007                    + 1 0
1008                    + 2 1
1009                    + 3 2"
1010                )
1011                .sort_rows(),
1012            );
1013
1014            assert_eq!(
1015                top_n.expect_chunk().await.sort_rows(),
1016                StreamChunk::from_pretty(
1017                    " I I
1018                    - 3 2
1019                    + 1 8
1020                    + 2 9"
1021                )
1022                .sort_rows(),
1023            );
1024
1025            assert_eq!(
1026                top_n.expect_chunk().await.sort_rows(),
1027                StreamChunk::from_pretty(
1028                    " I I
1029                    - 1 0"
1030                )
1031                .sort_rows(),
1032            );
1033
1034            // High cache has only 2 capacity, but we need to trigger 3 inserts here!
1035            assert_eq!(
1036                top_n.expect_chunk().await.sort_rows(),
1037                StreamChunk::from_pretty(
1038                    " I I
1039                    - 1 8
1040                    + 3 2
1041                    + 3 6
1042                    + 3 7"
1043                )
1044                .sort_rows(),
1045            );
1046
1047            // barrier
1048            top_n.expect_barrier().await;
1049        }
1050
1051        fn create_source_before_recovery() -> Executor {
1052            let mut chunks = [
1053                StreamChunk::from_pretty(
1054                    "  I I
1055                    +  1 0
1056                    +  2 1
1057                    +  3 2
1058                    + 10 3
1059                    +  9 4
1060                    +  8 5",
1061                ),
1062                StreamChunk::from_pretty(
1063                    "  I I
1064                    +  3 6
1065                    +  3 7
1066                    +  1 8
1067                    +  2 9
1068                    + 10 10",
1069                ),
1070            ];
1071            let schema = Schema {
1072                fields: vec![
1073                    Field::unnamed(DataType::Int64),
1074                    Field::unnamed(DataType::Int64),
1075                ],
1076            };
1077            MockSource::with_messages(vec![
1078                Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1079                Message::Chunk(std::mem::take(&mut chunks[0])),
1080                Message::Chunk(std::mem::take(&mut chunks[1])),
1081                Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1082            ])
1083            .into_executor(schema, pk_indices())
1084        }
1085
1086        fn create_source_after_recovery() -> Executor {
1087            let mut chunks = [
1088                StreamChunk::from_pretty(
1089                    " I I
1090                    - 1 0",
1091                ),
1092                StreamChunk::from_pretty(
1093                    " I I
1094                    - 1 8",
1095                ),
1096            ];
1097            let schema = Schema {
1098                fields: vec![
1099                    Field::unnamed(DataType::Int64),
1100                    Field::unnamed(DataType::Int64),
1101                ],
1102            };
1103            MockSource::with_messages(vec![
1104                Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1105                Message::Chunk(std::mem::take(&mut chunks[0])),
1106                Message::Chunk(std::mem::take(&mut chunks[1])),
1107                Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1108            ])
1109            .into_executor(schema, pk_indices())
1110        }
1111
1112        #[tokio::test]
1113        async fn test_with_ties_recovery() {
1114            let state_store = MemoryStateStore::new();
1115            let state_table = create_in_memory_state_table_from_state_store(
1116                &[DataType::Int64, DataType::Int64],
1117                &[OrderType::ascending(), OrderType::ascending()],
1118                &pk_indices(),
1119                state_store.clone(),
1120            )
1121            .await;
1122            let source = create_source_before_recovery();
1123            let schema = source.schema().clone();
1124            let top_n = TopNExecutor::new_with_ties_for_test(
1125                source,
1126                ActorContext::for_test(0),
1127                schema,
1128                storage_key(),
1129                (0, 3),
1130                order_by(),
1131                state_table,
1132            )
1133            .unwrap();
1134            let mut top_n = top_n.boxed().execute();
1135
1136            // consume the init barrier
1137            top_n.expect_barrier().await;
1138            assert_eq!(
1139                top_n.expect_chunk().await.sort_rows(),
1140                StreamChunk::from_pretty(
1141                    " I I
1142                    + 1 0
1143                    + 2 1
1144                    + 3 2"
1145                )
1146                .sort_rows(),
1147            );
1148
1149            assert_eq!(
1150                top_n.expect_chunk().await.sort_rows(),
1151                StreamChunk::from_pretty(
1152                    " I I
1153                    - 3 2
1154                    + 1 8
1155                    + 2 9"
1156                )
1157                .sort_rows(),
1158            );
1159
1160            // barrier
1161            top_n.expect_barrier().await;
1162
1163            let state_table = create_in_memory_state_table_from_state_store(
1164                &[DataType::Int64, DataType::Int64],
1165                &[OrderType::ascending(), OrderType::ascending()],
1166                &pk_indices(),
1167                state_store,
1168            )
1169            .await;
1170
1171            // recovery
1172            let source = create_source_after_recovery();
1173            let schema = source.schema().clone();
1174            let top_n_after_recovery = TopNExecutor::new_with_ties_for_test(
1175                source,
1176                ActorContext::for_test(0),
1177                schema,
1178                storage_key(),
1179                (0, 3),
1180                order_by(),
1181                state_table,
1182            )
1183            .unwrap();
1184            let mut top_n = top_n_after_recovery.boxed().execute();
1185
1186            // barrier
1187            top_n.expect_barrier().await;
1188
1189            assert_eq!(
1190                top_n.expect_chunk().await.sort_rows(),
1191                StreamChunk::from_pretty(
1192                    " I I
1193                    - 1 0"
1194                )
1195                .sort_rows(),
1196            );
1197
1198            // High cache has only 2 capacity, but we need to trigger 3 inserts here!
1199            assert_eq!(
1200                top_n.expect_chunk().await.sort_rows(),
1201                StreamChunk::from_pretty(
1202                    " I I
1203                    - 1 8
1204                    + 3 2
1205                    + 3 6
1206                    + 3 7"
1207                )
1208                .sort_rows(),
1209            );
1210            // barrier
1211            top_n.expect_barrier().await;
1212        }
1213    }
1214}