risingwave_frontend/handler/
create_index.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::num::NonZeroU32;
17use std::rc::Rc;
18use std::sync::Arc;
19
20use either::Either;
21use fixedbitset::FixedBitSet;
22use itertools::Itertools;
23use pgwire::pg_response::{PgResponse, StatementType};
24use risingwave_common::catalog::{IndexId, TableDesc, TableId};
25use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
26use risingwave_pb::catalog::{PbIndex, PbIndexColumnProperties, PbStreamJobStatus, PbTable};
27use risingwave_sqlparser::ast;
28use risingwave_sqlparser::ast::{Ident, ObjectName, OrderByExpr};
29
30use super::RwPgResponse;
31use crate::TableCatalog;
32use crate::binder::Binder;
33use crate::catalog::root_catalog::SchemaPath;
34use crate::catalog::{DatabaseId, SchemaId};
35use crate::error::{ErrorCode, Result};
36use crate::expr::{Expr, ExprImpl, ExprRewriter, InputRef};
37use crate::handler::HandlerArgs;
38use crate::optimizer::plan_expr_rewriter::ConstEvalRewriter;
39use crate::optimizer::plan_node::{Explain, LogicalProject, LogicalScan, StreamMaterialize};
40use crate::optimizer::property::{Cardinality, Distribution, Order, RequiredDist};
41use crate::optimizer::{OptimizerContext, OptimizerContextRef, PlanRef, PlanRoot};
42use crate::scheduler::streaming_manager::CreatingStreamingJobInfo;
43use crate::session::SessionImpl;
44use crate::stream_fragmenter::build_graph;
45
46pub(crate) fn resolve_index_schema(
47    session: &SessionImpl,
48    index_name: ObjectName,
49    table_name: ObjectName,
50) -> Result<(String, Arc<TableCatalog>, String)> {
51    let db_name = &session.database();
52    let (schema_name, table_name) = Binder::resolve_schema_qualified_name(db_name, table_name)?;
53    let search_path = session.config().search_path();
54    let user_name = &session.user_name();
55    let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
56
57    let index_table_name = Binder::resolve_index_name(index_name)?;
58
59    let catalog_reader = session.env().catalog_reader();
60    let read_guard = catalog_reader.read_guard();
61    let (table, schema_name) =
62        read_guard.get_created_table_by_name(db_name, schema_path, &table_name)?;
63    Ok((schema_name.to_owned(), table.clone(), index_table_name))
64}
65
66pub(crate) fn gen_create_index_plan(
67    session: &SessionImpl,
68    context: OptimizerContextRef,
69    schema_name: String,
70    table: Arc<TableCatalog>,
71    index_table_name: String,
72    columns: Vec<OrderByExpr>,
73    include: Vec<Ident>,
74    distributed_by: Vec<ast::Expr>,
75) -> Result<(PlanRef, PbTable, PbIndex)> {
76    let table_name = table.name.clone();
77
78    if table.is_index() {
79        return Err(
80            ErrorCode::InvalidInputSyntax(format!("\"{}\" is an index", table.name)).into(),
81        );
82    }
83
84    if !session.is_super_user() && session.user_id() != table.owner {
85        return Err(
86            ErrorCode::PermissionDenied(format!("must be owner of table {}", table.name)).into(),
87        );
88    }
89
90    let mut binder = Binder::new_for_stream(session);
91    binder.bind_table(Some(&schema_name), &table_name)?;
92
93    let mut index_columns_ordered_expr = vec![];
94    let mut include_columns_expr = vec![];
95    let mut distributed_columns_expr = vec![];
96    for column in columns {
97        let order_type = OrderType::from_bools(column.asc, column.nulls_first);
98        let expr_impl = binder.bind_expr(column.expr)?;
99        // Do constant folding and timezone transportation on expressions so that batch queries can match it in the same form.
100        let mut const_eval = ConstEvalRewriter { error: None };
101        let expr_impl = const_eval.rewrite_expr(expr_impl);
102        let expr_impl = context.session_timezone().rewrite_expr(expr_impl);
103        match expr_impl {
104            ExprImpl::InputRef(_) => {}
105            ExprImpl::FunctionCall(_) => {
106                if expr_impl.is_impure() {
107                    return Err(ErrorCode::NotSupported(
108                        "this expression is impure".into(),
109                        "use a pure expression instead".into(),
110                    )
111                    .into());
112                }
113            }
114            _ => {
115                return Err(ErrorCode::NotSupported(
116                    "index columns should be columns or expressions".into(),
117                    "use columns or expressions instead".into(),
118                )
119                .into());
120            }
121        }
122        index_columns_ordered_expr.push((expr_impl, order_type));
123    }
124
125    if include.is_empty() {
126        // Create index to include all (non-hidden) columns by default.
127        include_columns_expr = table
128            .columns()
129            .iter()
130            .enumerate()
131            .filter(|(_, column)| !column.is_hidden)
132            .map(|(x, column)| {
133                ExprImpl::InputRef(InputRef::new(x, column.column_desc.data_type.clone()).into())
134            })
135            .collect_vec();
136    } else {
137        for column in include {
138            let expr_impl =
139                binder.bind_expr(risingwave_sqlparser::ast::Expr::Identifier(column))?;
140            include_columns_expr.push(expr_impl);
141        }
142    };
143
144    for column in distributed_by {
145        let expr_impl = binder.bind_expr(column)?;
146        distributed_columns_expr.push(expr_impl);
147    }
148
149    let table_desc = Rc::new(table.table_desc());
150
151    // Remove duplicate column of index columns
152    let mut set = HashSet::new();
153    index_columns_ordered_expr = index_columns_ordered_expr
154        .into_iter()
155        .filter(|(expr, _)| match expr {
156            ExprImpl::InputRef(input_ref) => set.insert(input_ref.index),
157            ExprImpl::FunctionCall(_) => true,
158            _ => unreachable!(),
159        })
160        .collect_vec();
161
162    // Remove include columns are already in index columns
163    include_columns_expr = include_columns_expr
164        .into_iter()
165        .filter(|expr| match expr {
166            ExprImpl::InputRef(input_ref) => set.insert(input_ref.index),
167            _ => unreachable!(),
168        })
169        .collect_vec();
170
171    // Remove duplicate columns of distributed by columns
172    let mut set = HashSet::new();
173    let distributed_columns_expr = distributed_columns_expr
174        .into_iter()
175        .filter(|expr| match expr {
176            ExprImpl::InputRef(input_ref) => set.insert(input_ref.index),
177            ExprImpl::FunctionCall(_) => true,
178            _ => unreachable!(),
179        })
180        .collect_vec();
181    // Distributed by columns should be a prefix of index columns
182    if !index_columns_ordered_expr
183        .iter()
184        .map(|(expr, _)| expr.clone())
185        .collect_vec()
186        .starts_with(&distributed_columns_expr)
187    {
188        return Err(ErrorCode::InvalidInputSyntax(
189            "Distributed by columns should be a prefix of index columns".to_owned(),
190        )
191        .into());
192    }
193
194    let (index_database_id, index_schema_id) =
195        session.get_database_and_schema_id_for_create(Some(schema_name))?;
196
197    // Manually assemble the materialization plan for the index MV.
198    let materialize = assemble_materialize(
199        table_name,
200        index_database_id,
201        index_schema_id,
202        table.clone(),
203        context,
204        index_table_name.clone(),
205        &index_columns_ordered_expr,
206        &include_columns_expr,
207        // We use the first index column as distributed key by default if users
208        // haven't specified the distributed by columns.
209        if distributed_columns_expr.is_empty() {
210            1
211        } else {
212            distributed_columns_expr.len()
213        },
214        table.cardinality,
215    )?;
216
217    let index_table = materialize.table();
218    let mut index_table_prost = index_table.to_prost();
219    {
220        // Inherit table properties
221        index_table_prost.retention_seconds = table.retention_seconds;
222    }
223
224    index_table_prost.owner = table.owner;
225    index_table_prost.dependent_relations = vec![table.id.table_id];
226
227    let index_columns_len = index_columns_ordered_expr.len() as u32;
228    let index_column_properties = index_columns_ordered_expr
229        .iter()
230        .map(|(_, order)| PbIndexColumnProperties {
231            is_desc: order.is_descending(),
232            nulls_first: order.nulls_are_first(),
233        })
234        .collect();
235    let index_item = build_index_item(
236        index_table,
237        table.name(),
238        table_desc,
239        index_columns_ordered_expr,
240    );
241
242    let index_prost = PbIndex {
243        id: IndexId::placeholder().index_id,
244        schema_id: index_schema_id,
245        database_id: index_database_id,
246        name: index_table_name,
247        owner: index_table_prost.owner,
248        index_table_id: TableId::placeholder().table_id,
249        primary_table_id: table.id.table_id,
250        index_item,
251        index_column_properties,
252        index_columns_len,
253        initialized_at_epoch: None,
254        created_at_epoch: None,
255        stream_job_status: PbStreamJobStatus::Creating.into(),
256        initialized_at_cluster_version: None,
257        created_at_cluster_version: None,
258    };
259
260    let plan: PlanRef = materialize.into();
261    let ctx = plan.ctx();
262    let explain_trace = ctx.is_explain_trace();
263    if explain_trace {
264        ctx.trace("Create Index:");
265        ctx.trace(plan.explain_to_string());
266    }
267
268    Ok((plan, index_table_prost, index_prost))
269}
270
271fn build_index_item(
272    index_table: &TableCatalog,
273    primary_table_name: &str,
274    primary_table_desc: Rc<TableDesc>,
275    index_columns: Vec<(ExprImpl, OrderType)>,
276) -> Vec<risingwave_pb::expr::ExprNode> {
277    let primary_table_desc_map = primary_table_desc
278        .columns
279        .iter()
280        .enumerate()
281        .map(|(x, y)| (y.name.clone(), x))
282        .collect::<HashMap<_, _>>();
283
284    let primary_table_name_prefix = format!("{}.", primary_table_name);
285
286    let index_columns_len = index_columns.len();
287    index_columns
288        .into_iter()
289        .map(|(expr, _)| expr.to_expr_proto())
290        .chain(
291            index_table
292                .columns
293                .iter()
294                .map(|c| &c.column_desc)
295                .skip(index_columns_len)
296                .map(|x| {
297                    let name = if x.name.starts_with(&primary_table_name_prefix) {
298                        x.name[primary_table_name_prefix.len()..].to_string()
299                    } else {
300                        x.name.clone()
301                    };
302
303                    let column_index = *primary_table_desc_map.get(&name).unwrap();
304                    InputRef {
305                        index: column_index,
306                        data_type: primary_table_desc
307                            .columns
308                            .get(column_index)
309                            .unwrap()
310                            .data_type
311                            .clone(),
312                    }
313                    .to_expr_proto()
314                }),
315        )
316        .collect_vec()
317}
318
319/// Note: distributed by columns must be a prefix of index columns, so we just use
320/// `distributed_by_columns_len` to represent distributed by columns
321fn assemble_materialize(
322    table_name: String,
323    database_id: DatabaseId,
324    schema_id: SchemaId,
325    table_catalog: Arc<TableCatalog>,
326    context: OptimizerContextRef,
327    index_name: String,
328    index_columns: &[(ExprImpl, OrderType)],
329    include_columns: &[ExprImpl],
330    distributed_by_columns_len: usize,
331    cardinality: Cardinality,
332) -> Result<StreamMaterialize> {
333    // Build logical plan and then call gen_create_index_plan
334    // LogicalProject(index_columns, include_columns)
335    //   LogicalScan(table_desc)
336
337    let definition = context.normalized_sql().to_owned();
338    let retention_seconds = table_catalog.retention_seconds.and_then(NonZeroU32::new);
339
340    let logical_scan = LogicalScan::create(
341        table_name,
342        table_catalog.clone(),
343        // Index table has no indexes.
344        vec![],
345        context,
346        None,
347        cardinality,
348    );
349
350    let exprs = index_columns
351        .iter()
352        .map(|(expr, _)| expr.clone())
353        .chain(include_columns.iter().cloned())
354        .collect_vec();
355
356    let logical_project = LogicalProject::create(logical_scan.into(), exprs);
357    let mut project_required_cols = FixedBitSet::with_capacity(logical_project.schema().len());
358    project_required_cols.toggle_range(0..logical_project.schema().len());
359
360    let mut col_names = HashSet::new();
361    let mut count = 0;
362
363    let out_names: Vec<String> = index_columns
364        .iter()
365        .map(|(expr, _)| match expr {
366            ExprImpl::InputRef(input_ref) => table_catalog
367                .columns()
368                .get(input_ref.index)
369                .unwrap()
370                .name()
371                .to_owned(),
372            ExprImpl::FunctionCall(func) => {
373                let func_name = func.func_type().as_str_name().to_owned();
374                let mut name = func_name.clone();
375                while !col_names.insert(name.clone()) {
376                    count += 1;
377                    name = format!("{}{}", func_name, count);
378                }
379                name
380            }
381            _ => unreachable!(),
382        })
383        .chain(include_columns.iter().map(|expr| {
384            match expr {
385                ExprImpl::InputRef(input_ref) => table_catalog
386                    .columns()
387                    .get(input_ref.index)
388                    .unwrap()
389                    .name()
390                    .to_owned(),
391                _ => unreachable!(),
392            }
393        }))
394        .collect_vec();
395
396    PlanRoot::new_with_logical_plan(
397        logical_project,
398        // schema of logical_project is such that index columns come first.
399        // so we can use distributed_by_columns_len to represent distributed by columns indices.
400        RequiredDist::PhysicalDist(Distribution::HashShard(
401            (0..distributed_by_columns_len).collect(),
402        )),
403        Order::new(
404            index_columns
405                .iter()
406                .enumerate()
407                .map(|(i, (_, order))| ColumnOrder::new(i, *order))
408                .collect(),
409        ),
410        project_required_cols,
411        out_names,
412    )
413    .gen_index_plan(
414        index_name,
415        database_id,
416        schema_id,
417        definition,
418        retention_seconds,
419    )
420}
421
422pub async fn handle_create_index(
423    handler_args: HandlerArgs,
424    if_not_exists: bool,
425    index_name: ObjectName,
426    table_name: ObjectName,
427    columns: Vec<OrderByExpr>,
428    include: Vec<Ident>,
429    distributed_by: Vec<ast::Expr>,
430) -> Result<RwPgResponse> {
431    let session = handler_args.session.clone();
432
433    let (graph, index_table, index) = {
434        let (schema_name, table, index_table_name) =
435            resolve_index_schema(&session, index_name, table_name)?;
436        let qualified_index_name = ObjectName(vec![
437            Ident::with_quote_unchecked('"', &schema_name),
438            Ident::with_quote_unchecked('"', &index_table_name),
439        ]);
440        if let Either::Right(resp) = session.check_relation_name_duplicated(
441            qualified_index_name,
442            StatementType::CREATE_INDEX,
443            if_not_exists,
444        )? {
445            return Ok(resp);
446        }
447
448        let context = OptimizerContext::from_handler_args(handler_args);
449        let (plan, index_table, index) = gen_create_index_plan(
450            &session,
451            context.into(),
452            schema_name,
453            table,
454            index_table_name,
455            columns,
456            include,
457            distributed_by,
458        )?;
459        let graph = build_graph(plan)?;
460
461        (graph, index_table, index)
462    };
463
464    tracing::trace!(
465        "name={}, graph=\n{}",
466        index.name,
467        serde_json::to_string_pretty(&graph).unwrap()
468    );
469
470    let _job_guard =
471        session
472            .env()
473            .creating_streaming_job_tracker()
474            .guard(CreatingStreamingJobInfo::new(
475                session.session_id(),
476                index.database_id,
477                index.schema_id,
478                index.name.clone(),
479            ));
480
481    let catalog_writer = session.catalog_writer()?;
482    catalog_writer
483        .create_index(index, index_table, graph)
484        .await?;
485
486    Ok(PgResponse::empty_result(StatementType::CREATE_INDEX))
487}