risingwave_frontend/handler/
create_index.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::num::NonZeroU32;
17use std::sync::Arc;
18
19use either::Either;
20use fixedbitset::FixedBitSet;
21use itertools::Itertools;
22use pgwire::pg_response::{PgResponse, StatementType};
23use risingwave_common::catalog::{IndexId, TableId};
24use risingwave_common::types::DataType;
25use risingwave_common::util::recursive::{Recurse, tracker};
26use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
27use risingwave_pb::catalog::{
28    CreateType, PbFlatIndexConfig, PbHnswFlatIndexConfig, PbIndex, PbIndexColumnProperties,
29    PbStreamJobStatus, PbVectorIndexInfo,
30};
31use risingwave_pb::common::PbDistanceType;
32use risingwave_sqlparser::ast;
33use risingwave_sqlparser::ast::{Ident, ObjectName, OrderByExpr};
34use thiserror_ext::AsReport;
35
36use super::RwPgResponse;
37use crate::TableCatalog;
38use crate::binder::Binder;
39use crate::catalog::root_catalog::SchemaPath;
40use crate::catalog::{DatabaseId, SchemaId};
41use crate::error::{ErrorCode, Result};
42use crate::expr::{Expr, ExprImpl, ExprRewriter, ExprVisitor, InputRef, is_impure_func_call};
43use crate::handler::HandlerArgs;
44use crate::handler::util::reject_internal_table_dependency;
45use crate::optimizer::plan_expr_rewriter::ConstEvalRewriter;
46use crate::optimizer::plan_node::utils::plan_can_use_background_ddl;
47use crate::optimizer::plan_node::{
48    Explain, LogicalProject, LogicalScan, StreamMaterialize, StreamPlanRef as PlanRef,
49};
50use crate::optimizer::property::{Distribution, Order, RequiredDist};
51use crate::optimizer::{LogicalPlanRoot, OptimizerContext, OptimizerContextRef, PlanRoot};
52use crate::scheduler::streaming_manager::CreatingStreamingJobInfo;
53use crate::session::SessionImpl;
54use crate::session::current::notice_to_user;
55use crate::stream_fragmenter::{GraphJobType, build_graph};
56
57pub(crate) fn resolve_index_schema(
58    session: &SessionImpl,
59    index_name: ObjectName,
60    table_name: ObjectName,
61) -> Result<(String, Arc<TableCatalog>, String)> {
62    let db_name = &session.database();
63    let (schema_name, table_name) = Binder::resolve_schema_qualified_name(db_name, &table_name)?;
64    let search_path = session.config().search_path();
65    let user_name = &session.user_name();
66    let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
67
68    let index_table_name = Binder::resolve_index_name(index_name)?;
69
70    let catalog_reader = session.env().catalog_reader();
71    let read_guard = catalog_reader.read_guard();
72    let (table, schema_name) =
73        read_guard.get_created_table_by_name(db_name, schema_path, &table_name)?;
74    reject_internal_table_dependency(table.as_ref(), "CREATE INDEX")?;
75    Ok((schema_name.to_owned(), table.clone(), index_table_name))
76}
77
78pub(crate) struct IndexColumnExprValidator {
79    allow_impure: bool,
80    result: Result<()>,
81}
82
83impl IndexColumnExprValidator {
84    fn unsupported_expr_err(expr: &ExprImpl) -> ErrorCode {
85        ErrorCode::NotSupported(
86            format!("unsupported index column expression type: {:?}", expr),
87            "use columns or expressions instead".into(),
88        )
89    }
90
91    pub(crate) fn validate(expr: &ExprImpl, allow_impure: bool) -> Result<()> {
92        match expr {
93            ExprImpl::InputRef(_) | ExprImpl::FunctionCall(_) => {}
94            other_expr => {
95                return Err(Self::unsupported_expr_err(other_expr).into());
96            }
97        }
98        let mut visitor = Self {
99            allow_impure,
100            result: Ok(()),
101        };
102        visitor.visit_expr(expr);
103        visitor.result
104    }
105}
106
107impl ExprVisitor for IndexColumnExprValidator {
108    fn visit_expr(&mut self, expr: &ExprImpl) {
109        if self.result.is_err() {
110            return;
111        }
112        tracker!().recurse(|t| {
113            if t.depth_reaches(crate::expr::EXPR_DEPTH_THRESHOLD) {
114                notice_to_user(crate::expr::EXPR_TOO_DEEP_NOTICE);
115            }
116
117            match expr {
118                ExprImpl::InputRef(_) | ExprImpl::Literal(_) => {}
119                ExprImpl::FunctionCall(inner) => {
120                    if !self.allow_impure && is_impure_func_call(inner) {
121                        self.result = Err(ErrorCode::NotSupported(
122                            "this expression is impure".into(),
123                            "use a pure expression instead".into(),
124                        )
125                        .into());
126                        return;
127                    }
128                    self.visit_function_call(inner)
129                }
130                other_expr => {
131                    self.result = Err(Self::unsupported_expr_err(other_expr).into());
132                }
133            }
134        })
135    }
136}
137
138pub(crate) fn gen_create_index_plan(
139    session: &SessionImpl,
140    context: OptimizerContextRef,
141    schema_name: String,
142    table: Arc<TableCatalog>,
143    index_table_name: String,
144    method: Option<Ident>,
145    columns: Vec<OrderByExpr>,
146    include: Vec<Ident>,
147    distributed_by: Vec<ast::Expr>,
148) -> Result<(PlanRef, TableCatalog, PbIndex)> {
149    if table.is_index() {
150        return Err(
151            ErrorCode::InvalidInputSyntax(format!("\"{}\" is an index", table.name)).into(),
152        );
153    }
154
155    if !session.is_super_user() && session.user_id() != table.owner {
156        return Err(ErrorCode::PermissionDenied(format!(
157            "must be owner of table \"{}\"",
158            table.name
159        ))
160        .into());
161    }
162
163    let vector_index_config = if let Some(method) = method {
164        match method.real_value().to_ascii_lowercase().as_str() {
165            "default" => None,
166            "flat" => Some(risingwave_pb::catalog::vector_index_info::PbConfig::Flat(
167                PbFlatIndexConfig {},
168            )),
169            "hnsw" => {
170                let with_options = context.with_options();
171                let parse_non_zero_u32 = |key: &str, default| {
172                    with_options
173                        .get(key)
174                        .map(|v| {
175                            let v = v.parse::<u32>().map_err(|e| {
176                                ErrorCode::InvalidInputSyntax(format!(
177                                    "invalid {} value {}: failed to parse {}",
178                                    key,
179                                    v,
180                                    e.as_report()
181                                ))
182                            })?;
183                            if v == 0 {
184                                Err(ErrorCode::InvalidInputSyntax(format!(
185                                    "invalid {} value {}: should not be zero",
186                                    key, v
187                                )))
188                            } else {
189                                Ok(v)
190                            }
191                        })
192                        .transpose()
193                        .map(|v| v.unwrap_or(default))
194                };
195                Some(
196                    risingwave_pb::catalog::vector_index_info::PbConfig::HnswFlat(
197                        PbHnswFlatIndexConfig {
198                            m: parse_non_zero_u32("m", 16)?, /* default value borrowed from pg_vector */
199                            ef_construction: parse_non_zero_u32("ef_construction", 64)?, /* default value borrowed from pg_vector */
200                            max_level: parse_non_zero_u32("max_level", 10)?,
201                        },
202                    ),
203                )
204            }
205            _ => {
206                return Err(ErrorCode::InvalidInputSyntax(format!(
207                    "invalid index method {}",
208                    method
209                ))
210                .into());
211            }
212        }
213    } else {
214        None
215    };
216
217    let is_vector_index = vector_index_config.is_some();
218
219    if is_vector_index && !table.append_only {
220        return Err(ErrorCode::InvalidInputSyntax(format!(
221            "\"{}\" is not append-only but vector index must be append-only",
222            table.name
223        ))
224        .into());
225    }
226
227    let mut binder = Binder::new_for_stream(session);
228    binder.bind_table(Some(&schema_name), &table.name)?;
229
230    let mut index_columns_ordered_expr = vec![];
231    let mut include_columns_expr = vec![];
232    let mut distributed_columns_expr = vec![];
233    if is_vector_index && columns.len() != 1 {
234        return Err(ErrorCode::InvalidInputSyntax(format!(
235            "vector index must be defined on exactly 1 column, but got {}",
236            columns.len()
237        ))
238        .into());
239    }
240    let mut dimension = None;
241    for column in columns {
242        if is_vector_index && (column.asc.is_some() || column.nulls_first.is_some()) {
243            return Err(ErrorCode::InvalidInputSyntax(
244                "vector index cannot specify order".to_owned(),
245            )
246            .into());
247        }
248        let order_type = OrderType::from_bools(column.asc, column.nulls_first);
249        let expr_impl = binder.bind_expr(&column.expr)?;
250        // Do constant folding and timezone transportation on expressions so that batch queries can match it in the same form.
251        let mut const_eval = ConstEvalRewriter { error: None };
252        let expr_impl = const_eval.rewrite_expr(expr_impl);
253        let expr_impl = context.session_timezone().rewrite_expr(expr_impl);
254        let allow_impure = is_vector_index;
255        IndexColumnExprValidator::validate(&expr_impl, allow_impure)?;
256        if is_vector_index {
257            match expr_impl.return_type() {
258                DataType::Vector(d) => {
259                    dimension = Some(d);
260                }
261                other => {
262                    return Err(ErrorCode::InvalidInputSyntax(format!(
263                        "vector index must be defined on column of Vector type, but got {:?}",
264                        other
265                    ))
266                    .into());
267                }
268            }
269        }
270        index_columns_ordered_expr.push((expr_impl, order_type));
271    }
272
273    if include.is_empty() {
274        // Create index to include all (non-hidden) columns by default.
275        include_columns_expr = table
276            .columns()
277            .iter()
278            .enumerate()
279            .filter(|(_, column)| !column.is_hidden)
280            .map(|(x, column)| {
281                ExprImpl::InputRef(InputRef::new(x, column.column_desc.data_type.clone()).into())
282            })
283            .collect_vec();
284    } else {
285        for column in include {
286            let expr_impl =
287                binder.bind_expr(&risingwave_sqlparser::ast::Expr::Identifier(column))?;
288            include_columns_expr.push(expr_impl);
289        }
290    };
291
292    if is_vector_index && !distributed_by.is_empty() {
293        return Err(ErrorCode::InvalidInputSyntax(
294            "vector index cannot define DISTRIBUTED BY".to_owned(),
295        )
296        .into());
297    }
298
299    for column in distributed_by {
300        let expr_impl = binder.bind_expr(&column)?;
301        distributed_columns_expr.push(expr_impl);
302    }
303
304    // Remove duplicate column of index columns
305    let mut set = HashSet::new();
306    index_columns_ordered_expr = index_columns_ordered_expr
307        .into_iter()
308        .filter(|(expr, _)| match expr {
309            ExprImpl::InputRef(input_ref) => set.insert(input_ref.index),
310            ExprImpl::FunctionCall(_) => true,
311            _ => unreachable!(),
312        })
313        .collect_vec();
314
315    // Remove include columns are already in index columns
316    include_columns_expr = include_columns_expr
317        .into_iter()
318        .filter(|expr| match expr {
319            ExprImpl::InputRef(input_ref) => set.insert(input_ref.index),
320            _ => unreachable!(),
321        })
322        .collect_vec();
323
324    // Remove duplicate columns of distributed by columns
325    let mut set = HashSet::new();
326    let distributed_columns_expr = distributed_columns_expr
327        .into_iter()
328        .filter(|expr| match expr {
329            ExprImpl::InputRef(input_ref) => set.insert(input_ref.index),
330            ExprImpl::FunctionCall(_) => true,
331            _ => unreachable!(),
332        })
333        .collect_vec();
334    // Distributed by columns should be a prefix of index columns
335    if !index_columns_ordered_expr
336        .iter()
337        .map(|(expr, _)| expr.clone())
338        .collect_vec()
339        .starts_with(&distributed_columns_expr)
340    {
341        return Err(ErrorCode::InvalidInputSyntax(
342            "Distributed by columns should be a prefix of index columns".to_owned(),
343        )
344        .into());
345    }
346
347    let (index_database_id, index_schema_id) =
348        session.get_database_and_schema_id_for_create(Some(schema_name))?;
349
350    let (plan, mut index_table) = if let Some(vector_index_config) = vector_index_config {
351        assert!(distributed_columns_expr.is_empty());
352        assert_eq!(1, index_columns_ordered_expr.len());
353        let input = assemble_input(
354            table.clone(),
355            context.clone(),
356            &index_columns_ordered_expr,
357            &include_columns_expr,
358            RequiredDist::PhysicalDist(Distribution::Single),
359        )?;
360        let definition = context.normalized_sql().to_owned();
361        let retention_seconds = table.retention_seconds.and_then(NonZeroU32::new);
362
363        let distance_type = match context
364            .with_options()
365            .get("distance_type")
366            .ok_or_else(|| {
367                ErrorCode::InvalidInputSyntax(
368                    "vector index missing `distance_type` in with options".to_owned(),
369                )
370            })?
371            .to_ascii_lowercase()
372            .as_str()
373        {
374            "l1" => PbDistanceType::L1,
375            "l2" => PbDistanceType::L2Sqr,
376            "inner_product" => PbDistanceType::InnerProduct,
377            "cosine" => PbDistanceType::Cosine,
378            other => {
379                return Err(ErrorCode::InvalidInputSyntax(format!(
380                    "unsupported vector index distance type: {}",
381                    other
382                ))
383                .into());
384            }
385        };
386
387        let vector_index_write = input.gen_vector_index_plan(
388            index_table_name.clone(),
389            index_database_id,
390            index_schema_id,
391            definition,
392            retention_seconds,
393            PbVectorIndexInfo {
394                dimension: dimension.expect("should be set for vector index") as _,
395                config: Some(vector_index_config),
396                distance_type: distance_type as _,
397            },
398        )?;
399        let index_table = vector_index_write.table().clone();
400        let plan: PlanRef = vector_index_write.into();
401        (plan, index_table)
402    } else {
403        // Manually assemble the materialization plan for the index MV.
404        let materialize = assemble_materialize(
405            index_database_id,
406            index_schema_id,
407            table.clone(),
408            context.clone(),
409            index_table_name.clone(),
410            &index_columns_ordered_expr,
411            &include_columns_expr,
412            // We use the first index column as distributed key by default if users
413            // haven't specified the distributed by columns.
414            if distributed_columns_expr.is_empty() {
415                1
416            } else {
417                distributed_columns_expr.len()
418            },
419        )?;
420        let index_table = materialize.table().clone();
421        let plan: PlanRef = materialize.into();
422        (plan, index_table)
423    };
424
425    {
426        // Inherit table properties
427        index_table.retention_seconds = table.retention_seconds;
428    }
429
430    index_table.owner = table.owner;
431
432    let index_columns_len = index_columns_ordered_expr.len() as u32;
433    let index_column_properties = index_columns_ordered_expr
434        .iter()
435        .map(|(_, order)| PbIndexColumnProperties {
436            is_desc: order.is_descending(),
437            nulls_first: order.nulls_are_first(),
438        })
439        .collect();
440    let index_item = build_index_item(
441        &index_table,
442        table.name(),
443        &table,
444        index_columns_ordered_expr,
445    );
446
447    let create_type =
448        if context.session_ctx().config().background_ddl() && plan_can_use_background_ddl(&plan) {
449            CreateType::Background
450        } else {
451            CreateType::Foreground
452        };
453
454    let index_prost = PbIndex {
455        id: IndexId::placeholder(),
456        schema_id: index_schema_id,
457        database_id: index_database_id,
458        name: index_table_name,
459        owner: index_table.owner,
460        index_table_id: TableId::placeholder(),
461        primary_table_id: table.id,
462        index_item,
463        index_column_properties,
464        index_columns_len,
465        initialized_at_epoch: None,
466        created_at_epoch: None,
467        stream_job_status: PbStreamJobStatus::Creating.into(),
468        initialized_at_cluster_version: None,
469        created_at_cluster_version: None,
470        create_type: create_type.into(),
471    };
472
473    let ctx = plan.ctx();
474    let explain_trace = ctx.is_explain_trace();
475    if explain_trace {
476        ctx.trace("Create Index:");
477        ctx.trace(plan.explain_to_string());
478    }
479
480    Ok((plan, index_table, index_prost))
481}
482
483fn build_index_item(
484    index_table: &TableCatalog,
485    primary_table_name: &str,
486    primary_table: &TableCatalog,
487    index_columns: Vec<(ExprImpl, OrderType)>,
488) -> Vec<risingwave_pb::expr::ExprNode> {
489    let primary_table_desc_map = primary_table
490        .columns
491        .iter()
492        .enumerate()
493        .map(|(x, y)| (y.name.clone(), x))
494        .collect::<HashMap<_, _>>();
495
496    let primary_table_name_prefix = format!("{}.", primary_table_name);
497
498    let index_columns_len = index_columns.len();
499    index_columns
500        .into_iter()
501        .map(|(expr, _)| expr.to_expr_proto())
502        .chain(
503            index_table
504                .columns
505                .iter()
506                .map(|c| &c.column_desc)
507                .skip(index_columns_len)
508                .map(|x| {
509                    let name = if x.name.starts_with(&primary_table_name_prefix) {
510                        x.name[primary_table_name_prefix.len()..].to_string()
511                    } else {
512                        x.name.clone()
513                    };
514
515                    let column_index = *primary_table_desc_map.get(&name).unwrap();
516                    InputRef {
517                        index: column_index,
518                        data_type: primary_table
519                            .columns
520                            .get(column_index)
521                            .unwrap()
522                            .data_type
523                            .clone(),
524                    }
525                    .to_expr_proto()
526                }),
527        )
528        .collect_vec()
529}
530
531fn assemble_input(
532    table_catalog: Arc<TableCatalog>,
533    context: OptimizerContextRef,
534    index_columns: &[(ExprImpl, OrderType)],
535    include_columns: &[ExprImpl],
536    required_dist: RequiredDist,
537) -> Result<LogicalPlanRoot> {
538    // Build logical plan and then call gen_create_index_plan
539    // LogicalProject(index_columns, include_columns)
540    //   LogicalScan(table_desc)
541
542    let logical_scan = LogicalScan::create(table_catalog.clone(), context, None);
543
544    let exprs = index_columns
545        .iter()
546        .map(|(expr, _)| expr.clone())
547        .chain(include_columns.iter().cloned())
548        .collect_vec();
549
550    let logical_project = LogicalProject::create(logical_scan.into(), exprs);
551    let mut project_required_cols = FixedBitSet::with_capacity(logical_project.schema().len());
552    project_required_cols.toggle_range(0..logical_project.schema().len());
553
554    let mut col_names = HashSet::new();
555    let mut count = 0;
556
557    let out_names: Vec<String> = index_columns
558        .iter()
559        .map(|(expr, _)| match expr {
560            ExprImpl::InputRef(input_ref) => table_catalog
561                .columns()
562                .get(input_ref.index)
563                .unwrap()
564                .name()
565                .to_owned(),
566            ExprImpl::FunctionCall(func) => {
567                let func_name = func.func_type().as_str_name().to_owned();
568                let mut name = func_name.clone();
569                while !col_names.insert(name.clone()) {
570                    count += 1;
571                    name = format!("{}{}", func_name, count);
572                }
573                name
574            }
575            _ => unreachable!(),
576        })
577        .chain(include_columns.iter().map(|expr| {
578            match expr {
579                ExprImpl::InputRef(input_ref) => table_catalog
580                    .columns()
581                    .get(input_ref.index)
582                    .unwrap()
583                    .name()
584                    .to_owned(),
585                _ => unreachable!(),
586            }
587        }))
588        .collect_vec();
589
590    Ok(PlanRoot::new_with_logical_plan(
591        logical_project,
592        required_dist,
593        Order::new(
594            index_columns
595                .iter()
596                .enumerate()
597                .map(|(i, (_, order))| ColumnOrder::new(i, *order))
598                .collect(),
599        ),
600        project_required_cols,
601        out_names,
602    ))
603}
604
605/// Note: distributed by columns must be a prefix of index columns, so we just use
606/// `distributed_by_columns_len` to represent distributed by columns
607fn assemble_materialize(
608    database_id: DatabaseId,
609    schema_id: SchemaId,
610    table_catalog: Arc<TableCatalog>,
611    context: OptimizerContextRef,
612    index_name: String,
613    index_columns: &[(ExprImpl, OrderType)],
614    include_columns: &[ExprImpl],
615    distributed_by_columns_len: usize,
616) -> Result<StreamMaterialize> {
617    let input = assemble_input(
618        table_catalog.clone(),
619        context.clone(),
620        index_columns,
621        include_columns,
622        // schema of logical_project is such that index columns come first.
623        // so we can use distributed_by_columns_len to represent distributed by columns indices.
624        RequiredDist::PhysicalDist(Distribution::HashShard(
625            (0..distributed_by_columns_len).collect(),
626        )),
627    )?;
628
629    let definition = context.normalized_sql().to_owned();
630    let retention_seconds = table_catalog.retention_seconds.and_then(NonZeroU32::new);
631
632    input.gen_index_plan(
633        index_name,
634        database_id,
635        schema_id,
636        definition,
637        retention_seconds,
638    )
639}
640
641pub async fn handle_create_index(
642    handler_args: HandlerArgs,
643    if_not_exists: bool,
644    index_name: ObjectName,
645    table_name: ObjectName,
646    method: Option<Ident>,
647    columns: Vec<OrderByExpr>,
648    include: Vec<Ident>,
649    distributed_by: Vec<ast::Expr>,
650) -> Result<RwPgResponse> {
651    let session = handler_args.session.clone();
652
653    let (graph, index_table, index) = {
654        let (schema_name, table, index_table_name) =
655            resolve_index_schema(&session, index_name, table_name)?;
656        let qualified_index_name = ObjectName(vec![
657            Ident::from_real_value(&schema_name),
658            Ident::from_real_value(&index_table_name),
659        ]);
660        if let Either::Right(resp) = session.check_relation_name_duplicated(
661            qualified_index_name,
662            StatementType::CREATE_INDEX,
663            if_not_exists,
664        )? {
665            return Ok(resp);
666        }
667
668        let context = OptimizerContext::from_handler_args(handler_args);
669        let (plan, index_table, index) = gen_create_index_plan(
670            &session,
671            context.into(),
672            schema_name,
673            table,
674            index_table_name,
675            method,
676            columns,
677            include,
678            distributed_by,
679        )?;
680        let graph = build_graph(plan, Some(GraphJobType::Index))?;
681
682        (graph, index_table, index)
683    };
684
685    tracing::trace!(
686        "name={}, graph=\n{}",
687        index.name,
688        serde_json::to_string_pretty(&graph).unwrap()
689    );
690
691    let _job_guard =
692        session
693            .env()
694            .creating_streaming_job_tracker()
695            .guard(CreatingStreamingJobInfo::new(
696                session.session_id(),
697                index.database_id,
698                index.schema_id,
699                index.name.clone(),
700            ));
701
702    let catalog_writer = session.catalog_writer()?;
703    catalog_writer
704        .create_index(index, index_table.to_prost(), graph, if_not_exists)
705        .await?;
706
707    Ok(PgResponse::empty_result(StatementType::CREATE_INDEX))
708}