risingwave_common/catalog/
internal_table.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::type_name;
16use std::fmt::Debug;
17
18use anyhow::anyhow;
19use itertools::Itertools;
20
21pub const RW_INTERNAL_TABLE_FUNCTION_NAME: &str = "rw_table";
22
23pub fn generate_internal_table_name_with_type(
24    job_name: &str,
25    fragment_id: u32,
26    table_id: u32,
27    table_type: &str,
28) -> String {
29    format!(
30        "__internal_{}_{}_{}_{}",
31        job_name,
32        fragment_id,
33        table_type.to_lowercase(),
34        table_id
35    )
36}
37
38pub fn get_dist_key_in_pk_indices<I: Eq + Copy + Debug, O: TryFrom<usize>>(
39    dist_key_indices: &[I],
40    pk_indices: &[I],
41) -> anyhow::Result<Vec<O>> {
42    dist_key_indices
43        .iter()
44        .map(|&di| {
45            pk_indices
46                .iter()
47                .position(|&pi| di == pi)
48                .ok_or_else(|| {
49                    anyhow!(
50                        "distribution key {:?} must be a subset of primary key {:?}",
51                        dist_key_indices,
52                        pk_indices
53                    )
54                })
55                .map(|idx| match O::try_from(idx) {
56                    Ok(idx) => idx,
57                    Err(_) => unreachable!("failed to cast {} to {}", idx, type_name::<O>()),
58                })
59        })
60        .try_collect()
61}
62
63/// Get distribution key start index in pk, and return None if `dist_key_in_pk_indices` is not empty
64/// or continuous.
65/// Note that `dist_key_in_pk_indices` may be shuffled, the start index should be the
66/// minimum value.
67pub fn get_dist_key_start_index_in_pk(dist_key_in_pk_indices: &[usize]) -> Option<usize> {
68    let mut sorted_dist_key = dist_key_in_pk_indices.iter().sorted();
69    if let Some(min_idx) = sorted_dist_key.next() {
70        let mut prev_idx = min_idx;
71        for idx in sorted_dist_key {
72            if *idx != prev_idx + 1 {
73                return None;
74            }
75            prev_idx = idx;
76        }
77        Some(*min_idx)
78    } else {
79        None
80    }
81}