risingwave_stream/executor/test_utils/
top_n_executor.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use itertools::Itertools;
16use risingwave_common::catalog::{ColumnDesc, ColumnId, TableId};
17use risingwave_common::types::DataType;
18use risingwave_common::util::sort_util::OrderType;
19use risingwave_storage::memory::MemoryStateStore;
20
21use crate::common::table::state_table::StateTable;
22use crate::common::table::test_utils::gen_pbtable;
23
24pub async fn create_in_memory_state_table(
25    data_types: &[DataType],
26    order_types: &[OrderType],
27    pk_indices: &[usize],
28) -> StateTable<MemoryStateStore> {
29    create_in_memory_state_table_from_state_store(
30        data_types,
31        order_types,
32        pk_indices,
33        MemoryStateStore::new(),
34    )
35    .await
36}
37
38pub async fn create_in_memory_state_table_from_state_store(
39    data_types: &[DataType],
40    order_types: &[OrderType],
41    pk_indices: &[usize],
42    state_store: MemoryStateStore,
43) -> StateTable<MemoryStateStore> {
44    let column_descs = data_types
45        .iter()
46        .enumerate()
47        .map(|(id, data_type)| ColumnDesc::unnamed(ColumnId::new(id as i32), data_type.clone()))
48        .collect_vec();
49    StateTable::from_table_catalog(
50        &gen_pbtable(
51            TableId::new(0),
52            column_descs,
53            order_types.to_vec(),
54            pk_indices.to_vec(),
55            0,
56        ),
57        state_store,
58        None,
59    )
60    .await
61}