1use risingwave_common::array::Op;
16use risingwave_common::row::{RowDeserializer, RowExt};
17use risingwave_common::util::epoch::EpochPair;
18use risingwave_common::util::sort_util::ColumnOrder;
19
20use super::top_n_cache::TopNStaging;
21use super::utils::*;
22use super::{ManagedTopNState, TopNCache, TopNCacheTrait};
23use crate::common::table::state_table::StateTablePostCommit;
24use crate::executor::prelude::*;
25
26pub type TopNExecutor<S, const WITH_TIES: bool> =
29 TopNExecutorWrapper<InnerTopNExecutor<S, WITH_TIES>>;
30
31impl<S: StateStore, const WITH_TIES: bool> TopNExecutor<S, WITH_TIES> {
32 #[allow(clippy::too_many_arguments)]
33 pub fn new(
34 input: Executor,
35 ctx: ActorContextRef,
36 schema: Schema,
37 storage_key: Vec<ColumnOrder>,
38 offset_and_limit: (usize, usize),
39 order_by: Vec<ColumnOrder>,
40 state_table: StateTable<S>,
41 ) -> StreamResult<Self> {
42 Ok(TopNExecutorWrapper {
43 input,
44 ctx,
45 inner: InnerTopNExecutor::new(
46 schema,
47 storage_key,
48 offset_and_limit,
49 order_by,
50 state_table,
51 )?,
52 })
53 }
54}
55
56impl<S: StateStore> TopNExecutor<S, true> {
57 #[allow(clippy::too_many_arguments)]
60 #[cfg(test)]
61 pub fn new_with_ties_for_test(
62 input: Executor,
63 ctx: ActorContextRef,
64 schema: Schema,
65 storage_key: Vec<ColumnOrder>,
66 offset_and_limit: (usize, usize),
67 order_by: Vec<ColumnOrder>,
68 state_table: StateTable<S>,
69 ) -> StreamResult<Self> {
70 let mut inner =
71 InnerTopNExecutor::new(schema, storage_key, offset_and_limit, order_by, state_table)?;
72
73 inner.cache.high_cache_capacity = 2;
74
75 Ok(TopNExecutorWrapper { input, ctx, inner })
76 }
77}
78
79pub struct InnerTopNExecutor<S: StateStore, const WITH_TIES: bool> {
80 schema: Schema,
81
82 storage_key_indices: PkIndices,
84
85 managed_state: ManagedTopNState<S>,
86
87 cache: TopNCache<WITH_TIES>,
89
90 cache_key_serde: CacheKeySerde,
92}
93
94impl<S: StateStore, const WITH_TIES: bool> InnerTopNExecutor<S, WITH_TIES> {
95 #[allow(clippy::too_many_arguments)]
103 pub fn new(
104 schema: Schema,
105 storage_key: Vec<ColumnOrder>,
106 offset_and_limit: (usize, usize),
107 order_by: Vec<ColumnOrder>,
108 state_table: StateTable<S>,
109 ) -> StreamResult<Self> {
110 let num_offset = offset_and_limit.0;
111 let num_limit = offset_and_limit.1;
112
113 let cache_key_serde = create_cache_key_serde(&storage_key, &schema, &order_by, &[]);
114 let managed_state = ManagedTopNState::<S>::new(state_table, cache_key_serde.clone());
115 let data_types = schema.data_types();
116
117 Ok(Self {
118 schema,
119 managed_state,
120 storage_key_indices: storage_key.into_iter().map(|op| op.column_index).collect(),
121 cache: TopNCache::new(num_offset, num_limit, data_types),
122 cache_key_serde,
123 })
124 }
125}
126
127impl<S: StateStore, const WITH_TIES: bool> TopNExecutorBase for InnerTopNExecutor<S, WITH_TIES>
128where
129 TopNCache<WITH_TIES>: TopNCacheTrait,
130{
131 type State = S;
132
133 async fn apply_chunk(
134 &mut self,
135 chunk: StreamChunk,
136 ) -> StreamExecutorResult<Option<StreamChunk>> {
137 let mut staging = TopNStaging::new();
138
139 for (op, row_ref) in chunk.rows() {
141 let pk_row = row_ref.project(&self.storage_key_indices);
142 let cache_key = serialize_pk_to_cache_key(pk_row, &self.cache_key_serde);
143 match op {
144 Op::Insert | Op::UpdateInsert => {
145 self.managed_state.insert(row_ref);
147 self.cache.insert(cache_key, row_ref, &mut staging)
148 }
149
150 Op::Delete | Op::UpdateDelete => {
151 self.managed_state.delete(row_ref);
153 self.cache
154 .delete(
155 NO_GROUP_KEY,
156 &mut self.managed_state,
157 cache_key,
158 row_ref,
159 &mut staging,
160 )
161 .await?
162 }
163 }
164 }
165
166 let data_types = self.schema.data_types();
167 let deserializer = RowDeserializer::new(data_types.clone());
168 if staging.is_empty() {
169 return Ok(None);
170 }
171 let mut chunk_builder = StreamChunkBuilder::unlimited(data_types, Some(staging.len()));
172 for res in staging.into_deserialized_changes(&deserializer) {
173 let (op, row) = res?;
174 let _none = chunk_builder.append_row(op, row);
175 }
176 Ok(chunk_builder.take())
177 }
178
179 async fn flush_data(
180 &mut self,
181 epoch: EpochPair,
182 ) -> StreamExecutorResult<StateTablePostCommit<'_, S>> {
183 self.managed_state.flush(epoch).await
184 }
185
186 async fn try_flush_data(&mut self) -> StreamExecutorResult<()> {
187 self.managed_state.try_flush().await
188 }
189
190 async fn init(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
191 self.managed_state.init_epoch(epoch).await?;
192 self.managed_state
193 .init_topn_cache(NO_GROUP_KEY, &mut self.cache)
194 .await
195 }
196
197 async fn handle_watermark(&mut self, _: Watermark) -> Option<Watermark> {
198 None
200 }
201}
202
203#[cfg(test)]
204mod tests {
205 use risingwave_common::array::stream_chunk::StreamChunkTestExt;
206 use risingwave_common::catalog::{Field, Schema};
207 use risingwave_common::types::DataType;
208 use risingwave_common::util::sort_util::OrderType;
209
210 use super::*;
211 use crate::executor::test_utils::MockSource;
212 use crate::executor::test_utils::top_n_executor::create_in_memory_state_table;
213 use crate::executor::{Barrier, Message};
214
215 mod test1 {
216
217 use risingwave_common::util::epoch::test_epoch;
218
219 use super::*;
220 use crate::executor::test_utils::StreamExecutorTestExt;
221
222 fn create_stream_chunks() -> Vec<StreamChunk> {
223 let chunk1 = StreamChunk::from_pretty(
224 " I I
225 + 1 0
226 + 2 1
227 + 3 2
228 + 10 3
229 + 9 4
230 + 8 5",
231 );
232 let chunk2 = StreamChunk::from_pretty(
233 " I I
234 + 7 6
235 - 3 2
236 - 1 0
237 + 5 7
238 - 2 1
239 + 11 8",
240 );
241 let chunk3 = StreamChunk::from_pretty(
242 " I I
243 + 6 9
244 + 12 10
245 + 13 11
246 + 14 12",
247 );
248 let chunk4 = StreamChunk::from_pretty(
249 " I I
250 - 5 7
251 - 6 9
252 - 11 8",
253 );
254 vec![chunk1, chunk2, chunk3, chunk4]
255 }
256
257 fn create_schema() -> Schema {
258 Schema {
259 fields: vec![
260 Field::unnamed(DataType::Int64),
261 Field::unnamed(DataType::Int64),
262 ],
263 }
264 }
265
266 fn storage_key() -> Vec<ColumnOrder> {
267 let mut v = order_by();
268 v.extend([ColumnOrder::new(1, OrderType::ascending())]);
269 v
270 }
271
272 fn order_by() -> Vec<ColumnOrder> {
273 vec![ColumnOrder::new(0, OrderType::ascending())]
274 }
275
276 fn pk_indices() -> PkIndices {
277 vec![0, 1]
278 }
279
280 fn create_source() -> Executor {
281 let mut chunks = create_stream_chunks();
282 let schema = create_schema();
283 MockSource::with_messages(vec![
284 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
285 Message::Chunk(std::mem::take(&mut chunks[0])),
286 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
287 Message::Chunk(std::mem::take(&mut chunks[1])),
288 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
289 Message::Chunk(std::mem::take(&mut chunks[2])),
290 Message::Barrier(Barrier::new_test_barrier(test_epoch(4))),
291 Message::Chunk(std::mem::take(&mut chunks[3])),
292 Message::Barrier(Barrier::new_test_barrier(test_epoch(5))),
293 ])
294 .into_executor(schema, pk_indices())
295 }
296
297 #[tokio::test]
298 async fn test_top_n_executor_with_offset() {
299 let source = create_source();
300 let state_table = create_in_memory_state_table(
301 &[DataType::Int64, DataType::Int64],
302 &[OrderType::ascending(), OrderType::ascending()],
303 &pk_indices(),
304 )
305 .await;
306
307 let schema = source.schema().clone();
308 let top_n = TopNExecutor::<_, false>::new(
309 source,
310 ActorContext::for_test(0),
311 schema,
312 storage_key(),
313 (3, 1000),
314 order_by(),
315 state_table,
316 )
317 .unwrap();
318 let mut top_n = top_n.boxed().execute();
319
320 top_n.expect_barrier().await;
322 assert_eq!(
323 top_n.expect_chunk().await.sort_rows(),
324 StreamChunk::from_pretty(
325 " I I
326 + 10 3
327 + 9 4
328 + 8 5"
329 )
330 .sort_rows(),
331 );
332 top_n.expect_barrier().await;
334 assert_eq!(
335 top_n.expect_chunk().await.sort_rows(),
336 StreamChunk::from_pretty(
337 " I I
338 - 8 5
339 + 11 8"
340 )
341 .sort_rows(),
342 );
343
344 top_n.expect_barrier().await;
346
347 assert_eq!(
349 top_n.expect_chunk().await.sort_rows(),
350 StreamChunk::from_pretty(
351 " I I
352 + 8 5
353 + 12 10
354 + 13 11
355 + 14 12"
356 )
357 .sort_rows(),
358 );
359 top_n.expect_barrier().await;
361
362 assert_eq!(
364 top_n.expect_chunk().await.sort_rows(),
365 StreamChunk::from_pretty(
366 " I I
367 - 8 5
368 - 9 4
369 - 11 8"
370 )
371 .sort_rows(),
372 );
373 top_n.expect_barrier().await;
375 }
376
377 #[tokio::test]
378 async fn test_top_n_executor_with_limit() {
379 let source = create_source();
380 let state_table = create_in_memory_state_table(
381 &[DataType::Int64, DataType::Int64],
382 &[OrderType::ascending(), OrderType::ascending()],
383 &pk_indices(),
384 )
385 .await;
386 let schema = source.schema().clone();
387 let top_n = TopNExecutor::<_, false>::new(
388 source,
389 ActorContext::for_test(0),
390 schema,
391 storage_key(),
392 (0, 4),
393 order_by(),
394 state_table,
395 )
396 .unwrap();
397 let mut top_n = top_n.boxed().execute();
398
399 top_n.expect_barrier().await;
401 assert_eq!(
402 top_n.expect_chunk().await.sort_rows(),
403 StreamChunk::from_pretty(
404 " I I
405 + 1 0
406 + 2 1
407 + 3 2
408 + 8 5"
409 )
410 .sort_rows(),
411 );
412 top_n.expect_barrier().await;
416 assert_eq!(
417 top_n.expect_chunk().await.sort_rows(),
418 StreamChunk::from_pretty(
419 " I I
420 + 7 6
421 - 3 2
422 - 1 0
423 + 5 7
424 - 2 1
425 + 9 4"
426 )
427 .sort_rows(),
428 );
429
430 top_n.expect_barrier().await;
433
434 assert_eq!(
435 top_n.expect_chunk().await.sort_rows(),
436 StreamChunk::from_pretty(
437 " I I
438 - 9 4
439 + 6 9"
440 )
441 .sort_rows(),
442 );
443 top_n.expect_barrier().await;
446
447 assert_eq!(
448 top_n.expect_chunk().await.sort_rows(),
449 StreamChunk::from_pretty(
450 " I I
451 - 5 7
452 + 9 4
453 - 6 9
454 + 10 3"
455 )
456 .sort_rows(),
457 );
458 top_n.expect_barrier().await;
461 }
462
463 #[tokio::test]
465 async fn test_top_n_executor_with_limit_with_ties() {
466 let source = create_source();
467 let state_table = create_in_memory_state_table(
468 &[DataType::Int64, DataType::Int64],
469 &[OrderType::ascending(), OrderType::ascending()],
470 &pk_indices(),
471 )
472 .await;
473 let schema = source.schema().clone();
474 let top_n = TopNExecutor::<_, true>::new(
475 source,
476 ActorContext::for_test(0),
477 schema,
478 storage_key(),
479 (0, 4),
480 order_by(),
481 state_table,
482 )
483 .unwrap();
484 let mut top_n = top_n.boxed().execute();
485
486 top_n.expect_barrier().await;
488 assert_eq!(
489 top_n.expect_chunk().await.sort_rows(),
490 StreamChunk::from_pretty(
491 " I I
492 + 1 0
493 + 2 1
494 + 3 2
495 + 8 5"
496 )
497 .sort_rows(),
498 );
499 top_n.expect_barrier().await;
503 assert_eq!(
504 top_n.expect_chunk().await.sort_rows(),
505 StreamChunk::from_pretty(
506 " I I
507 + 7 6
508 - 3 2
509 - 1 0
510 + 5 7
511 - 2 1
512 + 9 4"
513 )
514 .sort_rows(),
515 );
516
517 top_n.expect_barrier().await;
520
521 assert_eq!(
522 top_n.expect_chunk().await.sort_rows(),
523 StreamChunk::from_pretty(
524 " I I
525 - 9 4
526 + 6 9"
527 )
528 .sort_rows(),
529 );
530 top_n.expect_barrier().await;
533
534 assert_eq!(
535 top_n.expect_chunk().await.sort_rows(),
536 StreamChunk::from_pretty(
537 " I I
538 - 5 7
539 + 9 4
540 - 6 9
541 + 10 3"
542 )
543 .sort_rows(),
544 );
545 top_n.expect_barrier().await;
548 }
549
550 #[tokio::test]
551 async fn test_top_n_executor_with_offset_and_limit() {
552 let source = create_source();
553 let state_table = create_in_memory_state_table(
554 &[DataType::Int64, DataType::Int64],
555 &[OrderType::ascending(), OrderType::ascending()],
556 &pk_indices(),
557 )
558 .await;
559 let schema = source.schema().clone();
560 let top_n = TopNExecutor::<_, false>::new(
561 source,
562 ActorContext::for_test(0),
563 schema,
564 storage_key(),
565 (3, 4),
566 order_by(),
567 state_table,
568 )
569 .unwrap();
570 let mut top_n = top_n.boxed().execute();
571
572 top_n.expect_barrier().await;
574 assert_eq!(
575 top_n.expect_chunk().await.sort_rows(),
576 StreamChunk::from_pretty(
577 " I I
578 + 10 3
579 + 9 4
580 + 8 5"
581 )
582 .sort_rows(),
583 );
584 top_n.expect_barrier().await;
586 assert_eq!(
587 top_n.expect_chunk().await.sort_rows(),
588 StreamChunk::from_pretty(
589 " I I
590 - 8 5
591 + 11 8"
592 )
593 .sort_rows(),
594 );
595 top_n.expect_barrier().await;
597
598 assert_eq!(
599 top_n.expect_chunk().await.sort_rows(),
600 StreamChunk::from_pretty(
601 " I I
602 + 8 5"
603 )
604 .sort_rows(),
605 );
606 top_n.expect_barrier().await;
608 assert_eq!(
609 top_n.expect_chunk().await.sort_rows(),
610 StreamChunk::from_pretty(
611 " I I
612 - 8 5
613 + 12 10
614 - 9 4
615 + 13 11
616 - 11 8
617 + 14 12"
618 )
619 .sort_rows(),
620 );
621 top_n.expect_barrier().await;
623 }
624 }
625
626 mod test2 {
627
628 use risingwave_common::util::epoch::test_epoch;
629 use risingwave_storage::memory::MemoryStateStore;
630
631 use super::*;
632 use crate::executor::test_utils::StreamExecutorTestExt;
633 use crate::executor::test_utils::top_n_executor::create_in_memory_state_table_from_state_store;
634 fn create_source_new() -> Executor {
635 let mut chunks = vec![
636 StreamChunk::from_pretty(
637 " I I I I
638 + 1 1 4 1001",
639 ),
640 StreamChunk::from_pretty(
641 " I I I I
642 + 5 1 4 1002 ",
643 ),
644 StreamChunk::from_pretty(
645 " I I I I
646 + 1 9 1 1003
647 + 9 8 1 1004
648 + 0 2 3 1005",
649 ),
650 StreamChunk::from_pretty(
651 " I I I I
652 + 1 0 2 1006",
653 ),
654 ];
655 let schema = Schema {
656 fields: vec![
657 Field::unnamed(DataType::Int64),
658 Field::unnamed(DataType::Int64),
659 Field::unnamed(DataType::Int64),
660 Field::unnamed(DataType::Int64),
661 ],
662 };
663 MockSource::with_messages(vec![
664 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
665 Message::Chunk(std::mem::take(&mut chunks[0])),
666 Message::Chunk(std::mem::take(&mut chunks[1])),
667 Message::Chunk(std::mem::take(&mut chunks[2])),
668 Message::Chunk(std::mem::take(&mut chunks[3])),
669 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
670 ])
671 .into_executor(schema, pk_indices())
672 }
673
674 fn create_source_new_before_recovery() -> Executor {
675 let mut chunks = [
676 StreamChunk::from_pretty(
677 " I I I I
678 + 1 1 4 1001",
679 ),
680 StreamChunk::from_pretty(
681 " I I I I
682 + 5 1 4 1002 ",
683 ),
684 ];
685 let schema = Schema {
686 fields: vec![
687 Field::unnamed(DataType::Int64),
688 Field::unnamed(DataType::Int64),
689 Field::unnamed(DataType::Int64),
690 Field::unnamed(DataType::Int64),
691 ],
692 };
693 MockSource::with_messages(vec![
694 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
695 Message::Chunk(std::mem::take(&mut chunks[0])),
696 Message::Chunk(std::mem::take(&mut chunks[1])),
697 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
698 ])
699 .into_executor(schema, pk_indices())
700 }
701
702 fn create_source_new_after_recovery() -> Executor {
703 let mut chunks = [
704 StreamChunk::from_pretty(
705 " I I I I
706 + 1 9 1 1003
707 + 9 8 1 1004
708 + 0 2 3 1005",
709 ),
710 StreamChunk::from_pretty(
711 " I I I I
712 + 1 0 2 1006",
713 ),
714 ];
715 let schema = Schema {
716 fields: vec![
717 Field::unnamed(DataType::Int64),
718 Field::unnamed(DataType::Int64),
719 Field::unnamed(DataType::Int64),
720 Field::unnamed(DataType::Int64),
721 ],
722 };
723 MockSource::with_messages(vec![
724 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
725 Message::Chunk(std::mem::take(&mut chunks[0])),
726 Message::Chunk(std::mem::take(&mut chunks[1])),
727 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
728 ])
729 .into_executor(schema, pk_indices())
730 }
731
732 fn storage_key() -> Vec<ColumnOrder> {
733 order_by()
734 }
735
736 fn order_by() -> Vec<ColumnOrder> {
737 vec![
738 ColumnOrder::new(0, OrderType::ascending()),
739 ColumnOrder::new(3, OrderType::ascending()),
740 ]
741 }
742
743 fn pk_indices() -> PkIndices {
744 vec![0, 3]
745 }
746
747 #[tokio::test]
748 async fn test_top_n_executor_with_offset_and_limit_new() {
749 let source = create_source_new();
750 let state_table = create_in_memory_state_table(
751 &[
752 DataType::Int64,
753 DataType::Int64,
754 DataType::Int64,
755 DataType::Int64,
756 ],
757 &[OrderType::ascending(), OrderType::ascending()],
758 &pk_indices(),
759 )
760 .await;
761 let schema = source.schema().clone();
762 let top_n = TopNExecutor::<_, false>::new(
763 source,
764 ActorContext::for_test(0),
765 schema,
766 storage_key(),
767 (1, 3),
768 order_by(),
769 state_table,
770 )
771 .unwrap();
772 let mut top_n = top_n.boxed().execute();
773
774 top_n.expect_barrier().await;
776
777 assert_eq!(
778 top_n.expect_chunk().await.sort_rows(),
779 StreamChunk::from_pretty(
780 " I I I I
781 + 5 1 4 1002"
782 )
783 .sort_rows(),
784 );
785
786 assert_eq!(
787 top_n.expect_chunk().await.sort_rows(),
788 StreamChunk::from_pretty(
789 " I I I I
790 + 1 9 1 1003
791 + 1 1 4 1001",
792 )
793 .sort_rows(),
794 );
795
796 assert_eq!(
797 top_n.expect_chunk().await.sort_rows(),
798 StreamChunk::from_pretty(
799 " I I I I
800 - 5 1 4 1002
801 + 1 0 2 1006",
802 )
803 .sort_rows(),
804 );
805
806 top_n.expect_barrier().await;
808 }
809
810 #[tokio::test]
811 async fn test_top_n_executor_with_offset_and_limit_new_after_recovery() {
812 let state_store = MemoryStateStore::new();
813 let state_table = create_in_memory_state_table_from_state_store(
814 &[
815 DataType::Int64,
816 DataType::Int64,
817 DataType::Int64,
818 DataType::Int64,
819 ],
820 &[OrderType::ascending(), OrderType::ascending()],
821 &pk_indices(),
822 state_store.clone(),
823 )
824 .await;
825 let source = create_source_new_before_recovery();
826 let schema = source.schema().clone();
827 let top_n = TopNExecutor::<_, false>::new(
828 source,
829 ActorContext::for_test(0),
830 schema,
831 storage_key(),
832 (1, 3),
833 order_by(),
834 state_table,
835 )
836 .unwrap();
837 let mut top_n = top_n.boxed().execute();
838
839 top_n.expect_barrier().await;
841
842 assert_eq!(
843 top_n.expect_chunk().await.sort_rows(),
844 StreamChunk::from_pretty(
845 " I I I I
846 + 5 1 4 1002"
847 )
848 .sort_rows(),
849 );
850
851 top_n.expect_barrier().await;
853
854 let state_table = create_in_memory_state_table_from_state_store(
855 &[
856 DataType::Int64,
857 DataType::Int64,
858 DataType::Int64,
859 DataType::Int64,
860 ],
861 &[OrderType::ascending(), OrderType::ascending()],
862 &pk_indices(),
863 state_store,
864 )
865 .await;
866
867 let source = create_source_new_after_recovery();
869 let schema = source.schema().clone();
870 let top_n_after_recovery = TopNExecutor::<_, false>::new(
871 source,
872 ActorContext::for_test(0),
873 schema,
874 storage_key(),
875 (1, 3),
876 order_by(),
877 state_table,
878 )
879 .unwrap();
880 let mut top_n = top_n_after_recovery.boxed().execute();
881
882 top_n.expect_barrier().await;
884
885 assert_eq!(
886 top_n.expect_chunk().await.sort_rows(),
887 StreamChunk::from_pretty(
888 " I I I I
889 + 1 9 1 1003
890 + 1 1 4 1001",
891 )
892 .sort_rows(),
893 );
894
895 assert_eq!(
896 top_n.expect_chunk().await.sort_rows(),
897 StreamChunk::from_pretty(
898 " I I I I
899 - 5 1 4 1002
900 + 1 0 2 1006",
901 )
902 .sort_rows(),
903 );
904
905 top_n.expect_barrier().await;
907 }
908 }
909
910 mod test_with_ties {
911
912 use risingwave_common::util::epoch::test_epoch;
913 use risingwave_storage::memory::MemoryStateStore;
914
915 use super::*;
916 use crate::executor::test_utils::StreamExecutorTestExt;
917 use crate::executor::test_utils::top_n_executor::create_in_memory_state_table_from_state_store;
918
919 fn create_source() -> Executor {
920 let mut chunks = vec![
921 StreamChunk::from_pretty(
922 " I I
923 + 1 0
924 + 2 1
925 + 3 2
926 + 10 3
927 + 9 4
928 + 8 5
929 ",
930 ),
931 StreamChunk::from_pretty(
932 " I I
933 + 3 6
934 + 3 7
935 + 1 8
936 + 2 9
937 + 10 10",
938 ),
939 StreamChunk::from_pretty(
940 " I I
941 - 1 0",
942 ),
943 StreamChunk::from_pretty(
944 " I I
945 - 1 8",
946 ),
947 ];
948 let schema = Schema {
949 fields: vec![
950 Field::unnamed(DataType::Int64),
951 Field::unnamed(DataType::Int64),
952 ],
953 };
954 MockSource::with_messages(vec![
955 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
956 Message::Chunk(std::mem::take(&mut chunks[0])),
957 Message::Chunk(std::mem::take(&mut chunks[1])),
958 Message::Chunk(std::mem::take(&mut chunks[2])),
959 Message::Chunk(std::mem::take(&mut chunks[3])),
960 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
961 ])
962 .into_executor(schema, pk_indices())
963 }
964
965 fn storage_key() -> Vec<ColumnOrder> {
966 let mut v = order_by();
967 v.push(ColumnOrder::new(1, OrderType::ascending()));
968 v
969 }
970
971 fn order_by() -> Vec<ColumnOrder> {
972 vec![ColumnOrder::new(0, OrderType::ascending())]
973 }
974
975 fn pk_indices() -> PkIndices {
976 vec![0, 1]
977 }
978
979 #[tokio::test]
980 async fn test_with_ties() {
981 let source = create_source();
982 let state_table = create_in_memory_state_table(
983 &[DataType::Int64, DataType::Int64],
984 &[OrderType::ascending(), OrderType::ascending()],
985 &pk_indices(),
986 )
987 .await;
988 let schema = source.schema().clone();
989 let top_n = TopNExecutor::new_with_ties_for_test(
990 source,
991 ActorContext::for_test(0),
992 schema,
993 storage_key(),
994 (0, 3),
995 order_by(),
996 state_table,
997 )
998 .unwrap();
999 let mut top_n = top_n.boxed().execute();
1000
1001 top_n.expect_barrier().await;
1003 assert_eq!(
1004 top_n.expect_chunk().await.sort_rows(),
1005 StreamChunk::from_pretty(
1006 " I I
1007 + 1 0
1008 + 2 1
1009 + 3 2"
1010 )
1011 .sort_rows(),
1012 );
1013
1014 assert_eq!(
1015 top_n.expect_chunk().await.sort_rows(),
1016 StreamChunk::from_pretty(
1017 " I I
1018 - 3 2
1019 + 1 8
1020 + 2 9"
1021 )
1022 .sort_rows(),
1023 );
1024
1025 assert_eq!(
1026 top_n.expect_chunk().await.sort_rows(),
1027 StreamChunk::from_pretty(
1028 " I I
1029 - 1 0"
1030 )
1031 .sort_rows(),
1032 );
1033
1034 assert_eq!(
1036 top_n.expect_chunk().await.sort_rows(),
1037 StreamChunk::from_pretty(
1038 " I I
1039 - 1 8
1040 + 3 2
1041 + 3 6
1042 + 3 7"
1043 )
1044 .sort_rows(),
1045 );
1046
1047 top_n.expect_barrier().await;
1049 }
1050
1051 fn create_source_before_recovery() -> Executor {
1052 let mut chunks = [
1053 StreamChunk::from_pretty(
1054 " I I
1055 + 1 0
1056 + 2 1
1057 + 3 2
1058 + 10 3
1059 + 9 4
1060 + 8 5",
1061 ),
1062 StreamChunk::from_pretty(
1063 " I I
1064 + 3 6
1065 + 3 7
1066 + 1 8
1067 + 2 9
1068 + 10 10",
1069 ),
1070 ];
1071 let schema = Schema {
1072 fields: vec![
1073 Field::unnamed(DataType::Int64),
1074 Field::unnamed(DataType::Int64),
1075 ],
1076 };
1077 MockSource::with_messages(vec![
1078 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1079 Message::Chunk(std::mem::take(&mut chunks[0])),
1080 Message::Chunk(std::mem::take(&mut chunks[1])),
1081 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1082 ])
1083 .into_executor(schema, pk_indices())
1084 }
1085
1086 fn create_source_after_recovery() -> Executor {
1087 let mut chunks = [
1088 StreamChunk::from_pretty(
1089 " I I
1090 - 1 0",
1091 ),
1092 StreamChunk::from_pretty(
1093 " I I
1094 - 1 8",
1095 ),
1096 ];
1097 let schema = Schema {
1098 fields: vec![
1099 Field::unnamed(DataType::Int64),
1100 Field::unnamed(DataType::Int64),
1101 ],
1102 };
1103 MockSource::with_messages(vec![
1104 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1105 Message::Chunk(std::mem::take(&mut chunks[0])),
1106 Message::Chunk(std::mem::take(&mut chunks[1])),
1107 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1108 ])
1109 .into_executor(schema, pk_indices())
1110 }
1111
1112 #[tokio::test]
1113 async fn test_with_ties_recovery() {
1114 let state_store = MemoryStateStore::new();
1115 let state_table = create_in_memory_state_table_from_state_store(
1116 &[DataType::Int64, DataType::Int64],
1117 &[OrderType::ascending(), OrderType::ascending()],
1118 &pk_indices(),
1119 state_store.clone(),
1120 )
1121 .await;
1122 let source = create_source_before_recovery();
1123 let schema = source.schema().clone();
1124 let top_n = TopNExecutor::new_with_ties_for_test(
1125 source,
1126 ActorContext::for_test(0),
1127 schema,
1128 storage_key(),
1129 (0, 3),
1130 order_by(),
1131 state_table,
1132 )
1133 .unwrap();
1134 let mut top_n = top_n.boxed().execute();
1135
1136 top_n.expect_barrier().await;
1138 assert_eq!(
1139 top_n.expect_chunk().await.sort_rows(),
1140 StreamChunk::from_pretty(
1141 " I I
1142 + 1 0
1143 + 2 1
1144 + 3 2"
1145 )
1146 .sort_rows(),
1147 );
1148
1149 assert_eq!(
1150 top_n.expect_chunk().await.sort_rows(),
1151 StreamChunk::from_pretty(
1152 " I I
1153 - 3 2
1154 + 1 8
1155 + 2 9"
1156 )
1157 .sort_rows(),
1158 );
1159
1160 top_n.expect_barrier().await;
1162
1163 let state_table = create_in_memory_state_table_from_state_store(
1164 &[DataType::Int64, DataType::Int64],
1165 &[OrderType::ascending(), OrderType::ascending()],
1166 &pk_indices(),
1167 state_store,
1168 )
1169 .await;
1170
1171 let source = create_source_after_recovery();
1173 let schema = source.schema().clone();
1174 let top_n_after_recovery = TopNExecutor::new_with_ties_for_test(
1175 source,
1176 ActorContext::for_test(0),
1177 schema,
1178 storage_key(),
1179 (0, 3),
1180 order_by(),
1181 state_table,
1182 )
1183 .unwrap();
1184 let mut top_n = top_n_after_recovery.boxed().execute();
1185
1186 top_n.expect_barrier().await;
1188
1189 assert_eq!(
1190 top_n.expect_chunk().await.sort_rows(),
1191 StreamChunk::from_pretty(
1192 " I I
1193 - 1 0"
1194 )
1195 .sort_rows(),
1196 );
1197
1198 assert_eq!(
1200 top_n.expect_chunk().await.sort_rows(),
1201 StreamChunk::from_pretty(
1202 " I I
1203 - 1 8
1204 + 3 2
1205 + 3 6
1206 + 3 7"
1207 )
1208 .sort_rows(),
1209 );
1210 top_n.expect_barrier().await;
1212 }
1213 }
1214}