1use std::collections::{HashMap, HashSet};
16use std::num::NonZeroU32;
17use std::rc::Rc;
18use std::sync::Arc;
19
20use either::Either;
21use fixedbitset::FixedBitSet;
22use itertools::Itertools;
23use pgwire::pg_response::{PgResponse, StatementType};
24use risingwave_common::catalog::{IndexId, TableDesc, TableId};
25use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
26use risingwave_pb::catalog::{PbIndex, PbIndexColumnProperties, PbStreamJobStatus};
27use risingwave_sqlparser::ast;
28use risingwave_sqlparser::ast::{Ident, ObjectName, OrderByExpr};
29
30use super::RwPgResponse;
31use crate::TableCatalog;
32use crate::binder::Binder;
33use crate::catalog::root_catalog::SchemaPath;
34use crate::catalog::{DatabaseId, SchemaId};
35use crate::error::{ErrorCode, Result};
36use crate::expr::{Expr, ExprImpl, ExprRewriter, InputRef};
37use crate::handler::HandlerArgs;
38use crate::optimizer::plan_expr_rewriter::ConstEvalRewriter;
39use crate::optimizer::plan_node::{Explain, LogicalProject, LogicalScan, StreamMaterialize};
40use crate::optimizer::property::{Cardinality, Distribution, Order, RequiredDist};
41use crate::optimizer::{OptimizerContext, OptimizerContextRef, PlanRef, PlanRoot};
42use crate::scheduler::streaming_manager::CreatingStreamingJobInfo;
43use crate::session::SessionImpl;
44use crate::stream_fragmenter::{GraphJobType, build_graph};
45
46pub(crate) fn resolve_index_schema(
47 session: &SessionImpl,
48 index_name: ObjectName,
49 table_name: ObjectName,
50) -> Result<(String, Arc<TableCatalog>, String)> {
51 let db_name = &session.database();
52 let (schema_name, table_name) = Binder::resolve_schema_qualified_name(db_name, table_name)?;
53 let search_path = session.config().search_path();
54 let user_name = &session.user_name();
55 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
56
57 let index_table_name = Binder::resolve_index_name(index_name)?;
58
59 let catalog_reader = session.env().catalog_reader();
60 let read_guard = catalog_reader.read_guard();
61 let (table, schema_name) =
62 read_guard.get_created_table_by_name(db_name, schema_path, &table_name)?;
63 Ok((schema_name.to_owned(), table.clone(), index_table_name))
64}
65
66pub(crate) fn gen_create_index_plan(
67 session: &SessionImpl,
68 context: OptimizerContextRef,
69 schema_name: String,
70 table: Arc<TableCatalog>,
71 index_table_name: String,
72 columns: Vec<OrderByExpr>,
73 include: Vec<Ident>,
74 distributed_by: Vec<ast::Expr>,
75) -> Result<(PlanRef, TableCatalog, PbIndex)> {
76 let table_name = table.name.clone();
77
78 if table.is_index() {
79 return Err(
80 ErrorCode::InvalidInputSyntax(format!("\"{}\" is an index", table.name)).into(),
81 );
82 }
83
84 if !session.is_super_user() && session.user_id() != table.owner {
85 return Err(ErrorCode::PermissionDenied(format!(
86 "must be owner of table \"{}\"",
87 table.name
88 ))
89 .into());
90 }
91
92 let mut binder = Binder::new_for_stream(session);
93 binder.bind_table(Some(&schema_name), &table_name)?;
94
95 let mut index_columns_ordered_expr = vec![];
96 let mut include_columns_expr = vec![];
97 let mut distributed_columns_expr = vec![];
98 for column in columns {
99 let order_type = OrderType::from_bools(column.asc, column.nulls_first);
100 let expr_impl = binder.bind_expr(column.expr)?;
101 let mut const_eval = ConstEvalRewriter { error: None };
103 let expr_impl = const_eval.rewrite_expr(expr_impl);
104 let expr_impl = context.session_timezone().rewrite_expr(expr_impl);
105 match expr_impl {
106 ExprImpl::InputRef(_) => {}
107 ExprImpl::FunctionCall(_) => {
108 if expr_impl.is_impure() {
109 return Err(ErrorCode::NotSupported(
110 "this expression is impure".into(),
111 "use a pure expression instead".into(),
112 )
113 .into());
114 }
115 }
116 _ => {
117 return Err(ErrorCode::NotSupported(
118 "index columns should be columns or expressions".into(),
119 "use columns or expressions instead".into(),
120 )
121 .into());
122 }
123 }
124 index_columns_ordered_expr.push((expr_impl, order_type));
125 }
126
127 if include.is_empty() {
128 include_columns_expr = table
130 .columns()
131 .iter()
132 .enumerate()
133 .filter(|(_, column)| !column.is_hidden)
134 .map(|(x, column)| {
135 ExprImpl::InputRef(InputRef::new(x, column.column_desc.data_type.clone()).into())
136 })
137 .collect_vec();
138 } else {
139 for column in include {
140 let expr_impl =
141 binder.bind_expr(risingwave_sqlparser::ast::Expr::Identifier(column))?;
142 include_columns_expr.push(expr_impl);
143 }
144 };
145
146 for column in distributed_by {
147 let expr_impl = binder.bind_expr(column)?;
148 distributed_columns_expr.push(expr_impl);
149 }
150
151 let table_desc = Rc::new(table.table_desc());
152
153 let mut set = HashSet::new();
155 index_columns_ordered_expr = index_columns_ordered_expr
156 .into_iter()
157 .filter(|(expr, _)| match expr {
158 ExprImpl::InputRef(input_ref) => set.insert(input_ref.index),
159 ExprImpl::FunctionCall(_) => true,
160 _ => unreachable!(),
161 })
162 .collect_vec();
163
164 include_columns_expr = include_columns_expr
166 .into_iter()
167 .filter(|expr| match expr {
168 ExprImpl::InputRef(input_ref) => set.insert(input_ref.index),
169 _ => unreachable!(),
170 })
171 .collect_vec();
172
173 let mut set = HashSet::new();
175 let distributed_columns_expr = distributed_columns_expr
176 .into_iter()
177 .filter(|expr| match expr {
178 ExprImpl::InputRef(input_ref) => set.insert(input_ref.index),
179 ExprImpl::FunctionCall(_) => true,
180 _ => unreachable!(),
181 })
182 .collect_vec();
183 if !index_columns_ordered_expr
185 .iter()
186 .map(|(expr, _)| expr.clone())
187 .collect_vec()
188 .starts_with(&distributed_columns_expr)
189 {
190 return Err(ErrorCode::InvalidInputSyntax(
191 "Distributed by columns should be a prefix of index columns".to_owned(),
192 )
193 .into());
194 }
195
196 let (index_database_id, index_schema_id) =
197 session.get_database_and_schema_id_for_create(Some(schema_name))?;
198
199 let materialize = assemble_materialize(
201 table_name,
202 index_database_id,
203 index_schema_id,
204 table.clone(),
205 context,
206 index_table_name.clone(),
207 &index_columns_ordered_expr,
208 &include_columns_expr,
209 if distributed_columns_expr.is_empty() {
212 1
213 } else {
214 distributed_columns_expr.len()
215 },
216 table.cardinality,
217 )?;
218
219 let mut index_table = materialize.table().clone();
220 {
221 index_table.retention_seconds = table.retention_seconds;
223 }
224
225 index_table.owner = table.owner;
226
227 let index_columns_len = index_columns_ordered_expr.len() as u32;
228 let index_column_properties = index_columns_ordered_expr
229 .iter()
230 .map(|(_, order)| PbIndexColumnProperties {
231 is_desc: order.is_descending(),
232 nulls_first: order.nulls_are_first(),
233 })
234 .collect();
235 let index_item = build_index_item(
236 &index_table,
237 table.name(),
238 table_desc,
239 index_columns_ordered_expr,
240 );
241
242 let index_prost = PbIndex {
243 id: IndexId::placeholder().index_id,
244 schema_id: index_schema_id,
245 database_id: index_database_id,
246 name: index_table_name,
247 owner: index_table.owner,
248 index_table_id: TableId::placeholder().table_id,
249 primary_table_id: table.id.table_id,
250 index_item,
251 index_column_properties,
252 index_columns_len,
253 initialized_at_epoch: None,
254 created_at_epoch: None,
255 stream_job_status: PbStreamJobStatus::Creating.into(),
256 initialized_at_cluster_version: None,
257 created_at_cluster_version: None,
258 };
259
260 let plan: PlanRef = materialize.into();
261 let ctx = plan.ctx();
262 let explain_trace = ctx.is_explain_trace();
263 if explain_trace {
264 ctx.trace("Create Index:");
265 ctx.trace(plan.explain_to_string());
266 }
267
268 Ok((plan, index_table, index_prost))
269}
270
271fn build_index_item(
272 index_table: &TableCatalog,
273 primary_table_name: &str,
274 primary_table_desc: Rc<TableDesc>,
275 index_columns: Vec<(ExprImpl, OrderType)>,
276) -> Vec<risingwave_pb::expr::ExprNode> {
277 let primary_table_desc_map = primary_table_desc
278 .columns
279 .iter()
280 .enumerate()
281 .map(|(x, y)| (y.name.clone(), x))
282 .collect::<HashMap<_, _>>();
283
284 let primary_table_name_prefix = format!("{}.", primary_table_name);
285
286 let index_columns_len = index_columns.len();
287 index_columns
288 .into_iter()
289 .map(|(expr, _)| expr.to_expr_proto())
290 .chain(
291 index_table
292 .columns
293 .iter()
294 .map(|c| &c.column_desc)
295 .skip(index_columns_len)
296 .map(|x| {
297 let name = if x.name.starts_with(&primary_table_name_prefix) {
298 x.name[primary_table_name_prefix.len()..].to_string()
299 } else {
300 x.name.clone()
301 };
302
303 let column_index = *primary_table_desc_map.get(&name).unwrap();
304 InputRef {
305 index: column_index,
306 data_type: primary_table_desc
307 .columns
308 .get(column_index)
309 .unwrap()
310 .data_type
311 .clone(),
312 }
313 .to_expr_proto()
314 }),
315 )
316 .collect_vec()
317}
318
319fn assemble_materialize(
322 table_name: String,
323 database_id: DatabaseId,
324 schema_id: SchemaId,
325 table_catalog: Arc<TableCatalog>,
326 context: OptimizerContextRef,
327 index_name: String,
328 index_columns: &[(ExprImpl, OrderType)],
329 include_columns: &[ExprImpl],
330 distributed_by_columns_len: usize,
331 cardinality: Cardinality,
332) -> Result<StreamMaterialize> {
333 let definition = context.normalized_sql().to_owned();
338 let retention_seconds = table_catalog.retention_seconds.and_then(NonZeroU32::new);
339
340 let logical_scan = LogicalScan::create(
341 table_name,
342 table_catalog.clone(),
343 vec![],
345 context,
346 None,
347 cardinality,
348 );
349
350 let exprs = index_columns
351 .iter()
352 .map(|(expr, _)| expr.clone())
353 .chain(include_columns.iter().cloned())
354 .collect_vec();
355
356 let logical_project = LogicalProject::create(logical_scan.into(), exprs);
357 let mut project_required_cols = FixedBitSet::with_capacity(logical_project.schema().len());
358 project_required_cols.toggle_range(0..logical_project.schema().len());
359
360 let mut col_names = HashSet::new();
361 let mut count = 0;
362
363 let out_names: Vec<String> = index_columns
364 .iter()
365 .map(|(expr, _)| match expr {
366 ExprImpl::InputRef(input_ref) => table_catalog
367 .columns()
368 .get(input_ref.index)
369 .unwrap()
370 .name()
371 .to_owned(),
372 ExprImpl::FunctionCall(func) => {
373 let func_name = func.func_type().as_str_name().to_owned();
374 let mut name = func_name.clone();
375 while !col_names.insert(name.clone()) {
376 count += 1;
377 name = format!("{}{}", func_name, count);
378 }
379 name
380 }
381 _ => unreachable!(),
382 })
383 .chain(include_columns.iter().map(|expr| {
384 match expr {
385 ExprImpl::InputRef(input_ref) => table_catalog
386 .columns()
387 .get(input_ref.index)
388 .unwrap()
389 .name()
390 .to_owned(),
391 _ => unreachable!(),
392 }
393 }))
394 .collect_vec();
395
396 PlanRoot::new_with_logical_plan(
397 logical_project,
398 RequiredDist::PhysicalDist(Distribution::HashShard(
401 (0..distributed_by_columns_len).collect(),
402 )),
403 Order::new(
404 index_columns
405 .iter()
406 .enumerate()
407 .map(|(i, (_, order))| ColumnOrder::new(i, *order))
408 .collect(),
409 ),
410 project_required_cols,
411 out_names,
412 )
413 .gen_index_plan(
414 index_name,
415 database_id,
416 schema_id,
417 definition,
418 retention_seconds,
419 )
420}
421
422pub async fn handle_create_index(
423 handler_args: HandlerArgs,
424 if_not_exists: bool,
425 index_name: ObjectName,
426 table_name: ObjectName,
427 columns: Vec<OrderByExpr>,
428 include: Vec<Ident>,
429 distributed_by: Vec<ast::Expr>,
430) -> Result<RwPgResponse> {
431 let session = handler_args.session.clone();
432
433 let (graph, index_table, index) = {
434 let (schema_name, table, index_table_name) =
435 resolve_index_schema(&session, index_name, table_name)?;
436 let qualified_index_name = ObjectName(vec![
437 Ident::from_real_value(&schema_name),
438 Ident::from_real_value(&index_table_name),
439 ]);
440 if let Either::Right(resp) = session.check_relation_name_duplicated(
441 qualified_index_name,
442 StatementType::CREATE_INDEX,
443 if_not_exists,
444 )? {
445 return Ok(resp);
446 }
447
448 let context = OptimizerContext::from_handler_args(handler_args);
449 let (plan, index_table, index) = gen_create_index_plan(
450 &session,
451 context.into(),
452 schema_name,
453 table,
454 index_table_name,
455 columns,
456 include,
457 distributed_by,
458 )?;
459 let graph = build_graph(plan, Some(GraphJobType::Index))?;
460
461 (graph, index_table, index)
462 };
463
464 tracing::trace!(
465 "name={}, graph=\n{}",
466 index.name,
467 serde_json::to_string_pretty(&graph).unwrap()
468 );
469
470 let _job_guard =
471 session
472 .env()
473 .creating_streaming_job_tracker()
474 .guard(CreatingStreamingJobInfo::new(
475 session.session_id(),
476 index.database_id,
477 index.schema_id,
478 index.name.clone(),
479 ));
480
481 let catalog_writer = session.catalog_writer()?;
482 catalog_writer
483 .create_index(index, index_table.to_prost(), graph, if_not_exists)
484 .await?;
485
486 Ok(PgResponse::empty_result(StatementType::CREATE_INDEX))
487}