risingwave_frontend/handler/
create_mv.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16
17use either::Either;
18use pgwire::pg_response::{PgResponse, StatementType};
19use risingwave_common::catalog::{FunctionId, ObjectId, TableId};
20use risingwave_pb::catalog::PbTable;
21use risingwave_pb::serverless_backfill_controller::{
22    ProvisionRequest, node_group_controller_service_client,
23};
24use risingwave_sqlparser::ast::{EmitMode, Ident, ObjectName, Query};
25use thiserror_ext::AsReport;
26
27use super::RwPgResponse;
28use crate::WithOptions;
29use crate::binder::{Binder, BoundQuery, BoundSetExpr};
30use crate::catalog::check_column_name_not_reserved;
31use crate::error::ErrorCode::{InvalidInputSyntax, ProtocolError};
32use crate::error::{ErrorCode, Result, RwError};
33use crate::handler::HandlerArgs;
34use crate::optimizer::plan_node::Explain;
35use crate::optimizer::plan_node::generic::GenericPlanRef;
36use crate::optimizer::{OptimizerContext, OptimizerContextRef, PlanRef, RelationCollectorVisitor};
37use crate::planner::Planner;
38use crate::scheduler::streaming_manager::CreatingStreamingJobInfo;
39use crate::session::{SESSION_MANAGER, SessionImpl};
40use crate::stream_fragmenter::{GraphJobType, build_graph};
41use crate::utils::ordinal;
42
43pub const RESOURCE_GROUP_KEY: &str = "resource_group";
44pub const CLOUD_SERVERLESS_BACKFILL_ENABLED: &str = "cloud.serverless_backfill_enabled";
45
46pub(super) fn parse_column_names(columns: &[Ident]) -> Option<Vec<String>> {
47    if columns.is_empty() {
48        None
49    } else {
50        Some(columns.iter().map(|v| v.real_value()).collect())
51    }
52}
53
54/// If columns is empty, it means that the user did not specify the column names.
55/// In this case, we extract the column names from the query.
56/// If columns is not empty, it means that user specify the column names and the user
57/// should guarantee that the column names number are consistent with the query.
58pub(super) fn get_column_names(
59    bound: &BoundQuery,
60    columns: Vec<Ident>,
61) -> Result<Option<Vec<String>>> {
62    let col_names = parse_column_names(&columns);
63    if let BoundSetExpr::Select(select) = &bound.body {
64        // `InputRef`'s alias will be implicitly assigned in `bind_project`.
65        // If user provides columns name (col_names.is_some()), we don't need alias.
66        // For other expressions (col_names.is_none()), we require the user to explicitly assign an
67        // alias.
68        if col_names.is_none() {
69            for (i, alias) in select.aliases.iter().enumerate() {
70                if alias.is_none() {
71                    return Err(ErrorCode::BindError(format!(
72                    "An alias must be specified for the {} expression (counting from 1) in result relation", ordinal(i+1)
73                ))
74                .into());
75                }
76            }
77        }
78    }
79
80    Ok(col_names)
81}
82
83/// Bind and generate create MV plan, return plan and mv table info.
84pub fn gen_create_mv_plan(
85    session: &SessionImpl,
86    context: OptimizerContextRef,
87    query: Query,
88    name: ObjectName,
89    columns: Vec<Ident>,
90    emit_mode: Option<EmitMode>,
91) -> Result<(PlanRef, PbTable)> {
92    let mut binder = Binder::new_for_stream(session);
93    let bound = binder.bind_query(query)?;
94    gen_create_mv_plan_bound(session, context, bound, name, columns, emit_mode)
95}
96
97/// Generate create MV plan from a bound query
98pub fn gen_create_mv_plan_bound(
99    session: &SessionImpl,
100    context: OptimizerContextRef,
101    query: BoundQuery,
102    name: ObjectName,
103    columns: Vec<Ident>,
104    emit_mode: Option<EmitMode>,
105) -> Result<(PlanRef, PbTable)> {
106    if session.config().create_compaction_group_for_mv() {
107        context.warn_to_user("The session variable CREATE_COMPACTION_GROUP_FOR_MV has been deprecated. It will not take effect.");
108    }
109
110    let db_name = &session.database();
111    let (schema_name, table_name) = Binder::resolve_schema_qualified_name(db_name, name)?;
112
113    let (database_id, schema_id) = session.get_database_and_schema_id_for_create(schema_name)?;
114
115    let definition = context.normalized_sql().to_owned();
116
117    let col_names = get_column_names(&query, columns)?;
118
119    let emit_on_window_close = emit_mode == Some(EmitMode::OnWindowClose);
120    if emit_on_window_close {
121        context.warn_to_user("EMIT ON WINDOW CLOSE is currently an experimental feature. Please use it with caution.");
122    }
123
124    let mut plan_root = Planner::new_for_stream(context).plan_query(query)?;
125    plan_root.set_req_dist_as_same_as_req_order();
126    if let Some(col_names) = col_names {
127        for name in &col_names {
128            check_column_name_not_reserved(name)?;
129        }
130        plan_root.set_out_names(col_names)?;
131    }
132    let materialize = plan_root.gen_materialize_plan(
133        database_id,
134        schema_id,
135        table_name,
136        definition,
137        emit_on_window_close,
138    )?;
139    let mut table = materialize.table().to_prost();
140
141    let plan: PlanRef = materialize.into();
142
143    table.owner = session.user_id();
144
145    let ctx = plan.ctx();
146    let explain_trace = ctx.is_explain_trace();
147    if explain_trace {
148        ctx.trace("Create Materialized View:");
149        ctx.trace(plan.explain_to_string());
150    }
151
152    Ok((plan, table))
153}
154
155pub async fn handle_create_mv(
156    handler_args: HandlerArgs,
157    if_not_exists: bool,
158    name: ObjectName,
159    query: Query,
160    columns: Vec<Ident>,
161    emit_mode: Option<EmitMode>,
162) -> Result<RwPgResponse> {
163    let (dependent_relations, dependent_udfs, bound) = {
164        let mut binder = Binder::new_for_stream(handler_args.session.as_ref());
165        let bound = binder.bind_query(query)?;
166        (
167            binder.included_relations().clone(),
168            binder.included_udfs().clone(),
169            bound,
170        )
171    };
172    handle_create_mv_bound(
173        handler_args,
174        if_not_exists,
175        name,
176        bound,
177        dependent_relations,
178        dependent_udfs,
179        columns,
180        emit_mode,
181    )
182    .await
183}
184
185/// Send a provision request to the serverless backfill controller
186pub async fn provision_resource_group(sbc_addr: String) -> Result<String> {
187    let request = tonic::Request::new(ProvisionRequest {});
188    let mut client =
189        node_group_controller_service_client::NodeGroupControllerServiceClient::connect(
190            sbc_addr.clone(),
191        )
192        .await
193        .map_err(|e| {
194            RwError::from(ErrorCode::InternalError(format!(
195                "unable to reach serverless backfill controller at addr {}: {}",
196                sbc_addr,
197                e.as_report()
198            )))
199        })?;
200
201    match client.provision(request).await {
202        Ok(resp) => Ok(resp.into_inner().resource_group),
203        Err(e) => Err(RwError::from(ErrorCode::InternalError(format!(
204            "serverless backfill controller returned error :{}",
205            e.as_report()
206        )))),
207    }
208}
209
210fn get_with_options(handler_args: HandlerArgs) -> WithOptions {
211    let context = OptimizerContext::from_handler_args(handler_args);
212    context.with_options().clone()
213}
214
215pub async fn handle_create_mv_bound(
216    handler_args: HandlerArgs,
217    if_not_exists: bool,
218    name: ObjectName,
219    query: BoundQuery,
220    dependent_relations: HashSet<TableId>,
221    dependent_udfs: HashSet<FunctionId>, // TODO(rc): merge with `dependent_relations`
222    columns: Vec<Ident>,
223    emit_mode: Option<EmitMode>,
224) -> Result<RwPgResponse> {
225    let session = handler_args.session.clone();
226
227    // Check cluster limits
228    session.check_cluster_limits().await?;
229
230    if let Either::Right(resp) = session.check_relation_name_duplicated(
231        name.clone(),
232        StatementType::CREATE_MATERIALIZED_VIEW,
233        if_not_exists,
234    )? {
235        return Ok(resp);
236    }
237
238    let (table, graph, dependencies, resource_group) = {
239        let mut with_options = get_with_options(handler_args.clone());
240        let mut resource_group = with_options.remove(&RESOURCE_GROUP_KEY.to_owned());
241
242        if resource_group.is_some() {
243            risingwave_common::license::Feature::ResourceGroup
244                .check_available()
245                .map_err(|e| anyhow::anyhow!(e))?;
246        }
247
248        let is_serverless_backfill = with_options
249            .remove(&CLOUD_SERVERLESS_BACKFILL_ENABLED.to_owned())
250            .unwrap_or_default()
251            .parse::<bool>()
252            .unwrap_or(false);
253
254        if resource_group.is_some() && is_serverless_backfill {
255            return Err(RwError::from(InvalidInputSyntax(
256                "Please do not specify serverless backfilling and resource group together"
257                    .to_owned(),
258            )));
259        }
260
261        if !with_options.is_empty() {
262            // get other useful fields by `remove`, the logic here is to reject unknown options.
263            return Err(RwError::from(ProtocolError(format!(
264                "unexpected options in WITH clause: {:?}",
265                with_options.keys()
266            ))));
267        }
268
269        let sbc_addr = match SESSION_MANAGER.get() {
270            Some(manager) => manager.env().sbc_address(),
271            None => "",
272        }
273        .to_owned();
274
275        if is_serverless_backfill && sbc_addr.is_empty() {
276            return Err(RwError::from(InvalidInputSyntax(
277                "Serverless Backfill is disabled on-premise. Use RisingWave cloud at https://cloud.risingwave.com/auth/signup to try this feature".to_owned(),
278            )));
279        }
280
281        if is_serverless_backfill {
282            match provision_resource_group(sbc_addr).await {
283                Err(e) => {
284                    return Err(RwError::from(ProtocolError(format!(
285                        "failed to provision serverless backfill nodes: {}",
286                        e.as_report()
287                    ))));
288                }
289                Ok(val) => resource_group = Some(val),
290            }
291        }
292        tracing::debug!(
293            resource_group = resource_group,
294            "provisioning on resource group"
295        );
296
297        let context = OptimizerContext::from_handler_args(handler_args);
298        let has_order_by = !query.order.is_empty();
299        if has_order_by {
300            context.warn_to_user(r#"The ORDER BY clause in the CREATE MATERIALIZED VIEW statement does not guarantee that the rows selected out of this materialized view is returned in this order.
301It only indicates the physical clustering of the data, which may improve the performance of queries issued against this materialized view.
302"#.to_owned());
303        }
304
305        if resource_group.is_some()
306            && !context
307                .session_ctx()
308                .config()
309                .streaming_use_arrangement_backfill()
310        {
311            return Err(RwError::from(ProtocolError("The session config arrangement backfill must be enabled to use the resource_group option".to_owned())));
312        }
313
314        let (plan, table) =
315            gen_create_mv_plan_bound(&session, context.into(), query, name, columns, emit_mode)?;
316
317        // TODO(rc): To be consistent with UDF dependency check, we should collect relation dependencies
318        // during binding instead of visiting the optimized plan.
319        let dependencies =
320            RelationCollectorVisitor::collect_with(dependent_relations, plan.clone())
321                .into_iter()
322                .map(|id| id.table_id() as ObjectId)
323                .chain(
324                    dependent_udfs
325                        .into_iter()
326                        .map(|id| id.function_id() as ObjectId),
327                )
328                .collect();
329
330        let graph = build_graph(plan, Some(GraphJobType::MaterializedView))?;
331
332        (table, graph, dependencies, resource_group)
333    };
334
335    // Ensure writes to `StreamJobTracker` are atomic.
336    let _job_guard =
337        session
338            .env()
339            .creating_streaming_job_tracker()
340            .guard(CreatingStreamingJobInfo::new(
341                session.session_id(),
342                table.database_id,
343                table.schema_id,
344                table.name.clone(),
345            ));
346
347    let session = session.clone();
348    let catalog_writer = session.catalog_writer()?;
349    catalog_writer
350        .create_materialized_view(table, graph, dependencies, resource_group)
351        .await?;
352
353    Ok(PgResponse::empty_result(
354        StatementType::CREATE_MATERIALIZED_VIEW,
355    ))
356}
357
358#[cfg(test)]
359pub mod tests {
360    use std::collections::HashMap;
361
362    use pgwire::pg_response::StatementType::CREATE_MATERIALIZED_VIEW;
363    use risingwave_common::catalog::{
364        DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, ROW_ID_COLUMN_NAME, RW_TIMESTAMP_COLUMN_NAME,
365    };
366    use risingwave_common::types::{DataType, StructType};
367
368    use crate::catalog::root_catalog::SchemaPath;
369    use crate::test_utils::{LocalFrontend, PROTO_FILE_DATA, create_proto_file};
370
371    #[tokio::test]
372    async fn test_create_mv_handler() {
373        let proto_file = create_proto_file(PROTO_FILE_DATA);
374        let sql = format!(
375            r#"CREATE SOURCE t1
376    WITH (connector = 'kinesis')
377    FORMAT PLAIN ENCODE PROTOBUF (message = '.test.TestRecord', schema.location = 'file://{}')"#,
378            proto_file.path().to_str().unwrap()
379        );
380        let frontend = LocalFrontend::new(Default::default()).await;
381        frontend.run_sql(sql).await.unwrap();
382
383        let sql = "create materialized view mv1 as select t1.country from t1";
384        frontend.run_sql(sql).await.unwrap();
385
386        let session = frontend.session_ref();
387        let catalog_reader = session.env().catalog_reader().read_guard();
388        let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
389
390        // Check source exists.
391        let (source, _) = catalog_reader
392            .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t1")
393            .unwrap();
394        assert_eq!(source.name, "t1");
395
396        // Check table exists.
397        let (table, _) = catalog_reader
398            .get_created_table_by_name(DEFAULT_DATABASE_NAME, schema_path, "mv1")
399            .unwrap();
400        assert_eq!(table.name(), "mv1");
401
402        let columns = table
403            .columns
404            .iter()
405            .map(|col| (col.name(), col.data_type().clone()))
406            .collect::<HashMap<&str, DataType>>();
407
408        let city_type = StructType::new(vec![
409            ("address", DataType::Varchar),
410            ("zipcode", DataType::Varchar),
411        ])
412        // .with_ids([5, 6].map(ColumnId::new))
413        .into();
414        let expected_columns = maplit::hashmap! {
415            ROW_ID_COLUMN_NAME => DataType::Serial,
416            "country" => StructType::new(
417                 vec![("address", DataType::Varchar),("city", city_type),("zipcode", DataType::Varchar)],
418            )
419            // .with_ids([3, 4, 7].map(ColumnId::new))
420            .into(),
421            RW_TIMESTAMP_COLUMN_NAME => DataType::Timestamptz,
422        };
423        assert_eq!(columns, expected_columns, "{columns:#?}");
424    }
425
426    /// When creating MV, a unique column name must be specified for each column
427    #[tokio::test]
428    async fn test_no_alias() {
429        let frontend = LocalFrontend::new(Default::default()).await;
430
431        let sql = "create table t(x varchar)";
432        frontend.run_sql(sql).await.unwrap();
433
434        // Aggregation without alias is ok.
435        let sql = "create materialized view mv0 as select count(x) from t";
436        frontend.run_sql(sql).await.unwrap();
437
438        // Same aggregations without alias is forbidden, because it make the same column name.
439        let sql = "create materialized view mv1 as select count(x), count(*) from t";
440        let err = frontend.run_sql(sql).await.unwrap_err();
441        assert_eq!(
442            err.to_string(),
443            "Invalid input syntax: column \"count\" specified more than once"
444        );
445
446        // Literal without alias is forbidden.
447        let sql = "create materialized view mv1 as select 1";
448        let err = frontend.run_sql(sql).await.unwrap_err();
449        assert_eq!(
450            err.to_string(),
451            "Bind error: An alias must be specified for the 1st expression (counting from 1) in result relation"
452        );
453
454        // some expression without alias is forbidden.
455        let sql = "create materialized view mv1 as select x is null from t";
456        let err = frontend.run_sql(sql).await.unwrap_err();
457        assert_eq!(
458            err.to_string(),
459            "Bind error: An alias must be specified for the 1st expression (counting from 1) in result relation"
460        );
461    }
462
463    /// Creating MV with order by returns a special notice
464    #[tokio::test]
465    async fn test_create_mv_with_order_by() {
466        let frontend = LocalFrontend::new(Default::default()).await;
467
468        let sql = "create table t(x varchar)";
469        frontend.run_sql(sql).await.unwrap();
470
471        // Without order by
472        let sql = "create materialized view mv1 as select * from t";
473        let response = frontend.run_sql(sql).await.unwrap();
474        assert_eq!(response.stmt_type(), CREATE_MATERIALIZED_VIEW);
475        assert!(response.notices().is_empty());
476
477        // With order by
478        let sql = "create materialized view mv2 as select * from t order by x";
479        let response = frontend.run_sql(sql).await.unwrap();
480        assert_eq!(response.stmt_type(), CREATE_MATERIALIZED_VIEW);
481    }
482}