risingwave_frontend/handler/
create_index.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::num::NonZeroU32;
17use std::sync::Arc;
18
19use either::Either;
20use fixedbitset::FixedBitSet;
21use itertools::Itertools;
22use pgwire::pg_response::{PgResponse, StatementType};
23use risingwave_common::catalog::{IndexId, TableId};
24use risingwave_common::types::DataType;
25use risingwave_common::util::recursive::{Recurse, tracker};
26use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
27use risingwave_pb::catalog::{
28    CreateType, PbFlatIndexConfig, PbHnswFlatIndexConfig, PbIndex, PbIndexColumnProperties,
29    PbStreamJobStatus, PbVectorIndexInfo,
30};
31use risingwave_pb::common::PbDistanceType;
32use risingwave_sqlparser::ast;
33use risingwave_sqlparser::ast::{Ident, ObjectName, OrderByExpr};
34use thiserror_ext::AsReport;
35
36use super::RwPgResponse;
37use crate::TableCatalog;
38use crate::binder::Binder;
39use crate::catalog::root_catalog::SchemaPath;
40use crate::catalog::{DatabaseId, SchemaId};
41use crate::error::{ErrorCode, Result};
42use crate::expr::{Expr, ExprImpl, ExprRewriter, ExprVisitor, InputRef, is_impure_func_call};
43use crate::handler::HandlerArgs;
44use crate::handler::util::reject_internal_table_dependency;
45use crate::optimizer::plan_expr_rewriter::ConstEvalRewriter;
46use crate::optimizer::plan_node::utils::plan_can_use_background_ddl;
47use crate::optimizer::plan_node::{
48    Explain, LogicalProject, LogicalScan, StreamMaterialize, StreamPlanRef as PlanRef,
49    ensure_sync_log_store_fragment_root,
50};
51use crate::optimizer::property::{Distribution, Order, RequiredDist};
52use crate::optimizer::{LogicalPlanRoot, OptimizerContext, OptimizerContextRef, PlanRoot};
53use crate::scheduler::streaming_manager::CreatingStreamingJobInfo;
54use crate::session::SessionImpl;
55use crate::session::current::notice_to_user;
56use crate::stream_fragmenter::{GraphJobType, build_graph};
57
58pub(crate) fn resolve_index_schema(
59    session: &SessionImpl,
60    index_name: ObjectName,
61    table_name: ObjectName,
62) -> Result<(String, Arc<TableCatalog>, String)> {
63    let db_name = &session.database();
64    let (schema_name, table_name) = Binder::resolve_schema_qualified_name(db_name, &table_name)?;
65    let search_path = session.config().search_path();
66    let user_name = &session.user_name();
67    let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
68
69    let index_table_name = Binder::resolve_index_name(index_name)?;
70
71    let catalog_reader = session.env().catalog_reader();
72    let read_guard = catalog_reader.read_guard();
73    let (table, schema_name) =
74        read_guard.get_created_table_by_name(db_name, schema_path, &table_name)?;
75    reject_internal_table_dependency(table.as_ref(), "CREATE INDEX")?;
76    Ok((schema_name.to_owned(), table.clone(), index_table_name))
77}
78
79pub(crate) struct IndexColumnExprValidator {
80    allow_impure: bool,
81    result: Result<()>,
82}
83
84impl IndexColumnExprValidator {
85    fn unsupported_expr_err(expr: &ExprImpl) -> ErrorCode {
86        ErrorCode::NotSupported(
87            format!("unsupported index column expression type: {:?}", expr),
88            "use columns or expressions instead".into(),
89        )
90    }
91
92    pub(crate) fn validate(expr: &ExprImpl, allow_impure: bool) -> Result<()> {
93        match expr {
94            ExprImpl::InputRef(_) | ExprImpl::FunctionCall(_) => {}
95            other_expr => {
96                return Err(Self::unsupported_expr_err(other_expr).into());
97            }
98        }
99        let mut visitor = Self {
100            allow_impure,
101            result: Ok(()),
102        };
103        visitor.visit_expr(expr);
104        visitor.result
105    }
106}
107
108impl ExprVisitor for IndexColumnExprValidator {
109    fn visit_expr(&mut self, expr: &ExprImpl) {
110        if self.result.is_err() {
111            return;
112        }
113        tracker!().recurse(|t| {
114            if t.depth_reaches(crate::expr::EXPR_DEPTH_THRESHOLD) {
115                notice_to_user(crate::expr::EXPR_TOO_DEEP_NOTICE);
116            }
117
118            match expr {
119                ExprImpl::InputRef(_) | ExprImpl::Literal(_) => {}
120                ExprImpl::FunctionCall(inner) => {
121                    if !self.allow_impure && is_impure_func_call(inner) {
122                        self.result = Err(ErrorCode::NotSupported(
123                            "this expression is impure".into(),
124                            "use a pure expression instead".into(),
125                        )
126                        .into());
127                        return;
128                    }
129                    self.visit_function_call(inner)
130                }
131                other_expr => {
132                    self.result = Err(Self::unsupported_expr_err(other_expr).into());
133                }
134            }
135        })
136    }
137}
138
139pub(crate) fn gen_create_index_plan(
140    session: &SessionImpl,
141    context: OptimizerContextRef,
142    schema_name: String,
143    table: Arc<TableCatalog>,
144    index_table_name: String,
145    method: Option<Ident>,
146    columns: Vec<OrderByExpr>,
147    include: Vec<Ident>,
148    distributed_by: Vec<ast::Expr>,
149) -> Result<(PlanRef, TableCatalog, PbIndex)> {
150    if table.is_index() {
151        return Err(
152            ErrorCode::InvalidInputSyntax(format!("\"{}\" is an index", table.name)).into(),
153        );
154    }
155
156    if !session.is_super_user() && session.user_id() != table.owner {
157        return Err(ErrorCode::PermissionDenied(format!(
158            "must be owner of table \"{}\"",
159            table.name
160        ))
161        .into());
162    }
163
164    let vector_index_config = if let Some(method) = method {
165        match method.real_value().to_ascii_lowercase().as_str() {
166            "default" => None,
167            "flat" => Some(risingwave_pb::catalog::vector_index_info::PbConfig::Flat(
168                PbFlatIndexConfig {},
169            )),
170            "hnsw" => {
171                let with_options = context.with_options();
172                let parse_non_zero_u32 = |key: &str, default| {
173                    with_options
174                        .get(key)
175                        .map(|v| {
176                            let v = v.parse::<u32>().map_err(|e| {
177                                ErrorCode::InvalidInputSyntax(format!(
178                                    "invalid {} value {}: failed to parse {}",
179                                    key,
180                                    v,
181                                    e.as_report()
182                                ))
183                            })?;
184                            if v == 0 {
185                                Err(ErrorCode::InvalidInputSyntax(format!(
186                                    "invalid {} value {}: should not be zero",
187                                    key, v
188                                )))
189                            } else {
190                                Ok(v)
191                            }
192                        })
193                        .transpose()
194                        .map(|v| v.unwrap_or(default))
195                };
196                Some(
197                    risingwave_pb::catalog::vector_index_info::PbConfig::HnswFlat(
198                        PbHnswFlatIndexConfig {
199                            m: parse_non_zero_u32("m", 16)?, /* default value borrowed from pg_vector */
200                            ef_construction: parse_non_zero_u32("ef_construction", 64)?, /* default value borrowed from pg_vector */
201                            max_level: parse_non_zero_u32("max_level", 10)?,
202                        },
203                    ),
204                )
205            }
206            _ => {
207                return Err(ErrorCode::InvalidInputSyntax(format!(
208                    "invalid index method {}",
209                    method
210                ))
211                .into());
212            }
213        }
214    } else {
215        None
216    };
217
218    let is_vector_index = vector_index_config.is_some();
219
220    if is_vector_index && !table.append_only {
221        return Err(ErrorCode::InvalidInputSyntax(format!(
222            "\"{}\" is not append-only but vector index must be append-only",
223            table.name
224        ))
225        .into());
226    }
227
228    let mut binder = Binder::new_for_stream(session);
229    binder.bind_table(Some(&schema_name), &table.name)?;
230
231    let mut index_columns_ordered_expr = vec![];
232    let mut include_columns_expr = vec![];
233    let mut distributed_columns_expr = vec![];
234    if is_vector_index && columns.len() != 1 {
235        return Err(ErrorCode::InvalidInputSyntax(format!(
236            "vector index must be defined on exactly 1 column, but got {}",
237            columns.len()
238        ))
239        .into());
240    }
241    let mut dimension = None;
242    for column in columns {
243        if is_vector_index && (column.asc.is_some() || column.nulls_first.is_some()) {
244            return Err(ErrorCode::InvalidInputSyntax(
245                "vector index cannot specify order".to_owned(),
246            )
247            .into());
248        }
249        let order_type = OrderType::from_bools(column.asc, column.nulls_first);
250        let expr_impl = binder.bind_expr(&column.expr)?;
251        // Do constant folding and timezone transportation on expressions so that batch queries can match it in the same form.
252        let mut const_eval = ConstEvalRewriter { error: None };
253        let expr_impl = const_eval.rewrite_expr(expr_impl);
254        let expr_impl = context.session_timezone().rewrite_expr(expr_impl);
255        let allow_impure = is_vector_index;
256        IndexColumnExprValidator::validate(&expr_impl, allow_impure)?;
257        if is_vector_index {
258            match expr_impl.return_type() {
259                DataType::Vector(d) => {
260                    dimension = Some(d);
261                }
262                other => {
263                    return Err(ErrorCode::InvalidInputSyntax(format!(
264                        "vector index must be defined on column of Vector type, but got {:?}",
265                        other
266                    ))
267                    .into());
268                }
269            }
270        }
271        index_columns_ordered_expr.push((expr_impl, order_type));
272    }
273
274    if include.is_empty() {
275        // Create index to include all (non-hidden) columns by default.
276        include_columns_expr = table
277            .columns()
278            .iter()
279            .enumerate()
280            .filter(|(_, column)| !column.is_hidden)
281            .map(|(x, column)| {
282                ExprImpl::InputRef(InputRef::new(x, column.column_desc.data_type.clone()).into())
283            })
284            .collect_vec();
285    } else {
286        for column in include {
287            let expr_impl =
288                binder.bind_expr(&risingwave_sqlparser::ast::Expr::Identifier(column))?;
289            include_columns_expr.push(expr_impl);
290        }
291    };
292
293    if is_vector_index && !distributed_by.is_empty() {
294        return Err(ErrorCode::InvalidInputSyntax(
295            "vector index cannot define DISTRIBUTED BY".to_owned(),
296        )
297        .into());
298    }
299
300    for column in distributed_by {
301        let expr_impl = binder.bind_expr(&column)?;
302        distributed_columns_expr.push(expr_impl);
303    }
304
305    // Remove duplicate column of index columns
306    let mut set = HashSet::new();
307    index_columns_ordered_expr = index_columns_ordered_expr
308        .into_iter()
309        .filter(|(expr, _)| match expr {
310            ExprImpl::InputRef(input_ref) => set.insert(input_ref.index),
311            ExprImpl::FunctionCall(_) => true,
312            _ => unreachable!(),
313        })
314        .collect_vec();
315
316    // Remove include columns are already in index columns
317    include_columns_expr = include_columns_expr
318        .into_iter()
319        .filter(|expr| match expr {
320            ExprImpl::InputRef(input_ref) => set.insert(input_ref.index),
321            _ => unreachable!(),
322        })
323        .collect_vec();
324
325    // Remove duplicate columns of distributed by columns
326    let mut set = HashSet::new();
327    let distributed_columns_expr = distributed_columns_expr
328        .into_iter()
329        .filter(|expr| match expr {
330            ExprImpl::InputRef(input_ref) => set.insert(input_ref.index),
331            ExprImpl::FunctionCall(_) => true,
332            _ => unreachable!(),
333        })
334        .collect_vec();
335    // Distributed by columns should be a prefix of index columns
336    if !index_columns_ordered_expr
337        .iter()
338        .map(|(expr, _)| expr.clone())
339        .collect_vec()
340        .starts_with(&distributed_columns_expr)
341    {
342        return Err(ErrorCode::InvalidInputSyntax(
343            "Distributed by columns should be a prefix of index columns".to_owned(),
344        )
345        .into());
346    }
347
348    let (index_database_id, index_schema_id) =
349        session.get_database_and_schema_id_for_create(Some(schema_name))?;
350
351    let (plan, mut index_table) = if let Some(vector_index_config) = vector_index_config {
352        assert!(distributed_columns_expr.is_empty());
353        assert_eq!(1, index_columns_ordered_expr.len());
354        let input = assemble_input(
355            table.clone(),
356            context.clone(),
357            &index_columns_ordered_expr,
358            &include_columns_expr,
359            RequiredDist::PhysicalDist(Distribution::Single),
360        )?;
361        let definition = context.normalized_sql().to_owned();
362        let retention_seconds = table.retention_seconds.and_then(NonZeroU32::new);
363
364        let distance_type = match context
365            .with_options()
366            .get("distance_type")
367            .ok_or_else(|| {
368                ErrorCode::InvalidInputSyntax(
369                    "vector index missing `distance_type` in with options".to_owned(),
370                )
371            })?
372            .to_ascii_lowercase()
373            .as_str()
374        {
375            "l1" => PbDistanceType::L1,
376            "l2" => PbDistanceType::L2Sqr,
377            "inner_product" => PbDistanceType::InnerProduct,
378            "cosine" => PbDistanceType::Cosine,
379            other => {
380                return Err(ErrorCode::InvalidInputSyntax(format!(
381                    "unsupported vector index distance type: {}",
382                    other
383                ))
384                .into());
385            }
386        };
387
388        let vector_index_write = input.gen_vector_index_plan(
389            index_table_name.clone(),
390            index_database_id,
391            index_schema_id,
392            definition,
393            retention_seconds,
394            PbVectorIndexInfo {
395                dimension: dimension.expect("should be set for vector index") as _,
396                config: Some(vector_index_config),
397                distance_type: distance_type as _,
398            },
399        )?;
400        let index_table = vector_index_write.table().clone();
401        let plan: PlanRef = ensure_sync_log_store_fragment_root(vector_index_write.into());
402        (plan, index_table)
403    } else {
404        // Manually assemble the materialization plan for the index MV.
405        let materialize = assemble_materialize(
406            index_database_id,
407            index_schema_id,
408            table.clone(),
409            context.clone(),
410            index_table_name.clone(),
411            &index_columns_ordered_expr,
412            &include_columns_expr,
413            // We use the first index column as distributed key by default if users
414            // haven't specified the distributed by columns.
415            if distributed_columns_expr.is_empty() {
416                1
417            } else {
418                distributed_columns_expr.len()
419            },
420        )?;
421        let index_table = materialize.table().clone();
422        let plan: PlanRef = ensure_sync_log_store_fragment_root(materialize.into());
423        (plan, index_table)
424    };
425
426    {
427        // Inherit table properties
428        index_table.retention_seconds = table.retention_seconds;
429    }
430
431    index_table.owner = table.owner;
432
433    let index_columns_len = index_columns_ordered_expr.len() as u32;
434    let index_column_properties = index_columns_ordered_expr
435        .iter()
436        .map(|(_, order)| PbIndexColumnProperties {
437            is_desc: order.is_descending(),
438            nulls_first: order.nulls_are_first(),
439        })
440        .collect();
441    let index_item = build_index_item(
442        &index_table,
443        table.name(),
444        &table,
445        index_columns_ordered_expr,
446    );
447
448    let create_type =
449        if context.session_ctx().config().background_ddl() && plan_can_use_background_ddl(&plan) {
450            CreateType::Background
451        } else {
452            CreateType::Foreground
453        };
454
455    let index_prost = PbIndex {
456        id: IndexId::placeholder(),
457        schema_id: index_schema_id,
458        database_id: index_database_id,
459        name: index_table_name,
460        owner: index_table.owner,
461        index_table_id: TableId::placeholder(),
462        primary_table_id: table.id,
463        index_item,
464        index_column_properties,
465        index_columns_len,
466        initialized_at_epoch: None,
467        created_at_epoch: None,
468        stream_job_status: PbStreamJobStatus::Creating.into(),
469        initialized_at_cluster_version: None,
470        created_at_cluster_version: None,
471        create_type: create_type.into(),
472    };
473
474    let ctx = plan.ctx();
475    let explain_trace = ctx.is_explain_trace();
476    if explain_trace {
477        ctx.trace("Create Index:");
478        ctx.trace(plan.explain_to_string());
479    }
480
481    Ok((plan, index_table, index_prost))
482}
483
484fn build_index_item(
485    index_table: &TableCatalog,
486    primary_table_name: &str,
487    primary_table: &TableCatalog,
488    index_columns: Vec<(ExprImpl, OrderType)>,
489) -> Vec<risingwave_pb::expr::ExprNode> {
490    let primary_table_desc_map = primary_table
491        .columns
492        .iter()
493        .enumerate()
494        .map(|(x, y)| (y.name.clone(), x))
495        .collect::<HashMap<_, _>>();
496
497    let primary_table_name_prefix = format!("{}.", primary_table_name);
498
499    let index_columns_len = index_columns.len();
500    index_columns
501        .into_iter()
502        .map(|(expr, _)| expr.to_expr_proto())
503        .chain(
504            index_table
505                .columns
506                .iter()
507                .map(|c| &c.column_desc)
508                .skip(index_columns_len)
509                .map(|x| {
510                    let name = if x.name.starts_with(&primary_table_name_prefix) {
511                        x.name[primary_table_name_prefix.len()..].to_string()
512                    } else {
513                        x.name.clone()
514                    };
515
516                    let column_index = *primary_table_desc_map.get(&name).unwrap();
517                    InputRef {
518                        index: column_index,
519                        data_type: primary_table
520                            .columns
521                            .get(column_index)
522                            .unwrap()
523                            .data_type
524                            .clone(),
525                    }
526                    .to_expr_proto()
527                }),
528        )
529        .collect_vec()
530}
531
532fn assemble_input(
533    table_catalog: Arc<TableCatalog>,
534    context: OptimizerContextRef,
535    index_columns: &[(ExprImpl, OrderType)],
536    include_columns: &[ExprImpl],
537    required_dist: RequiredDist,
538) -> Result<LogicalPlanRoot> {
539    // Build logical plan and then call gen_create_index_plan
540    // LogicalProject(index_columns, include_columns)
541    //   LogicalScan(table_desc)
542
543    let logical_scan = LogicalScan::create(table_catalog.clone(), context, None);
544
545    let exprs = index_columns
546        .iter()
547        .map(|(expr, _)| expr.clone())
548        .chain(include_columns.iter().cloned())
549        .collect_vec();
550
551    let logical_project = LogicalProject::create(logical_scan.into(), exprs);
552    let mut project_required_cols = FixedBitSet::with_capacity(logical_project.schema().len());
553    project_required_cols.toggle_range(0..logical_project.schema().len());
554
555    let mut col_names = HashSet::new();
556    let mut count = 0;
557
558    let out_names: Vec<String> = index_columns
559        .iter()
560        .map(|(expr, _)| match expr {
561            ExprImpl::InputRef(input_ref) => table_catalog
562                .columns()
563                .get(input_ref.index)
564                .unwrap()
565                .name()
566                .to_owned(),
567            ExprImpl::FunctionCall(func) => {
568                let func_name = func.func_type().as_str_name().to_owned();
569                let mut name = func_name.clone();
570                while !col_names.insert(name.clone()) {
571                    count += 1;
572                    name = format!("{}{}", func_name, count);
573                }
574                name
575            }
576            _ => unreachable!(),
577        })
578        .chain(include_columns.iter().map(|expr| {
579            match expr {
580                ExprImpl::InputRef(input_ref) => table_catalog
581                    .columns()
582                    .get(input_ref.index)
583                    .unwrap()
584                    .name()
585                    .to_owned(),
586                _ => unreachable!(),
587            }
588        }))
589        .collect_vec();
590
591    Ok(PlanRoot::new_with_logical_plan(
592        logical_project,
593        required_dist,
594        Order::new(
595            index_columns
596                .iter()
597                .enumerate()
598                .map(|(i, (_, order))| ColumnOrder::new(i, *order))
599                .collect(),
600        ),
601        project_required_cols,
602        out_names,
603    ))
604}
605
606/// Note: distributed by columns must be a prefix of index columns, so we just use
607/// `distributed_by_columns_len` to represent distributed by columns
608fn assemble_materialize(
609    database_id: DatabaseId,
610    schema_id: SchemaId,
611    table_catalog: Arc<TableCatalog>,
612    context: OptimizerContextRef,
613    index_name: String,
614    index_columns: &[(ExprImpl, OrderType)],
615    include_columns: &[ExprImpl],
616    distributed_by_columns_len: usize,
617) -> Result<StreamMaterialize> {
618    let input = assemble_input(
619        table_catalog.clone(),
620        context.clone(),
621        index_columns,
622        include_columns,
623        // schema of logical_project is such that index columns come first.
624        // so we can use distributed_by_columns_len to represent distributed by columns indices.
625        RequiredDist::PhysicalDist(Distribution::HashShard(
626            (0..distributed_by_columns_len).collect(),
627        )),
628    )?;
629
630    let definition = context.normalized_sql().to_owned();
631    let retention_seconds = table_catalog.retention_seconds.and_then(NonZeroU32::new);
632
633    input.gen_index_plan(
634        index_name,
635        database_id,
636        schema_id,
637        definition,
638        retention_seconds,
639    )
640}
641
642pub async fn handle_create_index(
643    handler_args: HandlerArgs,
644    if_not_exists: bool,
645    index_name: ObjectName,
646    table_name: ObjectName,
647    method: Option<Ident>,
648    columns: Vec<OrderByExpr>,
649    include: Vec<Ident>,
650    distributed_by: Vec<ast::Expr>,
651) -> Result<RwPgResponse> {
652    let session = handler_args.session.clone();
653
654    let (graph, index_table, index) = {
655        let (schema_name, table, index_table_name) =
656            resolve_index_schema(&session, index_name, table_name)?;
657        let qualified_index_name = ObjectName(vec![
658            Ident::from_real_value(&schema_name),
659            Ident::from_real_value(&index_table_name),
660        ]);
661        if let Either::Right(resp) = session.check_relation_name_duplicated(
662            qualified_index_name,
663            StatementType::CREATE_INDEX,
664            if_not_exists,
665        )? {
666            return Ok(resp);
667        }
668
669        let context = OptimizerContext::from_handler_args(handler_args);
670        let (plan, index_table, index) = gen_create_index_plan(
671            &session,
672            context.into(),
673            schema_name,
674            table,
675            index_table_name,
676            method,
677            columns,
678            include,
679            distributed_by,
680        )?;
681        let graph = build_graph(plan, Some(GraphJobType::Index))?;
682
683        (graph, index_table, index)
684    };
685
686    tracing::trace!(
687        "name={}, graph=\n{}",
688        index.name,
689        serde_json::to_string_pretty(&graph).unwrap()
690    );
691
692    let _job_guard =
693        session
694            .env()
695            .creating_streaming_job_tracker()
696            .guard(CreatingStreamingJobInfo::new(
697                session.session_id(),
698                index.database_id,
699                index.schema_id,
700                index.name.clone(),
701            ));
702
703    let catalog_writer = session.catalog_writer()?;
704    catalog_writer
705        .create_index(index, index_table.to_prost(), graph, if_not_exists)
706        .await?;
707
708    Ok(PgResponse::empty_result(StatementType::CREATE_INDEX))
709}