risingwave_frontend/handler/
create_mv.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16
17use either::Either;
18use itertools::Itertools;
19use pgwire::pg_response::{PgResponse, StatementType};
20use risingwave_common::catalog::{FunctionId, ObjectId};
21use risingwave_pb::serverless_backfill_controller::{
22    ProvisionRequest, node_group_controller_service_client,
23};
24use risingwave_pb::stream_plan::PbStreamFragmentGraph;
25use risingwave_sqlparser::ast::{EmitMode, Ident, ObjectName, Query};
26use thiserror_ext::AsReport;
27
28use super::RwPgResponse;
29use crate::binder::{Binder, BoundQuery, BoundSetExpr};
30use crate::catalog::check_column_name_not_reserved;
31use crate::error::ErrorCode::{InvalidInputSyntax, ProtocolError};
32use crate::error::{ErrorCode, Result, RwError};
33use crate::handler::HandlerArgs;
34use crate::optimizer::backfill_order_strategy::plan_backfill_order;
35use crate::optimizer::plan_node::generic::GenericPlanRef;
36use crate::optimizer::plan_node::{Explain, StreamPlanRef as PlanRef};
37use crate::optimizer::{OptimizerContext, OptimizerContextRef, RelationCollectorVisitor};
38use crate::planner::Planner;
39use crate::scheduler::streaming_manager::CreatingStreamingJobInfo;
40use crate::session::{SESSION_MANAGER, SessionImpl};
41use crate::stream_fragmenter::{GraphJobType, build_graph_with_strategy};
42use crate::utils::ordinal;
43use crate::{TableCatalog, WithOptions};
44
45pub const RESOURCE_GROUP_KEY: &str = "resource_group";
46pub const CLOUD_SERVERLESS_BACKFILL_ENABLED: &str = "cloud.serverless_backfill_enabled";
47
48pub(super) fn parse_column_names(columns: &[Ident]) -> Option<Vec<String>> {
49    if columns.is_empty() {
50        None
51    } else {
52        Some(columns.iter().map(|v| v.real_value()).collect())
53    }
54}
55
56/// If columns is empty, it means that the user did not specify the column names.
57/// In this case, we extract the column names from the query.
58/// If columns is not empty, it means that user specify the column names and the user
59/// should guarantee that the column names number are consistent with the query.
60pub(super) fn get_column_names(
61    bound: &BoundQuery,
62    columns: Vec<Ident>,
63) -> Result<Option<Vec<String>>> {
64    let col_names = parse_column_names(&columns);
65    if let BoundSetExpr::Select(select) = &bound.body {
66        // `InputRef`'s alias will be implicitly assigned in `bind_project`.
67        // If user provides columns name (col_names.is_some()), we don't need alias.
68        // For other expressions (col_names.is_none()), we require the user to explicitly assign an
69        // alias.
70        if col_names.is_none() {
71            for (i, alias) in select.aliases.iter().enumerate() {
72                if alias.is_none() {
73                    return Err(ErrorCode::BindError(format!(
74                    "An alias must be specified for the {} expression (counting from 1) in result relation", ordinal(i+1)
75                ))
76                .into());
77                }
78            }
79        }
80    }
81
82    Ok(col_names)
83}
84
85/// Bind and generate create MV plan, return plan and mv table info.
86pub fn gen_create_mv_plan(
87    session: &SessionImpl,
88    context: OptimizerContextRef,
89    query: Query,
90    name: ObjectName,
91    columns: Vec<Ident>,
92    emit_mode: Option<EmitMode>,
93) -> Result<(PlanRef, TableCatalog)> {
94    let mut binder = Binder::new_for_stream(session);
95    let bound = binder.bind_query(&query)?;
96    gen_create_mv_plan_bound(session, context, bound, name, columns, emit_mode)
97}
98
99/// Generate create MV plan from a bound query
100pub fn gen_create_mv_plan_bound(
101    session: &SessionImpl,
102    context: OptimizerContextRef,
103    query: BoundQuery,
104    name: ObjectName,
105    columns: Vec<Ident>,
106    emit_mode: Option<EmitMode>,
107) -> Result<(PlanRef, TableCatalog)> {
108    if session.config().create_compaction_group_for_mv() {
109        context.warn_to_user("The session variable CREATE_COMPACTION_GROUP_FOR_MV has been deprecated. It will not take effect.");
110    }
111
112    let db_name = &session.database();
113    let (schema_name, table_name) = Binder::resolve_schema_qualified_name(db_name, &name)?;
114
115    let (database_id, schema_id) = session.get_database_and_schema_id_for_create(schema_name)?;
116
117    let definition = context.normalized_sql().to_owned();
118
119    let col_names = get_column_names(&query, columns)?;
120
121    let emit_on_window_close = emit_mode == Some(EmitMode::OnWindowClose);
122    if emit_on_window_close {
123        context.warn_to_user("EMIT ON WINDOW CLOSE is currently an experimental feature. Please use it with caution.");
124    }
125
126    let mut plan_root = Planner::new_for_stream(context).plan_query(query)?;
127    if let Some(col_names) = col_names {
128        for name in &col_names {
129            check_column_name_not_reserved(name)?;
130        }
131        plan_root.set_out_names(col_names)?;
132    }
133    let materialize = plan_root.gen_materialize_plan(
134        database_id,
135        schema_id,
136        table_name,
137        definition,
138        emit_on_window_close,
139    )?;
140
141    let mut table = materialize.table().clone();
142    table.owner = session.user_id();
143
144    let plan: PlanRef = materialize.into();
145
146    let ctx = plan.ctx();
147    let explain_trace = ctx.is_explain_trace();
148    if explain_trace {
149        ctx.trace("Create Materialized View:");
150        ctx.trace(plan.explain_to_string());
151    }
152
153    Ok((plan, table))
154}
155
156pub async fn handle_create_mv(
157    handler_args: HandlerArgs,
158    if_not_exists: bool,
159    name: ObjectName,
160    query: Query,
161    columns: Vec<Ident>,
162    emit_mode: Option<EmitMode>,
163) -> Result<RwPgResponse> {
164    let (dependent_relations, dependent_udfs, bound_query) = {
165        let mut binder = Binder::new_for_stream(handler_args.session.as_ref());
166        let bound_query = binder.bind_query(&query)?;
167        (
168            binder.included_relations().clone(),
169            binder.included_udfs().clone(),
170            bound_query,
171        )
172    };
173    handle_create_mv_bound(
174        handler_args,
175        if_not_exists,
176        name,
177        bound_query,
178        dependent_relations,
179        dependent_udfs,
180        columns,
181        emit_mode,
182    )
183    .await
184}
185
186/// Send a provision request to the serverless backfill controller
187pub async fn provision_resource_group(sbc_addr: String) -> Result<String> {
188    let request = tonic::Request::new(ProvisionRequest {});
189    let mut client =
190        node_group_controller_service_client::NodeGroupControllerServiceClient::connect(
191            sbc_addr.clone(),
192        )
193        .await
194        .map_err(|e| {
195            RwError::from(ErrorCode::InternalError(format!(
196                "unable to reach serverless backfill controller at addr {}: {}",
197                sbc_addr,
198                e.as_report()
199            )))
200        })?;
201
202    match client.provision(request).await {
203        Ok(resp) => Ok(resp.into_inner().resource_group),
204        Err(e) => Err(RwError::from(ErrorCode::InternalError(format!(
205            "serverless backfill controller returned error :{}",
206            e.as_report()
207        )))),
208    }
209}
210
211fn get_with_options(handler_args: HandlerArgs) -> WithOptions {
212    let context = OptimizerContext::from_handler_args(handler_args);
213    context.with_options().clone()
214}
215
216pub async fn handle_create_mv_bound(
217    handler_args: HandlerArgs,
218    if_not_exists: bool,
219    name: ObjectName,
220    query: BoundQuery,
221    dependent_relations: HashSet<ObjectId>,
222    dependent_udfs: HashSet<FunctionId>, // TODO(rc): merge with `dependent_relations`
223    columns: Vec<Ident>,
224    emit_mode: Option<EmitMode>,
225) -> Result<RwPgResponse> {
226    let session = handler_args.session.clone();
227
228    // Check cluster limits
229    session.check_cluster_limits().await?;
230
231    if let Either::Right(resp) = session.check_relation_name_duplicated(
232        name.clone(),
233        StatementType::CREATE_MATERIALIZED_VIEW,
234        if_not_exists,
235    )? {
236        return Ok(resp);
237    }
238
239    let (table, graph, dependencies, resource_group) = {
240        gen_create_mv_graph(
241            handler_args,
242            name,
243            query,
244            dependent_relations,
245            dependent_udfs,
246            columns,
247            emit_mode,
248        )
249        .await?
250    };
251
252    // Ensure writes to `StreamJobTracker` are atomic.
253    let _job_guard =
254        session
255            .env()
256            .creating_streaming_job_tracker()
257            .guard(CreatingStreamingJobInfo::new(
258                session.session_id(),
259                table.database_id,
260                table.schema_id,
261                table.name.clone(),
262            ));
263
264    let catalog_writer = session.catalog_writer()?;
265    catalog_writer
266        .create_materialized_view(
267            table.to_prost(),
268            graph,
269            dependencies,
270            resource_group,
271            if_not_exists,
272        )
273        .await?;
274
275    Ok(PgResponse::empty_result(
276        StatementType::CREATE_MATERIALIZED_VIEW,
277    ))
278}
279
280pub(crate) async fn gen_create_mv_graph(
281    handler_args: HandlerArgs,
282    name: ObjectName,
283    query: BoundQuery,
284    dependent_relations: HashSet<ObjectId>,
285    dependent_udfs: HashSet<FunctionId>,
286    columns: Vec<Ident>,
287    emit_mode: Option<EmitMode>,
288) -> Result<(
289    TableCatalog,
290    PbStreamFragmentGraph,
291    HashSet<ObjectId>,
292    Option<String>,
293)> {
294    let mut with_options = get_with_options(handler_args.clone());
295    let mut resource_group = with_options.remove(&RESOURCE_GROUP_KEY.to_owned());
296
297    if resource_group.is_some() {
298        risingwave_common::license::Feature::ResourceGroup.check_available()?;
299    }
300
301    let is_serverless_backfill = with_options
302        .remove(&CLOUD_SERVERLESS_BACKFILL_ENABLED.to_owned())
303        .unwrap_or_default()
304        .parse::<bool>()
305        .unwrap_or(false);
306
307    if resource_group.is_some() && is_serverless_backfill {
308        return Err(RwError::from(InvalidInputSyntax(
309            "Please do not specify serverless backfilling and resource group together".to_owned(),
310        )));
311    }
312
313    if !with_options.is_empty() {
314        // get other useful fields by `remove`, the logic here is to reject unknown options.
315        return Err(RwError::from(ProtocolError(format!(
316            "unexpected options in WITH clause: {:?}",
317            with_options.keys()
318        ))));
319    }
320
321    let sbc_addr = match SESSION_MANAGER.get() {
322        Some(manager) => manager.env().sbc_address(),
323        None => "",
324    }
325    .to_owned();
326
327    if is_serverless_backfill && sbc_addr.is_empty() {
328        return Err(RwError::from(InvalidInputSyntax(
329            "Serverless Backfill is disabled on-premise. Use RisingWave cloud at https://cloud.risingwave.com/auth/signup to try this feature".to_owned(),
330        )));
331    }
332
333    if is_serverless_backfill {
334        match provision_resource_group(sbc_addr).await {
335            Err(e) => {
336                return Err(RwError::from(ProtocolError(format!(
337                    "failed to provision serverless backfill nodes: {}",
338                    e.as_report()
339                ))));
340            }
341            Ok(val) => resource_group = Some(val),
342        }
343    }
344    tracing::debug!(
345        resource_group = resource_group,
346        "provisioning on resource group"
347    );
348
349    let context = OptimizerContext::from_handler_args(handler_args);
350    let has_order_by = !query.order.is_empty();
351    if has_order_by {
352        context.warn_to_user(r#"The ORDER BY clause in the CREATE MATERIALIZED VIEW statement does not guarantee that the rows selected out of this materialized view is returned in this order.
353It only indicates the physical clustering of the data, which may improve the performance of queries issued against this materialized view.
354"#.to_owned());
355    }
356
357    if resource_group.is_some()
358        && !context
359            .session_ctx()
360            .config()
361            .streaming_use_arrangement_backfill()
362    {
363        return Err(RwError::from(ProtocolError("The session config arrangement backfill must be enabled to use the resource_group option".to_owned())));
364    }
365
366    let context: OptimizerContextRef = context.into();
367    let session = context.session_ctx().as_ref();
368
369    let (plan, table) =
370        gen_create_mv_plan_bound(session, context.clone(), query, name, columns, emit_mode)?;
371
372    let backfill_order = plan_backfill_order(
373        session,
374        context.with_options().backfill_order_strategy(),
375        plan.clone(),
376    )?;
377
378    // TODO(rc): To be consistent with UDF dependency check, we should collect relation dependencies
379    // during binding instead of visiting the optimized plan.
380    let dependencies = RelationCollectorVisitor::collect_with(dependent_relations, plan.clone())
381        .into_iter()
382        .chain(dependent_udfs.iter().copied().map_into())
383        .collect();
384
385    let graph = build_graph_with_strategy(
386        plan,
387        Some(GraphJobType::MaterializedView),
388        Some(backfill_order),
389    )?;
390
391    Ok((table, graph, dependencies, resource_group))
392}
393
394#[cfg(test)]
395pub mod tests {
396    use std::collections::HashMap;
397
398    use pgwire::pg_response::StatementType::CREATE_MATERIALIZED_VIEW;
399    use risingwave_common::catalog::{
400        DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, ROW_ID_COLUMN_NAME, RW_TIMESTAMP_COLUMN_NAME,
401    };
402    use risingwave_common::types::{DataType, StructType};
403
404    use crate::catalog::root_catalog::SchemaPath;
405    use crate::test_utils::{LocalFrontend, PROTO_FILE_DATA, create_proto_file};
406
407    #[tokio::test]
408    async fn test_create_mv_handler() {
409        let proto_file = create_proto_file(PROTO_FILE_DATA);
410        let sql = format!(
411            r#"CREATE SOURCE t1
412    WITH (connector = 'kinesis')
413    FORMAT PLAIN ENCODE PROTOBUF (message = '.test.TestRecord', schema.location = 'file://{}')"#,
414            proto_file.path().to_str().unwrap()
415        );
416        let frontend = LocalFrontend::new(Default::default()).await;
417        frontend.run_sql(sql).await.unwrap();
418
419        let sql = "create materialized view mv1 as select t1.country from t1";
420        frontend.run_sql(sql).await.unwrap();
421
422        let session = frontend.session_ref();
423        let catalog_reader = session.env().catalog_reader().read_guard();
424        let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
425
426        // Check source exists.
427        let (source, _) = catalog_reader
428            .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t1")
429            .unwrap();
430        assert_eq!(source.name, "t1");
431
432        // Check table exists.
433        let (table, _) = catalog_reader
434            .get_created_table_by_name(DEFAULT_DATABASE_NAME, schema_path, "mv1")
435            .unwrap();
436        assert_eq!(table.name(), "mv1");
437
438        let columns = table
439            .columns
440            .iter()
441            .map(|col| (col.name(), col.data_type().clone()))
442            .collect::<HashMap<&str, DataType>>();
443
444        let city_type = StructType::new(vec![
445            ("address", DataType::Varchar),
446            ("zipcode", DataType::Varchar),
447        ])
448        // .with_ids([5, 6].map(ColumnId::new))
449        .into();
450        let expected_columns = maplit::hashmap! {
451            ROW_ID_COLUMN_NAME => DataType::Serial,
452            "country" => StructType::new(
453                 vec![("address", DataType::Varchar),("city", city_type),("zipcode", DataType::Varchar)],
454            )
455            // .with_ids([3, 4, 7].map(ColumnId::new))
456            .into(),
457            RW_TIMESTAMP_COLUMN_NAME => DataType::Timestamptz,
458        };
459        assert_eq!(columns, expected_columns, "{columns:#?}");
460    }
461
462    /// When creating MV, a unique column name must be specified for each column
463    #[tokio::test]
464    async fn test_no_alias() {
465        let frontend = LocalFrontend::new(Default::default()).await;
466
467        let sql = "create table t(x varchar)";
468        frontend.run_sql(sql).await.unwrap();
469
470        // Aggregation without alias is ok.
471        let sql = "create materialized view mv0 as select count(x) from t";
472        frontend.run_sql(sql).await.unwrap();
473
474        // Same aggregations without alias is forbidden, because it make the same column name.
475        let sql = "create materialized view mv1 as select count(x), count(*) from t";
476        let err = frontend.run_sql(sql).await.unwrap_err();
477        assert_eq!(
478            err.to_string(),
479            "Invalid input syntax: column \"count\" specified more than once"
480        );
481
482        // Literal without alias is forbidden.
483        let sql = "create materialized view mv1 as select 1";
484        let err = frontend.run_sql(sql).await.unwrap_err();
485        assert_eq!(
486            err.to_string(),
487            "Bind error: An alias must be specified for the 1st expression (counting from 1) in result relation"
488        );
489
490        // some expression without alias is forbidden.
491        let sql = "create materialized view mv1 as select x is null from t";
492        let err = frontend.run_sql(sql).await.unwrap_err();
493        assert_eq!(
494            err.to_string(),
495            "Bind error: An alias must be specified for the 1st expression (counting from 1) in result relation"
496        );
497    }
498
499    /// Creating MV with order by returns a special notice
500    #[tokio::test]
501    async fn test_create_mv_with_order_by() {
502        let frontend = LocalFrontend::new(Default::default()).await;
503
504        let sql = "create table t(x varchar)";
505        frontend.run_sql(sql).await.unwrap();
506
507        // Without order by
508        let sql = "create materialized view mv1 as select * from t";
509        let response = frontend.run_sql(sql).await.unwrap();
510        assert_eq!(response.stmt_type(), CREATE_MATERIALIZED_VIEW);
511        assert!(response.notices().is_empty());
512
513        // With order by
514        let sql = "create materialized view mv2 as select * from t order by x";
515        let response = frontend.run_sql(sql).await.unwrap();
516        assert_eq!(response.stmt_type(), CREATE_MATERIALIZED_VIEW);
517    }
518}