1use std::collections::{HashMap, HashSet};
16use std::num::NonZeroU32;
17use std::sync::Arc;
18
19use either::Either;
20use fixedbitset::FixedBitSet;
21use itertools::Itertools;
22use pgwire::pg_response::{PgResponse, StatementType};
23use risingwave_common::catalog::{IndexId, TableId};
24use risingwave_common::types::DataType;
25use risingwave_common::util::recursive::{Recurse, tracker};
26use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
27use risingwave_pb::catalog::{
28 CreateType, PbFlatIndexConfig, PbHnswFlatIndexConfig, PbIndex, PbIndexColumnProperties,
29 PbStreamJobStatus, PbVectorIndexInfo,
30};
31use risingwave_pb::common::PbDistanceType;
32use risingwave_sqlparser::ast;
33use risingwave_sqlparser::ast::{Ident, ObjectName, OrderByExpr};
34use thiserror_ext::AsReport;
35
36use super::RwPgResponse;
37use crate::TableCatalog;
38use crate::binder::Binder;
39use crate::catalog::root_catalog::SchemaPath;
40use crate::catalog::{DatabaseId, SchemaId};
41use crate::error::{ErrorCode, Result};
42use crate::expr::{Expr, ExprImpl, ExprRewriter, ExprVisitor, InputRef, is_impure_func_call};
43use crate::handler::HandlerArgs;
44use crate::optimizer::plan_expr_rewriter::ConstEvalRewriter;
45use crate::optimizer::plan_node::utils::plan_can_use_background_ddl;
46use crate::optimizer::plan_node::{
47 Explain, LogicalProject, LogicalScan, StreamMaterialize, StreamPlanRef as PlanRef,
48};
49use crate::optimizer::property::{Distribution, Order, RequiredDist};
50use crate::optimizer::{LogicalPlanRoot, OptimizerContext, OptimizerContextRef, PlanRoot};
51use crate::scheduler::streaming_manager::CreatingStreamingJobInfo;
52use crate::session::SessionImpl;
53use crate::session::current::notice_to_user;
54use crate::stream_fragmenter::{GraphJobType, build_graph};
55
56pub(crate) fn resolve_index_schema(
57 session: &SessionImpl,
58 index_name: ObjectName,
59 table_name: ObjectName,
60) -> Result<(String, Arc<TableCatalog>, String)> {
61 let db_name = &session.database();
62 let (schema_name, table_name) = Binder::resolve_schema_qualified_name(db_name, &table_name)?;
63 let search_path = session.config().search_path();
64 let user_name = &session.user_name();
65 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
66
67 let index_table_name = Binder::resolve_index_name(index_name)?;
68
69 let catalog_reader = session.env().catalog_reader();
70 let read_guard = catalog_reader.read_guard();
71 let (table, schema_name) =
72 read_guard.get_created_table_by_name(db_name, schema_path, &table_name)?;
73 Ok((schema_name.to_owned(), table.clone(), index_table_name))
74}
75
76struct IndexColumnExprValidator {
77 allow_impure: bool,
78 result: Result<()>,
79}
80
81impl IndexColumnExprValidator {
82 fn unsupported_expr_err(expr: &ExprImpl) -> ErrorCode {
83 ErrorCode::NotSupported(
84 format!("unsupported index column expression type: {:?}", expr),
85 "use columns or expressions instead".into(),
86 )
87 }
88
89 fn validate(expr: &ExprImpl, allow_impure: bool) -> Result<()> {
90 match expr {
91 ExprImpl::InputRef(_) | ExprImpl::FunctionCall(_) => {}
92 other_expr => {
93 return Err(Self::unsupported_expr_err(other_expr).into());
94 }
95 }
96 let mut visitor = Self {
97 allow_impure,
98 result: Ok(()),
99 };
100 visitor.visit_expr(expr);
101 visitor.result
102 }
103}
104
105impl ExprVisitor for IndexColumnExprValidator {
106 fn visit_expr(&mut self, expr: &ExprImpl) {
107 if self.result.is_err() {
108 return;
109 }
110 tracker!().recurse(|t| {
111 if t.depth_reaches(crate::expr::EXPR_DEPTH_THRESHOLD) {
112 notice_to_user(crate::expr::EXPR_TOO_DEEP_NOTICE);
113 }
114
115 match expr {
116 ExprImpl::InputRef(_) | ExprImpl::Literal(_) => {}
117 ExprImpl::FunctionCall(inner) => {
118 if !self.allow_impure && is_impure_func_call(inner) {
119 self.result = Err(ErrorCode::NotSupported(
120 "this expression is impure".into(),
121 "use a pure expression instead".into(),
122 )
123 .into());
124 return;
125 }
126 self.visit_function_call(inner)
127 }
128 other_expr => {
129 self.result = Err(Self::unsupported_expr_err(other_expr).into());
130 }
131 }
132 })
133 }
134}
135
136pub(crate) fn gen_create_index_plan(
137 session: &SessionImpl,
138 context: OptimizerContextRef,
139 schema_name: String,
140 table: Arc<TableCatalog>,
141 index_table_name: String,
142 method: Option<Ident>,
143 columns: Vec<OrderByExpr>,
144 include: Vec<Ident>,
145 distributed_by: Vec<ast::Expr>,
146) -> Result<(PlanRef, TableCatalog, PbIndex)> {
147 if table.is_index() {
148 return Err(
149 ErrorCode::InvalidInputSyntax(format!("\"{}\" is an index", table.name)).into(),
150 );
151 }
152
153 if !session.is_super_user() && session.user_id() != table.owner {
154 return Err(ErrorCode::PermissionDenied(format!(
155 "must be owner of table \"{}\"",
156 table.name
157 ))
158 .into());
159 }
160
161 let vector_index_config = if let Some(method) = method {
162 match method.real_value().to_ascii_lowercase().as_str() {
163 "default" => None,
164 "flat" => Some(risingwave_pb::catalog::vector_index_info::PbConfig::Flat(
165 PbFlatIndexConfig {},
166 )),
167 "hnsw" => {
168 let with_options = context.with_options();
169 let parse_non_zero_u32 = |key: &str, default| {
170 with_options
171 .get(key)
172 .map(|v| {
173 let v = v.parse::<u32>().map_err(|e| {
174 ErrorCode::InvalidInputSyntax(format!(
175 "invalid {} value {}: failed to parse {}",
176 key,
177 v,
178 e.as_report()
179 ))
180 })?;
181 if v == 0 {
182 Err(ErrorCode::InvalidInputSyntax(format!(
183 "invalid {} value {}: should not be zero",
184 key, v
185 )))
186 } else {
187 Ok(v)
188 }
189 })
190 .transpose()
191 .map(|v| v.unwrap_or(default))
192 };
193 Some(
194 risingwave_pb::catalog::vector_index_info::PbConfig::HnswFlat(
195 PbHnswFlatIndexConfig {
196 m: parse_non_zero_u32("m", 16)?, ef_construction: parse_non_zero_u32("ef_construction", 64)?, max_level: parse_non_zero_u32("max_level", 10)?,
199 },
200 ),
201 )
202 }
203 _ => {
204 return Err(ErrorCode::InvalidInputSyntax(format!(
205 "invalid index method {}",
206 method
207 ))
208 .into());
209 }
210 }
211 } else {
212 None
213 };
214
215 let is_vector_index = vector_index_config.is_some();
216
217 if is_vector_index && !table.append_only {
218 return Err(ErrorCode::InvalidInputSyntax(format!(
219 "\"{}\" is not append-only but vector index must be append-only",
220 table.name
221 ))
222 .into());
223 }
224
225 let mut binder = Binder::new_for_stream(session);
226 binder.bind_table(Some(&schema_name), &table.name)?;
227
228 let mut index_columns_ordered_expr = vec![];
229 let mut include_columns_expr = vec![];
230 let mut distributed_columns_expr = vec![];
231 if is_vector_index && columns.len() != 1 {
232 return Err(ErrorCode::InvalidInputSyntax(format!(
233 "vector index must be defined on exactly 1 column, but got {}",
234 columns.len()
235 ))
236 .into());
237 }
238 let mut dimension = None;
239 for column in columns {
240 if is_vector_index && (column.asc.is_some() || column.nulls_first.is_some()) {
241 return Err(ErrorCode::InvalidInputSyntax(
242 "vector index cannot specify order".to_owned(),
243 )
244 .into());
245 }
246 let order_type = OrderType::from_bools(column.asc, column.nulls_first);
247 let expr_impl = binder.bind_expr(&column.expr)?;
248 let mut const_eval = ConstEvalRewriter { error: None };
250 let expr_impl = const_eval.rewrite_expr(expr_impl);
251 let expr_impl = context.session_timezone().rewrite_expr(expr_impl);
252 let allow_impure = is_vector_index;
253 IndexColumnExprValidator::validate(&expr_impl, allow_impure)?;
254 if is_vector_index {
255 match expr_impl.return_type() {
256 DataType::Vector(d) => {
257 dimension = Some(d);
258 }
259 other => {
260 return Err(ErrorCode::InvalidInputSyntax(format!(
261 "vector index must be defined on column of Vector type, but got {:?}",
262 other
263 ))
264 .into());
265 }
266 }
267 }
268 index_columns_ordered_expr.push((expr_impl, order_type));
269 }
270
271 if include.is_empty() {
272 include_columns_expr = table
274 .columns()
275 .iter()
276 .enumerate()
277 .filter(|(_, column)| !column.is_hidden)
278 .map(|(x, column)| {
279 ExprImpl::InputRef(InputRef::new(x, column.column_desc.data_type.clone()).into())
280 })
281 .collect_vec();
282 } else {
283 for column in include {
284 let expr_impl =
285 binder.bind_expr(&risingwave_sqlparser::ast::Expr::Identifier(column))?;
286 include_columns_expr.push(expr_impl);
287 }
288 };
289
290 if is_vector_index && !distributed_by.is_empty() {
291 return Err(ErrorCode::InvalidInputSyntax(
292 "vector index cannot define DISTRIBUTED BY".to_owned(),
293 )
294 .into());
295 }
296
297 for column in distributed_by {
298 let expr_impl = binder.bind_expr(&column)?;
299 distributed_columns_expr.push(expr_impl);
300 }
301
302 let mut set = HashSet::new();
304 index_columns_ordered_expr = index_columns_ordered_expr
305 .into_iter()
306 .filter(|(expr, _)| match expr {
307 ExprImpl::InputRef(input_ref) => set.insert(input_ref.index),
308 ExprImpl::FunctionCall(_) => true,
309 _ => unreachable!(),
310 })
311 .collect_vec();
312
313 include_columns_expr = include_columns_expr
315 .into_iter()
316 .filter(|expr| match expr {
317 ExprImpl::InputRef(input_ref) => set.insert(input_ref.index),
318 _ => unreachable!(),
319 })
320 .collect_vec();
321
322 let mut set = HashSet::new();
324 let distributed_columns_expr = distributed_columns_expr
325 .into_iter()
326 .filter(|expr| match expr {
327 ExprImpl::InputRef(input_ref) => set.insert(input_ref.index),
328 ExprImpl::FunctionCall(_) => true,
329 _ => unreachable!(),
330 })
331 .collect_vec();
332 if !index_columns_ordered_expr
334 .iter()
335 .map(|(expr, _)| expr.clone())
336 .collect_vec()
337 .starts_with(&distributed_columns_expr)
338 {
339 return Err(ErrorCode::InvalidInputSyntax(
340 "Distributed by columns should be a prefix of index columns".to_owned(),
341 )
342 .into());
343 }
344
345 let (index_database_id, index_schema_id) =
346 session.get_database_and_schema_id_for_create(Some(schema_name))?;
347
348 let (plan, mut index_table) = if let Some(vector_index_config) = vector_index_config {
349 assert!(distributed_columns_expr.is_empty());
350 assert_eq!(1, index_columns_ordered_expr.len());
351 let input = assemble_input(
352 table.clone(),
353 context.clone(),
354 &index_columns_ordered_expr,
355 &include_columns_expr,
356 RequiredDist::PhysicalDist(Distribution::Single),
357 )?;
358 let definition = context.normalized_sql().to_owned();
359 let retention_seconds = table.retention_seconds.and_then(NonZeroU32::new);
360
361 let distance_type = match context
362 .with_options()
363 .get("distance_type")
364 .ok_or_else(|| {
365 ErrorCode::InvalidInputSyntax(
366 "vector index missing `distance_type` in with options".to_owned(),
367 )
368 })?
369 .to_ascii_lowercase()
370 .as_str()
371 {
372 "l1" => PbDistanceType::L1,
373 "l2" => PbDistanceType::L2Sqr,
374 "inner_product" => PbDistanceType::InnerProduct,
375 "cosine" => PbDistanceType::Cosine,
376 other => {
377 return Err(ErrorCode::InvalidInputSyntax(format!(
378 "unsupported vector index distance type: {}",
379 other
380 ))
381 .into());
382 }
383 };
384
385 let vector_index_write = input.gen_vector_index_plan(
386 index_table_name.clone(),
387 index_database_id,
388 index_schema_id,
389 definition,
390 retention_seconds,
391 PbVectorIndexInfo {
392 dimension: dimension.expect("should be set for vector index") as _,
393 config: Some(vector_index_config),
394 distance_type: distance_type as _,
395 },
396 )?;
397 let index_table = vector_index_write.table().clone();
398 let plan: PlanRef = vector_index_write.into();
399 (plan, index_table)
400 } else {
401 let materialize = assemble_materialize(
403 index_database_id,
404 index_schema_id,
405 table.clone(),
406 context.clone(),
407 index_table_name.clone(),
408 &index_columns_ordered_expr,
409 &include_columns_expr,
410 if distributed_columns_expr.is_empty() {
413 1
414 } else {
415 distributed_columns_expr.len()
416 },
417 )?;
418 let index_table = materialize.table().clone();
419 let plan: PlanRef = materialize.into();
420 (plan, index_table)
421 };
422
423 {
424 index_table.retention_seconds = table.retention_seconds;
426 }
427
428 index_table.owner = table.owner;
429
430 let index_columns_len = index_columns_ordered_expr.len() as u32;
431 let index_column_properties = index_columns_ordered_expr
432 .iter()
433 .map(|(_, order)| PbIndexColumnProperties {
434 is_desc: order.is_descending(),
435 nulls_first: order.nulls_are_first(),
436 })
437 .collect();
438 let index_item = build_index_item(
439 &index_table,
440 table.name(),
441 &table,
442 index_columns_ordered_expr,
443 );
444
445 let create_type =
446 if context.session_ctx().config().background_ddl() && plan_can_use_background_ddl(&plan) {
447 CreateType::Background
448 } else {
449 CreateType::Foreground
450 };
451
452 let index_prost = PbIndex {
453 id: IndexId::placeholder().index_id,
454 schema_id: index_schema_id,
455 database_id: index_database_id,
456 name: index_table_name,
457 owner: index_table.owner,
458 index_table_id: TableId::placeholder().table_id,
459 primary_table_id: table.id.table_id,
460 index_item,
461 index_column_properties,
462 index_columns_len,
463 initialized_at_epoch: None,
464 created_at_epoch: None,
465 stream_job_status: PbStreamJobStatus::Creating.into(),
466 initialized_at_cluster_version: None,
467 created_at_cluster_version: None,
468 create_type: create_type.into(),
469 };
470
471 let ctx = plan.ctx();
472 let explain_trace = ctx.is_explain_trace();
473 if explain_trace {
474 ctx.trace("Create Index:");
475 ctx.trace(plan.explain_to_string());
476 }
477
478 Ok((plan, index_table, index_prost))
479}
480
481fn build_index_item(
482 index_table: &TableCatalog,
483 primary_table_name: &str,
484 primary_table: &TableCatalog,
485 index_columns: Vec<(ExprImpl, OrderType)>,
486) -> Vec<risingwave_pb::expr::ExprNode> {
487 let primary_table_desc_map = primary_table
488 .columns
489 .iter()
490 .enumerate()
491 .map(|(x, y)| (y.name.clone(), x))
492 .collect::<HashMap<_, _>>();
493
494 let primary_table_name_prefix = format!("{}.", primary_table_name);
495
496 let index_columns_len = index_columns.len();
497 index_columns
498 .into_iter()
499 .map(|(expr, _)| expr.to_expr_proto())
500 .chain(
501 index_table
502 .columns
503 .iter()
504 .map(|c| &c.column_desc)
505 .skip(index_columns_len)
506 .map(|x| {
507 let name = if x.name.starts_with(&primary_table_name_prefix) {
508 x.name[primary_table_name_prefix.len()..].to_string()
509 } else {
510 x.name.clone()
511 };
512
513 let column_index = *primary_table_desc_map.get(&name).unwrap();
514 InputRef {
515 index: column_index,
516 data_type: primary_table
517 .columns
518 .get(column_index)
519 .unwrap()
520 .data_type
521 .clone(),
522 }
523 .to_expr_proto()
524 }),
525 )
526 .collect_vec()
527}
528
529fn assemble_input(
530 table_catalog: Arc<TableCatalog>,
531 context: OptimizerContextRef,
532 index_columns: &[(ExprImpl, OrderType)],
533 include_columns: &[ExprImpl],
534 required_dist: RequiredDist,
535) -> Result<LogicalPlanRoot> {
536 let logical_scan = LogicalScan::create(table_catalog.clone(), context, None);
541
542 let exprs = index_columns
543 .iter()
544 .map(|(expr, _)| expr.clone())
545 .chain(include_columns.iter().cloned())
546 .collect_vec();
547
548 let logical_project = LogicalProject::create(logical_scan.into(), exprs);
549 let mut project_required_cols = FixedBitSet::with_capacity(logical_project.schema().len());
550 project_required_cols.toggle_range(0..logical_project.schema().len());
551
552 let mut col_names = HashSet::new();
553 let mut count = 0;
554
555 let out_names: Vec<String> = index_columns
556 .iter()
557 .map(|(expr, _)| match expr {
558 ExprImpl::InputRef(input_ref) => table_catalog
559 .columns()
560 .get(input_ref.index)
561 .unwrap()
562 .name()
563 .to_owned(),
564 ExprImpl::FunctionCall(func) => {
565 let func_name = func.func_type().as_str_name().to_owned();
566 let mut name = func_name.clone();
567 while !col_names.insert(name.clone()) {
568 count += 1;
569 name = format!("{}{}", func_name, count);
570 }
571 name
572 }
573 _ => unreachable!(),
574 })
575 .chain(include_columns.iter().map(|expr| {
576 match expr {
577 ExprImpl::InputRef(input_ref) => table_catalog
578 .columns()
579 .get(input_ref.index)
580 .unwrap()
581 .name()
582 .to_owned(),
583 _ => unreachable!(),
584 }
585 }))
586 .collect_vec();
587
588 Ok(PlanRoot::new_with_logical_plan(
589 logical_project,
590 required_dist,
591 Order::new(
592 index_columns
593 .iter()
594 .enumerate()
595 .map(|(i, (_, order))| ColumnOrder::new(i, *order))
596 .collect(),
597 ),
598 project_required_cols,
599 out_names,
600 ))
601}
602
603fn assemble_materialize(
606 database_id: DatabaseId,
607 schema_id: SchemaId,
608 table_catalog: Arc<TableCatalog>,
609 context: OptimizerContextRef,
610 index_name: String,
611 index_columns: &[(ExprImpl, OrderType)],
612 include_columns: &[ExprImpl],
613 distributed_by_columns_len: usize,
614) -> Result<StreamMaterialize> {
615 let input = assemble_input(
616 table_catalog.clone(),
617 context.clone(),
618 index_columns,
619 include_columns,
620 RequiredDist::PhysicalDist(Distribution::HashShard(
623 (0..distributed_by_columns_len).collect(),
624 )),
625 )?;
626
627 let definition = context.normalized_sql().to_owned();
628 let retention_seconds = table_catalog.retention_seconds.and_then(NonZeroU32::new);
629
630 input.gen_index_plan(
631 index_name,
632 database_id,
633 schema_id,
634 definition,
635 retention_seconds,
636 )
637}
638
639pub async fn handle_create_index(
640 handler_args: HandlerArgs,
641 if_not_exists: bool,
642 index_name: ObjectName,
643 table_name: ObjectName,
644 method: Option<Ident>,
645 columns: Vec<OrderByExpr>,
646 include: Vec<Ident>,
647 distributed_by: Vec<ast::Expr>,
648) -> Result<RwPgResponse> {
649 let session = handler_args.session.clone();
650
651 let (graph, index_table, index) = {
652 let (schema_name, table, index_table_name) =
653 resolve_index_schema(&session, index_name, table_name)?;
654 let qualified_index_name = ObjectName(vec![
655 Ident::from_real_value(&schema_name),
656 Ident::from_real_value(&index_table_name),
657 ]);
658 if let Either::Right(resp) = session.check_relation_name_duplicated(
659 qualified_index_name,
660 StatementType::CREATE_INDEX,
661 if_not_exists,
662 )? {
663 return Ok(resp);
664 }
665
666 let context = OptimizerContext::from_handler_args(handler_args);
667 let (plan, index_table, index) = gen_create_index_plan(
668 &session,
669 context.into(),
670 schema_name,
671 table,
672 index_table_name,
673 method,
674 columns,
675 include,
676 distributed_by,
677 )?;
678 let graph = build_graph(plan, Some(GraphJobType::Index))?;
679
680 (graph, index_table, index)
681 };
682
683 tracing::trace!(
684 "name={}, graph=\n{}",
685 index.name,
686 serde_json::to_string_pretty(&graph).unwrap()
687 );
688
689 let _job_guard =
690 session
691 .env()
692 .creating_streaming_job_tracker()
693 .guard(CreatingStreamingJobInfo::new(
694 session.session_id(),
695 index.database_id,
696 index.schema_id,
697 index.name.clone(),
698 ));
699
700 let catalog_writer = session.catalog_writer()?;
701 catalog_writer
702 .create_index(index, index_table.to_prost(), graph, if_not_exists)
703 .await?;
704
705 Ok(PgResponse::empty_result(StatementType::CREATE_INDEX))
706}