1use std::collections::{HashMap, HashSet};
16use std::num::NonZeroU32;
17use std::rc::Rc;
18use std::sync::Arc;
19
20use either::Either;
21use fixedbitset::FixedBitSet;
22use itertools::Itertools;
23use pgwire::pg_response::{PgResponse, StatementType};
24use risingwave_common::catalog::{IndexId, TableDesc, TableId};
25use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
26use risingwave_pb::catalog::{PbIndex, PbIndexColumnProperties, PbStreamJobStatus, PbTable};
27use risingwave_sqlparser::ast;
28use risingwave_sqlparser::ast::{Ident, ObjectName, OrderByExpr};
29
30use super::RwPgResponse;
31use crate::TableCatalog;
32use crate::binder::Binder;
33use crate::catalog::root_catalog::SchemaPath;
34use crate::catalog::{DatabaseId, SchemaId};
35use crate::error::{ErrorCode, Result};
36use crate::expr::{Expr, ExprImpl, ExprRewriter, InputRef};
37use crate::handler::HandlerArgs;
38use crate::optimizer::plan_expr_rewriter::ConstEvalRewriter;
39use crate::optimizer::plan_node::{Explain, LogicalProject, LogicalScan, StreamMaterialize};
40use crate::optimizer::property::{Cardinality, Distribution, Order, RequiredDist};
41use crate::optimizer::{OptimizerContext, OptimizerContextRef, PlanRef, PlanRoot};
42use crate::scheduler::streaming_manager::CreatingStreamingJobInfo;
43use crate::session::SessionImpl;
44use crate::stream_fragmenter::build_graph;
45
46pub(crate) fn resolve_index_schema(
47 session: &SessionImpl,
48 index_name: ObjectName,
49 table_name: ObjectName,
50) -> Result<(String, Arc<TableCatalog>, String)> {
51 let db_name = &session.database();
52 let (schema_name, table_name) = Binder::resolve_schema_qualified_name(db_name, table_name)?;
53 let search_path = session.config().search_path();
54 let user_name = &session.user_name();
55 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
56
57 let index_table_name = Binder::resolve_index_name(index_name)?;
58
59 let catalog_reader = session.env().catalog_reader();
60 let read_guard = catalog_reader.read_guard();
61 let (table, schema_name) =
62 read_guard.get_created_table_by_name(db_name, schema_path, &table_name)?;
63 Ok((schema_name.to_owned(), table.clone(), index_table_name))
64}
65
66pub(crate) fn gen_create_index_plan(
67 session: &SessionImpl,
68 context: OptimizerContextRef,
69 schema_name: String,
70 table: Arc<TableCatalog>,
71 index_table_name: String,
72 columns: Vec<OrderByExpr>,
73 include: Vec<Ident>,
74 distributed_by: Vec<ast::Expr>,
75) -> Result<(PlanRef, PbTable, PbIndex)> {
76 let table_name = table.name.clone();
77
78 if table.is_index() {
79 return Err(
80 ErrorCode::InvalidInputSyntax(format!("\"{}\" is an index", table.name)).into(),
81 );
82 }
83
84 if !session.is_super_user() && session.user_id() != table.owner {
85 return Err(
86 ErrorCode::PermissionDenied(format!("must be owner of table {}", table.name)).into(),
87 );
88 }
89
90 let mut binder = Binder::new_for_stream(session);
91 binder.bind_table(Some(&schema_name), &table_name)?;
92
93 let mut index_columns_ordered_expr = vec![];
94 let mut include_columns_expr = vec![];
95 let mut distributed_columns_expr = vec![];
96 for column in columns {
97 let order_type = OrderType::from_bools(column.asc, column.nulls_first);
98 let expr_impl = binder.bind_expr(column.expr)?;
99 let mut const_eval = ConstEvalRewriter { error: None };
101 let expr_impl = const_eval.rewrite_expr(expr_impl);
102 let expr_impl = context.session_timezone().rewrite_expr(expr_impl);
103 match expr_impl {
104 ExprImpl::InputRef(_) => {}
105 ExprImpl::FunctionCall(_) => {
106 if expr_impl.is_impure() {
107 return Err(ErrorCode::NotSupported(
108 "this expression is impure".into(),
109 "use a pure expression instead".into(),
110 )
111 .into());
112 }
113 }
114 _ => {
115 return Err(ErrorCode::NotSupported(
116 "index columns should be columns or expressions".into(),
117 "use columns or expressions instead".into(),
118 )
119 .into());
120 }
121 }
122 index_columns_ordered_expr.push((expr_impl, order_type));
123 }
124
125 if include.is_empty() {
126 include_columns_expr = table
128 .columns()
129 .iter()
130 .enumerate()
131 .filter(|(_, column)| !column.is_hidden)
132 .map(|(x, column)| {
133 ExprImpl::InputRef(InputRef::new(x, column.column_desc.data_type.clone()).into())
134 })
135 .collect_vec();
136 } else {
137 for column in include {
138 let expr_impl =
139 binder.bind_expr(risingwave_sqlparser::ast::Expr::Identifier(column))?;
140 include_columns_expr.push(expr_impl);
141 }
142 };
143
144 for column in distributed_by {
145 let expr_impl = binder.bind_expr(column)?;
146 distributed_columns_expr.push(expr_impl);
147 }
148
149 let table_desc = Rc::new(table.table_desc());
150
151 let mut set = HashSet::new();
153 index_columns_ordered_expr = index_columns_ordered_expr
154 .into_iter()
155 .filter(|(expr, _)| match expr {
156 ExprImpl::InputRef(input_ref) => set.insert(input_ref.index),
157 ExprImpl::FunctionCall(_) => true,
158 _ => unreachable!(),
159 })
160 .collect_vec();
161
162 include_columns_expr = include_columns_expr
164 .into_iter()
165 .filter(|expr| match expr {
166 ExprImpl::InputRef(input_ref) => set.insert(input_ref.index),
167 _ => unreachable!(),
168 })
169 .collect_vec();
170
171 let mut set = HashSet::new();
173 let distributed_columns_expr = distributed_columns_expr
174 .into_iter()
175 .filter(|expr| match expr {
176 ExprImpl::InputRef(input_ref) => set.insert(input_ref.index),
177 ExprImpl::FunctionCall(_) => true,
178 _ => unreachable!(),
179 })
180 .collect_vec();
181 if !index_columns_ordered_expr
183 .iter()
184 .map(|(expr, _)| expr.clone())
185 .collect_vec()
186 .starts_with(&distributed_columns_expr)
187 {
188 return Err(ErrorCode::InvalidInputSyntax(
189 "Distributed by columns should be a prefix of index columns".to_owned(),
190 )
191 .into());
192 }
193
194 let (index_database_id, index_schema_id) =
195 session.get_database_and_schema_id_for_create(Some(schema_name))?;
196
197 let materialize = assemble_materialize(
199 table_name,
200 index_database_id,
201 index_schema_id,
202 table.clone(),
203 context,
204 index_table_name.clone(),
205 &index_columns_ordered_expr,
206 &include_columns_expr,
207 if distributed_columns_expr.is_empty() {
210 1
211 } else {
212 distributed_columns_expr.len()
213 },
214 table.cardinality,
215 )?;
216
217 let index_table = materialize.table();
218 let mut index_table_prost = index_table.to_prost();
219 {
220 index_table_prost.retention_seconds = table.retention_seconds;
222 }
223
224 index_table_prost.owner = table.owner;
225 index_table_prost.dependent_relations = vec![table.id.table_id];
226
227 let index_columns_len = index_columns_ordered_expr.len() as u32;
228 let index_column_properties = index_columns_ordered_expr
229 .iter()
230 .map(|(_, order)| PbIndexColumnProperties {
231 is_desc: order.is_descending(),
232 nulls_first: order.nulls_are_first(),
233 })
234 .collect();
235 let index_item = build_index_item(
236 index_table,
237 table.name(),
238 table_desc,
239 index_columns_ordered_expr,
240 );
241
242 let index_prost = PbIndex {
243 id: IndexId::placeholder().index_id,
244 schema_id: index_schema_id,
245 database_id: index_database_id,
246 name: index_table_name,
247 owner: index_table_prost.owner,
248 index_table_id: TableId::placeholder().table_id,
249 primary_table_id: table.id.table_id,
250 index_item,
251 index_column_properties,
252 index_columns_len,
253 initialized_at_epoch: None,
254 created_at_epoch: None,
255 stream_job_status: PbStreamJobStatus::Creating.into(),
256 initialized_at_cluster_version: None,
257 created_at_cluster_version: None,
258 };
259
260 let plan: PlanRef = materialize.into();
261 let ctx = plan.ctx();
262 let explain_trace = ctx.is_explain_trace();
263 if explain_trace {
264 ctx.trace("Create Index:");
265 ctx.trace(plan.explain_to_string());
266 }
267
268 Ok((plan, index_table_prost, index_prost))
269}
270
271fn build_index_item(
272 index_table: &TableCatalog,
273 primary_table_name: &str,
274 primary_table_desc: Rc<TableDesc>,
275 index_columns: Vec<(ExprImpl, OrderType)>,
276) -> Vec<risingwave_pb::expr::ExprNode> {
277 let primary_table_desc_map = primary_table_desc
278 .columns
279 .iter()
280 .enumerate()
281 .map(|(x, y)| (y.name.clone(), x))
282 .collect::<HashMap<_, _>>();
283
284 let primary_table_name_prefix = format!("{}.", primary_table_name);
285
286 let index_columns_len = index_columns.len();
287 index_columns
288 .into_iter()
289 .map(|(expr, _)| expr.to_expr_proto())
290 .chain(
291 index_table
292 .columns
293 .iter()
294 .map(|c| &c.column_desc)
295 .skip(index_columns_len)
296 .map(|x| {
297 let name = if x.name.starts_with(&primary_table_name_prefix) {
298 x.name[primary_table_name_prefix.len()..].to_string()
299 } else {
300 x.name.clone()
301 };
302
303 let column_index = *primary_table_desc_map.get(&name).unwrap();
304 InputRef {
305 index: column_index,
306 data_type: primary_table_desc
307 .columns
308 .get(column_index)
309 .unwrap()
310 .data_type
311 .clone(),
312 }
313 .to_expr_proto()
314 }),
315 )
316 .collect_vec()
317}
318
319fn assemble_materialize(
322 table_name: String,
323 database_id: DatabaseId,
324 schema_id: SchemaId,
325 table_catalog: Arc<TableCatalog>,
326 context: OptimizerContextRef,
327 index_name: String,
328 index_columns: &[(ExprImpl, OrderType)],
329 include_columns: &[ExprImpl],
330 distributed_by_columns_len: usize,
331 cardinality: Cardinality,
332) -> Result<StreamMaterialize> {
333 let definition = context.normalized_sql().to_owned();
338 let retention_seconds = table_catalog.retention_seconds.and_then(NonZeroU32::new);
339
340 let logical_scan = LogicalScan::create(
341 table_name,
342 table_catalog.clone(),
343 vec![],
345 context,
346 None,
347 cardinality,
348 );
349
350 let exprs = index_columns
351 .iter()
352 .map(|(expr, _)| expr.clone())
353 .chain(include_columns.iter().cloned())
354 .collect_vec();
355
356 let logical_project = LogicalProject::create(logical_scan.into(), exprs);
357 let mut project_required_cols = FixedBitSet::with_capacity(logical_project.schema().len());
358 project_required_cols.toggle_range(0..logical_project.schema().len());
359
360 let mut col_names = HashSet::new();
361 let mut count = 0;
362
363 let out_names: Vec<String> = index_columns
364 .iter()
365 .map(|(expr, _)| match expr {
366 ExprImpl::InputRef(input_ref) => table_catalog
367 .columns()
368 .get(input_ref.index)
369 .unwrap()
370 .name()
371 .to_owned(),
372 ExprImpl::FunctionCall(func) => {
373 let func_name = func.func_type().as_str_name().to_owned();
374 let mut name = func_name.clone();
375 while !col_names.insert(name.clone()) {
376 count += 1;
377 name = format!("{}{}", func_name, count);
378 }
379 name
380 }
381 _ => unreachable!(),
382 })
383 .chain(include_columns.iter().map(|expr| {
384 match expr {
385 ExprImpl::InputRef(input_ref) => table_catalog
386 .columns()
387 .get(input_ref.index)
388 .unwrap()
389 .name()
390 .to_owned(),
391 _ => unreachable!(),
392 }
393 }))
394 .collect_vec();
395
396 PlanRoot::new_with_logical_plan(
397 logical_project,
398 RequiredDist::PhysicalDist(Distribution::HashShard(
401 (0..distributed_by_columns_len).collect(),
402 )),
403 Order::new(
404 index_columns
405 .iter()
406 .enumerate()
407 .map(|(i, (_, order))| ColumnOrder::new(i, *order))
408 .collect(),
409 ),
410 project_required_cols,
411 out_names,
412 )
413 .gen_index_plan(
414 index_name,
415 database_id,
416 schema_id,
417 definition,
418 retention_seconds,
419 )
420}
421
422pub async fn handle_create_index(
423 handler_args: HandlerArgs,
424 if_not_exists: bool,
425 index_name: ObjectName,
426 table_name: ObjectName,
427 columns: Vec<OrderByExpr>,
428 include: Vec<Ident>,
429 distributed_by: Vec<ast::Expr>,
430) -> Result<RwPgResponse> {
431 let session = handler_args.session.clone();
432
433 let (graph, index_table, index) = {
434 let (schema_name, table, index_table_name) =
435 resolve_index_schema(&session, index_name, table_name)?;
436 let qualified_index_name = ObjectName(vec![
437 Ident::with_quote_unchecked('"', &schema_name),
438 Ident::with_quote_unchecked('"', &index_table_name),
439 ]);
440 if let Either::Right(resp) = session.check_relation_name_duplicated(
441 qualified_index_name,
442 StatementType::CREATE_INDEX,
443 if_not_exists,
444 )? {
445 return Ok(resp);
446 }
447
448 let context = OptimizerContext::from_handler_args(handler_args);
449 let (plan, index_table, index) = gen_create_index_plan(
450 &session,
451 context.into(),
452 schema_name,
453 table,
454 index_table_name,
455 columns,
456 include,
457 distributed_by,
458 )?;
459 let graph = build_graph(plan)?;
460
461 (graph, index_table, index)
462 };
463
464 tracing::trace!(
465 "name={}, graph=\n{}",
466 index.name,
467 serde_json::to_string_pretty(&graph).unwrap()
468 );
469
470 let _job_guard =
471 session
472 .env()
473 .creating_streaming_job_tracker()
474 .guard(CreatingStreamingJobInfo::new(
475 session.session_id(),
476 index.database_id,
477 index.schema_id,
478 index.name.clone(),
479 ));
480
481 let catalog_writer = session.catalog_writer()?;
482 catalog_writer
483 .create_index(index, index_table, graph)
484 .await?;
485
486 Ok(PgResponse::empty_result(StatementType::CREATE_INDEX))
487}