risingwave_frontend/catalog/
index_catalog.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::ops::Deref;
17use std::sync::Arc;
18
19use educe::Educe;
20use itertools::Itertools;
21use risingwave_common::catalog::{ColumnDesc, Field, IndexId, Schema};
22use risingwave_common::util::epoch::Epoch;
23use risingwave_common::util::sort_util::ColumnOrder;
24use risingwave_pb::catalog::{PbIndex, PbIndexColumnProperties, PbVectorIndexInfo};
25
26use crate::catalog::table_catalog::TableType;
27use crate::catalog::{OwnedByUserCatalog, TableCatalog};
28use crate::expr::{ExprDisplay, ExprImpl, ExprRewriter as _, FunctionCall};
29use crate::user::UserId;
30
31#[derive(Clone, Debug, Educe)]
32#[educe(PartialEq, Eq, Hash)]
33pub struct TableIndex {
34    pub name: String,
35    /// The properties of the index columns.
36    /// <https://www.postgresql.org/docs/current/functions-info.html#FUNCTIONS-INFO-INDEX-COLUMN-PROPS>
37    pub index_column_properties: Vec<PbIndexColumnProperties>,
38
39    pub index_table: Arc<TableCatalog>,
40
41    pub primary_table: Arc<TableCatalog>,
42
43    pub primary_to_secondary_mapping: BTreeMap<usize, usize>,
44
45    pub secondary_to_primary_mapping: BTreeMap<usize, usize>,
46
47    /// Map function call from the primary table to the index table.
48    /// Use `HashMap` instead of `BTreeMap`, because `FunctionCall` can't be used as the key for
49    /// `BTreeMap`. BTW, the trait `std::hash::Hash` is not implemented for
50    /// `HashMap<function_call::FunctionCall, usize>`, so we need to ignore it. It will not
51    /// affect the correctness, since it can be derived by `index_item`.
52    #[educe(PartialEq(ignore))]
53    #[educe(Hash(ignore))]
54    pub function_mapping: HashMap<FunctionCall, usize>,
55
56    pub index_columns_len: u32,
57}
58
59#[derive(Clone, Debug, PartialEq, Eq, Educe)]
60#[educe(Hash)]
61pub struct VectorIndex {
62    pub index_table: Arc<TableCatalog>,
63    pub vector_expr: ExprImpl,
64    #[educe(Hash(ignore))]
65    pub primary_to_included_info_column_mapping: HashMap<usize, usize>,
66    pub primary_key_idx_in_info_columns: Vec<usize>,
67    pub included_info_columns: Vec<usize>,
68    pub vector_index_info: PbVectorIndexInfo,
69}
70
71impl VectorIndex {
72    pub fn info_column_desc(&self) -> Vec<ColumnDesc> {
73        self.index_table.columns[1..=self.included_info_columns.len()]
74            .iter()
75            .map(|col| col.column_desc.clone())
76            .collect()
77    }
78}
79
80#[derive(Clone, Debug, Hash, PartialEq, Eq)]
81pub enum IndexType {
82    Table(Arc<TableIndex>),
83    Vector(Arc<VectorIndex>),
84}
85
86#[derive(Clone, Debug, Hash, PartialEq, Eq)]
87pub struct IndexCatalog {
88    pub id: IndexId,
89
90    pub name: String,
91
92    /// Only `InputRef` and `FuncCall` type index is supported Now.
93    /// The index of `InputRef` is the column index of the primary table.
94    /// The `index_item` size is equal to the index table columns size
95    /// The input args of `FuncCall` is also the column index of the primary table.
96    pub index_item: Vec<ExprImpl>,
97
98    pub index_type: IndexType,
99
100    pub primary_table: Arc<TableCatalog>,
101
102    pub created_at_epoch: Option<Epoch>,
103
104    pub initialized_at_epoch: Option<Epoch>,
105
106    pub created_at_cluster_version: Option<String>,
107
108    pub initialized_at_cluster_version: Option<String>,
109}
110
111impl IndexCatalog {
112    pub fn build_from(
113        index_prost: &PbIndex,
114        index_table: &Arc<TableCatalog>,
115        primary_table: &Arc<TableCatalog>,
116    ) -> Self {
117        let index_item: Vec<ExprImpl> = index_prost
118            .index_item
119            .iter()
120            .map(|expr| ExprImpl::from_expr_proto(expr).unwrap())
121            .map(|expr| item_rewriter::CompositeCastEliminator.rewrite_expr(expr))
122            .collect();
123
124        let index_type = match index_table.table_type {
125            TableType::Index => {
126                let primary_to_secondary_mapping: BTreeMap<usize, usize> = index_item
127                    .iter()
128                    .enumerate()
129                    .filter_map(|(i, expr)| match expr {
130                        ExprImpl::InputRef(input_ref) => Some((input_ref.index, i)),
131                        ExprImpl::FunctionCall(_) => None,
132                        _ => unreachable!(),
133                    })
134                    .collect();
135
136                let secondary_to_primary_mapping = BTreeMap::from_iter(
137                    primary_to_secondary_mapping
138                        .clone()
139                        .into_iter()
140                        .map(|(x, y)| (y, x)),
141                );
142
143                let function_mapping: HashMap<FunctionCall, usize> = index_item
144                    .iter()
145                    .enumerate()
146                    .filter_map(|(i, expr)| match expr {
147                        ExprImpl::InputRef(_) => None,
148                        ExprImpl::FunctionCall(func) => Some((func.deref().clone(), i)),
149                        _ => unreachable!(),
150                    })
151                    .collect();
152                IndexType::Table(Arc::new(TableIndex {
153                    name: index_prost.name.clone(),
154                    index_column_properties: index_prost.index_column_properties.clone(),
155                    index_columns_len: index_prost.index_columns_len,
156                    index_table: index_table.clone(),
157                    primary_table: primary_table.clone(),
158                    primary_to_secondary_mapping,
159                    secondary_to_primary_mapping,
160                    function_mapping,
161                }))
162            }
163            TableType::VectorIndex => {
164                assert_eq!(index_prost.index_columns_len, 1);
165                let included_info_columns = index_item[1..].iter().map(|item| {
166                    let ExprImpl::InputRef(input) = item else {
167                        panic!("vector index included columns must be from direct input column, but got: {:?}", item);
168                    };
169                    input.index
170                }).collect_vec();
171                let primary_to_included_info_column_mapping: HashMap<_, _> = included_info_columns
172                    .iter()
173                    .enumerate()
174                    .map(|(included_info_column_idx, primary_column_idx)| {
175                        (*primary_column_idx, included_info_column_idx)
176                    })
177                    .collect();
178                let primary_key_idx_in_info_columns = primary_table
179                    .pk()
180                    .iter()
181                    .map(|order| primary_to_included_info_column_mapping[&order.column_index])
182                    .collect();
183                IndexType::Vector(Arc::new(VectorIndex {
184                    index_table: index_table.clone(),
185                    vector_expr: index_item[0].clone(),
186                    primary_to_included_info_column_mapping,
187                    primary_key_idx_in_info_columns,
188                    included_info_columns,
189                    vector_index_info: index_table
190                        .vector_index_info
191                        .expect("should exist for vector index"),
192                }))
193            }
194            TableType::Table | TableType::MaterializedView | TableType::Internal => {
195                unreachable!()
196            }
197        };
198
199        IndexCatalog {
200            id: index_prost.id.into(),
201            name: index_prost.name.clone(),
202            index_item,
203            index_type,
204            primary_table: primary_table.clone(),
205            created_at_epoch: index_prost.created_at_epoch.map(Epoch::from),
206            initialized_at_epoch: index_prost.initialized_at_epoch.map(Epoch::from),
207            created_at_cluster_version: index_prost.created_at_cluster_version.clone(),
208            initialized_at_cluster_version: index_prost.initialized_at_cluster_version.clone(),
209        }
210    }
211}
212
213impl TableIndex {
214    pub fn primary_table_pk_ref_to_index_table(&self) -> Vec<ColumnOrder> {
215        let mapping = self.primary_to_secondary_mapping();
216
217        self.primary_table
218            .pk
219            .iter()
220            .map(|x| ColumnOrder::new(*mapping.get(&x.column_index).unwrap(), x.order_type))
221            .collect_vec()
222    }
223
224    pub fn primary_table_distribute_key_ref_to_index_table(&self) -> Vec<usize> {
225        let mapping = self.primary_to_secondary_mapping();
226
227        self.primary_table
228            .distribution_key
229            .iter()
230            .map(|x| *mapping.get(x).unwrap())
231            .collect_vec()
232    }
233
234    pub fn full_covering(&self) -> bool {
235        self.index_table.columns.len() == self.primary_table.columns.len()
236    }
237
238    /// A mapping maps the column index of the secondary index to the column index of the primary
239    /// table.
240    pub fn secondary_to_primary_mapping(&self) -> &BTreeMap<usize, usize> {
241        &self.secondary_to_primary_mapping
242    }
243
244    /// A mapping maps the column index of the primary table to the column index of the secondary
245    /// index.
246    pub fn primary_to_secondary_mapping(&self) -> &BTreeMap<usize, usize> {
247        &self.primary_to_secondary_mapping
248    }
249
250    pub fn function_mapping(&self) -> &HashMap<FunctionCall, usize> {
251        &self.function_mapping
252    }
253}
254
255impl IndexCatalog {
256    pub fn index_table(&self) -> &Arc<TableCatalog> {
257        match &self.index_type {
258            IndexType::Table(index) => &index.index_table,
259            IndexType::Vector(index) => &index.index_table,
260        }
261    }
262
263    /// Get the column properties of the index column.
264    pub fn get_column_properties(&self, column_idx: usize) -> Option<PbIndexColumnProperties> {
265        match &self.index_type {
266            IndexType::Table(index) => index.index_column_properties.get(column_idx).cloned(),
267            IndexType::Vector { .. } => {
268                if column_idx == 0 {
269                    // return with the default value defined in [https://www.postgresql.org/docs/current/sql-createindex.html]
270                    Some(PbIndexColumnProperties {
271                        is_desc: false,
272                        nulls_first: false,
273                    })
274                } else {
275                    None
276                }
277            }
278        }
279    }
280
281    pub fn get_column_def(&self, column_idx: usize) -> Option<String> {
282        if let Some(col) = self.index_table().columns.get(column_idx) {
283            if col.is_hidden {
284                return None;
285            }
286        } else {
287            return None;
288        }
289        let expr_display = ExprDisplay {
290            expr: &self.index_item[column_idx],
291            input_schema: &Schema::new(
292                self.primary_table
293                    .columns
294                    .iter()
295                    .map(|col| Field::from(&col.column_desc))
296                    .collect_vec(),
297            ),
298        };
299
300        // TODO(Kexiang): Currently, extra info like ":Int32" introduced by `ExprDisplay` is kept for simplity.
301        // We'd better remove it in the future.
302        Some(expr_display.to_string())
303    }
304
305    pub fn display(&self) -> IndexDisplay {
306        let index_table = self.index_table().clone();
307        let index_columns_with_ordering = index_table
308            .pk
309            .iter()
310            .filter(|x| !index_table.columns[x.column_index].is_hidden)
311            .map(|x| {
312                let index_column_name = index_table.columns[x.column_index].name().to_owned();
313                format!("{} {}", index_column_name, x.order_type)
314            })
315            .collect_vec();
316
317        let pk_column_index_set = index_table
318            .pk
319            .iter()
320            .map(|x| x.column_index)
321            .collect::<HashSet<_>>();
322
323        let include_columns = index_table
324            .columns
325            .iter()
326            .enumerate()
327            .filter(|(i, _)| !pk_column_index_set.contains(i))
328            .filter(|(_, x)| !x.is_hidden)
329            .map(|(_, x)| x.name().to_owned())
330            .collect_vec();
331
332        let distributed_by_columns = index_table
333            .distribution_key
334            .iter()
335            .map(|&x| index_table.columns[x].name().to_owned())
336            .collect_vec();
337
338        IndexDisplay {
339            index_columns_with_ordering,
340            include_columns,
341            distributed_by_columns,
342        }
343    }
344
345    pub fn is_created(&self) -> bool {
346        self.index_table().is_created()
347    }
348}
349
350pub struct IndexDisplay {
351    pub index_columns_with_ordering: Vec<String>,
352    pub include_columns: Vec<String>,
353    pub distributed_by_columns: Vec<String>,
354}
355
356impl OwnedByUserCatalog for IndexCatalog {
357    fn owner(&self) -> UserId {
358        self.index_table().owner
359    }
360}
361
362mod item_rewriter {
363    use risingwave_pb::expr::expr_node;
364
365    use crate::expr::{Expr, ExprImpl, ExprRewriter, FunctionCall};
366
367    /// Rewrite the expression of index item to eliminate `CompositeCast`, if any. This is needed
368    /// if the type of a column was changed and there's functional index on it.
369    ///
370    /// # Example
371    ///
372    /// Imagine there's a table created with `CREATE TABLE t (v struct<a int, b int>)`.
373    /// Then we create an index on it with `CREATE INDEX idx ON t ((v).a)`, which will create an
374    /// index item `Field(InputRef(0), 0)`.
375    ///
376    /// If we alter the column with `ALTER TABLE t ALTER COLUMN v TYPE struct<x varchar, a int>`,
377    /// the meta service will wrap the `InputRef(0)` with a `CompositeCast` to maintain the correct
378    /// return type. The index item will now become `Field(CompositeCast(InputRef(0)), 0)`.
379    ///
380    /// `CompositeCast` is for internal use only, and cannot be constructed or executed. To allow
381    /// this functional index to work and be matched with user queries, we need to eliminate it
382    /// here. By comparing the input and output types of `CompositeCast` and matching the field id,
383    /// we can find the real `Field` index and rewrite it to `Field(InputRef(0), 1)`.
384    ///
385    /// Note that if the field is dropped, we will leave the index item as is. This makes the index
386    /// item invalid, and it will never be matched and used.
387    pub struct CompositeCastEliminator;
388
389    impl ExprRewriter for CompositeCastEliminator {
390        fn rewrite_function_call(&mut self, func_call: FunctionCall) -> ExprImpl {
391            let (func_type, inputs, ret) = func_call.decompose();
392
393            // Flatten consecutive `CompositeCast`.
394            if func_type == expr_node::Type::CompositeCast {
395                let child = inputs[0].clone();
396
397                if let Some(child) = child.as_function_call()
398                    && child.func_type() == expr_node::Type::CompositeCast
399                {
400                    let new_child = child.inputs()[0].clone();
401
402                    // If the type already matches, no need to wrap again.
403                    // Recursively eliminate more composite cast by calling rewrite again.
404                    if new_child.return_type() == ret {
405                        return self.rewrite_expr(new_child);
406                    } else {
407                        let new_composite_cast =
408                            FunctionCall::new_unchecked(func_type, vec![new_child], ret);
409                        return self.rewrite_function_call(new_composite_cast);
410                    }
411                }
412            }
413            // Rewrite `Field(CompositeCast(x), y)` to `Field(x, y')`.
414            // TODO: also support rewriting `ArrayAccess` and `MapAccess`.
415            else if func_type == expr_node::Type::Field {
416                let child = inputs[0].clone();
417
418                if let Some(child) = child.as_function_call()
419                    && child.func_type() == expr_node::Type::CompositeCast
420                {
421                    let index = (inputs[1].clone().into_literal().unwrap())
422                        .get_data()
423                        .clone()
424                        .unwrap()
425                        .into_int32();
426
427                    let struct_type = child.return_type().into_struct();
428                    let field_id = struct_type
429                        .id_at(index as usize)
430                        .expect("ids should be set");
431
432                    // Unwrap the composite cast.
433                    let new_child = child.inputs()[0].clone();
434                    let new_struct_type = new_child.return_type().into_struct();
435
436                    let Some(new_index) = new_struct_type
437                        .ids()
438                        .expect("ids should be set")
439                        .position(|x| x == field_id)
440                    else {
441                        // Previously we have index on this field, but now it's dropped.
442                        // As a result, this entire index item becomes invalid.
443                        // Simply leave it as is. Users cannot construct a `CompositeCast` (which is
444                        // not user-facing), thus this index item will never be matched and used.
445                        return FunctionCall::new_unchecked(func_type, inputs, ret).into();
446                    };
447                    let new_index = ExprImpl::literal_int(new_index as i32);
448
449                    let new_inputs = vec![new_child, new_index];
450                    let new_field_call = FunctionCall::new_unchecked(func_type, new_inputs, ret);
451
452                    // Recursively eliminate more composite cast by calling rewrite again.
453                    return self.rewrite_function_call(new_field_call);
454                }
455            }
456
457            let inputs = inputs
458                .into_iter()
459                .map(|expr| self.rewrite_expr(expr))
460                .collect();
461            FunctionCall::new_unchecked(func_type, inputs, ret).into()
462        }
463    }
464}