risingwave_frontend/handler/
create_mv.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16
17use either::Either;
18use itertools::Itertools;
19use pgwire::pg_response::{PgResponse, StatementType};
20use risingwave_common::catalog::{FunctionId, ObjectId};
21use risingwave_pb::serverless_backfill_controller::{
22    ProvisionRequest, node_group_controller_service_client,
23};
24use risingwave_pb::stream_plan::PbStreamFragmentGraph;
25use risingwave_sqlparser::ast::{EmitMode, Ident, ObjectName, Query};
26use thiserror_ext::AsReport;
27
28use super::RwPgResponse;
29use crate::binder::{Binder, BoundQuery, BoundSetExpr};
30use crate::catalog::check_column_name_not_reserved;
31use crate::error::ErrorCode::{InvalidInputSyntax, ProtocolError};
32use crate::error::{ErrorCode, Result, RwError};
33use crate::handler::HandlerArgs;
34use crate::handler::util::{LongRunningNotificationAction, execute_with_long_running_notification};
35use crate::optimizer::backfill_order_strategy::plan_backfill_order;
36use crate::optimizer::plan_node::generic::GenericPlanRef;
37use crate::optimizer::plan_node::{Explain, StreamPlanRef as PlanRef};
38use crate::optimizer::{OptimizerContext, OptimizerContextRef, RelationCollectorVisitor};
39use crate::planner::Planner;
40use crate::scheduler::streaming_manager::CreatingStreamingJobInfo;
41use crate::session::{SESSION_MANAGER, SessionImpl};
42use crate::stream_fragmenter::{GraphJobType, build_graph_with_strategy};
43use crate::utils::ordinal;
44use crate::{TableCatalog, WithOptions};
45
46pub const RESOURCE_GROUP_KEY: &str = "resource_group";
47pub const CLOUD_SERVERLESS_BACKFILL_ENABLED: &str = "cloud.serverless_backfill_enabled";
48
49pub(super) fn parse_column_names(columns: &[Ident]) -> Option<Vec<String>> {
50    if columns.is_empty() {
51        None
52    } else {
53        Some(columns.iter().map(|v| v.real_value()).collect())
54    }
55}
56
57/// If columns is empty, it means that the user did not specify the column names.
58/// In this case, we extract the column names from the query.
59/// If columns is not empty, it means that user specify the column names and the user
60/// should guarantee that the column names number are consistent with the query.
61pub(super) fn get_column_names(
62    bound: &BoundQuery,
63    columns: Vec<Ident>,
64) -> Result<Option<Vec<String>>> {
65    let col_names = parse_column_names(&columns);
66    if let BoundSetExpr::Select(select) = &bound.body {
67        // `InputRef`'s alias will be implicitly assigned in `bind_project`.
68        // If user provides columns name (col_names.is_some()), we don't need alias.
69        // For other expressions (col_names.is_none()), we require the user to explicitly assign an
70        // alias.
71        if col_names.is_none() {
72            for (i, alias) in select.aliases.iter().enumerate() {
73                if alias.is_none() {
74                    return Err(ErrorCode::BindError(format!(
75                    "An alias must be specified for the {} expression (counting from 1) in result relation", ordinal(i+1)
76                ))
77                .into());
78                }
79            }
80        }
81    }
82
83    Ok(col_names)
84}
85
86/// Bind and generate create MV plan, return plan and mv table info.
87pub fn gen_create_mv_plan(
88    session: &SessionImpl,
89    context: OptimizerContextRef,
90    query: Query,
91    name: ObjectName,
92    columns: Vec<Ident>,
93    emit_mode: Option<EmitMode>,
94) -> Result<(PlanRef, TableCatalog)> {
95    let mut binder = Binder::new_for_stream(session);
96    let bound = binder.bind_query(&query)?;
97    gen_create_mv_plan_bound(session, context, bound, name, columns, emit_mode)
98}
99
100/// Generate create MV plan from a bound query
101pub fn gen_create_mv_plan_bound(
102    session: &SessionImpl,
103    context: OptimizerContextRef,
104    query: BoundQuery,
105    name: ObjectName,
106    columns: Vec<Ident>,
107    emit_mode: Option<EmitMode>,
108) -> Result<(PlanRef, TableCatalog)> {
109    if session.config().create_compaction_group_for_mv() {
110        context.warn_to_user("The session variable CREATE_COMPACTION_GROUP_FOR_MV has been deprecated. It will not take effect.");
111    }
112
113    let db_name = &session.database();
114    let (schema_name, table_name) = Binder::resolve_schema_qualified_name(db_name, &name)?;
115
116    let (database_id, schema_id) = session.get_database_and_schema_id_for_create(schema_name)?;
117
118    let definition = context.normalized_sql().to_owned();
119
120    let col_names = get_column_names(&query, columns)?;
121
122    let emit_on_window_close = emit_mode == Some(EmitMode::OnWindowClose);
123    if emit_on_window_close {
124        context.warn_to_user("EMIT ON WINDOW CLOSE is currently an experimental feature. Please use it with caution.");
125    }
126
127    let mut plan_root = Planner::new_for_stream(context).plan_query(query)?;
128    if let Some(col_names) = col_names {
129        for name in &col_names {
130            check_column_name_not_reserved(name)?;
131        }
132        plan_root.set_out_names(col_names)?;
133    }
134    let materialize = plan_root.gen_materialize_plan(
135        database_id,
136        schema_id,
137        table_name,
138        definition,
139        emit_on_window_close,
140    )?;
141
142    let mut table = materialize.table().clone();
143    table.owner = session.user_id();
144
145    let plan: PlanRef = materialize.into();
146
147    let ctx = plan.ctx();
148    let explain_trace = ctx.is_explain_trace();
149    if explain_trace {
150        ctx.trace("Create Materialized View:");
151        ctx.trace(plan.explain_to_string());
152    }
153
154    Ok((plan, table))
155}
156
157pub async fn handle_create_mv(
158    handler_args: HandlerArgs,
159    if_not_exists: bool,
160    name: ObjectName,
161    query: Query,
162    columns: Vec<Ident>,
163    emit_mode: Option<EmitMode>,
164) -> Result<RwPgResponse> {
165    let (dependent_relations, dependent_udfs, bound_query) = {
166        let mut binder = Binder::new_for_stream(handler_args.session.as_ref());
167        let bound_query = binder.bind_query(&query)?;
168        (
169            binder.included_relations().clone(),
170            binder.included_udfs().clone(),
171            bound_query,
172        )
173    };
174    handle_create_mv_bound(
175        handler_args,
176        if_not_exists,
177        name,
178        bound_query,
179        dependent_relations,
180        dependent_udfs,
181        columns,
182        emit_mode,
183    )
184    .await
185}
186
187/// Send a provision request to the serverless backfill controller
188pub async fn provision_resource_group(sbc_addr: String) -> Result<String> {
189    let request = tonic::Request::new(ProvisionRequest {});
190    let mut client =
191        node_group_controller_service_client::NodeGroupControllerServiceClient::connect(
192            sbc_addr.clone(),
193        )
194        .await
195        .map_err(|e| {
196            RwError::from(ErrorCode::InternalError(format!(
197                "unable to reach serverless backfill controller at addr {}: {}",
198                sbc_addr,
199                e.as_report()
200            )))
201        })?;
202
203    match client.provision(request).await {
204        Ok(resp) => Ok(resp.into_inner().resource_group),
205        Err(e) => Err(RwError::from(ErrorCode::InternalError(format!(
206            "serverless backfill controller returned error :{}",
207            e.as_report()
208        )))),
209    }
210}
211
212fn get_with_options(handler_args: HandlerArgs) -> WithOptions {
213    let context = OptimizerContext::from_handler_args(handler_args);
214    context.with_options().clone()
215}
216
217pub async fn handle_create_mv_bound(
218    handler_args: HandlerArgs,
219    if_not_exists: bool,
220    name: ObjectName,
221    query: BoundQuery,
222    dependent_relations: HashSet<ObjectId>,
223    dependent_udfs: HashSet<FunctionId>, // TODO(rc): merge with `dependent_relations`
224    columns: Vec<Ident>,
225    emit_mode: Option<EmitMode>,
226) -> Result<RwPgResponse> {
227    let session = handler_args.session.clone();
228
229    // Check cluster limits
230    session.check_cluster_limits().await?;
231
232    if let Either::Right(resp) = session.check_relation_name_duplicated(
233        name.clone(),
234        StatementType::CREATE_MATERIALIZED_VIEW,
235        if_not_exists,
236    )? {
237        return Ok(resp);
238    }
239
240    let (table, graph, dependencies, resource_group) = {
241        gen_create_mv_graph(
242            handler_args,
243            name,
244            query,
245            dependent_relations,
246            dependent_udfs,
247            columns,
248            emit_mode,
249        )
250        .await?
251    };
252
253    // Ensure writes to `StreamJobTracker` are atomic.
254    let _job_guard =
255        session
256            .env()
257            .creating_streaming_job_tracker()
258            .guard(CreatingStreamingJobInfo::new(
259                session.session_id(),
260                table.database_id,
261                table.schema_id,
262                table.name.clone(),
263            ));
264
265    let catalog_writer = session.catalog_writer()?;
266    execute_with_long_running_notification(
267        catalog_writer.create_materialized_view(
268            table.to_prost(),
269            graph,
270            dependencies,
271            resource_group,
272            if_not_exists,
273        ),
274        &session,
275        "CREATE MATERIALIZED VIEW",
276        LongRunningNotificationAction::MonitorBackfillJob,
277    )
278    .await?;
279
280    Ok(PgResponse::empty_result(
281        StatementType::CREATE_MATERIALIZED_VIEW,
282    ))
283}
284
285pub(crate) async fn gen_create_mv_graph(
286    handler_args: HandlerArgs,
287    name: ObjectName,
288    query: BoundQuery,
289    dependent_relations: HashSet<ObjectId>,
290    dependent_udfs: HashSet<FunctionId>,
291    columns: Vec<Ident>,
292    emit_mode: Option<EmitMode>,
293) -> Result<(
294    TableCatalog,
295    PbStreamFragmentGraph,
296    HashSet<ObjectId>,
297    Option<String>,
298)> {
299    let mut with_options = get_with_options(handler_args.clone());
300    let mut resource_group = with_options.remove(&RESOURCE_GROUP_KEY.to_owned());
301
302    if resource_group.is_some() {
303        risingwave_common::license::Feature::ResourceGroup.check_available()?;
304    }
305
306    let is_serverless_backfill = with_options
307        .remove(&CLOUD_SERVERLESS_BACKFILL_ENABLED.to_owned())
308        .unwrap_or_default()
309        .parse::<bool>()
310        .unwrap_or(false);
311
312    if resource_group.is_some() && is_serverless_backfill {
313        return Err(RwError::from(InvalidInputSyntax(
314            "Please do not specify serverless backfilling and resource group together".to_owned(),
315        )));
316    }
317
318    if !with_options.is_empty() {
319        // get other useful fields by `remove`, the logic here is to reject unknown options.
320        return Err(RwError::from(ProtocolError(format!(
321            "unexpected options in WITH clause: {:?}",
322            with_options.keys()
323        ))));
324    }
325
326    let sbc_addr = match SESSION_MANAGER.get() {
327        Some(manager) => manager.env().sbc_address(),
328        None => "",
329    }
330    .to_owned();
331
332    if is_serverless_backfill && sbc_addr.is_empty() {
333        return Err(RwError::from(InvalidInputSyntax(
334            "Serverless Backfill is disabled on-premise. Use RisingWave cloud at https://cloud.risingwave.com/auth/signup to try this feature".to_owned(),
335        )));
336    }
337
338    if is_serverless_backfill {
339        match provision_resource_group(sbc_addr).await {
340            Err(e) => {
341                return Err(RwError::from(ProtocolError(format!(
342                    "failed to provision serverless backfill nodes: {}",
343                    e.as_report()
344                ))));
345            }
346            Ok(val) => resource_group = Some(val),
347        }
348    }
349    tracing::debug!(
350        resource_group = resource_group,
351        "provisioning on resource group"
352    );
353
354    let context = OptimizerContext::from_handler_args(handler_args);
355    let has_order_by = !query.order.is_empty();
356    if has_order_by {
357        context.warn_to_user(r#"The ORDER BY clause in the CREATE MATERIALIZED VIEW statement does not guarantee that the rows selected out of this materialized view is returned in this order.
358It only indicates the physical clustering of the data, which may improve the performance of queries issued against this materialized view.
359"#.to_owned());
360    }
361
362    if resource_group.is_some()
363        && !context
364            .session_ctx()
365            .config()
366            .streaming_use_arrangement_backfill()
367    {
368        return Err(RwError::from(ProtocolError("The session config arrangement backfill must be enabled to use the resource_group option".to_owned())));
369    }
370
371    let context: OptimizerContextRef = context.into();
372    let session = context.session_ctx().as_ref();
373
374    let (plan, table) =
375        gen_create_mv_plan_bound(session, context.clone(), query, name, columns, emit_mode)?;
376
377    let backfill_order = plan_backfill_order(
378        session,
379        context.with_options().backfill_order_strategy(),
380        plan.clone(),
381    )?;
382
383    // TODO(rc): To be consistent with UDF dependency check, we should collect relation dependencies
384    // during binding instead of visiting the optimized plan.
385    let dependencies = RelationCollectorVisitor::collect_with(dependent_relations, plan.clone())
386        .into_iter()
387        .chain(dependent_udfs.iter().copied().map_into())
388        .collect();
389
390    let graph = build_graph_with_strategy(
391        plan,
392        Some(GraphJobType::MaterializedView),
393        Some(backfill_order),
394    )?;
395
396    Ok((table, graph, dependencies, resource_group))
397}
398
399#[cfg(test)]
400pub mod tests {
401    use std::collections::HashMap;
402
403    use pgwire::pg_response::StatementType::CREATE_MATERIALIZED_VIEW;
404    use risingwave_common::catalog::{
405        DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, ROW_ID_COLUMN_NAME, RW_TIMESTAMP_COLUMN_NAME,
406    };
407    use risingwave_common::types::{DataType, StructType};
408
409    use crate::catalog::root_catalog::SchemaPath;
410    use crate::test_utils::{LocalFrontend, PROTO_FILE_DATA, create_proto_file};
411
412    #[tokio::test]
413    async fn test_create_mv_handler() {
414        let proto_file = create_proto_file(PROTO_FILE_DATA);
415        let sql = format!(
416            r#"CREATE SOURCE t1
417    WITH (connector = 'kinesis')
418    FORMAT PLAIN ENCODE PROTOBUF (message = '.test.TestRecord', schema.location = 'file://{}')"#,
419            proto_file.path().to_str().unwrap()
420        );
421        let frontend = LocalFrontend::new(Default::default()).await;
422        frontend.run_sql(sql).await.unwrap();
423
424        let sql = "create materialized view mv1 as select t1.country from t1";
425        frontend.run_sql(sql).await.unwrap();
426
427        let session = frontend.session_ref();
428        let catalog_reader = session.env().catalog_reader().read_guard();
429        let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
430
431        // Check source exists.
432        let (source, _) = catalog_reader
433            .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t1")
434            .unwrap();
435        assert_eq!(source.name, "t1");
436
437        // Check table exists.
438        let (table, _) = catalog_reader
439            .get_created_table_by_name(DEFAULT_DATABASE_NAME, schema_path, "mv1")
440            .unwrap();
441        assert_eq!(table.name(), "mv1");
442
443        let columns = table
444            .columns
445            .iter()
446            .map(|col| (col.name(), col.data_type().clone()))
447            .collect::<HashMap<&str, DataType>>();
448
449        let city_type = StructType::new(vec![
450            ("address", DataType::Varchar),
451            ("zipcode", DataType::Varchar),
452        ])
453        // .with_ids([5, 6].map(ColumnId::new))
454        .into();
455        let expected_columns = maplit::hashmap! {
456            ROW_ID_COLUMN_NAME => DataType::Serial,
457            "country" => StructType::new(
458                 vec![("address", DataType::Varchar),("city", city_type),("zipcode", DataType::Varchar)],
459            )
460            // .with_ids([3, 4, 7].map(ColumnId::new))
461            .into(),
462            RW_TIMESTAMP_COLUMN_NAME => DataType::Timestamptz,
463        };
464        assert_eq!(columns, expected_columns, "{columns:#?}");
465    }
466
467    /// When creating MV, a unique column name must be specified for each column
468    #[tokio::test]
469    async fn test_no_alias() {
470        let frontend = LocalFrontend::new(Default::default()).await;
471
472        let sql = "create table t(x varchar)";
473        frontend.run_sql(sql).await.unwrap();
474
475        // Aggregation without alias is ok.
476        let sql = "create materialized view mv0 as select count(x) from t";
477        frontend.run_sql(sql).await.unwrap();
478
479        // Same aggregations without alias is forbidden, because it make the same column name.
480        let sql = "create materialized view mv1 as select count(x), count(*) from t";
481        let err = frontend.run_sql(sql).await.unwrap_err();
482        assert_eq!(
483            err.to_string(),
484            "Invalid input syntax: column \"count\" specified more than once"
485        );
486
487        // Literal without alias is forbidden.
488        let sql = "create materialized view mv1 as select 1";
489        let err = frontend.run_sql(sql).await.unwrap_err();
490        assert_eq!(
491            err.to_string(),
492            "Bind error: An alias must be specified for the 1st expression (counting from 1) in result relation"
493        );
494
495        // some expression without alias is forbidden.
496        let sql = "create materialized view mv1 as select x is null from t";
497        let err = frontend.run_sql(sql).await.unwrap_err();
498        assert_eq!(
499            err.to_string(),
500            "Bind error: An alias must be specified for the 1st expression (counting from 1) in result relation"
501        );
502    }
503
504    /// Creating MV with order by returns a special notice
505    #[tokio::test]
506    async fn test_create_mv_with_order_by() {
507        let frontend = LocalFrontend::new(Default::default()).await;
508
509        let sql = "create table t(x varchar)";
510        frontend.run_sql(sql).await.unwrap();
511
512        // Without order by
513        let sql = "create materialized view mv1 as select * from t";
514        let response = frontend.run_sql(sql).await.unwrap();
515        assert_eq!(response.stmt_type(), CREATE_MATERIALIZED_VIEW);
516        assert!(response.notices().is_empty());
517
518        // With order by
519        let sql = "create materialized view mv2 as select * from t order by x";
520        let response = frontend.run_sql(sql).await.unwrap();
521        assert_eq!(response.stmt_type(), CREATE_MATERIALIZED_VIEW);
522    }
523}