risingwave_frontend/handler/
create_mv.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16
17use either::Either;
18use itertools::Itertools;
19use pgwire::pg_response::{PgResponse, StatementType};
20use risingwave_common::catalog::{FunctionId, ObjectId, SecretId};
21use risingwave_pb::ddl_service::streaming_job_resource_type;
22use risingwave_pb::serverless_backfill_controller::{
23    ProvisionRequest, node_group_controller_service_client,
24};
25use risingwave_pb::stream_plan::PbStreamFragmentGraph;
26use risingwave_sqlparser::ast::{EmitMode, Ident, ObjectName, Query};
27use thiserror_ext::AsReport;
28
29use super::RwPgResponse;
30use crate::binder::{Binder, BoundQuery, BoundSetExpr};
31use crate::catalog::check_column_name_not_reserved;
32use crate::error::ErrorCode::{InvalidInputSyntax, ProtocolError};
33use crate::error::{ErrorCode, Result, RwError};
34use crate::handler::HandlerArgs;
35use crate::handler::util::{
36    LongRunningNotificationAction, execute_with_long_running_notification,
37    reject_internal_table_dependencies,
38};
39use crate::optimizer::backfill_order_strategy::plan_backfill_order;
40use crate::optimizer::plan_node::generic::GenericPlanRef;
41use crate::optimizer::plan_node::{
42    BackfillType, Explain, StreamPlanRef as PlanRef, ensure_sync_log_store_fragment_root,
43};
44use crate::optimizer::{OptimizerContext, OptimizerContextRef, RelationCollectorVisitor};
45use crate::planner::Planner;
46use crate::scheduler::streaming_manager::CreatingStreamingJobInfo;
47use crate::session::{SESSION_MANAGER, SessionImpl};
48use crate::stream_fragmenter::{GraphJobType, build_graph_with_strategy};
49use crate::utils::{MV_REFRESH_INTERVAL_SEC_KEY, ordinal};
50use crate::{TableCatalog, WithOptions};
51
52pub const RESOURCE_GROUP_KEY: &str = "resource_group";
53pub const CLOUD_SERVERLESS_BACKFILL_ENABLED: &str = "cloud.serverless_backfill_enabled";
54
55pub(super) fn parse_column_names(columns: &[Ident]) -> Option<Vec<String>> {
56    if columns.is_empty() {
57        None
58    } else {
59        Some(columns.iter().map(|v| v.real_value()).collect())
60    }
61}
62
63/// If columns is empty, it means that the user did not specify the column names.
64/// In this case, we extract the column names from the query.
65/// If columns is not empty, it means that user specify the column names and the user
66/// should guarantee that the column names number are consistent with the query.
67pub(super) fn get_column_names(
68    bound: &BoundQuery,
69    columns: Vec<Ident>,
70) -> Result<Option<Vec<String>>> {
71    let col_names = parse_column_names(&columns);
72    if let BoundSetExpr::Select(select) = &bound.body {
73        // `InputRef`'s alias will be implicitly assigned in `bind_project`.
74        // If user provides columns name (col_names.is_some()), we don't need alias.
75        // For other expressions (col_names.is_none()), we require the user to explicitly assign an
76        // alias.
77        if col_names.is_none() {
78            for (i, alias) in select.aliases.iter().enumerate() {
79                if alias.is_none() {
80                    return Err(ErrorCode::BindError(format!(
81                    "An alias must be specified for the {} expression (counting from 1) in result relation", ordinal(i+1)
82                ))
83                .into());
84                }
85            }
86        }
87    }
88
89    Ok(col_names)
90}
91
92/// Bind and generate create MV plan, return plan and mv table info.
93pub fn explain_create_mv_plan(
94    session: &SessionImpl,
95    context: OptimizerContextRef,
96    query: Query,
97    name: ObjectName,
98    columns: Vec<Ident>,
99    emit_mode: Option<EmitMode>,
100) -> Result<(PlanRef, TableCatalog)> {
101    let mut binder = Binder::new_for_stream(session);
102    let bound = binder.bind_query(&query)?;
103    gen_create_mv_plan_bound(session, context, bound, name, columns, emit_mode, None)
104}
105
106/// Generate create MV plan from a bound query
107pub fn gen_create_mv_plan_bound(
108    session: &SessionImpl,
109    context: OptimizerContextRef,
110    query: BoundQuery,
111    name: ObjectName,
112    columns: Vec<Ident>,
113    emit_mode: Option<EmitMode>,
114    refresh_interval_sec: Option<u64>,
115) -> Result<(PlanRef, TableCatalog)> {
116    if session.config().create_compaction_group_for_mv() {
117        context.warn_to_user("The session variable CREATE_COMPACTION_GROUP_FOR_MV has been deprecated. It will not take effect.");
118    }
119
120    let db_name = &session.database();
121    let (schema_name, table_name) = Binder::resolve_schema_qualified_name(db_name, &name)?;
122
123    let (database_id, schema_id) = session.get_database_and_schema_id_for_create(schema_name)?;
124
125    let definition = context.normalized_sql().to_owned();
126
127    let col_names = get_column_names(&query, columns)?;
128
129    let emit_on_window_close = emit_mode == Some(EmitMode::OnWindowClose);
130    if emit_on_window_close {
131        context.warn_to_user("EMIT ON WINDOW CLOSE is currently an experimental feature. Please use it with caution.");
132    }
133
134    let mut plan_root = Planner::new_for_stream(context).plan_query(query)?;
135    if let Some(col_names) = col_names {
136        for name in &col_names {
137            check_column_name_not_reserved(name)?;
138        }
139        plan_root.set_out_names(col_names)?;
140    }
141
142    let backfill_type = if refresh_interval_sec.is_some() {
143        plan_root.require_snapshot_backfill_for_batch_refresh()?;
144        BackfillType::SnapshotBackfill
145    } else {
146        plan_root.derive_backfill_type(true)
147    };
148
149    let materialize = plan_root.gen_materialize_plan(
150        database_id,
151        schema_id,
152        table_name,
153        definition,
154        emit_on_window_close,
155        backfill_type,
156    )?;
157
158    let mut table = materialize.table().clone();
159    table.owner = session.user_id();
160
161    let plan: PlanRef = ensure_sync_log_store_fragment_root(materialize.into());
162
163    let ctx = plan.ctx();
164    let explain_trace = ctx.is_explain_trace();
165    if explain_trace {
166        ctx.trace("Create Materialized View:");
167        ctx.trace(plan.explain_to_string());
168    }
169
170    Ok((plan, table))
171}
172
173pub async fn handle_create_mv(
174    handler_args: HandlerArgs,
175    if_not_exists: bool,
176    name: ObjectName,
177    query: Query,
178    columns: Vec<Ident>,
179    emit_mode: Option<EmitMode>,
180) -> Result<RwPgResponse> {
181    let (dependent_relations, dependent_udfs, dependent_secrets, bound_query) = {
182        let mut binder = Binder::new_for_stream(handler_args.session.as_ref());
183        let bound_query = binder.bind_query(&query)?;
184        (
185            binder.included_relations().clone(),
186            binder.included_udfs().clone(),
187            binder.included_secrets().clone(),
188            bound_query,
189        )
190    };
191    handle_create_mv_bound(
192        handler_args,
193        if_not_exists,
194        name,
195        bound_query,
196        dependent_relations,
197        dependent_udfs,
198        dependent_secrets,
199        columns,
200        emit_mode,
201    )
202    .await
203}
204
205/// Send a provision request to the serverless backfill controller
206pub async fn provision_resource_group(sbc_addr: String) -> Result<String> {
207    let request = tonic::Request::new(ProvisionRequest {});
208    let mut client =
209        node_group_controller_service_client::NodeGroupControllerServiceClient::connect(
210            sbc_addr.clone(),
211        )
212        .await
213        .map_err(|e| {
214            RwError::from(ErrorCode::InternalError(format!(
215                "unable to reach serverless backfill controller at addr {}: {}",
216                sbc_addr,
217                e.as_report()
218            )))
219        })?;
220
221    match client.provision(request).await {
222        Ok(resp) => Ok(resp.into_inner().resource_group),
223        Err(e) => Err(RwError::from(ErrorCode::InternalError(format!(
224            "serverless backfill controller returned error :{}",
225            e.as_report()
226        )))),
227    }
228}
229
230fn get_with_options(handler_args: HandlerArgs) -> WithOptions {
231    let context = OptimizerContext::from_handler_args(handler_args);
232    context.with_options().clone()
233}
234
235pub async fn handle_create_mv_bound(
236    handler_args: HandlerArgs,
237    if_not_exists: bool,
238    name: ObjectName,
239    query: BoundQuery,
240    dependent_relations: HashSet<ObjectId>,
241    dependent_udfs: HashSet<FunctionId>, // TODO(rc): merge with `dependent_relations`
242    dependent_secrets: HashSet<SecretId>,
243    columns: Vec<Ident>,
244    emit_mode: Option<EmitMode>,
245) -> Result<RwPgResponse> {
246    let session = handler_args.session.clone();
247
248    // Check cluster limits
249    session.check_cluster_limits().await?;
250
251    if let Either::Right(resp) = session.check_relation_name_duplicated(
252        name.clone(),
253        StatementType::CREATE_MATERIALIZED_VIEW,
254        if_not_exists,
255    )? {
256        return Ok(resp);
257    }
258
259    reject_internal_table_dependencies(
260        session.as_ref(),
261        &dependent_relations,
262        "CREATE MATERIALIZED VIEW",
263    )?;
264
265    let (table, graph, dependencies, resource_type, refresh_interval_sec) = {
266        gen_create_mv_graph(
267            handler_args,
268            name,
269            query,
270            dependent_relations,
271            dependent_udfs,
272            dependent_secrets,
273            columns,
274            emit_mode,
275        )
276        .await?
277    };
278
279    // Ensure writes to `StreamJobTracker` are atomic.
280    let _job_guard =
281        session
282            .env()
283            .creating_streaming_job_tracker()
284            .guard(CreatingStreamingJobInfo::new(
285                session.session_id(),
286                table.database_id,
287                table.schema_id,
288                table.name.clone(),
289            ));
290
291    let catalog_writer = session.catalog_writer()?;
292    execute_with_long_running_notification(
293        catalog_writer.create_materialized_view(
294            table.to_prost(),
295            graph,
296            dependencies,
297            resource_type,
298            if_not_exists,
299            refresh_interval_sec,
300        ),
301        &session,
302        "CREATE MATERIALIZED VIEW",
303        LongRunningNotificationAction::MonitorBackfillJob,
304    )
305    .await?;
306
307    Ok(PgResponse::empty_result(
308        StatementType::CREATE_MATERIALIZED_VIEW,
309    ))
310}
311
312pub(crate) async fn gen_create_mv_graph(
313    handler_args: HandlerArgs,
314    name: ObjectName,
315    query: BoundQuery,
316    dependent_relations: HashSet<ObjectId>,
317    dependent_udfs: HashSet<FunctionId>,
318    dependent_secrets: HashSet<SecretId>,
319    columns: Vec<Ident>,
320    emit_mode: Option<EmitMode>,
321) -> Result<(
322    TableCatalog,
323    PbStreamFragmentGraph,
324    HashSet<ObjectId>,
325    streaming_job_resource_type::ResourceType,
326    Option<u64>,
327)> {
328    let mut with_options = get_with_options(handler_args.clone());
329    let refresh_interval_sec = with_options.refresh_interval_sec()?;
330    with_options.remove(MV_REFRESH_INTERVAL_SEC_KEY);
331    let resource_group = with_options.remove(&RESOURCE_GROUP_KEY.to_owned());
332
333    if resource_group.is_some() {
334        risingwave_common::license::Feature::ResourceGroup.check_available()?;
335    }
336
337    let serverless_backfill_from_with = with_options
338        .remove(&CLOUD_SERVERLESS_BACKFILL_ENABLED.to_owned())
339        .map(|value| value.parse::<bool>().unwrap_or(false));
340    let is_serverless_backfill = match serverless_backfill_from_with {
341        Some(value) => value,
342        None => {
343            if resource_group.is_some() {
344                false
345            } else {
346                handler_args.session.config().enable_serverless_backfill()
347            }
348        }
349    };
350
351    if resource_group.is_some() && is_serverless_backfill {
352        return Err(RwError::from(InvalidInputSyntax(
353            "Please do not specify serverless backfilling and resource group together".to_owned(),
354        )));
355    }
356
357    if !with_options.is_empty() {
358        // get other useful fields by `remove`, the logic here is to reject unknown options.
359        return Err(RwError::from(ProtocolError(format!(
360            "unexpected options in WITH clause: {:?}",
361            with_options.keys()
362        ))));
363    }
364
365    let sbc_addr = match SESSION_MANAGER.get() {
366        Some(manager) => manager.env().sbc_address(),
367        None => "",
368    }
369    .to_owned();
370
371    if is_serverless_backfill && sbc_addr.is_empty() {
372        return Err(RwError::from(InvalidInputSyntax(
373            "Serverless Backfill is disabled. Use RisingWave cloud at https://cloud.risingwave.com/auth/signup to try this feature".to_owned(),
374        )));
375    }
376
377    let resource_type = if is_serverless_backfill {
378        assert_eq!(resource_group, None);
379        match provision_resource_group(sbc_addr).await {
380            Err(e) => {
381                return Err(RwError::from(ProtocolError(format!(
382                    "failed to provision serverless backfill nodes: {}",
383                    e.as_report()
384                ))));
385            }
386            Ok(group) => {
387                tracing::info!(
388                    resource_group = group,
389                    "provisioning serverless backfill resource group"
390                );
391                streaming_job_resource_type::ResourceType::ServerlessBackfillResourceGroup(group)
392            }
393        }
394    } else if let Some(group) = resource_group {
395        streaming_job_resource_type::ResourceType::SpecificResourceGroup(group)
396    } else {
397        streaming_job_resource_type::ResourceType::Regular(true)
398    };
399    let context = OptimizerContext::from_handler_args(handler_args);
400    let has_order_by = !query.order.is_empty();
401    if has_order_by {
402        context.warn_to_user(r#"The ORDER BY clause in the CREATE MATERIALIZED VIEW statement does not guarantee that the rows selected out of this materialized view is returned in this order.
403It only indicates the physical clustering of the data, which may improve the performance of queries issued against this materialized view.
404"#.to_owned());
405    }
406
407    if resource_type.resource_group().is_some()
408        && !context
409            .session_ctx()
410            .config()
411            .streaming_use_arrangement_backfill()
412    {
413        return Err(RwError::from(ProtocolError("The session config arrangement backfill must be enabled to use the resource_group option".to_owned())));
414    }
415
416    let context: OptimizerContextRef = context.into();
417    let session = context.session_ctx().as_ref();
418
419    let (plan, table) = gen_create_mv_plan_bound(
420        session,
421        context.clone(),
422        query,
423        name,
424        columns,
425        emit_mode,
426        refresh_interval_sec,
427    )?;
428
429    let backfill_order = plan_backfill_order(
430        session,
431        context.with_options().backfill_order_strategy(),
432        plan.clone(),
433    )?;
434
435    // TODO(rc): To be consistent with UDF dependency check, we should collect relation dependencies
436    // during binding instead of visiting the optimized plan.
437    let dependencies = RelationCollectorVisitor::collect_with(dependent_relations, plan.clone())
438        .into_iter()
439        .chain(dependent_udfs.iter().copied().map_into())
440        .chain(
441            dependent_secrets
442                .iter()
443                .copied()
444                .map(|id| id.as_object_id()),
445        )
446        .collect();
447
448    let graph = build_graph_with_strategy(
449        plan,
450        Some(GraphJobType::MaterializedView),
451        Some(backfill_order),
452    )?;
453
454    Ok((
455        table,
456        graph,
457        dependencies,
458        resource_type,
459        refresh_interval_sec,
460    ))
461}
462
463#[cfg(test)]
464pub mod tests {
465    use std::collections::HashMap;
466
467    use pgwire::pg_response::StatementType::CREATE_MATERIALIZED_VIEW;
468    use risingwave_common::catalog::{
469        DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, ROW_ID_COLUMN_NAME, RW_TIMESTAMP_COLUMN_NAME,
470    };
471    use risingwave_common::types::{DataType, StructType};
472
473    use crate::catalog::root_catalog::SchemaPath;
474    use crate::test_utils::{LocalFrontend, PROTO_FILE_DATA, create_proto_file};
475
476    #[tokio::test]
477    async fn test_create_mv_handler() {
478        let proto_file = create_proto_file(PROTO_FILE_DATA);
479        let sql = format!(
480            r#"CREATE SOURCE t1
481    WITH (connector = 'kinesis')
482    FORMAT PLAIN ENCODE PROTOBUF (message = '.test.TestRecord', schema.location = 'file://{}')"#,
483            proto_file.path().to_str().unwrap()
484        );
485        let frontend = LocalFrontend::new(Default::default()).await;
486        frontend.run_sql(sql).await.unwrap();
487
488        let sql = "create materialized view mv1 as select t1.country from t1";
489        frontend.run_sql(sql).await.unwrap();
490
491        let session = frontend.session_ref();
492        let catalog_reader = session.env().catalog_reader().read_guard();
493        let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
494
495        // Check source exists.
496        let (source, _) = catalog_reader
497            .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t1")
498            .unwrap();
499        assert_eq!(source.name, "t1");
500
501        // Check table exists.
502        let (table, _) = catalog_reader
503            .get_created_table_by_name(DEFAULT_DATABASE_NAME, schema_path, "mv1")
504            .unwrap();
505        assert_eq!(table.name(), "mv1");
506
507        let columns = table
508            .columns
509            .iter()
510            .map(|col| (col.name(), col.data_type().clone()))
511            .collect::<HashMap<&str, DataType>>();
512
513        let city_type = StructType::new(vec![
514            ("address", DataType::Varchar),
515            ("zipcode", DataType::Varchar),
516        ])
517        // .with_ids([5, 6].map(ColumnId::new))
518        .into();
519        let expected_columns = maplit::hashmap! {
520            ROW_ID_COLUMN_NAME => DataType::Serial,
521            "country" => StructType::new(
522                 vec![("address", DataType::Varchar),("city", city_type),("zipcode", DataType::Varchar)],
523            )
524            // .with_ids([3, 4, 7].map(ColumnId::new))
525            .into(),
526            RW_TIMESTAMP_COLUMN_NAME => DataType::Timestamptz,
527        };
528        assert_eq!(columns, expected_columns, "{columns:#?}");
529    }
530
531    /// When creating MV, a unique column name must be specified for each column
532    #[tokio::test]
533    async fn test_no_alias() {
534        let frontend = LocalFrontend::new(Default::default()).await;
535
536        let sql = "create table t(x varchar)";
537        frontend.run_sql(sql).await.unwrap();
538
539        // Aggregation without alias is ok.
540        let sql = "create materialized view mv0 as select count(x) from t";
541        frontend.run_sql(sql).await.unwrap();
542
543        // Same aggregations without alias is forbidden, because it make the same column name.
544        let sql = "create materialized view mv1 as select count(x), count(*) from t";
545        let err = frontend.run_sql(sql).await.unwrap_err();
546        assert_eq!(
547            err.to_string(),
548            "Invalid input syntax: column \"count\" specified more than once"
549        );
550
551        // Literal without alias is forbidden.
552        let sql = "create materialized view mv1 as select 1";
553        let err = frontend.run_sql(sql).await.unwrap_err();
554        assert_eq!(
555            err.to_string(),
556            "Bind error: An alias must be specified for the 1st expression (counting from 1) in result relation"
557        );
558
559        // some expression without alias is forbidden.
560        let sql = "create materialized view mv1 as select x is null from t";
561        let err = frontend.run_sql(sql).await.unwrap_err();
562        assert_eq!(
563            err.to_string(),
564            "Bind error: An alias must be specified for the 1st expression (counting from 1) in result relation"
565        );
566    }
567
568    /// Creating MV with order by returns a special notice
569    #[tokio::test]
570    async fn test_create_mv_with_order_by() {
571        let frontend = LocalFrontend::new(Default::default()).await;
572
573        let sql = "create table t(x varchar)";
574        frontend.run_sql(sql).await.unwrap();
575
576        // Without order by
577        let sql = "create materialized view mv1 as select * from t";
578        let response = frontend.run_sql(sql).await.unwrap();
579        assert_eq!(response.stmt_type(), CREATE_MATERIALIZED_VIEW);
580        assert!(response.notices().is_empty());
581
582        // With order by
583        let sql = "create materialized view mv2 as select * from t order by x";
584        let response = frontend.run_sql(sql).await.unwrap();
585        assert_eq!(response.stmt_type(), CREATE_MATERIALIZED_VIEW);
586    }
587}