risingwave_frontend/handler/
create_mv.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16
17use either::Either;
18use itertools::Itertools;
19use pgwire::pg_response::{PgResponse, StatementType};
20use risingwave_common::catalog::{FunctionId, ObjectId, SecretId};
21use risingwave_common::license::Feature;
22use risingwave_pb::ddl_service::streaming_job_resource_type;
23use risingwave_pb::serverless_backfill_controller::{
24    ProvisionRequest, node_group_controller_service_client,
25};
26use risingwave_pb::stream_plan::PbStreamFragmentGraph;
27use risingwave_sqlparser::ast::{EmitMode, Ident, ObjectName, Query};
28use thiserror_ext::AsReport;
29
30use super::RwPgResponse;
31use crate::binder::{Binder, BoundQuery, BoundSetExpr};
32use crate::catalog::check_column_name_not_reserved;
33use crate::error::ErrorCode::{InvalidInputSyntax, ProtocolError};
34use crate::error::{ErrorCode, Result, RwError};
35use crate::handler::HandlerArgs;
36use crate::handler::util::{
37    LongRunningNotificationAction, execute_with_long_running_notification,
38    reject_internal_table_dependencies,
39};
40use crate::optimizer::backfill_order_strategy::plan_backfill_order;
41use crate::optimizer::plan_node::generic::GenericPlanRef;
42use crate::optimizer::plan_node::{
43    BackfillType, Explain, StreamPlanRef as PlanRef, ensure_sync_log_store_fragment_root,
44};
45use crate::optimizer::{OptimizerContext, OptimizerContextRef, RelationCollectorVisitor};
46use crate::planner::Planner;
47use crate::scheduler::streaming_manager::CreatingStreamingJobInfo;
48use crate::session::{SESSION_MANAGER, SessionImpl};
49use crate::stream_fragmenter::{GraphJobType, build_graph_with_strategy};
50use crate::utils::{MV_REFRESH_INTERVAL_SEC_KEY, ordinal};
51use crate::{TableCatalog, WithOptions};
52
53pub const RESOURCE_GROUP_KEY: &str = "resource_group";
54pub const CLOUD_SERVERLESS_BACKFILL_ENABLED: &str = "cloud.serverless_backfill_enabled";
55
56pub(crate) struct StreamingJobResourceOptions {
57    pub resource_group: Option<String>,
58    pub serverless_backfill_enabled: Option<bool>,
59}
60
61pub(crate) fn extract_streaming_job_resource_options(
62    with_options: &mut WithOptions,
63) -> StreamingJobResourceOptions {
64    StreamingJobResourceOptions {
65        resource_group: with_options.remove(RESOURCE_GROUP_KEY),
66        serverless_backfill_enabled: with_options
67            .remove(CLOUD_SERVERLESS_BACKFILL_ENABLED)
68            .map(|value| value.parse::<bool>().unwrap_or(false)),
69    }
70}
71
72pub(super) fn parse_column_names(columns: &[Ident]) -> Option<Vec<String>> {
73    if columns.is_empty() {
74        None
75    } else {
76        Some(columns.iter().map(|v| v.real_value()).collect())
77    }
78}
79
80/// If columns is empty, it means that the user did not specify the column names.
81/// In this case, we extract the column names from the query.
82/// If columns is not empty, it means that user specify the column names and the user
83/// should guarantee that the column names number are consistent with the query.
84pub(super) fn get_column_names(
85    bound: &BoundQuery,
86    columns: Vec<Ident>,
87) -> Result<Option<Vec<String>>> {
88    let col_names = parse_column_names(&columns);
89    if let BoundSetExpr::Select(select) = &bound.body {
90        // `InputRef`'s alias will be implicitly assigned in `bind_project`.
91        // If user provides columns name (col_names.is_some()), we don't need alias.
92        // For other expressions (col_names.is_none()), we require the user to explicitly assign an
93        // alias.
94        if col_names.is_none() {
95            for (i, alias) in select.aliases.iter().enumerate() {
96                if alias.is_none() {
97                    return Err(ErrorCode::BindError(format!(
98                    "An alias must be specified for the {} expression (counting from 1) in result relation", ordinal(i+1)
99                ))
100                .into());
101                }
102            }
103        }
104    }
105
106    Ok(col_names)
107}
108
109/// Bind and generate create MV plan, return plan and mv table info.
110pub fn explain_create_mv_plan(
111    session: &SessionImpl,
112    context: OptimizerContextRef,
113    query: Query,
114    name: ObjectName,
115    columns: Vec<Ident>,
116    emit_mode: Option<EmitMode>,
117) -> Result<(PlanRef, TableCatalog)> {
118    let mut binder = Binder::new_for_stream(session);
119    let bound = binder.bind_query(&query)?;
120    gen_create_mv_plan_bound(session, context, bound, name, columns, emit_mode, None)
121}
122
123/// Generate create MV plan from a bound query
124pub fn gen_create_mv_plan_bound(
125    session: &SessionImpl,
126    context: OptimizerContextRef,
127    query: BoundQuery,
128    name: ObjectName,
129    columns: Vec<Ident>,
130    emit_mode: Option<EmitMode>,
131    refresh_interval_sec: Option<u64>,
132) -> Result<(PlanRef, TableCatalog)> {
133    if session.config().create_compaction_group_for_mv() {
134        context.warn_to_user("The session variable CREATE_COMPACTION_GROUP_FOR_MV has been deprecated. It will not take effect.");
135    }
136
137    let db_name = &session.database();
138    let (schema_name, table_name) = Binder::resolve_schema_qualified_name(db_name, &name)?;
139
140    let (database_id, schema_id) = session.get_database_and_schema_id_for_create(schema_name)?;
141
142    let definition = context.normalized_sql().to_owned();
143
144    let col_names = get_column_names(&query, columns)?;
145
146    let emit_on_window_close = emit_mode == Some(EmitMode::OnWindowClose);
147    if emit_on_window_close {
148        context.warn_to_user("EMIT ON WINDOW CLOSE is currently an experimental feature. Please use it with caution.");
149    }
150
151    let mut plan_root = Planner::new_for_stream(context).plan_query(query)?;
152    if let Some(col_names) = col_names {
153        for name in &col_names {
154            check_column_name_not_reserved(name)?;
155        }
156        plan_root.set_out_names(col_names)?;
157    }
158
159    let backfill_type = if refresh_interval_sec.is_some() {
160        plan_root.require_snapshot_backfill_for_batch_refresh()?;
161        BackfillType::SnapshotBackfill
162    } else {
163        plan_root.derive_backfill_type(true)
164    };
165
166    let materialize = plan_root.gen_materialize_plan(
167        database_id,
168        schema_id,
169        table_name,
170        definition,
171        emit_on_window_close,
172        backfill_type,
173    )?;
174
175    let mut table = materialize.table().clone();
176    table.owner = session.user_id();
177
178    let plan: PlanRef = ensure_sync_log_store_fragment_root(materialize.into());
179
180    let ctx = plan.ctx();
181    let explain_trace = ctx.is_explain_trace();
182    if explain_trace {
183        ctx.trace("Create Materialized View:");
184        ctx.trace(plan.explain_to_string());
185    }
186
187    Ok((plan, table))
188}
189
190pub async fn handle_create_mv(
191    handler_args: HandlerArgs,
192    if_not_exists: bool,
193    name: ObjectName,
194    query: Query,
195    columns: Vec<Ident>,
196    emit_mode: Option<EmitMode>,
197) -> Result<RwPgResponse> {
198    let (dependent_relations, dependent_udfs, dependent_secrets, bound_query) = {
199        let mut binder = Binder::new_for_stream(handler_args.session.as_ref());
200        let bound_query = binder.bind_query(&query)?;
201        (
202            binder.included_relations().clone(),
203            binder.included_udfs().clone(),
204            binder.included_secrets().clone(),
205            bound_query,
206        )
207    };
208    handle_create_mv_bound(
209        handler_args,
210        if_not_exists,
211        name,
212        bound_query,
213        dependent_relations,
214        dependent_udfs,
215        dependent_secrets,
216        columns,
217        emit_mode,
218    )
219    .await
220}
221
222/// Send a provision request to the serverless backfill controller
223pub async fn provision_resource_group(sbc_addr: String) -> Result<String> {
224    let request = tonic::Request::new(ProvisionRequest {});
225    let mut client =
226        node_group_controller_service_client::NodeGroupControllerServiceClient::connect(
227            sbc_addr.clone(),
228        )
229        .await
230        .map_err(|e| {
231            RwError::from(ErrorCode::InternalError(format!(
232                "unable to reach serverless backfill controller at addr {}: {}",
233                sbc_addr,
234                e.as_report()
235            )))
236        })?;
237
238    match client.provision(request).await {
239        Ok(resp) => Ok(resp.into_inner().resource_group),
240        Err(e) => Err(RwError::from(ErrorCode::InternalError(format!(
241            "serverless backfill controller returned error :{}",
242            e.as_report()
243        )))),
244    }
245}
246
247pub(crate) async fn resolve_streaming_job_resource_type(
248    session: &SessionImpl,
249    with_options: &mut WithOptions,
250) -> Result<streaming_job_resource_type::ResourceType> {
251    let StreamingJobResourceOptions {
252        resource_group,
253        serverless_backfill_enabled,
254    } = extract_streaming_job_resource_options(with_options);
255
256    if resource_group.is_some() {
257        Feature::ResourceGroup.check_available()?;
258    }
259
260    let is_serverless_backfill = match serverless_backfill_enabled {
261        Some(value) => value,
262        None => {
263            if resource_group.is_some() {
264                false
265            } else {
266                session.config().enable_serverless_backfill()
267            }
268        }
269    };
270
271    if resource_group.is_some() && is_serverless_backfill {
272        return Err(RwError::from(InvalidInputSyntax(
273            "Please do not specify serverless backfilling and resource group together".to_owned(),
274        )));
275    }
276
277    let sbc_addr = match SESSION_MANAGER.get() {
278        Some(manager) => manager.env().sbc_address(),
279        None => "",
280    }
281    .to_owned();
282
283    if is_serverless_backfill && sbc_addr.is_empty() {
284        return Err(RwError::from(InvalidInputSyntax(
285            "Serverless Backfill is disabled. Use RisingWave cloud at https://cloud.risingwave.com/auth/signup to try this feature".to_owned(),
286        )));
287    }
288
289    let resource_type = if is_serverless_backfill {
290        assert_eq!(resource_group, None);
291        match provision_resource_group(sbc_addr).await {
292            Err(e) => {
293                return Err(RwError::from(ProtocolError(format!(
294                    "failed to provision serverless backfill nodes: {}",
295                    e.as_report()
296                ))));
297            }
298            Ok(group) => {
299                tracing::info!(
300                    resource_group = group,
301                    "provisioning serverless backfill resource group"
302                );
303                streaming_job_resource_type::ResourceType::ServerlessBackfillResourceGroup(group)
304            }
305        }
306    } else if let Some(group) = resource_group {
307        streaming_job_resource_type::ResourceType::SpecificResourceGroup(group)
308    } else {
309        streaming_job_resource_type::ResourceType::Regular(true)
310    };
311
312    if resource_type.resource_group().is_some()
313        && !session.config().streaming_use_arrangement_backfill()
314    {
315        return Err(RwError::from(ProtocolError("The session config arrangement backfill must be enabled to use the resource_group option".to_owned())));
316    }
317
318    Ok(resource_type)
319}
320
321fn get_with_options(handler_args: HandlerArgs) -> WithOptions {
322    let context = OptimizerContext::from_handler_args(handler_args);
323    context.with_options().clone()
324}
325
326pub async fn handle_create_mv_bound(
327    handler_args: HandlerArgs,
328    if_not_exists: bool,
329    name: ObjectName,
330    query: BoundQuery,
331    dependent_relations: HashSet<ObjectId>,
332    dependent_udfs: HashSet<FunctionId>, // TODO(rc): merge with `dependent_relations`
333    dependent_secrets: HashSet<SecretId>,
334    columns: Vec<Ident>,
335    emit_mode: Option<EmitMode>,
336) -> Result<RwPgResponse> {
337    let session = handler_args.session.clone();
338
339    // Check cluster limits
340    session.check_cluster_limits().await?;
341
342    if let Either::Right(resp) = session.check_relation_name_duplicated(
343        name.clone(),
344        StatementType::CREATE_MATERIALIZED_VIEW,
345        if_not_exists,
346    )? {
347        return Ok(resp);
348    }
349
350    reject_internal_table_dependencies(
351        session.as_ref(),
352        &dependent_relations,
353        "CREATE MATERIALIZED VIEW",
354    )?;
355
356    let (table, graph, dependencies, resource_type, refresh_interval_sec) = {
357        gen_create_mv_graph(
358            handler_args,
359            name,
360            query,
361            dependent_relations,
362            dependent_udfs,
363            dependent_secrets,
364            columns,
365            emit_mode,
366        )
367        .await?
368    };
369
370    // Ensure writes to `StreamJobTracker` are atomic.
371    let _job_guard =
372        session
373            .env()
374            .creating_streaming_job_tracker()
375            .guard(CreatingStreamingJobInfo::new(
376                session.session_id(),
377                table.database_id,
378                table.schema_id,
379                table.name.clone(),
380            ));
381
382    let catalog_writer = session.catalog_writer()?;
383    execute_with_long_running_notification(
384        catalog_writer.create_materialized_view(
385            table.to_prost(),
386            graph,
387            dependencies,
388            resource_type,
389            if_not_exists,
390            refresh_interval_sec,
391        ),
392        &session,
393        "CREATE MATERIALIZED VIEW",
394        LongRunningNotificationAction::MonitorBackfillJob,
395    )
396    .await?;
397
398    Ok(PgResponse::empty_result(
399        StatementType::CREATE_MATERIALIZED_VIEW,
400    ))
401}
402
403pub(crate) async fn gen_create_mv_graph(
404    handler_args: HandlerArgs,
405    name: ObjectName,
406    query: BoundQuery,
407    dependent_relations: HashSet<ObjectId>,
408    dependent_udfs: HashSet<FunctionId>,
409    dependent_secrets: HashSet<SecretId>,
410    columns: Vec<Ident>,
411    emit_mode: Option<EmitMode>,
412) -> Result<(
413    TableCatalog,
414    PbStreamFragmentGraph,
415    HashSet<ObjectId>,
416    streaming_job_resource_type::ResourceType,
417    Option<u64>,
418)> {
419    let mut with_options = get_with_options(handler_args.clone());
420    let refresh_interval_sec = with_options.refresh_interval_sec()?;
421    with_options.remove(MV_REFRESH_INTERVAL_SEC_KEY);
422    let resource_type =
423        resolve_streaming_job_resource_type(handler_args.session.as_ref(), &mut with_options)
424            .await?;
425
426    if !with_options.is_empty() {
427        // get other useful fields by `remove`, the logic here is to reject unknown options.
428        return Err(RwError::from(ProtocolError(format!(
429            "unexpected options in WITH clause: {:?}",
430            with_options.keys()
431        ))));
432    }
433
434    let context = OptimizerContext::from_handler_args(handler_args);
435    let has_order_by = !query.order.is_empty();
436    if has_order_by {
437        context.warn_to_user(r#"The ORDER BY clause in the CREATE MATERIALIZED VIEW statement does not guarantee that the rows selected out of this materialized view is returned in this order.
438It only indicates the physical clustering of the data, which may improve the performance of queries issued against this materialized view.
439"#.to_owned());
440    }
441
442    let context: OptimizerContextRef = context.into();
443    let session = context.session_ctx().as_ref();
444
445    let (plan, table) = gen_create_mv_plan_bound(
446        session,
447        context.clone(),
448        query,
449        name,
450        columns,
451        emit_mode,
452        refresh_interval_sec,
453    )?;
454
455    let backfill_order = plan_backfill_order(
456        session,
457        context.with_options().backfill_order_strategy(),
458        plan.clone(),
459    )?;
460
461    // TODO(rc): To be consistent with UDF dependency check, we should collect relation dependencies
462    // during binding instead of visiting the optimized plan.
463    let dependencies = RelationCollectorVisitor::collect_with(dependent_relations, plan.clone())
464        .into_iter()
465        .chain(dependent_udfs.iter().copied().map_into())
466        .chain(
467            dependent_secrets
468                .iter()
469                .copied()
470                .map(|id| id.as_object_id()),
471        )
472        .collect();
473
474    let graph = build_graph_with_strategy(
475        plan,
476        Some(GraphJobType::MaterializedView),
477        Some(backfill_order),
478    )?;
479
480    Ok((
481        table,
482        graph,
483        dependencies,
484        resource_type,
485        refresh_interval_sec,
486    ))
487}
488
489#[cfg(test)]
490pub mod tests {
491    use std::collections::HashMap;
492
493    use pgwire::pg_response::StatementType::CREATE_MATERIALIZED_VIEW;
494    use risingwave_common::catalog::{
495        DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, ROW_ID_COLUMN_NAME, RW_TIMESTAMP_COLUMN_NAME,
496    };
497    use risingwave_common::types::{DataType, StructType};
498
499    use crate::catalog::root_catalog::SchemaPath;
500    use crate::test_utils::{LocalFrontend, PROTO_FILE_DATA, create_proto_file};
501
502    #[tokio::test]
503    async fn test_create_mv_handler() {
504        let proto_file = create_proto_file(PROTO_FILE_DATA);
505        let sql = format!(
506            r#"CREATE SOURCE t1
507    WITH (connector = 'kinesis')
508    FORMAT PLAIN ENCODE PROTOBUF (message = '.test.TestRecord', schema.location = 'file://{}')"#,
509            proto_file.path().to_str().unwrap()
510        );
511        let frontend = LocalFrontend::new(Default::default()).await;
512        frontend.run_sql(sql).await.unwrap();
513
514        let sql = "create materialized view mv1 as select t1.country from t1";
515        frontend.run_sql(sql).await.unwrap();
516
517        let session = frontend.session_ref();
518        let catalog_reader = session.env().catalog_reader().read_guard();
519        let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
520
521        // Check source exists.
522        let (source, _) = catalog_reader
523            .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t1")
524            .unwrap();
525        assert_eq!(source.name, "t1");
526
527        // Check table exists.
528        let (table, _) = catalog_reader
529            .get_created_table_by_name(DEFAULT_DATABASE_NAME, schema_path, "mv1")
530            .unwrap();
531        assert_eq!(table.name(), "mv1");
532
533        let columns = table
534            .columns
535            .iter()
536            .map(|col| (col.name(), col.data_type().clone()))
537            .collect::<HashMap<&str, DataType>>();
538
539        let city_type = StructType::new(vec![
540            ("address", DataType::Varchar),
541            ("zipcode", DataType::Varchar),
542        ])
543        // .with_ids([5, 6].map(ColumnId::new))
544        .into();
545        let expected_columns = maplit::hashmap! {
546            ROW_ID_COLUMN_NAME => DataType::Serial,
547            "country" => StructType::new(
548                 vec![("address", DataType::Varchar),("city", city_type),("zipcode", DataType::Varchar)],
549            )
550            // .with_ids([3, 4, 7].map(ColumnId::new))
551            .into(),
552            RW_TIMESTAMP_COLUMN_NAME => DataType::Timestamptz,
553        };
554        assert_eq!(columns, expected_columns, "{columns:#?}");
555    }
556
557    /// When creating MV, a unique column name must be specified for each column
558    #[tokio::test]
559    async fn test_no_alias() {
560        let frontend = LocalFrontend::new(Default::default()).await;
561
562        let sql = "create table t(x varchar)";
563        frontend.run_sql(sql).await.unwrap();
564
565        // Aggregation without alias is ok.
566        let sql = "create materialized view mv0 as select count(x) from t";
567        frontend.run_sql(sql).await.unwrap();
568
569        // Same aggregations without alias is forbidden, because it make the same column name.
570        let sql = "create materialized view mv1 as select count(x), count(*) from t";
571        let err = frontend.run_sql(sql).await.unwrap_err();
572        assert_eq!(
573            err.to_string(),
574            "Invalid input syntax: column \"count\" specified more than once"
575        );
576
577        // Literal without alias is forbidden.
578        let sql = "create materialized view mv1 as select 1";
579        let err = frontend.run_sql(sql).await.unwrap_err();
580        assert_eq!(
581            err.to_string(),
582            "Bind error: An alias must be specified for the 1st expression (counting from 1) in result relation"
583        );
584
585        // some expression without alias is forbidden.
586        let sql = "create materialized view mv1 as select x is null from t";
587        let err = frontend.run_sql(sql).await.unwrap_err();
588        assert_eq!(
589            err.to_string(),
590            "Bind error: An alias must be specified for the 1st expression (counting from 1) in result relation"
591        );
592    }
593
594    /// Creating MV with order by returns a special notice
595    #[tokio::test]
596    async fn test_create_mv_with_order_by() {
597        let frontend = LocalFrontend::new(Default::default()).await;
598
599        let sql = "create table t(x varchar)";
600        frontend.run_sql(sql).await.unwrap();
601
602        // Without order by
603        let sql = "create materialized view mv1 as select * from t";
604        let response = frontend.run_sql(sql).await.unwrap();
605        assert_eq!(response.stmt_type(), CREATE_MATERIALIZED_VIEW);
606        assert!(response.notices().is_empty());
607
608        // With order by
609        let sql = "create materialized view mv2 as select * from t order by x";
610        let response = frontend.run_sql(sql).await.unwrap();
611        assert_eq!(response.stmt_type(), CREATE_MATERIALIZED_VIEW);
612    }
613}