1use std::collections::{HashMap, HashSet};
16use std::num::NonZeroU32;
17use std::sync::Arc;
18
19use either::Either;
20use fixedbitset::FixedBitSet;
21use itertools::Itertools;
22use pgwire::pg_response::{PgResponse, StatementType};
23use risingwave_common::catalog::{IndexId, TableId};
24use risingwave_common::types::DataType;
25use risingwave_common::util::recursive::{Recurse, tracker};
26use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
27use risingwave_pb::catalog::{
28 CreateType, PbFlatIndexConfig, PbHnswFlatIndexConfig, PbIndex, PbIndexColumnProperties,
29 PbStreamJobStatus, PbVectorIndexInfo,
30};
31use risingwave_pb::common::PbDistanceType;
32use risingwave_sqlparser::ast;
33use risingwave_sqlparser::ast::{Ident, ObjectName, OrderByExpr};
34use thiserror_ext::AsReport;
35
36use super::RwPgResponse;
37use crate::TableCatalog;
38use crate::binder::Binder;
39use crate::catalog::root_catalog::SchemaPath;
40use crate::catalog::{DatabaseId, SchemaId};
41use crate::error::{ErrorCode, Result};
42use crate::expr::{Expr, ExprImpl, ExprRewriter, ExprVisitor, InputRef, is_impure_func_call};
43use crate::handler::HandlerArgs;
44use crate::handler::util::reject_internal_table_dependency;
45use crate::optimizer::plan_expr_rewriter::ConstEvalRewriter;
46use crate::optimizer::plan_node::utils::plan_can_use_background_ddl;
47use crate::optimizer::plan_node::{
48 Explain, LogicalProject, LogicalScan, StreamMaterialize, StreamPlanRef as PlanRef,
49 ensure_sync_log_store_fragment_root,
50};
51use crate::optimizer::property::{Distribution, Order, RequiredDist};
52use crate::optimizer::{LogicalPlanRoot, OptimizerContext, OptimizerContextRef, PlanRoot};
53use crate::scheduler::streaming_manager::CreatingStreamingJobInfo;
54use crate::session::SessionImpl;
55use crate::session::current::notice_to_user;
56use crate::stream_fragmenter::{GraphJobType, build_graph};
57
58pub(crate) fn resolve_index_schema(
59 session: &SessionImpl,
60 index_name: ObjectName,
61 table_name: ObjectName,
62) -> Result<(String, Arc<TableCatalog>, String)> {
63 let db_name = &session.database();
64 let (schema_name, table_name) = Binder::resolve_schema_qualified_name(db_name, &table_name)?;
65 let search_path = session.config().search_path();
66 let user_name = &session.user_name();
67 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
68
69 let index_table_name = Binder::resolve_index_name(index_name)?;
70
71 let catalog_reader = session.env().catalog_reader();
72 let read_guard = catalog_reader.read_guard();
73 let (table, schema_name) =
74 read_guard.get_created_table_by_name(db_name, schema_path, &table_name)?;
75 reject_internal_table_dependency(table.as_ref(), "CREATE INDEX")?;
76 Ok((schema_name.to_owned(), table.clone(), index_table_name))
77}
78
79pub(crate) struct IndexColumnExprValidator {
80 allow_impure: bool,
81 result: Result<()>,
82}
83
84impl IndexColumnExprValidator {
85 fn unsupported_expr_err(expr: &ExprImpl) -> ErrorCode {
86 ErrorCode::NotSupported(
87 format!("unsupported index column expression type: {:?}", expr),
88 "use columns or expressions instead".into(),
89 )
90 }
91
92 pub(crate) fn validate(expr: &ExprImpl, allow_impure: bool) -> Result<()> {
93 match expr {
94 ExprImpl::InputRef(_) | ExprImpl::FunctionCall(_) => {}
95 other_expr => {
96 return Err(Self::unsupported_expr_err(other_expr).into());
97 }
98 }
99 let mut visitor = Self {
100 allow_impure,
101 result: Ok(()),
102 };
103 visitor.visit_expr(expr);
104 visitor.result
105 }
106}
107
108impl ExprVisitor for IndexColumnExprValidator {
109 fn visit_expr(&mut self, expr: &ExprImpl) {
110 if self.result.is_err() {
111 return;
112 }
113 tracker!().recurse(|t| {
114 if t.depth_reaches(crate::expr::EXPR_DEPTH_THRESHOLD) {
115 notice_to_user(crate::expr::EXPR_TOO_DEEP_NOTICE);
116 }
117
118 match expr {
119 ExprImpl::InputRef(_) | ExprImpl::Literal(_) => {}
120 ExprImpl::FunctionCall(inner) => {
121 if !self.allow_impure && is_impure_func_call(inner) {
122 self.result = Err(ErrorCode::NotSupported(
123 "this expression is impure".into(),
124 "use a pure expression instead".into(),
125 )
126 .into());
127 return;
128 }
129 self.visit_function_call(inner)
130 }
131 other_expr => {
132 self.result = Err(Self::unsupported_expr_err(other_expr).into());
133 }
134 }
135 })
136 }
137}
138
139pub(crate) fn gen_create_index_plan(
140 session: &SessionImpl,
141 context: OptimizerContextRef,
142 schema_name: String,
143 table: Arc<TableCatalog>,
144 index_table_name: String,
145 method: Option<Ident>,
146 columns: Vec<OrderByExpr>,
147 include: Vec<Ident>,
148 distributed_by: Vec<ast::Expr>,
149) -> Result<(PlanRef, TableCatalog, PbIndex)> {
150 if table.is_index() {
151 return Err(
152 ErrorCode::InvalidInputSyntax(format!("\"{}\" is an index", table.name)).into(),
153 );
154 }
155
156 if !session.is_super_user() && session.user_id() != table.owner {
157 return Err(ErrorCode::PermissionDenied(format!(
158 "must be owner of table \"{}\"",
159 table.name
160 ))
161 .into());
162 }
163
164 let vector_index_config = if let Some(method) = method {
165 match method.real_value().to_ascii_lowercase().as_str() {
166 "default" => None,
167 "flat" => Some(risingwave_pb::catalog::vector_index_info::PbConfig::Flat(
168 PbFlatIndexConfig {},
169 )),
170 "hnsw" => {
171 let with_options = context.with_options();
172 let parse_non_zero_u32 = |key: &str, default| {
173 with_options
174 .get(key)
175 .map(|v| {
176 let v = v.parse::<u32>().map_err(|e| {
177 ErrorCode::InvalidInputSyntax(format!(
178 "invalid {} value {}: failed to parse {}",
179 key,
180 v,
181 e.as_report()
182 ))
183 })?;
184 if v == 0 {
185 Err(ErrorCode::InvalidInputSyntax(format!(
186 "invalid {} value {}: should not be zero",
187 key, v
188 )))
189 } else {
190 Ok(v)
191 }
192 })
193 .transpose()
194 .map(|v| v.unwrap_or(default))
195 };
196 Some(
197 risingwave_pb::catalog::vector_index_info::PbConfig::HnswFlat(
198 PbHnswFlatIndexConfig {
199 m: parse_non_zero_u32("m", 16)?, ef_construction: parse_non_zero_u32("ef_construction", 64)?, max_level: parse_non_zero_u32("max_level", 10)?,
202 },
203 ),
204 )
205 }
206 _ => {
207 return Err(ErrorCode::InvalidInputSyntax(format!(
208 "invalid index method {}",
209 method
210 ))
211 .into());
212 }
213 }
214 } else {
215 None
216 };
217
218 let is_vector_index = vector_index_config.is_some();
219
220 if is_vector_index && !table.append_only {
221 return Err(ErrorCode::InvalidInputSyntax(format!(
222 "\"{}\" is not append-only but vector index must be append-only",
223 table.name
224 ))
225 .into());
226 }
227
228 let mut binder = Binder::new_for_stream(session);
229 binder.bind_table(Some(&schema_name), &table.name)?;
230
231 let mut index_columns_ordered_expr = vec![];
232 let mut include_columns_expr = vec![];
233 let mut distributed_columns_expr = vec![];
234 if is_vector_index && columns.len() != 1 {
235 return Err(ErrorCode::InvalidInputSyntax(format!(
236 "vector index must be defined on exactly 1 column, but got {}",
237 columns.len()
238 ))
239 .into());
240 }
241 let mut dimension = None;
242 for column in columns {
243 if is_vector_index && (column.asc.is_some() || column.nulls_first.is_some()) {
244 return Err(ErrorCode::InvalidInputSyntax(
245 "vector index cannot specify order".to_owned(),
246 )
247 .into());
248 }
249 let order_type = OrderType::from_bools(column.asc, column.nulls_first);
250 let expr_impl = binder.bind_expr(&column.expr)?;
251 let mut const_eval = ConstEvalRewriter { error: None };
253 let expr_impl = const_eval.rewrite_expr(expr_impl);
254 let expr_impl = context.session_timezone().rewrite_expr(expr_impl);
255 let allow_impure = is_vector_index;
256 IndexColumnExprValidator::validate(&expr_impl, allow_impure)?;
257 if is_vector_index {
258 match expr_impl.return_type() {
259 DataType::Vector(d) => {
260 dimension = Some(d);
261 }
262 other => {
263 return Err(ErrorCode::InvalidInputSyntax(format!(
264 "vector index must be defined on column of Vector type, but got {:?}",
265 other
266 ))
267 .into());
268 }
269 }
270 }
271 index_columns_ordered_expr.push((expr_impl, order_type));
272 }
273
274 if include.is_empty() {
275 include_columns_expr = table
277 .columns()
278 .iter()
279 .enumerate()
280 .filter(|(_, column)| !column.is_hidden)
281 .map(|(x, column)| {
282 ExprImpl::InputRef(InputRef::new(x, column.column_desc.data_type.clone()).into())
283 })
284 .collect_vec();
285 } else {
286 for column in include {
287 let expr_impl =
288 binder.bind_expr(&risingwave_sqlparser::ast::Expr::Identifier(column))?;
289 include_columns_expr.push(expr_impl);
290 }
291 };
292
293 if is_vector_index && !distributed_by.is_empty() {
294 return Err(ErrorCode::InvalidInputSyntax(
295 "vector index cannot define DISTRIBUTED BY".to_owned(),
296 )
297 .into());
298 }
299
300 for column in distributed_by {
301 let expr_impl = binder.bind_expr(&column)?;
302 distributed_columns_expr.push(expr_impl);
303 }
304
305 let mut set = HashSet::new();
307 index_columns_ordered_expr = index_columns_ordered_expr
308 .into_iter()
309 .filter(|(expr, _)| match expr {
310 ExprImpl::InputRef(input_ref) => set.insert(input_ref.index),
311 ExprImpl::FunctionCall(_) => true,
312 _ => unreachable!(),
313 })
314 .collect_vec();
315
316 include_columns_expr = include_columns_expr
318 .into_iter()
319 .filter(|expr| match expr {
320 ExprImpl::InputRef(input_ref) => set.insert(input_ref.index),
321 _ => unreachable!(),
322 })
323 .collect_vec();
324
325 let mut set = HashSet::new();
327 let distributed_columns_expr = distributed_columns_expr
328 .into_iter()
329 .filter(|expr| match expr {
330 ExprImpl::InputRef(input_ref) => set.insert(input_ref.index),
331 ExprImpl::FunctionCall(_) => true,
332 _ => unreachable!(),
333 })
334 .collect_vec();
335 if !index_columns_ordered_expr
337 .iter()
338 .map(|(expr, _)| expr.clone())
339 .collect_vec()
340 .starts_with(&distributed_columns_expr)
341 {
342 return Err(ErrorCode::InvalidInputSyntax(
343 "Distributed by columns should be a prefix of index columns".to_owned(),
344 )
345 .into());
346 }
347
348 let (index_database_id, index_schema_id) =
349 session.get_database_and_schema_id_for_create(Some(schema_name))?;
350
351 let (plan, mut index_table) = if let Some(vector_index_config) = vector_index_config {
352 assert!(distributed_columns_expr.is_empty());
353 assert_eq!(1, index_columns_ordered_expr.len());
354 let input = assemble_input(
355 table.clone(),
356 context.clone(),
357 &index_columns_ordered_expr,
358 &include_columns_expr,
359 RequiredDist::PhysicalDist(Distribution::Single),
360 )?;
361 let definition = context.normalized_sql().to_owned();
362 let retention_seconds = table.retention_seconds.and_then(NonZeroU32::new);
363
364 let distance_type = match context
365 .with_options()
366 .get("distance_type")
367 .ok_or_else(|| {
368 ErrorCode::InvalidInputSyntax(
369 "vector index missing `distance_type` in with options".to_owned(),
370 )
371 })?
372 .to_ascii_lowercase()
373 .as_str()
374 {
375 "l1" => PbDistanceType::L1,
376 "l2" => PbDistanceType::L2Sqr,
377 "inner_product" => PbDistanceType::InnerProduct,
378 "cosine" => PbDistanceType::Cosine,
379 other => {
380 return Err(ErrorCode::InvalidInputSyntax(format!(
381 "unsupported vector index distance type: {}",
382 other
383 ))
384 .into());
385 }
386 };
387
388 let vector_index_write = input.gen_vector_index_plan(
389 index_table_name.clone(),
390 index_database_id,
391 index_schema_id,
392 definition,
393 retention_seconds,
394 PbVectorIndexInfo {
395 dimension: dimension.expect("should be set for vector index") as _,
396 config: Some(vector_index_config),
397 distance_type: distance_type as _,
398 },
399 )?;
400 let index_table = vector_index_write.table().clone();
401 let plan: PlanRef = ensure_sync_log_store_fragment_root(vector_index_write.into());
402 (plan, index_table)
403 } else {
404 let materialize = assemble_materialize(
406 index_database_id,
407 index_schema_id,
408 table.clone(),
409 context.clone(),
410 index_table_name.clone(),
411 &index_columns_ordered_expr,
412 &include_columns_expr,
413 if distributed_columns_expr.is_empty() {
416 1
417 } else {
418 distributed_columns_expr.len()
419 },
420 )?;
421 let index_table = materialize.table().clone();
422 let plan: PlanRef = ensure_sync_log_store_fragment_root(materialize.into());
423 (plan, index_table)
424 };
425
426 {
427 index_table.retention_seconds = table.retention_seconds;
429 }
430
431 index_table.owner = table.owner;
432
433 let index_columns_len = index_columns_ordered_expr.len() as u32;
434 let index_column_properties = index_columns_ordered_expr
435 .iter()
436 .map(|(_, order)| PbIndexColumnProperties {
437 is_desc: order.is_descending(),
438 nulls_first: order.nulls_are_first(),
439 })
440 .collect();
441 let index_item = build_index_item(
442 &index_table,
443 table.name(),
444 &table,
445 index_columns_ordered_expr,
446 );
447
448 let create_type =
449 if context.session_ctx().config().background_ddl() && plan_can_use_background_ddl(&plan) {
450 CreateType::Background
451 } else {
452 CreateType::Foreground
453 };
454
455 let index_prost = PbIndex {
456 id: IndexId::placeholder(),
457 schema_id: index_schema_id,
458 database_id: index_database_id,
459 name: index_table_name,
460 owner: index_table.owner,
461 index_table_id: TableId::placeholder(),
462 primary_table_id: table.id,
463 index_item,
464 index_column_properties,
465 index_columns_len,
466 initialized_at_epoch: None,
467 created_at_epoch: None,
468 stream_job_status: PbStreamJobStatus::Creating.into(),
469 initialized_at_cluster_version: None,
470 created_at_cluster_version: None,
471 create_type: create_type.into(),
472 };
473
474 let ctx = plan.ctx();
475 let explain_trace = ctx.is_explain_trace();
476 if explain_trace {
477 ctx.trace("Create Index:");
478 ctx.trace(plan.explain_to_string());
479 }
480
481 Ok((plan, index_table, index_prost))
482}
483
484fn build_index_item(
485 index_table: &TableCatalog,
486 primary_table_name: &str,
487 primary_table: &TableCatalog,
488 index_columns: Vec<(ExprImpl, OrderType)>,
489) -> Vec<risingwave_pb::expr::ExprNode> {
490 let primary_table_desc_map = primary_table
491 .columns
492 .iter()
493 .enumerate()
494 .map(|(x, y)| (y.name.clone(), x))
495 .collect::<HashMap<_, _>>();
496
497 let primary_table_name_prefix = format!("{}.", primary_table_name);
498
499 let index_columns_len = index_columns.len();
500 index_columns
501 .into_iter()
502 .map(|(expr, _)| expr.to_expr_proto())
503 .chain(
504 index_table
505 .columns
506 .iter()
507 .map(|c| &c.column_desc)
508 .skip(index_columns_len)
509 .map(|x| {
510 let name = if x.name.starts_with(&primary_table_name_prefix) {
511 x.name[primary_table_name_prefix.len()..].to_string()
512 } else {
513 x.name.clone()
514 };
515
516 let column_index = *primary_table_desc_map.get(&name).unwrap();
517 InputRef {
518 index: column_index,
519 data_type: primary_table
520 .columns
521 .get(column_index)
522 .unwrap()
523 .data_type
524 .clone(),
525 }
526 .to_expr_proto()
527 }),
528 )
529 .collect_vec()
530}
531
532fn assemble_input(
533 table_catalog: Arc<TableCatalog>,
534 context: OptimizerContextRef,
535 index_columns: &[(ExprImpl, OrderType)],
536 include_columns: &[ExprImpl],
537 required_dist: RequiredDist,
538) -> Result<LogicalPlanRoot> {
539 let logical_scan = LogicalScan::create(table_catalog.clone(), context, None);
544
545 let exprs = index_columns
546 .iter()
547 .map(|(expr, _)| expr.clone())
548 .chain(include_columns.iter().cloned())
549 .collect_vec();
550
551 let logical_project = LogicalProject::create(logical_scan.into(), exprs);
552 let mut project_required_cols = FixedBitSet::with_capacity(logical_project.schema().len());
553 project_required_cols.toggle_range(0..logical_project.schema().len());
554
555 let mut col_names = HashSet::new();
556 let mut count = 0;
557
558 let out_names: Vec<String> = index_columns
559 .iter()
560 .map(|(expr, _)| match expr {
561 ExprImpl::InputRef(input_ref) => table_catalog
562 .columns()
563 .get(input_ref.index)
564 .unwrap()
565 .name()
566 .to_owned(),
567 ExprImpl::FunctionCall(func) => {
568 let func_name = func.func_type().as_str_name().to_owned();
569 let mut name = func_name.clone();
570 while !col_names.insert(name.clone()) {
571 count += 1;
572 name = format!("{}{}", func_name, count);
573 }
574 name
575 }
576 _ => unreachable!(),
577 })
578 .chain(include_columns.iter().map(|expr| {
579 match expr {
580 ExprImpl::InputRef(input_ref) => table_catalog
581 .columns()
582 .get(input_ref.index)
583 .unwrap()
584 .name()
585 .to_owned(),
586 _ => unreachable!(),
587 }
588 }))
589 .collect_vec();
590
591 Ok(PlanRoot::new_with_logical_plan(
592 logical_project,
593 required_dist,
594 Order::new(
595 index_columns
596 .iter()
597 .enumerate()
598 .map(|(i, (_, order))| ColumnOrder::new(i, *order))
599 .collect(),
600 ),
601 project_required_cols,
602 out_names,
603 ))
604}
605
606fn assemble_materialize(
609 database_id: DatabaseId,
610 schema_id: SchemaId,
611 table_catalog: Arc<TableCatalog>,
612 context: OptimizerContextRef,
613 index_name: String,
614 index_columns: &[(ExprImpl, OrderType)],
615 include_columns: &[ExprImpl],
616 distributed_by_columns_len: usize,
617) -> Result<StreamMaterialize> {
618 let input = assemble_input(
619 table_catalog.clone(),
620 context.clone(),
621 index_columns,
622 include_columns,
623 RequiredDist::PhysicalDist(Distribution::HashShard(
626 (0..distributed_by_columns_len).collect(),
627 )),
628 )?;
629
630 let definition = context.normalized_sql().to_owned();
631 let retention_seconds = table_catalog.retention_seconds.and_then(NonZeroU32::new);
632
633 input.gen_index_plan(
634 index_name,
635 database_id,
636 schema_id,
637 definition,
638 retention_seconds,
639 )
640}
641
642pub async fn handle_create_index(
643 handler_args: HandlerArgs,
644 if_not_exists: bool,
645 index_name: ObjectName,
646 table_name: ObjectName,
647 method: Option<Ident>,
648 columns: Vec<OrderByExpr>,
649 include: Vec<Ident>,
650 distributed_by: Vec<ast::Expr>,
651) -> Result<RwPgResponse> {
652 let session = handler_args.session.clone();
653
654 let (graph, index_table, index) = {
655 let (schema_name, table, index_table_name) =
656 resolve_index_schema(&session, index_name, table_name)?;
657 let qualified_index_name = ObjectName(vec![
658 Ident::from_real_value(&schema_name),
659 Ident::from_real_value(&index_table_name),
660 ]);
661 if let Either::Right(resp) = session.check_relation_name_duplicated(
662 qualified_index_name,
663 StatementType::CREATE_INDEX,
664 if_not_exists,
665 )? {
666 return Ok(resp);
667 }
668
669 let context = OptimizerContext::from_handler_args(handler_args);
670 let (plan, index_table, index) = gen_create_index_plan(
671 &session,
672 context.into(),
673 schema_name,
674 table,
675 index_table_name,
676 method,
677 columns,
678 include,
679 distributed_by,
680 )?;
681 let graph = build_graph(plan, Some(GraphJobType::Index))?;
682
683 (graph, index_table, index)
684 };
685
686 tracing::trace!(
687 "name={}, graph=\n{}",
688 index.name,
689 serde_json::to_string_pretty(&graph).unwrap()
690 );
691
692 let _job_guard =
693 session
694 .env()
695 .creating_streaming_job_tracker()
696 .guard(CreatingStreamingJobInfo::new(
697 session.session_id(),
698 index.database_id,
699 index.schema_id,
700 index.name.clone(),
701 ));
702
703 let catalog_writer = session.catalog_writer()?;
704 catalog_writer
705 .create_index(index, index_table.to_prost(), graph, if_not_exists)
706 .await?;
707
708 Ok(PgResponse::empty_result(StatementType::CREATE_INDEX))
709}