risingwave_frontend/catalog/
index_catalog.rs1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::ops::Deref;
17use std::sync::Arc;
18
19use educe::Educe;
20use itertools::Itertools;
21use risingwave_common::catalog::{Field, IndexId, Schema};
22use risingwave_common::util::epoch::Epoch;
23use risingwave_common::util::sort_util::ColumnOrder;
24use risingwave_pb::catalog::{PbIndex, PbIndexColumnProperties, PbVectorIndexInfo};
25
26use crate::catalog::table_catalog::TableType;
27use crate::catalog::{OwnedByUserCatalog, TableCatalog};
28use crate::expr::{ExprDisplay, ExprImpl, ExprRewriter as _, FunctionCall};
29use crate::user::UserId;
30
31#[derive(Clone, Debug, Educe)]
32#[educe(PartialEq, Eq, Hash)]
33pub struct TableIndex {
34 pub name: String,
35 pub index_column_properties: Vec<PbIndexColumnProperties>,
38
39 pub index_table: Arc<TableCatalog>,
40
41 pub primary_table: Arc<TableCatalog>,
42
43 pub primary_to_secondary_mapping: BTreeMap<usize, usize>,
44
45 pub secondary_to_primary_mapping: BTreeMap<usize, usize>,
46
47 #[educe(PartialEq(ignore))]
53 #[educe(Hash(ignore))]
54 pub function_mapping: HashMap<FunctionCall, usize>,
55
56 pub index_columns_len: u32,
57}
58
59#[derive(Clone, Debug, PartialEq, Eq, Educe)]
60#[educe(Hash)]
61pub struct VectorIndex {
62 pub index_table: Arc<TableCatalog>,
63 pub vector_expr: ExprImpl,
64 #[educe(Hash(ignore))]
65 pub primary_to_included_info_column_mapping: HashMap<usize, usize>,
66 pub primary_key_idx_in_info_columns: Vec<usize>,
67 pub included_info_columns: Vec<usize>,
68 pub vector_index_info: PbVectorIndexInfo,
69}
70
71#[derive(Clone, Debug, Hash, PartialEq, Eq)]
72pub enum IndexType {
73 Table(Arc<TableIndex>),
74 Vector(Arc<VectorIndex>),
75}
76
77#[derive(Clone, Debug, Hash, PartialEq, Eq)]
78pub struct IndexCatalog {
79 pub id: IndexId,
80
81 pub name: String,
82
83 pub index_item: Vec<ExprImpl>,
88
89 pub index_type: IndexType,
90
91 pub primary_table: Arc<TableCatalog>,
92
93 pub created_at_epoch: Option<Epoch>,
94
95 pub initialized_at_epoch: Option<Epoch>,
96
97 pub created_at_cluster_version: Option<String>,
98
99 pub initialized_at_cluster_version: Option<String>,
100}
101
102impl IndexCatalog {
103 pub fn build_from(
104 index_prost: &PbIndex,
105 index_table: &Arc<TableCatalog>,
106 primary_table: &Arc<TableCatalog>,
107 ) -> Self {
108 let index_item: Vec<ExprImpl> = index_prost
109 .index_item
110 .iter()
111 .map(|expr| ExprImpl::from_expr_proto(expr).unwrap())
112 .map(|expr| item_rewriter::CompositeCastEliminator.rewrite_expr(expr))
113 .collect();
114
115 let index_type = match index_table.table_type {
116 TableType::Index => {
117 let primary_to_secondary_mapping: BTreeMap<usize, usize> = index_item
118 .iter()
119 .enumerate()
120 .filter_map(|(i, expr)| match expr {
121 ExprImpl::InputRef(input_ref) => Some((input_ref.index, i)),
122 ExprImpl::FunctionCall(_) => None,
123 _ => unreachable!(),
124 })
125 .collect();
126
127 let secondary_to_primary_mapping = BTreeMap::from_iter(
128 primary_to_secondary_mapping
129 .clone()
130 .into_iter()
131 .map(|(x, y)| (y, x)),
132 );
133
134 let function_mapping: HashMap<FunctionCall, usize> = index_item
135 .iter()
136 .enumerate()
137 .filter_map(|(i, expr)| match expr {
138 ExprImpl::InputRef(_) => None,
139 ExprImpl::FunctionCall(func) => Some((func.deref().clone(), i)),
140 _ => unreachable!(),
141 })
142 .collect();
143 IndexType::Table(Arc::new(TableIndex {
144 name: index_prost.name.clone(),
145 index_column_properties: index_prost.index_column_properties.clone(),
146 index_columns_len: index_prost.index_columns_len,
147 index_table: index_table.clone(),
148 primary_table: primary_table.clone(),
149 primary_to_secondary_mapping,
150 secondary_to_primary_mapping,
151 function_mapping,
152 }))
153 }
154 TableType::VectorIndex => {
155 assert_eq!(index_prost.index_columns_len, 1);
156 let included_info_columns = index_item[1..].iter().map(|item| {
157 let ExprImpl::InputRef(input) = item else {
158 panic!("vector index included columns must be from direct input column, but got: {:?}", item);
159 };
160 input.index
161 }).collect_vec();
162 let primary_to_included_info_column_mapping: HashMap<_, _> = included_info_columns
163 .iter()
164 .enumerate()
165 .map(|(included_info_column_idx, primary_column_idx)| {
166 (*primary_column_idx, included_info_column_idx)
167 })
168 .collect();
169 let primary_key_idx_in_info_columns = primary_table
170 .pk()
171 .iter()
172 .map(|order| primary_to_included_info_column_mapping[&order.column_index])
173 .collect();
174 IndexType::Vector(Arc::new(VectorIndex {
175 index_table: index_table.clone(),
176 vector_expr: index_item[0].clone(),
177 primary_to_included_info_column_mapping,
178 primary_key_idx_in_info_columns,
179 included_info_columns,
180 vector_index_info: index_table
181 .vector_index_info
182 .expect("should exist for vector index"),
183 }))
184 }
185 TableType::Table | TableType::MaterializedView | TableType::Internal => {
186 unreachable!()
187 }
188 };
189
190 IndexCatalog {
191 id: index_prost.id.into(),
192 name: index_prost.name.clone(),
193 index_item,
194 index_type,
195 primary_table: primary_table.clone(),
196 created_at_epoch: index_prost.created_at_epoch.map(Epoch::from),
197 initialized_at_epoch: index_prost.initialized_at_epoch.map(Epoch::from),
198 created_at_cluster_version: index_prost.created_at_cluster_version.clone(),
199 initialized_at_cluster_version: index_prost.initialized_at_cluster_version.clone(),
200 }
201 }
202}
203
204impl TableIndex {
205 pub fn primary_table_pk_ref_to_index_table(&self) -> Vec<ColumnOrder> {
206 let mapping = self.primary_to_secondary_mapping();
207
208 self.primary_table
209 .pk
210 .iter()
211 .map(|x| ColumnOrder::new(*mapping.get(&x.column_index).unwrap(), x.order_type))
212 .collect_vec()
213 }
214
215 pub fn primary_table_distribute_key_ref_to_index_table(&self) -> Vec<usize> {
216 let mapping = self.primary_to_secondary_mapping();
217
218 self.primary_table
219 .distribution_key
220 .iter()
221 .map(|x| *mapping.get(x).unwrap())
222 .collect_vec()
223 }
224
225 pub fn full_covering(&self) -> bool {
226 self.index_table.columns.len() == self.primary_table.columns.len()
227 }
228
229 pub fn secondary_to_primary_mapping(&self) -> &BTreeMap<usize, usize> {
232 &self.secondary_to_primary_mapping
233 }
234
235 pub fn primary_to_secondary_mapping(&self) -> &BTreeMap<usize, usize> {
238 &self.primary_to_secondary_mapping
239 }
240
241 pub fn function_mapping(&self) -> &HashMap<FunctionCall, usize> {
242 &self.function_mapping
243 }
244}
245
246impl IndexCatalog {
247 pub fn index_table(&self) -> &Arc<TableCatalog> {
248 match &self.index_type {
249 IndexType::Table(index) => &index.index_table,
250 IndexType::Vector(index) => &index.index_table,
251 }
252 }
253
254 pub fn get_column_properties(&self, column_idx: usize) -> Option<PbIndexColumnProperties> {
256 match &self.index_type {
257 IndexType::Table(index) => index.index_column_properties.get(column_idx).cloned(),
258 IndexType::Vector { .. } => {
259 if column_idx == 0 {
260 Some(PbIndexColumnProperties {
262 is_desc: false,
263 nulls_first: false,
264 })
265 } else {
266 None
267 }
268 }
269 }
270 }
271
272 pub fn get_column_def(&self, column_idx: usize) -> Option<String> {
273 if let Some(col) = self.index_table().columns.get(column_idx) {
274 if col.is_hidden {
275 return None;
276 }
277 } else {
278 return None;
279 }
280 let expr_display = ExprDisplay {
281 expr: &self.index_item[column_idx],
282 input_schema: &Schema::new(
283 self.primary_table
284 .columns
285 .iter()
286 .map(|col| Field::from(&col.column_desc))
287 .collect_vec(),
288 ),
289 };
290
291 Some(expr_display.to_string())
294 }
295
296 pub fn display(&self) -> IndexDisplay {
297 let index_table = self.index_table().clone();
298 let index_columns_with_ordering = index_table
299 .pk
300 .iter()
301 .filter(|x| !index_table.columns[x.column_index].is_hidden)
302 .map(|x| {
303 let index_column_name = index_table.columns[x.column_index].name().to_owned();
304 format!("{} {}", index_column_name, x.order_type)
305 })
306 .collect_vec();
307
308 let pk_column_index_set = index_table
309 .pk
310 .iter()
311 .map(|x| x.column_index)
312 .collect::<HashSet<_>>();
313
314 let include_columns = index_table
315 .columns
316 .iter()
317 .enumerate()
318 .filter(|(i, _)| !pk_column_index_set.contains(i))
319 .filter(|(_, x)| !x.is_hidden)
320 .map(|(_, x)| x.name().to_owned())
321 .collect_vec();
322
323 let distributed_by_columns = index_table
324 .distribution_key
325 .iter()
326 .map(|&x| index_table.columns[x].name().to_owned())
327 .collect_vec();
328
329 IndexDisplay {
330 index_columns_with_ordering,
331 include_columns,
332 distributed_by_columns,
333 }
334 }
335
336 pub fn is_created(&self) -> bool {
337 self.index_table().is_created()
338 }
339}
340
341pub struct IndexDisplay {
342 pub index_columns_with_ordering: Vec<String>,
343 pub include_columns: Vec<String>,
344 pub distributed_by_columns: Vec<String>,
345}
346
347impl OwnedByUserCatalog for IndexCatalog {
348 fn owner(&self) -> UserId {
349 self.index_table().owner
350 }
351}
352
353mod item_rewriter {
354 use risingwave_pb::expr::expr_node;
355
356 use crate::expr::{Expr, ExprImpl, ExprRewriter, FunctionCall};
357
358 pub struct CompositeCastEliminator;
379
380 impl ExprRewriter for CompositeCastEliminator {
381 fn rewrite_function_call(&mut self, func_call: FunctionCall) -> ExprImpl {
382 let (func_type, inputs, ret) = func_call.decompose();
383
384 if func_type == expr_node::Type::CompositeCast {
386 let child = inputs[0].clone();
387
388 if let Some(child) = child.as_function_call()
389 && child.func_type() == expr_node::Type::CompositeCast
390 {
391 let new_child = child.inputs()[0].clone();
392
393 if new_child.return_type() == ret {
396 return self.rewrite_expr(new_child);
397 } else {
398 let new_composite_cast =
399 FunctionCall::new_unchecked(func_type, vec![new_child], ret);
400 return self.rewrite_function_call(new_composite_cast);
401 }
402 }
403 }
404 else if func_type == expr_node::Type::Field {
407 let child = inputs[0].clone();
408
409 if let Some(child) = child.as_function_call()
410 && child.func_type() == expr_node::Type::CompositeCast
411 {
412 let index = (inputs[1].clone().into_literal().unwrap())
413 .get_data()
414 .clone()
415 .unwrap()
416 .into_int32();
417
418 let struct_type = child.return_type().into_struct();
419 let field_id = struct_type
420 .id_at(index as usize)
421 .expect("ids should be set");
422
423 let new_child = child.inputs()[0].clone();
425 let new_struct_type = new_child.return_type().into_struct();
426
427 let Some(new_index) = new_struct_type
428 .ids()
429 .expect("ids should be set")
430 .position(|x| x == field_id)
431 else {
432 return FunctionCall::new_unchecked(func_type, inputs, ret).into();
437 };
438 let new_index = ExprImpl::literal_int(new_index as i32);
439
440 let new_inputs = vec![new_child, new_index];
441 let new_field_call = FunctionCall::new_unchecked(func_type, new_inputs, ret);
442
443 return self.rewrite_function_call(new_field_call);
445 }
446 }
447
448 let inputs = inputs
449 .into_iter()
450 .map(|expr| self.rewrite_expr(expr))
451 .collect();
452 FunctionCall::new_unchecked(func_type, inputs, ret).into()
453 }
454 }
455}