risingwave_stream/common/table/
test_utils.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use itertools::Itertools;
16use risingwave_common::catalog::{ColumnDesc, TableId};
17use risingwave_common::util::iter_util::ZipEqFast;
18use risingwave_common::util::sort_util::OrderType;
19use risingwave_pb::catalog::PbTable;
20use risingwave_pb::common::PbColumnOrder;
21use risingwave_pb::plan_common::ColumnCatalog;
22
23pub fn gen_pbtable(
24    table_id: TableId,
25    column_descs: Vec<ColumnDesc>,
26    order_types: Vec<OrderType>,
27    pk_indices: Vec<usize>,
28    read_prefix_len_hint: usize,
29) -> PbTable {
30    let value_indices = (0..column_descs.len()).collect_vec();
31    gen_pbtable_with_value_indices(
32        table_id,
33        column_descs,
34        order_types,
35        pk_indices,
36        read_prefix_len_hint,
37        value_indices,
38    )
39}
40
41pub fn gen_pbtable_with_dist_key(
42    table_id: TableId,
43    column_descs: Vec<ColumnDesc>,
44    order_types: Vec<OrderType>,
45    pk_indices: Vec<usize>,
46    read_prefix_len_hint: usize,
47    distribution_key: Vec<usize>,
48) -> PbTable {
49    let value_indices = (0..column_descs.len()).collect_vec();
50    gen_pbtable_inner(
51        table_id,
52        column_descs,
53        order_types,
54        pk_indices,
55        read_prefix_len_hint,
56        value_indices,
57        distribution_key,
58    )
59}
60
61pub fn gen_pbtable_with_value_indices(
62    table_id: TableId,
63    column_descs: Vec<ColumnDesc>,
64    order_types: Vec<OrderType>,
65    pk_indices: Vec<usize>,
66    read_prefix_len_hint: usize,
67    value_indices: Vec<usize>,
68) -> PbTable {
69    gen_pbtable_inner(
70        table_id,
71        column_descs,
72        order_types,
73        pk_indices,
74        read_prefix_len_hint,
75        value_indices,
76        Vec::default(),
77    )
78}
79
80pub fn gen_pbtable_inner(
81    table_id: TableId,
82    column_descs: Vec<ColumnDesc>,
83    order_types: Vec<OrderType>,
84    pk_indices: Vec<usize>,
85    read_prefix_len_hint: usize,
86    value_indices: Vec<usize>,
87    distribution_key: Vec<usize>,
88) -> PbTable {
89    let prost_pk = pk_indices
90        .iter()
91        .zip_eq_fast(order_types.iter())
92        .map(|(idx, order)| PbColumnOrder {
93            column_index: *idx as _,
94            order_type: Some(order.to_protobuf()),
95        })
96        .collect();
97    let prost_columns = column_descs
98        .iter()
99        .map(|col| ColumnCatalog {
100            column_desc: Some(col.to_protobuf()),
101            is_hidden: false,
102        })
103        .collect();
104
105    let value_indices = value_indices.into_iter().map(|i| i as i32).collect_vec();
106    let distribution_key = distribution_key.into_iter().map(|i| i as i32).collect_vec();
107
108    PbTable {
109        id: table_id.table_id(),
110        columns: prost_columns,
111        pk: prost_pk,
112        read_prefix_len_hint: read_prefix_len_hint as u32,
113        value_indices,
114        distribution_key,
115        ..Default::default()
116    }
117}