risingwave_frontend/catalog/
index_catalog.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::ops::Deref;
17use std::sync::Arc;
18
19use educe::Educe;
20use itertools::Itertools;
21use risingwave_common::catalog::{Field, IndexId, Schema};
22use risingwave_common::util::epoch::Epoch;
23use risingwave_common::util::sort_util::ColumnOrder;
24use risingwave_pb::catalog::{PbIndex, PbIndexColumnProperties};
25
26use crate::catalog::table_catalog::TableType;
27use crate::catalog::{OwnedByUserCatalog, TableCatalog};
28use crate::expr::{ExprDisplay, ExprImpl, ExprRewriter as _, FunctionCall};
29use crate::user::UserId;
30
31#[derive(Clone, Debug, Educe)]
32#[educe(PartialEq, Eq, Hash)]
33pub struct TableIndex {
34    pub name: String,
35    /// The properties of the index columns.
36    /// <https://www.postgresql.org/docs/current/functions-info.html#FUNCTIONS-INFO-INDEX-COLUMN-PROPS>
37    pub index_column_properties: Vec<PbIndexColumnProperties>,
38
39    pub index_table: Arc<TableCatalog>,
40
41    pub primary_table: Arc<TableCatalog>,
42
43    pub primary_to_secondary_mapping: BTreeMap<usize, usize>,
44
45    pub secondary_to_primary_mapping: BTreeMap<usize, usize>,
46
47    /// Map function call from the primary table to the index table.
48    /// Use `HashMap` instead of `BTreeMap`, because `FunctionCall` can't be used as the key for
49    /// `BTreeMap`. BTW, the trait `std::hash::Hash` is not implemented for
50    /// `HashMap<function_call::FunctionCall, usize>`, so we need to ignore it. It will not
51    /// affect the correctness, since it can be derived by `index_item`.
52    #[educe(PartialEq(ignore))]
53    #[educe(Hash(ignore))]
54    pub function_mapping: HashMap<FunctionCall, usize>,
55
56    pub index_columns_len: u32,
57}
58
59#[derive(Clone, Debug, Hash, PartialEq, Eq)]
60pub enum IndexType {
61    Table(Arc<TableIndex>),
62}
63
64#[derive(Clone, Debug, Hash, PartialEq, Eq)]
65pub struct IndexCatalog {
66    pub id: IndexId,
67
68    pub name: String,
69
70    /// Only `InputRef` and `FuncCall` type index is supported Now.
71    /// The index of `InputRef` is the column index of the primary table.
72    /// The `index_item` size is equal to the index table columns size
73    /// The input args of `FuncCall` is also the column index of the primary table.
74    pub index_item: Vec<ExprImpl>,
75
76    pub index_type: IndexType,
77
78    pub primary_table: Arc<TableCatalog>,
79
80    pub created_at_epoch: Option<Epoch>,
81
82    pub initialized_at_epoch: Option<Epoch>,
83
84    pub created_at_cluster_version: Option<String>,
85
86    pub initialized_at_cluster_version: Option<String>,
87}
88
89impl IndexCatalog {
90    pub fn build_from(
91        index_prost: &PbIndex,
92        index_table: &Arc<TableCatalog>,
93        primary_table: &Arc<TableCatalog>,
94    ) -> Self {
95        let index_item: Vec<ExprImpl> = index_prost
96            .index_item
97            .iter()
98            .map(|expr| ExprImpl::from_expr_proto(expr).unwrap())
99            .map(|expr| item_rewriter::CompositeCastEliminator.rewrite_expr(expr))
100            .collect();
101
102        let index_type = match index_table.table_type {
103            TableType::Index => {
104                let primary_to_secondary_mapping: BTreeMap<usize, usize> = index_item
105                    .iter()
106                    .enumerate()
107                    .filter_map(|(i, expr)| match expr {
108                        ExprImpl::InputRef(input_ref) => Some((input_ref.index, i)),
109                        ExprImpl::FunctionCall(_) => None,
110                        _ => unreachable!(),
111                    })
112                    .collect();
113
114                let secondary_to_primary_mapping = BTreeMap::from_iter(
115                    primary_to_secondary_mapping
116                        .clone()
117                        .into_iter()
118                        .map(|(x, y)| (y, x)),
119                );
120
121                let function_mapping: HashMap<FunctionCall, usize> = index_item
122                    .iter()
123                    .enumerate()
124                    .filter_map(|(i, expr)| match expr {
125                        ExprImpl::InputRef(_) => None,
126                        ExprImpl::FunctionCall(func) => Some((func.deref().clone(), i)),
127                        _ => unreachable!(),
128                    })
129                    .collect();
130                IndexType::Table(Arc::new(TableIndex {
131                    name: index_prost.name.clone(),
132                    index_column_properties: index_prost.index_column_properties.clone(),
133                    index_columns_len: index_prost.index_columns_len,
134                    index_table: index_table.clone(),
135                    primary_table: primary_table.clone(),
136                    primary_to_secondary_mapping,
137                    secondary_to_primary_mapping,
138                    function_mapping,
139                }))
140            }
141            TableType::Table | TableType::MaterializedView | TableType::Internal => {
142                unreachable!()
143            }
144        };
145
146        IndexCatalog {
147            id: index_prost.id.into(),
148            name: index_prost.name.clone(),
149            index_item,
150            index_type,
151            primary_table: primary_table.clone(),
152            created_at_epoch: index_prost.created_at_epoch.map(Epoch::from),
153            initialized_at_epoch: index_prost.initialized_at_epoch.map(Epoch::from),
154            created_at_cluster_version: index_prost.created_at_cluster_version.clone(),
155            initialized_at_cluster_version: index_prost.initialized_at_cluster_version.clone(),
156        }
157    }
158}
159
160impl TableIndex {
161    pub fn primary_table_pk_ref_to_index_table(&self) -> Vec<ColumnOrder> {
162        let mapping = self.primary_to_secondary_mapping();
163
164        self.primary_table
165            .pk
166            .iter()
167            .map(|x| ColumnOrder::new(*mapping.get(&x.column_index).unwrap(), x.order_type))
168            .collect_vec()
169    }
170
171    pub fn primary_table_distribute_key_ref_to_index_table(&self) -> Vec<usize> {
172        let mapping = self.primary_to_secondary_mapping();
173
174        self.primary_table
175            .distribution_key
176            .iter()
177            .map(|x| *mapping.get(x).unwrap())
178            .collect_vec()
179    }
180
181    pub fn full_covering(&self) -> bool {
182        self.index_table.columns.len() == self.primary_table.columns.len()
183    }
184
185    /// A mapping maps the column index of the secondary index to the column index of the primary
186    /// table.
187    pub fn secondary_to_primary_mapping(&self) -> &BTreeMap<usize, usize> {
188        &self.secondary_to_primary_mapping
189    }
190
191    /// A mapping maps the column index of the primary table to the column index of the secondary
192    /// index.
193    pub fn primary_to_secondary_mapping(&self) -> &BTreeMap<usize, usize> {
194        &self.primary_to_secondary_mapping
195    }
196
197    pub fn function_mapping(&self) -> &HashMap<FunctionCall, usize> {
198        &self.function_mapping
199    }
200}
201
202impl IndexCatalog {
203    pub fn index_table(&self) -> &Arc<TableCatalog> {
204        match &self.index_type {
205            IndexType::Table(index) => &index.index_table,
206        }
207    }
208
209    /// Get the column properties of the index column.
210    pub fn get_column_properties(&self, column_idx: usize) -> Option<PbIndexColumnProperties> {
211        match &self.index_type {
212            IndexType::Table(index) => index.index_column_properties.get(column_idx).cloned(),
213        }
214    }
215
216    pub fn get_column_def(&self, column_idx: usize) -> Option<String> {
217        if let Some(col) = self.index_table().columns.get(column_idx) {
218            if col.is_hidden {
219                return None;
220            }
221        } else {
222            return None;
223        }
224        let expr_display = ExprDisplay {
225            expr: &self.index_item[column_idx],
226            input_schema: &Schema::new(
227                self.primary_table
228                    .columns
229                    .iter()
230                    .map(|col| Field::from(&col.column_desc))
231                    .collect_vec(),
232            ),
233        };
234
235        // TODO(Kexiang): Currently, extra info like ":Int32" introduced by `ExprDisplay` is kept for simplity.
236        // We'd better remove it in the future.
237        Some(expr_display.to_string())
238    }
239
240    pub fn display(&self) -> IndexDisplay {
241        let index_table = self.index_table().clone();
242        let index_columns_with_ordering = index_table
243            .pk
244            .iter()
245            .filter(|x| !index_table.columns[x.column_index].is_hidden)
246            .map(|x| {
247                let index_column_name = index_table.columns[x.column_index].name().to_owned();
248                format!("{} {}", index_column_name, x.order_type)
249            })
250            .collect_vec();
251
252        let pk_column_index_set = index_table
253            .pk
254            .iter()
255            .map(|x| x.column_index)
256            .collect::<HashSet<_>>();
257
258        let include_columns = index_table
259            .columns
260            .iter()
261            .enumerate()
262            .filter(|(i, _)| !pk_column_index_set.contains(i))
263            .filter(|(_, x)| !x.is_hidden)
264            .map(|(_, x)| x.name().to_owned())
265            .collect_vec();
266
267        let distributed_by_columns = index_table
268            .distribution_key
269            .iter()
270            .map(|&x| index_table.columns[x].name().to_owned())
271            .collect_vec();
272
273        IndexDisplay {
274            index_columns_with_ordering,
275            include_columns,
276            distributed_by_columns,
277        }
278    }
279}
280
281pub struct IndexDisplay {
282    pub index_columns_with_ordering: Vec<String>,
283    pub include_columns: Vec<String>,
284    pub distributed_by_columns: Vec<String>,
285}
286
287impl OwnedByUserCatalog for IndexCatalog {
288    fn owner(&self) -> UserId {
289        self.index_table().owner
290    }
291}
292
293mod item_rewriter {
294    use risingwave_pb::expr::expr_node;
295
296    use crate::expr::{Expr, ExprImpl, ExprRewriter, FunctionCall};
297
298    /// Rewrite the expression of index item to eliminate `CompositeCast`, if any. This is needed
299    /// if the type of a column was changed and there's functional index on it.
300    ///
301    /// # Example
302    ///
303    /// Imagine there's a table created with `CREATE TABLE t (v struct<a int, b int>)`.
304    /// Then we create an index on it with `CREATE INDEX idx ON t ((v).a)`, which will create an
305    /// index item `Field(InputRef(0), 0)`.
306    ///
307    /// If we alter the column with `ALTER TABLE t ALTER COLUMN v TYPE struct<x varchar, a int>`,
308    /// the meta service will wrap the `InputRef(0)` with a `CompositeCast` to maintain the correct
309    /// return type. The index item will now become `Field(CompositeCast(InputRef(0)), 0)`.
310    ///
311    /// `CompositeCast` is for internal use only, and cannot be constructed or executed. To allow
312    /// this functional index to work and be matched with user queries, we need to eliminate it
313    /// here. By comparing the input and output types of `CompositeCast` and matching the field id,
314    /// we can find the real `Field` index and rewrite it to `Field(InputRef(0), 1)`.
315    ///
316    /// Note that if the field is dropped, we will leave the index item as is. This makes the index
317    /// item invalid, and it will never be matched and used.
318    pub struct CompositeCastEliminator;
319
320    impl ExprRewriter for CompositeCastEliminator {
321        fn rewrite_function_call(&mut self, func_call: FunctionCall) -> ExprImpl {
322            let (func_type, inputs, ret) = func_call.decompose();
323
324            // Flatten consecutive `CompositeCast`.
325            if func_type == expr_node::Type::CompositeCast {
326                let child = inputs[0].clone();
327
328                if let Some(child) = child.as_function_call()
329                    && child.func_type() == expr_node::Type::CompositeCast
330                {
331                    let new_child = child.inputs()[0].clone();
332
333                    // If the type already matches, no need to wrap again.
334                    // Recursively eliminate more composite cast by calling rewrite again.
335                    if new_child.return_type() == ret {
336                        return self.rewrite_expr(new_child);
337                    } else {
338                        let new_composite_cast =
339                            FunctionCall::new_unchecked(func_type, vec![new_child], ret);
340                        return self.rewrite_function_call(new_composite_cast);
341                    }
342                }
343            }
344            // Rewrite `Field(CompositeCast(x), y)` to `Field(x, y')`.
345            // TODO: also support rewriting `ArrayAccess` and `MapAccess`.
346            else if func_type == expr_node::Type::Field {
347                let child = inputs[0].clone();
348
349                if let Some(child) = child.as_function_call()
350                    && child.func_type() == expr_node::Type::CompositeCast
351                {
352                    let index = (inputs[1].clone().into_literal().unwrap())
353                        .get_data()
354                        .clone()
355                        .unwrap()
356                        .into_int32();
357
358                    let struct_type = child.return_type().into_struct();
359                    let field_id = struct_type
360                        .id_at(index as usize)
361                        .expect("ids should be set");
362
363                    // Unwrap the composite cast.
364                    let new_child = child.inputs()[0].clone();
365                    let new_struct_type = new_child.return_type().into_struct();
366
367                    let Some(new_index) = new_struct_type
368                        .ids()
369                        .expect("ids should be set")
370                        .position(|x| x == field_id)
371                    else {
372                        // Previously we have index on this field, but now it's dropped.
373                        // As a result, this entire index item becomes invalid.
374                        // Simply leave it as is. Users cannot construct a `CompositeCast` (which is
375                        // not user-facing), thus this index item will never be matched and used.
376                        return FunctionCall::new_unchecked(func_type, inputs, ret).into();
377                    };
378                    let new_index = ExprImpl::literal_int(new_index as i32);
379
380                    let new_inputs = vec![new_child, new_index];
381                    let new_field_call = FunctionCall::new_unchecked(func_type, new_inputs, ret);
382
383                    // Recursively eliminate more composite cast by calling rewrite again.
384                    return self.rewrite_function_call(new_field_call);
385                }
386            }
387
388            let inputs = inputs
389                .into_iter()
390                .map(|expr| self.rewrite_expr(expr))
391                .collect();
392            FunctionCall::new_unchecked(func_type, inputs, ret).into()
393        }
394    }
395}