risingwave_frontend/handler/
create_mv.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16
17use either::Either;
18use itertools::Itertools;
19use pgwire::pg_response::{PgResponse, StatementType};
20use risingwave_common::catalog::{FunctionId, ObjectId, SecretId};
21use risingwave_pb::ddl_service::streaming_job_resource_type;
22use risingwave_pb::serverless_backfill_controller::{
23    ProvisionRequest, node_group_controller_service_client,
24};
25use risingwave_pb::stream_plan::PbStreamFragmentGraph;
26use risingwave_sqlparser::ast::{EmitMode, Ident, ObjectName, Query};
27use thiserror_ext::AsReport;
28
29use super::RwPgResponse;
30use crate::binder::{Binder, BoundQuery, BoundSetExpr};
31use crate::catalog::check_column_name_not_reserved;
32use crate::error::ErrorCode::{InvalidInputSyntax, ProtocolError};
33use crate::error::{ErrorCode, Result, RwError};
34use crate::handler::HandlerArgs;
35use crate::handler::util::{LongRunningNotificationAction, execute_with_long_running_notification};
36use crate::optimizer::backfill_order_strategy::plan_backfill_order;
37use crate::optimizer::plan_node::generic::GenericPlanRef;
38use crate::optimizer::plan_node::{Explain, StreamPlanRef as PlanRef};
39use crate::optimizer::{OptimizerContext, OptimizerContextRef, RelationCollectorVisitor};
40use crate::planner::Planner;
41use crate::scheduler::streaming_manager::CreatingStreamingJobInfo;
42use crate::session::{SESSION_MANAGER, SessionImpl};
43use crate::stream_fragmenter::{GraphJobType, build_graph_with_strategy};
44use crate::utils::ordinal;
45use crate::{TableCatalog, WithOptions};
46
47pub const RESOURCE_GROUP_KEY: &str = "resource_group";
48pub const CLOUD_SERVERLESS_BACKFILL_ENABLED: &str = "cloud.serverless_backfill_enabled";
49
50pub(super) fn parse_column_names(columns: &[Ident]) -> Option<Vec<String>> {
51    if columns.is_empty() {
52        None
53    } else {
54        Some(columns.iter().map(|v| v.real_value()).collect())
55    }
56}
57
58/// If columns is empty, it means that the user did not specify the column names.
59/// In this case, we extract the column names from the query.
60/// If columns is not empty, it means that user specify the column names and the user
61/// should guarantee that the column names number are consistent with the query.
62pub(super) fn get_column_names(
63    bound: &BoundQuery,
64    columns: Vec<Ident>,
65) -> Result<Option<Vec<String>>> {
66    let col_names = parse_column_names(&columns);
67    if let BoundSetExpr::Select(select) = &bound.body {
68        // `InputRef`'s alias will be implicitly assigned in `bind_project`.
69        // If user provides columns name (col_names.is_some()), we don't need alias.
70        // For other expressions (col_names.is_none()), we require the user to explicitly assign an
71        // alias.
72        if col_names.is_none() {
73            for (i, alias) in select.aliases.iter().enumerate() {
74                if alias.is_none() {
75                    return Err(ErrorCode::BindError(format!(
76                    "An alias must be specified for the {} expression (counting from 1) in result relation", ordinal(i+1)
77                ))
78                .into());
79                }
80            }
81        }
82    }
83
84    Ok(col_names)
85}
86
87/// Bind and generate create MV plan, return plan and mv table info.
88pub fn gen_create_mv_plan(
89    session: &SessionImpl,
90    context: OptimizerContextRef,
91    query: Query,
92    name: ObjectName,
93    columns: Vec<Ident>,
94    emit_mode: Option<EmitMode>,
95) -> Result<(PlanRef, TableCatalog)> {
96    let mut binder = Binder::new_for_stream(session);
97    let bound = binder.bind_query(&query)?;
98    gen_create_mv_plan_bound(session, context, bound, name, columns, emit_mode)
99}
100
101/// Generate create MV plan from a bound query
102pub fn gen_create_mv_plan_bound(
103    session: &SessionImpl,
104    context: OptimizerContextRef,
105    query: BoundQuery,
106    name: ObjectName,
107    columns: Vec<Ident>,
108    emit_mode: Option<EmitMode>,
109) -> Result<(PlanRef, TableCatalog)> {
110    if session.config().create_compaction_group_for_mv() {
111        context.warn_to_user("The session variable CREATE_COMPACTION_GROUP_FOR_MV has been deprecated. It will not take effect.");
112    }
113
114    let db_name = &session.database();
115    let (schema_name, table_name) = Binder::resolve_schema_qualified_name(db_name, &name)?;
116
117    let (database_id, schema_id) = session.get_database_and_schema_id_for_create(schema_name)?;
118
119    let definition = context.normalized_sql().to_owned();
120
121    let col_names = get_column_names(&query, columns)?;
122
123    let emit_on_window_close = emit_mode == Some(EmitMode::OnWindowClose);
124    if emit_on_window_close {
125        context.warn_to_user("EMIT ON WINDOW CLOSE is currently an experimental feature. Please use it with caution.");
126    }
127
128    let mut plan_root = Planner::new_for_stream(context).plan_query(query)?;
129    if let Some(col_names) = col_names {
130        for name in &col_names {
131            check_column_name_not_reserved(name)?;
132        }
133        plan_root.set_out_names(col_names)?;
134    }
135    let materialize = plan_root.gen_materialize_plan(
136        database_id,
137        schema_id,
138        table_name,
139        definition,
140        emit_on_window_close,
141    )?;
142
143    let mut table = materialize.table().clone();
144    table.owner = session.user_id();
145
146    let plan: PlanRef = materialize.into();
147
148    let ctx = plan.ctx();
149    let explain_trace = ctx.is_explain_trace();
150    if explain_trace {
151        ctx.trace("Create Materialized View:");
152        ctx.trace(plan.explain_to_string());
153    }
154
155    Ok((plan, table))
156}
157
158pub async fn handle_create_mv(
159    handler_args: HandlerArgs,
160    if_not_exists: bool,
161    name: ObjectName,
162    query: Query,
163    columns: Vec<Ident>,
164    emit_mode: Option<EmitMode>,
165) -> Result<RwPgResponse> {
166    let (dependent_relations, dependent_udfs, dependent_secrets, bound_query) = {
167        let mut binder = Binder::new_for_stream(handler_args.session.as_ref());
168        let bound_query = binder.bind_query(&query)?;
169        (
170            binder.included_relations().clone(),
171            binder.included_udfs().clone(),
172            binder.included_secrets().clone(),
173            bound_query,
174        )
175    };
176    handle_create_mv_bound(
177        handler_args,
178        if_not_exists,
179        name,
180        bound_query,
181        dependent_relations,
182        dependent_udfs,
183        dependent_secrets,
184        columns,
185        emit_mode,
186    )
187    .await
188}
189
190/// Send a provision request to the serverless backfill controller
191pub async fn provision_resource_group(sbc_addr: String) -> Result<String> {
192    let request = tonic::Request::new(ProvisionRequest {});
193    let mut client =
194        node_group_controller_service_client::NodeGroupControllerServiceClient::connect(
195            sbc_addr.clone(),
196        )
197        .await
198        .map_err(|e| {
199            RwError::from(ErrorCode::InternalError(format!(
200                "unable to reach serverless backfill controller at addr {}: {}",
201                sbc_addr,
202                e.as_report()
203            )))
204        })?;
205
206    match client.provision(request).await {
207        Ok(resp) => Ok(resp.into_inner().resource_group),
208        Err(e) => Err(RwError::from(ErrorCode::InternalError(format!(
209            "serverless backfill controller returned error :{}",
210            e.as_report()
211        )))),
212    }
213}
214
215fn get_with_options(handler_args: HandlerArgs) -> WithOptions {
216    let context = OptimizerContext::from_handler_args(handler_args);
217    context.with_options().clone()
218}
219
220pub async fn handle_create_mv_bound(
221    handler_args: HandlerArgs,
222    if_not_exists: bool,
223    name: ObjectName,
224    query: BoundQuery,
225    dependent_relations: HashSet<ObjectId>,
226    dependent_udfs: HashSet<FunctionId>, // TODO(rc): merge with `dependent_relations`
227    dependent_secrets: HashSet<SecretId>,
228    columns: Vec<Ident>,
229    emit_mode: Option<EmitMode>,
230) -> Result<RwPgResponse> {
231    let session = handler_args.session.clone();
232
233    // Check cluster limits
234    session.check_cluster_limits().await?;
235
236    if let Either::Right(resp) = session.check_relation_name_duplicated(
237        name.clone(),
238        StatementType::CREATE_MATERIALIZED_VIEW,
239        if_not_exists,
240    )? {
241        return Ok(resp);
242    }
243
244    let (table, graph, dependencies, resource_type) = {
245        gen_create_mv_graph(
246            handler_args,
247            name,
248            query,
249            dependent_relations,
250            dependent_udfs,
251            dependent_secrets,
252            columns,
253            emit_mode,
254        )
255        .await?
256    };
257
258    // Ensure writes to `StreamJobTracker` are atomic.
259    let _job_guard =
260        session
261            .env()
262            .creating_streaming_job_tracker()
263            .guard(CreatingStreamingJobInfo::new(
264                session.session_id(),
265                table.database_id,
266                table.schema_id,
267                table.name.clone(),
268            ));
269
270    let catalog_writer = session.catalog_writer()?;
271    execute_with_long_running_notification(
272        catalog_writer.create_materialized_view(
273            table.to_prost(),
274            graph,
275            dependencies,
276            resource_type,
277            if_not_exists,
278        ),
279        &session,
280        "CREATE MATERIALIZED VIEW",
281        LongRunningNotificationAction::MonitorBackfillJob,
282    )
283    .await?;
284
285    Ok(PgResponse::empty_result(
286        StatementType::CREATE_MATERIALIZED_VIEW,
287    ))
288}
289
290pub(crate) async fn gen_create_mv_graph(
291    handler_args: HandlerArgs,
292    name: ObjectName,
293    query: BoundQuery,
294    dependent_relations: HashSet<ObjectId>,
295    dependent_udfs: HashSet<FunctionId>,
296    dependent_secrets: HashSet<SecretId>,
297    columns: Vec<Ident>,
298    emit_mode: Option<EmitMode>,
299) -> Result<(
300    TableCatalog,
301    PbStreamFragmentGraph,
302    HashSet<ObjectId>,
303    streaming_job_resource_type::ResourceType,
304)> {
305    let mut with_options = get_with_options(handler_args.clone());
306    let resource_group = with_options.remove(&RESOURCE_GROUP_KEY.to_owned());
307
308    if resource_group.is_some() {
309        risingwave_common::license::Feature::ResourceGroup.check_available()?;
310    }
311
312    let serverless_backfill_from_with = with_options
313        .remove(&CLOUD_SERVERLESS_BACKFILL_ENABLED.to_owned())
314        .map(|value| value.parse::<bool>().unwrap_or(false));
315    let is_serverless_backfill = match serverless_backfill_from_with {
316        Some(value) => value,
317        None => {
318            if resource_group.is_some() {
319                false
320            } else {
321                handler_args.session.config().enable_serverless_backfill()
322            }
323        }
324    };
325
326    if resource_group.is_some() && is_serverless_backfill {
327        return Err(RwError::from(InvalidInputSyntax(
328            "Please do not specify serverless backfilling and resource group together".to_owned(),
329        )));
330    }
331
332    if !with_options.is_empty() {
333        // get other useful fields by `remove`, the logic here is to reject unknown options.
334        return Err(RwError::from(ProtocolError(format!(
335            "unexpected options in WITH clause: {:?}",
336            with_options.keys()
337        ))));
338    }
339
340    let sbc_addr = match SESSION_MANAGER.get() {
341        Some(manager) => manager.env().sbc_address(),
342        None => "",
343    }
344    .to_owned();
345
346    if is_serverless_backfill && sbc_addr.is_empty() {
347        return Err(RwError::from(InvalidInputSyntax(
348            "Serverless Backfill is disabled. Use RisingWave cloud at https://cloud.risingwave.com/auth/signup to try this feature".to_owned(),
349        )));
350    }
351
352    let resource_type = if is_serverless_backfill {
353        assert_eq!(resource_group, None);
354        match provision_resource_group(sbc_addr).await {
355            Err(e) => {
356                return Err(RwError::from(ProtocolError(format!(
357                    "failed to provision serverless backfill nodes: {}",
358                    e.as_report()
359                ))));
360            }
361            Ok(group) => {
362                tracing::info!(
363                    resource_group = group,
364                    "provisioning serverless backfill resource group"
365                );
366                streaming_job_resource_type::ResourceType::ServerlessBackfillResourceGroup(group)
367            }
368        }
369    } else if let Some(group) = resource_group {
370        streaming_job_resource_type::ResourceType::SpecificResourceGroup(group)
371    } else {
372        streaming_job_resource_type::ResourceType::Regular(true)
373    };
374    let context = OptimizerContext::from_handler_args(handler_args);
375    let has_order_by = !query.order.is_empty();
376    if has_order_by {
377        context.warn_to_user(r#"The ORDER BY clause in the CREATE MATERIALIZED VIEW statement does not guarantee that the rows selected out of this materialized view is returned in this order.
378It only indicates the physical clustering of the data, which may improve the performance of queries issued against this materialized view.
379"#.to_owned());
380    }
381
382    if resource_type.resource_group().is_some()
383        && !context
384            .session_ctx()
385            .config()
386            .streaming_use_arrangement_backfill()
387    {
388        return Err(RwError::from(ProtocolError("The session config arrangement backfill must be enabled to use the resource_group option".to_owned())));
389    }
390
391    let context: OptimizerContextRef = context.into();
392    let session = context.session_ctx().as_ref();
393
394    let (plan, table) =
395        gen_create_mv_plan_bound(session, context.clone(), query, name, columns, emit_mode)?;
396
397    let backfill_order = plan_backfill_order(
398        session,
399        context.with_options().backfill_order_strategy(),
400        plan.clone(),
401    )?;
402
403    // TODO(rc): To be consistent with UDF dependency check, we should collect relation dependencies
404    // during binding instead of visiting the optimized plan.
405    let dependencies = RelationCollectorVisitor::collect_with(dependent_relations, plan.clone())
406        .into_iter()
407        .chain(dependent_udfs.iter().copied().map_into())
408        .chain(
409            dependent_secrets
410                .iter()
411                .copied()
412                .map(|id| id.as_object_id()),
413        )
414        .collect();
415
416    let graph = build_graph_with_strategy(
417        plan,
418        Some(GraphJobType::MaterializedView),
419        Some(backfill_order),
420    )?;
421
422    Ok((table, graph, dependencies, resource_type))
423}
424
425#[cfg(test)]
426pub mod tests {
427    use std::collections::HashMap;
428
429    use pgwire::pg_response::StatementType::CREATE_MATERIALIZED_VIEW;
430    use risingwave_common::catalog::{
431        DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, ROW_ID_COLUMN_NAME, RW_TIMESTAMP_COLUMN_NAME,
432    };
433    use risingwave_common::types::{DataType, StructType};
434
435    use crate::catalog::root_catalog::SchemaPath;
436    use crate::test_utils::{LocalFrontend, PROTO_FILE_DATA, create_proto_file};
437
438    #[tokio::test]
439    async fn test_create_mv_handler() {
440        let proto_file = create_proto_file(PROTO_FILE_DATA);
441        let sql = format!(
442            r#"CREATE SOURCE t1
443    WITH (connector = 'kinesis')
444    FORMAT PLAIN ENCODE PROTOBUF (message = '.test.TestRecord', schema.location = 'file://{}')"#,
445            proto_file.path().to_str().unwrap()
446        );
447        let frontend = LocalFrontend::new(Default::default()).await;
448        frontend.run_sql(sql).await.unwrap();
449
450        let sql = "create materialized view mv1 as select t1.country from t1";
451        frontend.run_sql(sql).await.unwrap();
452
453        let session = frontend.session_ref();
454        let catalog_reader = session.env().catalog_reader().read_guard();
455        let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
456
457        // Check source exists.
458        let (source, _) = catalog_reader
459            .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t1")
460            .unwrap();
461        assert_eq!(source.name, "t1");
462
463        // Check table exists.
464        let (table, _) = catalog_reader
465            .get_created_table_by_name(DEFAULT_DATABASE_NAME, schema_path, "mv1")
466            .unwrap();
467        assert_eq!(table.name(), "mv1");
468
469        let columns = table
470            .columns
471            .iter()
472            .map(|col| (col.name(), col.data_type().clone()))
473            .collect::<HashMap<&str, DataType>>();
474
475        let city_type = StructType::new(vec![
476            ("address", DataType::Varchar),
477            ("zipcode", DataType::Varchar),
478        ])
479        // .with_ids([5, 6].map(ColumnId::new))
480        .into();
481        let expected_columns = maplit::hashmap! {
482            ROW_ID_COLUMN_NAME => DataType::Serial,
483            "country" => StructType::new(
484                 vec![("address", DataType::Varchar),("city", city_type),("zipcode", DataType::Varchar)],
485            )
486            // .with_ids([3, 4, 7].map(ColumnId::new))
487            .into(),
488            RW_TIMESTAMP_COLUMN_NAME => DataType::Timestamptz,
489        };
490        assert_eq!(columns, expected_columns, "{columns:#?}");
491    }
492
493    /// When creating MV, a unique column name must be specified for each column
494    #[tokio::test]
495    async fn test_no_alias() {
496        let frontend = LocalFrontend::new(Default::default()).await;
497
498        let sql = "create table t(x varchar)";
499        frontend.run_sql(sql).await.unwrap();
500
501        // Aggregation without alias is ok.
502        let sql = "create materialized view mv0 as select count(x) from t";
503        frontend.run_sql(sql).await.unwrap();
504
505        // Same aggregations without alias is forbidden, because it make the same column name.
506        let sql = "create materialized view mv1 as select count(x), count(*) from t";
507        let err = frontend.run_sql(sql).await.unwrap_err();
508        assert_eq!(
509            err.to_string(),
510            "Invalid input syntax: column \"count\" specified more than once"
511        );
512
513        // Literal without alias is forbidden.
514        let sql = "create materialized view mv1 as select 1";
515        let err = frontend.run_sql(sql).await.unwrap_err();
516        assert_eq!(
517            err.to_string(),
518            "Bind error: An alias must be specified for the 1st expression (counting from 1) in result relation"
519        );
520
521        // some expression without alias is forbidden.
522        let sql = "create materialized view mv1 as select x is null from t";
523        let err = frontend.run_sql(sql).await.unwrap_err();
524        assert_eq!(
525            err.to_string(),
526            "Bind error: An alias must be specified for the 1st expression (counting from 1) in result relation"
527        );
528    }
529
530    /// Creating MV with order by returns a special notice
531    #[tokio::test]
532    async fn test_create_mv_with_order_by() {
533        let frontend = LocalFrontend::new(Default::default()).await;
534
535        let sql = "create table t(x varchar)";
536        frontend.run_sql(sql).await.unwrap();
537
538        // Without order by
539        let sql = "create materialized view mv1 as select * from t";
540        let response = frontend.run_sql(sql).await.unwrap();
541        assert_eq!(response.stmt_type(), CREATE_MATERIALIZED_VIEW);
542        assert!(response.notices().is_empty());
543
544        // With order by
545        let sql = "create materialized view mv2 as select * from t order by x";
546        let response = frontend.run_sql(sql).await.unwrap();
547        assert_eq!(response.stmt_type(), CREATE_MATERIALIZED_VIEW);
548    }
549}