risingwave_stream/common/table/
test_utils.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use itertools::Itertools;
use risingwave_common::catalog::{ColumnDesc, TableId};
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_common::util::sort_util::OrderType;
use risingwave_pb::catalog::PbTable;
use risingwave_pb::common::PbColumnOrder;
use risingwave_pb::plan_common::ColumnCatalog;

pub fn gen_pbtable(
    table_id: TableId,
    column_descs: Vec<ColumnDesc>,
    order_types: Vec<OrderType>,
    pk_indices: Vec<usize>,
    read_prefix_len_hint: usize,
) -> PbTable {
    let value_indices = (0..column_descs.len()).collect_vec();
    gen_pbtable_with_value_indices(
        table_id,
        column_descs,
        order_types,
        pk_indices,
        read_prefix_len_hint,
        value_indices,
    )
}

pub fn gen_pbtable_with_dist_key(
    table_id: TableId,
    column_descs: Vec<ColumnDesc>,
    order_types: Vec<OrderType>,
    pk_indices: Vec<usize>,
    read_prefix_len_hint: usize,
    distribution_key: Vec<usize>,
) -> PbTable {
    let value_indices = (0..column_descs.len()).collect_vec();
    gen_pbtable_inner(
        table_id,
        column_descs,
        order_types,
        pk_indices,
        read_prefix_len_hint,
        value_indices,
        distribution_key,
    )
}

pub fn gen_pbtable_with_value_indices(
    table_id: TableId,
    column_descs: Vec<ColumnDesc>,
    order_types: Vec<OrderType>,
    pk_indices: Vec<usize>,
    read_prefix_len_hint: usize,
    value_indices: Vec<usize>,
) -> PbTable {
    gen_pbtable_inner(
        table_id,
        column_descs,
        order_types,
        pk_indices,
        read_prefix_len_hint,
        value_indices,
        Vec::default(),
    )
}

pub fn gen_pbtable_inner(
    table_id: TableId,
    column_descs: Vec<ColumnDesc>,
    order_types: Vec<OrderType>,
    pk_indices: Vec<usize>,
    read_prefix_len_hint: usize,
    value_indices: Vec<usize>,
    distribution_key: Vec<usize>,
) -> PbTable {
    let prost_pk = pk_indices
        .iter()
        .zip_eq_fast(order_types.iter())
        .map(|(idx, order)| PbColumnOrder {
            column_index: *idx as _,
            order_type: Some(order.to_protobuf()),
        })
        .collect();
    let prost_columns = column_descs
        .iter()
        .map(|col| ColumnCatalog {
            column_desc: Some(col.to_protobuf()),
            is_hidden: false,
        })
        .collect();

    let value_indices = value_indices.into_iter().map(|i| i as i32).collect_vec();
    let distribution_key = distribution_key.into_iter().map(|i| i as i32).collect_vec();

    PbTable {
        id: table_id.table_id(),
        columns: prost_columns,
        pk: prost_pk,
        read_prefix_len_hint: read_prefix_len_hint as u32,
        value_indices,
        distribution_key,
        ..Default::default()
    }
}