risingwave_stream/executor/test_utils/
top_n_executor.rs1use itertools::Itertools;
16use risingwave_common::catalog::{ColumnDesc, ColumnId, TableId};
17use risingwave_common::types::DataType;
18use risingwave_common::util::sort_util::OrderType;
19use risingwave_storage::memory::MemoryStateStore;
20
21use crate::common::table::state_table::StateTable;
22use crate::common::table::test_utils::gen_pbtable;
23
24pub async fn create_in_memory_state_table(
25 data_types: &[DataType],
26 order_types: &[OrderType],
27 pk_indices: &[usize],
28) -> StateTable<MemoryStateStore> {
29 create_in_memory_state_table_from_state_store(
30 data_types,
31 order_types,
32 pk_indices,
33 MemoryStateStore::new(),
34 )
35 .await
36}
37
38pub async fn create_in_memory_state_table_from_state_store(
39 data_types: &[DataType],
40 order_types: &[OrderType],
41 pk_indices: &[usize],
42 state_store: MemoryStateStore,
43) -> StateTable<MemoryStateStore> {
44 let column_descs = data_types
45 .iter()
46 .enumerate()
47 .map(|(id, data_type)| ColumnDesc::unnamed(ColumnId::new(id as i32), data_type.clone()))
48 .collect_vec();
49 StateTable::from_table_catalog(
50 &gen_pbtable(
51 TableId::new(0),
52 column_descs,
53 order_types.to_vec(),
54 pk_indices.to_vec(),
55 0,
56 ),
57 state_store,
58 None,
59 )
60 .await
61}