risingwave_frontend/handler/
create_mv.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16
17use either::Either;
18use pgwire::pg_response::{PgResponse, StatementType};
19use risingwave_common::catalog::{FunctionId, ObjectId, TableId};
20use risingwave_pb::serverless_backfill_controller::{
21    ProvisionRequest, node_group_controller_service_client,
22};
23use risingwave_pb::stream_plan::PbStreamFragmentGraph;
24use risingwave_sqlparser::ast::{EmitMode, Ident, ObjectName, Query};
25use thiserror_ext::AsReport;
26
27use super::RwPgResponse;
28use crate::binder::{Binder, BoundQuery, BoundSetExpr};
29use crate::catalog::check_column_name_not_reserved;
30use crate::error::ErrorCode::{InvalidInputSyntax, ProtocolError};
31use crate::error::{ErrorCode, Result, RwError};
32use crate::handler::HandlerArgs;
33use crate::optimizer::backfill_order_strategy::plan_backfill_order;
34use crate::optimizer::plan_node::Explain;
35use crate::optimizer::plan_node::generic::GenericPlanRef;
36use crate::optimizer::{OptimizerContext, OptimizerContextRef, PlanRef, RelationCollectorVisitor};
37use crate::planner::Planner;
38use crate::scheduler::streaming_manager::CreatingStreamingJobInfo;
39use crate::session::{SESSION_MANAGER, SessionImpl};
40use crate::stream_fragmenter::{GraphJobType, build_graph_with_strategy};
41use crate::utils::ordinal;
42use crate::{TableCatalog, WithOptions};
43
44pub const RESOURCE_GROUP_KEY: &str = "resource_group";
45pub const CLOUD_SERVERLESS_BACKFILL_ENABLED: &str = "cloud.serverless_backfill_enabled";
46
47pub(super) fn parse_column_names(columns: &[Ident]) -> Option<Vec<String>> {
48    if columns.is_empty() {
49        None
50    } else {
51        Some(columns.iter().map(|v| v.real_value()).collect())
52    }
53}
54
55/// If columns is empty, it means that the user did not specify the column names.
56/// In this case, we extract the column names from the query.
57/// If columns is not empty, it means that user specify the column names and the user
58/// should guarantee that the column names number are consistent with the query.
59pub(super) fn get_column_names(
60    bound: &BoundQuery,
61    columns: Vec<Ident>,
62) -> Result<Option<Vec<String>>> {
63    let col_names = parse_column_names(&columns);
64    if let BoundSetExpr::Select(select) = &bound.body {
65        // `InputRef`'s alias will be implicitly assigned in `bind_project`.
66        // If user provides columns name (col_names.is_some()), we don't need alias.
67        // For other expressions (col_names.is_none()), we require the user to explicitly assign an
68        // alias.
69        if col_names.is_none() {
70            for (i, alias) in select.aliases.iter().enumerate() {
71                if alias.is_none() {
72                    return Err(ErrorCode::BindError(format!(
73                    "An alias must be specified for the {} expression (counting from 1) in result relation", ordinal(i+1)
74                ))
75                .into());
76                }
77            }
78        }
79    }
80
81    Ok(col_names)
82}
83
84/// Bind and generate create MV plan, return plan and mv table info.
85pub fn gen_create_mv_plan(
86    session: &SessionImpl,
87    context: OptimizerContextRef,
88    query: Query,
89    name: ObjectName,
90    columns: Vec<Ident>,
91    emit_mode: Option<EmitMode>,
92) -> Result<(PlanRef, TableCatalog)> {
93    let mut binder = Binder::new_for_stream(session);
94    let bound = binder.bind_query(query)?;
95    gen_create_mv_plan_bound(session, context, bound, name, columns, emit_mode)
96}
97
98/// Generate create MV plan from a bound query
99pub fn gen_create_mv_plan_bound(
100    session: &SessionImpl,
101    context: OptimizerContextRef,
102    query: BoundQuery,
103    name: ObjectName,
104    columns: Vec<Ident>,
105    emit_mode: Option<EmitMode>,
106) -> Result<(PlanRef, TableCatalog)> {
107    if session.config().create_compaction_group_for_mv() {
108        context.warn_to_user("The session variable CREATE_COMPACTION_GROUP_FOR_MV has been deprecated. It will not take effect.");
109    }
110
111    let db_name = &session.database();
112    let (schema_name, table_name) = Binder::resolve_schema_qualified_name(db_name, name)?;
113
114    let (database_id, schema_id) = session.get_database_and_schema_id_for_create(schema_name)?;
115
116    let definition = context.normalized_sql().to_owned();
117
118    let col_names = get_column_names(&query, columns)?;
119
120    let emit_on_window_close = emit_mode == Some(EmitMode::OnWindowClose);
121    if emit_on_window_close {
122        context.warn_to_user("EMIT ON WINDOW CLOSE is currently an experimental feature. Please use it with caution.");
123    }
124
125    let mut plan_root = Planner::new_for_stream(context).plan_query(query)?;
126    if let Some(col_names) = col_names {
127        for name in &col_names {
128            check_column_name_not_reserved(name)?;
129        }
130        plan_root.set_out_names(col_names)?;
131    }
132    let materialize = plan_root.gen_materialize_plan(
133        database_id,
134        schema_id,
135        table_name,
136        definition,
137        emit_on_window_close,
138    )?;
139
140    let mut table = materialize.table().clone();
141    table.owner = session.user_id();
142
143    let plan: PlanRef = materialize.into();
144
145    let ctx = plan.ctx();
146    let explain_trace = ctx.is_explain_trace();
147    if explain_trace {
148        ctx.trace("Create Materialized View:");
149        ctx.trace(plan.explain_to_string());
150    }
151
152    Ok((plan, table))
153}
154
155pub async fn handle_create_mv(
156    handler_args: HandlerArgs,
157    if_not_exists: bool,
158    name: ObjectName,
159    query: Query,
160    columns: Vec<Ident>,
161    emit_mode: Option<EmitMode>,
162) -> Result<RwPgResponse> {
163    let (dependent_relations, dependent_udfs, bound_query) = {
164        let mut binder = Binder::new_for_stream(handler_args.session.as_ref());
165        let bound_query = binder.bind_query(query)?;
166        (
167            binder.included_relations().clone(),
168            binder.included_udfs().clone(),
169            bound_query,
170        )
171    };
172    handle_create_mv_bound(
173        handler_args,
174        if_not_exists,
175        name,
176        bound_query,
177        dependent_relations,
178        dependent_udfs,
179        columns,
180        emit_mode,
181    )
182    .await
183}
184
185/// Send a provision request to the serverless backfill controller
186pub async fn provision_resource_group(sbc_addr: String) -> Result<String> {
187    let request = tonic::Request::new(ProvisionRequest {});
188    let mut client =
189        node_group_controller_service_client::NodeGroupControllerServiceClient::connect(
190            sbc_addr.clone(),
191        )
192        .await
193        .map_err(|e| {
194            RwError::from(ErrorCode::InternalError(format!(
195                "unable to reach serverless backfill controller at addr {}: {}",
196                sbc_addr,
197                e.as_report()
198            )))
199        })?;
200
201    match client.provision(request).await {
202        Ok(resp) => Ok(resp.into_inner().resource_group),
203        Err(e) => Err(RwError::from(ErrorCode::InternalError(format!(
204            "serverless backfill controller returned error :{}",
205            e.as_report()
206        )))),
207    }
208}
209
210fn get_with_options(handler_args: HandlerArgs) -> WithOptions {
211    let context = OptimizerContext::from_handler_args(handler_args);
212    context.with_options().clone()
213}
214
215pub async fn handle_create_mv_bound(
216    handler_args: HandlerArgs,
217    if_not_exists: bool,
218    name: ObjectName,
219    query: BoundQuery,
220    dependent_relations: HashSet<TableId>,
221    dependent_udfs: HashSet<FunctionId>, // TODO(rc): merge with `dependent_relations`
222    columns: Vec<Ident>,
223    emit_mode: Option<EmitMode>,
224) -> Result<RwPgResponse> {
225    let session = handler_args.session.clone();
226
227    // Check cluster limits
228    session.check_cluster_limits().await?;
229
230    if let Either::Right(resp) = session.check_relation_name_duplicated(
231        name.clone(),
232        StatementType::CREATE_MATERIALIZED_VIEW,
233        if_not_exists,
234    )? {
235        return Ok(resp);
236    }
237
238    let (table, graph, dependencies, resource_group) = {
239        gen_create_mv_graph(
240            handler_args,
241            name,
242            query,
243            dependent_relations,
244            dependent_udfs,
245            columns,
246            emit_mode,
247        )
248        .await?
249    };
250
251    // Ensure writes to `StreamJobTracker` are atomic.
252    let _job_guard =
253        session
254            .env()
255            .creating_streaming_job_tracker()
256            .guard(CreatingStreamingJobInfo::new(
257                session.session_id(),
258                table.database_id,
259                table.schema_id,
260                table.name.clone(),
261            ));
262
263    let catalog_writer = session.catalog_writer()?;
264    catalog_writer
265        .create_materialized_view(
266            table.to_prost(),
267            graph,
268            dependencies,
269            resource_group,
270            if_not_exists,
271        )
272        .await?;
273
274    Ok(PgResponse::empty_result(
275        StatementType::CREATE_MATERIALIZED_VIEW,
276    ))
277}
278
279pub(crate) async fn gen_create_mv_graph(
280    handler_args: HandlerArgs,
281    name: ObjectName,
282    query: BoundQuery,
283    dependent_relations: HashSet<TableId>,
284    dependent_udfs: HashSet<FunctionId>,
285    columns: Vec<Ident>,
286    emit_mode: Option<EmitMode>,
287) -> Result<(
288    TableCatalog,
289    PbStreamFragmentGraph,
290    HashSet<u32>,
291    Option<String>,
292)> {
293    let mut with_options = get_with_options(handler_args.clone());
294    let mut resource_group = with_options.remove(&RESOURCE_GROUP_KEY.to_owned());
295
296    if resource_group.is_some() {
297        risingwave_common::license::Feature::ResourceGroup
298            .check_available()
299            .map_err(|e| anyhow::anyhow!(e))?;
300    }
301
302    let is_serverless_backfill = with_options
303        .remove(&CLOUD_SERVERLESS_BACKFILL_ENABLED.to_owned())
304        .unwrap_or_default()
305        .parse::<bool>()
306        .unwrap_or(false);
307
308    if resource_group.is_some() && is_serverless_backfill {
309        return Err(RwError::from(InvalidInputSyntax(
310            "Please do not specify serverless backfilling and resource group together".to_owned(),
311        )));
312    }
313
314    if !with_options.is_empty() {
315        // get other useful fields by `remove`, the logic here is to reject unknown options.
316        return Err(RwError::from(ProtocolError(format!(
317            "unexpected options in WITH clause: {:?}",
318            with_options.keys()
319        ))));
320    }
321
322    let sbc_addr = match SESSION_MANAGER.get() {
323        Some(manager) => manager.env().sbc_address(),
324        None => "",
325    }
326    .to_owned();
327
328    if is_serverless_backfill && sbc_addr.is_empty() {
329        return Err(RwError::from(InvalidInputSyntax(
330            "Serverless Backfill is disabled on-premise. Use RisingWave cloud at https://cloud.risingwave.com/auth/signup to try this feature".to_owned(),
331        )));
332    }
333
334    if is_serverless_backfill {
335        match provision_resource_group(sbc_addr).await {
336            Err(e) => {
337                return Err(RwError::from(ProtocolError(format!(
338                    "failed to provision serverless backfill nodes: {}",
339                    e.as_report()
340                ))));
341            }
342            Ok(val) => resource_group = Some(val),
343        }
344    }
345    tracing::debug!(
346        resource_group = resource_group,
347        "provisioning on resource group"
348    );
349
350    let context = OptimizerContext::from_handler_args(handler_args);
351    let has_order_by = !query.order.is_empty();
352    if has_order_by {
353        context.warn_to_user(r#"The ORDER BY clause in the CREATE MATERIALIZED VIEW statement does not guarantee that the rows selected out of this materialized view is returned in this order.
354It only indicates the physical clustering of the data, which may improve the performance of queries issued against this materialized view.
355"#.to_owned());
356    }
357
358    if resource_group.is_some()
359        && !context
360            .session_ctx()
361            .config()
362            .streaming_use_arrangement_backfill()
363    {
364        return Err(RwError::from(ProtocolError("The session config arrangement backfill must be enabled to use the resource_group option".to_owned())));
365    }
366
367    let context: OptimizerContextRef = context.into();
368    let session = context.session_ctx().as_ref();
369
370    let (plan, table) =
371        gen_create_mv_plan_bound(session, context.clone(), query, name, columns, emit_mode)?;
372
373    let backfill_order = plan_backfill_order(
374        session,
375        context.with_options().backfill_order_strategy(),
376        plan.clone(),
377    )?;
378
379    // TODO(rc): To be consistent with UDF dependency check, we should collect relation dependencies
380    // during binding instead of visiting the optimized plan.
381    let dependencies = RelationCollectorVisitor::collect_with(dependent_relations, plan.clone())
382        .into_iter()
383        .map(|id| id.table_id() as ObjectId)
384        .chain(
385            dependent_udfs
386                .into_iter()
387                .map(|id| id.function_id() as ObjectId),
388        )
389        .collect();
390
391    let graph = build_graph_with_strategy(
392        plan,
393        Some(GraphJobType::MaterializedView),
394        Some(backfill_order),
395    )?;
396
397    Ok((table, graph, dependencies, resource_group))
398}
399
400#[cfg(test)]
401pub mod tests {
402    use std::collections::HashMap;
403
404    use pgwire::pg_response::StatementType::CREATE_MATERIALIZED_VIEW;
405    use risingwave_common::catalog::{
406        DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, ROW_ID_COLUMN_NAME, RW_TIMESTAMP_COLUMN_NAME,
407    };
408    use risingwave_common::types::{DataType, StructType};
409
410    use crate::catalog::root_catalog::SchemaPath;
411    use crate::test_utils::{LocalFrontend, PROTO_FILE_DATA, create_proto_file};
412
413    #[tokio::test]
414    async fn test_create_mv_handler() {
415        let proto_file = create_proto_file(PROTO_FILE_DATA);
416        let sql = format!(
417            r#"CREATE SOURCE t1
418    WITH (connector = 'kinesis')
419    FORMAT PLAIN ENCODE PROTOBUF (message = '.test.TestRecord', schema.location = 'file://{}')"#,
420            proto_file.path().to_str().unwrap()
421        );
422        let frontend = LocalFrontend::new(Default::default()).await;
423        frontend.run_sql(sql).await.unwrap();
424
425        let sql = "create materialized view mv1 as select t1.country from t1";
426        frontend.run_sql(sql).await.unwrap();
427
428        let session = frontend.session_ref();
429        let catalog_reader = session.env().catalog_reader().read_guard();
430        let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
431
432        // Check source exists.
433        let (source, _) = catalog_reader
434            .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t1")
435            .unwrap();
436        assert_eq!(source.name, "t1");
437
438        // Check table exists.
439        let (table, _) = catalog_reader
440            .get_created_table_by_name(DEFAULT_DATABASE_NAME, schema_path, "mv1")
441            .unwrap();
442        assert_eq!(table.name(), "mv1");
443
444        let columns = table
445            .columns
446            .iter()
447            .map(|col| (col.name(), col.data_type().clone()))
448            .collect::<HashMap<&str, DataType>>();
449
450        let city_type = StructType::new(vec![
451            ("address", DataType::Varchar),
452            ("zipcode", DataType::Varchar),
453        ])
454        // .with_ids([5, 6].map(ColumnId::new))
455        .into();
456        let expected_columns = maplit::hashmap! {
457            ROW_ID_COLUMN_NAME => DataType::Serial,
458            "country" => StructType::new(
459                 vec![("address", DataType::Varchar),("city", city_type),("zipcode", DataType::Varchar)],
460            )
461            // .with_ids([3, 4, 7].map(ColumnId::new))
462            .into(),
463            RW_TIMESTAMP_COLUMN_NAME => DataType::Timestamptz,
464        };
465        assert_eq!(columns, expected_columns, "{columns:#?}");
466    }
467
468    /// When creating MV, a unique column name must be specified for each column
469    #[tokio::test]
470    async fn test_no_alias() {
471        let frontend = LocalFrontend::new(Default::default()).await;
472
473        let sql = "create table t(x varchar)";
474        frontend.run_sql(sql).await.unwrap();
475
476        // Aggregation without alias is ok.
477        let sql = "create materialized view mv0 as select count(x) from t";
478        frontend.run_sql(sql).await.unwrap();
479
480        // Same aggregations without alias is forbidden, because it make the same column name.
481        let sql = "create materialized view mv1 as select count(x), count(*) from t";
482        let err = frontend.run_sql(sql).await.unwrap_err();
483        assert_eq!(
484            err.to_string(),
485            "Invalid input syntax: column \"count\" specified more than once"
486        );
487
488        // Literal without alias is forbidden.
489        let sql = "create materialized view mv1 as select 1";
490        let err = frontend.run_sql(sql).await.unwrap_err();
491        assert_eq!(
492            err.to_string(),
493            "Bind error: An alias must be specified for the 1st expression (counting from 1) in result relation"
494        );
495
496        // some expression without alias is forbidden.
497        let sql = "create materialized view mv1 as select x is null from t";
498        let err = frontend.run_sql(sql).await.unwrap_err();
499        assert_eq!(
500            err.to_string(),
501            "Bind error: An alias must be specified for the 1st expression (counting from 1) in result relation"
502        );
503    }
504
505    /// Creating MV with order by returns a special notice
506    #[tokio::test]
507    async fn test_create_mv_with_order_by() {
508        let frontend = LocalFrontend::new(Default::default()).await;
509
510        let sql = "create table t(x varchar)";
511        frontend.run_sql(sql).await.unwrap();
512
513        // Without order by
514        let sql = "create materialized view mv1 as select * from t";
515        let response = frontend.run_sql(sql).await.unwrap();
516        assert_eq!(response.stmt_type(), CREATE_MATERIALIZED_VIEW);
517        assert!(response.notices().is_empty());
518
519        // With order by
520        let sql = "create materialized view mv2 as select * from t order by x";
521        let response = frontend.run_sql(sql).await.unwrap();
522        assert_eq!(response.stmt_type(), CREATE_MATERIALIZED_VIEW);
523    }
524}