risingwave_frontend/catalog/
index_catalog.rs1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::ops::Deref;
17use std::sync::Arc;
18
19use educe::Educe;
20use itertools::Itertools;
21use risingwave_common::catalog::{Field, IndexId, Schema};
22use risingwave_common::util::epoch::Epoch;
23use risingwave_common::util::sort_util::ColumnOrder;
24use risingwave_pb::catalog::{PbIndex, PbIndexColumnProperties};
25
26use crate::catalog::table_catalog::TableType;
27use crate::catalog::{OwnedByUserCatalog, TableCatalog};
28use crate::expr::{ExprDisplay, ExprImpl, ExprRewriter as _, FunctionCall};
29use crate::user::UserId;
30
31#[derive(Clone, Debug, Educe)]
32#[educe(PartialEq, Eq, Hash)]
33pub struct TableIndex {
34 pub name: String,
35 pub index_column_properties: Vec<PbIndexColumnProperties>,
38
39 pub index_table: Arc<TableCatalog>,
40
41 pub primary_table: Arc<TableCatalog>,
42
43 pub primary_to_secondary_mapping: BTreeMap<usize, usize>,
44
45 pub secondary_to_primary_mapping: BTreeMap<usize, usize>,
46
47 #[educe(PartialEq(ignore))]
53 #[educe(Hash(ignore))]
54 pub function_mapping: HashMap<FunctionCall, usize>,
55
56 pub index_columns_len: u32,
57}
58
59#[derive(Clone, Debug, Hash, PartialEq, Eq)]
60pub enum IndexType {
61 Table(Arc<TableIndex>),
62}
63
64#[derive(Clone, Debug, Hash, PartialEq, Eq)]
65pub struct IndexCatalog {
66 pub id: IndexId,
67
68 pub name: String,
69
70 pub index_item: Vec<ExprImpl>,
75
76 pub index_type: IndexType,
77
78 pub primary_table: Arc<TableCatalog>,
79
80 pub created_at_epoch: Option<Epoch>,
81
82 pub initialized_at_epoch: Option<Epoch>,
83
84 pub created_at_cluster_version: Option<String>,
85
86 pub initialized_at_cluster_version: Option<String>,
87}
88
89impl IndexCatalog {
90 pub fn build_from(
91 index_prost: &PbIndex,
92 index_table: &Arc<TableCatalog>,
93 primary_table: &Arc<TableCatalog>,
94 ) -> Self {
95 let index_item: Vec<ExprImpl> = index_prost
96 .index_item
97 .iter()
98 .map(|expr| ExprImpl::from_expr_proto(expr).unwrap())
99 .map(|expr| item_rewriter::CompositeCastEliminator.rewrite_expr(expr))
100 .collect();
101
102 let index_type = match index_table.table_type {
103 TableType::Index => {
104 let primary_to_secondary_mapping: BTreeMap<usize, usize> = index_item
105 .iter()
106 .enumerate()
107 .filter_map(|(i, expr)| match expr {
108 ExprImpl::InputRef(input_ref) => Some((input_ref.index, i)),
109 ExprImpl::FunctionCall(_) => None,
110 _ => unreachable!(),
111 })
112 .collect();
113
114 let secondary_to_primary_mapping = BTreeMap::from_iter(
115 primary_to_secondary_mapping
116 .clone()
117 .into_iter()
118 .map(|(x, y)| (y, x)),
119 );
120
121 let function_mapping: HashMap<FunctionCall, usize> = index_item
122 .iter()
123 .enumerate()
124 .filter_map(|(i, expr)| match expr {
125 ExprImpl::InputRef(_) => None,
126 ExprImpl::FunctionCall(func) => Some((func.deref().clone(), i)),
127 _ => unreachable!(),
128 })
129 .collect();
130 IndexType::Table(Arc::new(TableIndex {
131 name: index_prost.name.clone(),
132 index_column_properties: index_prost.index_column_properties.clone(),
133 index_columns_len: index_prost.index_columns_len,
134 index_table: index_table.clone(),
135 primary_table: primary_table.clone(),
136 primary_to_secondary_mapping,
137 secondary_to_primary_mapping,
138 function_mapping,
139 }))
140 }
141 TableType::Table | TableType::MaterializedView | TableType::Internal => {
142 unreachable!()
143 }
144 };
145
146 IndexCatalog {
147 id: index_prost.id.into(),
148 name: index_prost.name.clone(),
149 index_item,
150 index_type,
151 primary_table: primary_table.clone(),
152 created_at_epoch: index_prost.created_at_epoch.map(Epoch::from),
153 initialized_at_epoch: index_prost.initialized_at_epoch.map(Epoch::from),
154 created_at_cluster_version: index_prost.created_at_cluster_version.clone(),
155 initialized_at_cluster_version: index_prost.initialized_at_cluster_version.clone(),
156 }
157 }
158}
159
160impl TableIndex {
161 pub fn primary_table_pk_ref_to_index_table(&self) -> Vec<ColumnOrder> {
162 let mapping = self.primary_to_secondary_mapping();
163
164 self.primary_table
165 .pk
166 .iter()
167 .map(|x| ColumnOrder::new(*mapping.get(&x.column_index).unwrap(), x.order_type))
168 .collect_vec()
169 }
170
171 pub fn primary_table_distribute_key_ref_to_index_table(&self) -> Vec<usize> {
172 let mapping = self.primary_to_secondary_mapping();
173
174 self.primary_table
175 .distribution_key
176 .iter()
177 .map(|x| *mapping.get(x).unwrap())
178 .collect_vec()
179 }
180
181 pub fn full_covering(&self) -> bool {
182 self.index_table.columns.len() == self.primary_table.columns.len()
183 }
184
185 pub fn secondary_to_primary_mapping(&self) -> &BTreeMap<usize, usize> {
188 &self.secondary_to_primary_mapping
189 }
190
191 pub fn primary_to_secondary_mapping(&self) -> &BTreeMap<usize, usize> {
194 &self.primary_to_secondary_mapping
195 }
196
197 pub fn function_mapping(&self) -> &HashMap<FunctionCall, usize> {
198 &self.function_mapping
199 }
200}
201
202impl IndexCatalog {
203 pub fn index_table(&self) -> &Arc<TableCatalog> {
204 match &self.index_type {
205 IndexType::Table(index) => &index.index_table,
206 }
207 }
208
209 pub fn get_column_properties(&self, column_idx: usize) -> Option<PbIndexColumnProperties> {
211 match &self.index_type {
212 IndexType::Table(index) => index.index_column_properties.get(column_idx).cloned(),
213 }
214 }
215
216 pub fn get_column_def(&self, column_idx: usize) -> Option<String> {
217 if let Some(col) = self.index_table().columns.get(column_idx) {
218 if col.is_hidden {
219 return None;
220 }
221 } else {
222 return None;
223 }
224 let expr_display = ExprDisplay {
225 expr: &self.index_item[column_idx],
226 input_schema: &Schema::new(
227 self.primary_table
228 .columns
229 .iter()
230 .map(|col| Field::from(&col.column_desc))
231 .collect_vec(),
232 ),
233 };
234
235 Some(expr_display.to_string())
238 }
239
240 pub fn display(&self) -> IndexDisplay {
241 let index_table = self.index_table().clone();
242 let index_columns_with_ordering = index_table
243 .pk
244 .iter()
245 .filter(|x| !index_table.columns[x.column_index].is_hidden)
246 .map(|x| {
247 let index_column_name = index_table.columns[x.column_index].name().to_owned();
248 format!("{} {}", index_column_name, x.order_type)
249 })
250 .collect_vec();
251
252 let pk_column_index_set = index_table
253 .pk
254 .iter()
255 .map(|x| x.column_index)
256 .collect::<HashSet<_>>();
257
258 let include_columns = index_table
259 .columns
260 .iter()
261 .enumerate()
262 .filter(|(i, _)| !pk_column_index_set.contains(i))
263 .filter(|(_, x)| !x.is_hidden)
264 .map(|(_, x)| x.name().to_owned())
265 .collect_vec();
266
267 let distributed_by_columns = index_table
268 .distribution_key
269 .iter()
270 .map(|&x| index_table.columns[x].name().to_owned())
271 .collect_vec();
272
273 IndexDisplay {
274 index_columns_with_ordering,
275 include_columns,
276 distributed_by_columns,
277 }
278 }
279}
280
281pub struct IndexDisplay {
282 pub index_columns_with_ordering: Vec<String>,
283 pub include_columns: Vec<String>,
284 pub distributed_by_columns: Vec<String>,
285}
286
287impl OwnedByUserCatalog for IndexCatalog {
288 fn owner(&self) -> UserId {
289 self.index_table().owner
290 }
291}
292
293mod item_rewriter {
294 use risingwave_pb::expr::expr_node;
295
296 use crate::expr::{Expr, ExprImpl, ExprRewriter, FunctionCall};
297
298 pub struct CompositeCastEliminator;
319
320 impl ExprRewriter for CompositeCastEliminator {
321 fn rewrite_function_call(&mut self, func_call: FunctionCall) -> ExprImpl {
322 let (func_type, inputs, ret) = func_call.decompose();
323
324 if func_type == expr_node::Type::CompositeCast {
326 let child = inputs[0].clone();
327
328 if let Some(child) = child.as_function_call()
329 && child.func_type() == expr_node::Type::CompositeCast
330 {
331 let new_child = child.inputs()[0].clone();
332
333 if new_child.return_type() == ret {
336 return self.rewrite_expr(new_child);
337 } else {
338 let new_composite_cast =
339 FunctionCall::new_unchecked(func_type, vec![new_child], ret);
340 return self.rewrite_function_call(new_composite_cast);
341 }
342 }
343 }
344 else if func_type == expr_node::Type::Field {
347 let child = inputs[0].clone();
348
349 if let Some(child) = child.as_function_call()
350 && child.func_type() == expr_node::Type::CompositeCast
351 {
352 let index = (inputs[1].clone().into_literal().unwrap())
353 .get_data()
354 .clone()
355 .unwrap()
356 .into_int32();
357
358 let struct_type = child.return_type().into_struct();
359 let field_id = struct_type
360 .id_at(index as usize)
361 .expect("ids should be set");
362
363 let new_child = child.inputs()[0].clone();
365 let new_struct_type = new_child.return_type().into_struct();
366
367 let Some(new_index) = new_struct_type
368 .ids()
369 .expect("ids should be set")
370 .position(|x| x == field_id)
371 else {
372 return FunctionCall::new_unchecked(func_type, inputs, ret).into();
377 };
378 let new_index = ExprImpl::literal_int(new_index as i32);
379
380 let new_inputs = vec![new_child, new_index];
381 let new_field_call = FunctionCall::new_unchecked(func_type, new_inputs, ret);
382
383 return self.rewrite_function_call(new_field_call);
385 }
386 }
387
388 let inputs = inputs
389 .into_iter()
390 .map(|expr| self.rewrite_expr(expr))
391 .collect();
392 FunctionCall::new_unchecked(func_type, inputs, ret).into()
393 }
394 }
395}