risingwave_frontend/catalog/
index_catalog.rs1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::ops::Deref;
17use std::sync::Arc;
18
19use educe::Educe;
20use itertools::Itertools;
21use risingwave_common::catalog::{ColumnDesc, Field, IndexId, Schema};
22use risingwave_common::util::epoch::Epoch;
23use risingwave_common::util::sort_util::ColumnOrder;
24use risingwave_pb::catalog::{PbIndex, PbIndexColumnProperties, PbVectorIndexInfo};
25
26use crate::catalog::table_catalog::TableType;
27use crate::catalog::{OwnedByUserCatalog, TableCatalog};
28use crate::expr::{ExprDisplay, ExprImpl, ExprRewriter as _, FunctionCall};
29use crate::user::UserId;
30
31#[derive(Clone, Debug, Educe)]
32#[educe(PartialEq, Eq, Hash)]
33pub struct TableIndex {
34 pub name: String,
35 pub index_column_properties: Vec<PbIndexColumnProperties>,
38
39 pub index_table: Arc<TableCatalog>,
40
41 pub primary_table: Arc<TableCatalog>,
42
43 pub primary_to_secondary_mapping: BTreeMap<usize, usize>,
44
45 pub secondary_to_primary_mapping: BTreeMap<usize, usize>,
46
47 #[educe(PartialEq(ignore))]
53 #[educe(Hash(ignore))]
54 pub function_mapping: HashMap<FunctionCall, usize>,
55
56 pub index_columns_len: u32,
57}
58
59#[derive(Clone, Debug, PartialEq, Eq, Educe)]
60#[educe(Hash)]
61pub struct VectorIndex {
62 pub index_table: Arc<TableCatalog>,
63 pub vector_expr: ExprImpl,
64 #[educe(Hash(ignore))]
65 pub primary_to_included_info_column_mapping: HashMap<usize, usize>,
66 pub primary_key_idx_in_info_columns: Vec<usize>,
67 pub included_info_columns: Vec<usize>,
68 pub vector_index_info: PbVectorIndexInfo,
69}
70
71impl VectorIndex {
72 pub fn info_column_desc(&self) -> Vec<ColumnDesc> {
73 self.index_table.columns[1..=self.included_info_columns.len()]
74 .iter()
75 .map(|col| col.column_desc.clone())
76 .collect()
77 }
78}
79
80#[derive(Clone, Debug, Hash, PartialEq, Eq)]
81pub enum IndexType {
82 Table(Arc<TableIndex>),
83 Vector(Arc<VectorIndex>),
84}
85
86#[derive(Clone, Debug, Hash, PartialEq, Eq)]
87pub struct IndexCatalog {
88 pub id: IndexId,
89
90 pub name: String,
91
92 pub index_item: Vec<ExprImpl>,
97
98 pub index_type: IndexType,
99
100 pub primary_table: Arc<TableCatalog>,
101
102 pub created_at_epoch: Option<Epoch>,
103
104 pub initialized_at_epoch: Option<Epoch>,
105
106 pub created_at_cluster_version: Option<String>,
107
108 pub initialized_at_cluster_version: Option<String>,
109}
110
111impl IndexCatalog {
112 pub fn build_from(
113 index_prost: &PbIndex,
114 index_table: &Arc<TableCatalog>,
115 primary_table: &Arc<TableCatalog>,
116 ) -> Self {
117 let index_item: Vec<ExprImpl> = index_prost
118 .index_item
119 .iter()
120 .map(|expr| ExprImpl::from_expr_proto(expr).unwrap())
121 .map(|expr| item_rewriter::CompositeCastEliminator.rewrite_expr(expr))
122 .collect();
123
124 let index_type = match index_table.table_type {
125 TableType::Index => {
126 let primary_to_secondary_mapping: BTreeMap<usize, usize> = index_item
127 .iter()
128 .enumerate()
129 .filter_map(|(i, expr)| match expr {
130 ExprImpl::InputRef(input_ref) => Some((input_ref.index, i)),
131 ExprImpl::FunctionCall(_) => None,
132 _ => unreachable!(),
133 })
134 .collect();
135
136 let secondary_to_primary_mapping = BTreeMap::from_iter(
137 primary_to_secondary_mapping
138 .clone()
139 .into_iter()
140 .map(|(x, y)| (y, x)),
141 );
142
143 let function_mapping: HashMap<FunctionCall, usize> = index_item
144 .iter()
145 .enumerate()
146 .filter_map(|(i, expr)| match expr {
147 ExprImpl::InputRef(_) => None,
148 ExprImpl::FunctionCall(func) => Some((func.deref().clone(), i)),
149 _ => unreachable!(),
150 })
151 .collect();
152 IndexType::Table(Arc::new(TableIndex {
153 name: index_prost.name.clone(),
154 index_column_properties: index_prost.index_column_properties.clone(),
155 index_columns_len: index_prost.index_columns_len,
156 index_table: index_table.clone(),
157 primary_table: primary_table.clone(),
158 primary_to_secondary_mapping,
159 secondary_to_primary_mapping,
160 function_mapping,
161 }))
162 }
163 TableType::VectorIndex => {
164 assert_eq!(index_prost.index_columns_len, 1);
165 let included_info_columns = index_item[1..].iter().map(|item| {
166 let ExprImpl::InputRef(input) = item else {
167 panic!("vector index included columns must be from direct input column, but got: {:?}", item);
168 };
169 input.index
170 }).collect_vec();
171 let primary_to_included_info_column_mapping: HashMap<_, _> = included_info_columns
172 .iter()
173 .enumerate()
174 .map(|(included_info_column_idx, primary_column_idx)| {
175 (*primary_column_idx, included_info_column_idx)
176 })
177 .collect();
178 let primary_key_idx_in_info_columns = primary_table
179 .pk()
180 .iter()
181 .map(|order| primary_to_included_info_column_mapping[&order.column_index])
182 .collect();
183 IndexType::Vector(Arc::new(VectorIndex {
184 index_table: index_table.clone(),
185 vector_expr: index_item[0].clone(),
186 primary_to_included_info_column_mapping,
187 primary_key_idx_in_info_columns,
188 included_info_columns,
189 vector_index_info: index_table
190 .vector_index_info
191 .expect("should exist for vector index"),
192 }))
193 }
194 TableType::Table | TableType::MaterializedView | TableType::Internal => {
195 unreachable!()
196 }
197 };
198
199 IndexCatalog {
200 id: index_prost.id.into(),
201 name: index_prost.name.clone(),
202 index_item,
203 index_type,
204 primary_table: primary_table.clone(),
205 created_at_epoch: index_prost.created_at_epoch.map(Epoch::from),
206 initialized_at_epoch: index_prost.initialized_at_epoch.map(Epoch::from),
207 created_at_cluster_version: index_prost.created_at_cluster_version.clone(),
208 initialized_at_cluster_version: index_prost.initialized_at_cluster_version.clone(),
209 }
210 }
211}
212
213impl TableIndex {
214 pub fn primary_table_pk_ref_to_index_table(&self) -> Vec<ColumnOrder> {
215 let mapping = self.primary_to_secondary_mapping();
216
217 self.primary_table
218 .pk
219 .iter()
220 .map(|x| ColumnOrder::new(*mapping.get(&x.column_index).unwrap(), x.order_type))
221 .collect_vec()
222 }
223
224 pub fn primary_table_distribute_key_ref_to_index_table(&self) -> Vec<usize> {
225 let mapping = self.primary_to_secondary_mapping();
226
227 self.primary_table
228 .distribution_key
229 .iter()
230 .map(|x| *mapping.get(x).unwrap())
231 .collect_vec()
232 }
233
234 pub fn full_covering(&self) -> bool {
235 self.index_table.columns.len() == self.primary_table.columns.len()
236 }
237
238 pub fn secondary_to_primary_mapping(&self) -> &BTreeMap<usize, usize> {
241 &self.secondary_to_primary_mapping
242 }
243
244 pub fn primary_to_secondary_mapping(&self) -> &BTreeMap<usize, usize> {
247 &self.primary_to_secondary_mapping
248 }
249
250 pub fn function_mapping(&self) -> &HashMap<FunctionCall, usize> {
251 &self.function_mapping
252 }
253}
254
255impl IndexCatalog {
256 pub fn index_table(&self) -> &Arc<TableCatalog> {
257 match &self.index_type {
258 IndexType::Table(index) => &index.index_table,
259 IndexType::Vector(index) => &index.index_table,
260 }
261 }
262
263 pub fn get_column_properties(&self, column_idx: usize) -> Option<PbIndexColumnProperties> {
265 match &self.index_type {
266 IndexType::Table(index) => index.index_column_properties.get(column_idx).cloned(),
267 IndexType::Vector { .. } => {
268 if column_idx == 0 {
269 Some(PbIndexColumnProperties {
271 is_desc: false,
272 nulls_first: false,
273 })
274 } else {
275 None
276 }
277 }
278 }
279 }
280
281 pub fn get_column_def(&self, column_idx: usize) -> Option<String> {
282 if let Some(col) = self.index_table().columns.get(column_idx) {
283 if col.is_hidden {
284 return None;
285 }
286 } else {
287 return None;
288 }
289 let expr_display = ExprDisplay {
290 expr: &self.index_item[column_idx],
291 input_schema: &Schema::new(
292 self.primary_table
293 .columns
294 .iter()
295 .map(|col| Field::from(&col.column_desc))
296 .collect_vec(),
297 ),
298 };
299
300 Some(expr_display.to_string())
303 }
304
305 pub fn display(&self) -> IndexDisplay {
306 let index_table = self.index_table().clone();
307 let index_columns_with_ordering = index_table
308 .pk
309 .iter()
310 .filter(|x| !index_table.columns[x.column_index].is_hidden)
311 .map(|x| {
312 let index_column_name = index_table.columns[x.column_index].name().to_owned();
313 format!("{} {}", index_column_name, x.order_type)
314 })
315 .collect_vec();
316
317 let pk_column_index_set = index_table
318 .pk
319 .iter()
320 .map(|x| x.column_index)
321 .collect::<HashSet<_>>();
322
323 let include_columns = index_table
324 .columns
325 .iter()
326 .enumerate()
327 .filter(|(i, _)| !pk_column_index_set.contains(i))
328 .filter(|(_, x)| !x.is_hidden)
329 .map(|(_, x)| x.name().to_owned())
330 .collect_vec();
331
332 let distributed_by_columns = index_table
333 .distribution_key
334 .iter()
335 .map(|&x| index_table.columns[x].name().to_owned())
336 .collect_vec();
337
338 IndexDisplay {
339 index_columns_with_ordering,
340 include_columns,
341 distributed_by_columns,
342 }
343 }
344
345 pub fn is_created(&self) -> bool {
346 self.index_table().is_created()
347 }
348}
349
350pub struct IndexDisplay {
351 pub index_columns_with_ordering: Vec<String>,
352 pub include_columns: Vec<String>,
353 pub distributed_by_columns: Vec<String>,
354}
355
356impl OwnedByUserCatalog for IndexCatalog {
357 fn owner(&self) -> UserId {
358 self.index_table().owner
359 }
360}
361
362mod item_rewriter {
363 use risingwave_pb::expr::expr_node;
364
365 use crate::expr::{Expr, ExprImpl, ExprRewriter, FunctionCall};
366
367 pub struct CompositeCastEliminator;
388
389 impl ExprRewriter for CompositeCastEliminator {
390 fn rewrite_function_call(&mut self, func_call: FunctionCall) -> ExprImpl {
391 let (func_type, inputs, ret) = func_call.decompose();
392
393 if func_type == expr_node::Type::CompositeCast {
395 let child = inputs[0].clone();
396
397 if let Some(child) = child.as_function_call()
398 && child.func_type() == expr_node::Type::CompositeCast
399 {
400 let new_child = child.inputs()[0].clone();
401
402 if new_child.return_type() == ret {
405 return self.rewrite_expr(new_child);
406 } else {
407 let new_composite_cast =
408 FunctionCall::new_unchecked(func_type, vec![new_child], ret);
409 return self.rewrite_function_call(new_composite_cast);
410 }
411 }
412 }
413 else if func_type == expr_node::Type::Field {
416 let child = inputs[0].clone();
417
418 if let Some(child) = child.as_function_call()
419 && child.func_type() == expr_node::Type::CompositeCast
420 {
421 let index = (inputs[1].clone().into_literal().unwrap())
422 .get_data()
423 .clone()
424 .unwrap()
425 .into_int32();
426
427 let struct_type = child.return_type().into_struct();
428 let field_id = struct_type
429 .id_at(index as usize)
430 .expect("ids should be set");
431
432 let new_child = child.inputs()[0].clone();
434 let new_struct_type = new_child.return_type().into_struct();
435
436 let Some(new_index) = new_struct_type
437 .ids()
438 .expect("ids should be set")
439 .position(|x| x == field_id)
440 else {
441 return FunctionCall::new_unchecked(func_type, inputs, ret).into();
446 };
447 let new_index = ExprImpl::literal_int(new_index as i32);
448
449 let new_inputs = vec![new_child, new_index];
450 let new_field_call = FunctionCall::new_unchecked(func_type, new_inputs, ret);
451
452 return self.rewrite_function_call(new_field_call);
454 }
455 }
456
457 let inputs = inputs
458 .into_iter()
459 .map(|expr| self.rewrite_expr(expr))
460 .collect();
461 FunctionCall::new_unchecked(func_type, inputs, ret).into()
462 }
463 }
464}