1use std::collections::HashSet;
16
17use either::Either;
18use itertools::Itertools;
19use pgwire::pg_response::{PgResponse, StatementType};
20use risingwave_common::catalog::{FunctionId, ObjectId, SecretId};
21use risingwave_pb::ddl_service::streaming_job_resource_type;
22use risingwave_pb::serverless_backfill_controller::{
23 ProvisionRequest, node_group_controller_service_client,
24};
25use risingwave_pb::stream_plan::PbStreamFragmentGraph;
26use risingwave_sqlparser::ast::{EmitMode, Ident, ObjectName, Query};
27use thiserror_ext::AsReport;
28
29use super::RwPgResponse;
30use crate::binder::{Binder, BoundQuery, BoundSetExpr};
31use crate::catalog::check_column_name_not_reserved;
32use crate::error::ErrorCode::{InvalidInputSyntax, ProtocolError};
33use crate::error::{ErrorCode, Result, RwError};
34use crate::handler::HandlerArgs;
35use crate::handler::util::{
36 LongRunningNotificationAction, execute_with_long_running_notification,
37 reject_internal_table_dependencies,
38};
39use crate::optimizer::backfill_order_strategy::plan_backfill_order;
40use crate::optimizer::plan_node::generic::GenericPlanRef;
41use crate::optimizer::plan_node::{
42 BackfillType, Explain, StreamPlanRef as PlanRef, ensure_sync_log_store_fragment_root,
43};
44use crate::optimizer::{OptimizerContext, OptimizerContextRef, RelationCollectorVisitor};
45use crate::planner::Planner;
46use crate::scheduler::streaming_manager::CreatingStreamingJobInfo;
47use crate::session::{SESSION_MANAGER, SessionImpl};
48use crate::stream_fragmenter::{GraphJobType, build_graph_with_strategy};
49use crate::utils::{MV_REFRESH_INTERVAL_SEC_KEY, ordinal};
50use crate::{TableCatalog, WithOptions};
51
52pub const RESOURCE_GROUP_KEY: &str = "resource_group";
53pub const CLOUD_SERVERLESS_BACKFILL_ENABLED: &str = "cloud.serverless_backfill_enabled";
54
55pub(super) fn parse_column_names(columns: &[Ident]) -> Option<Vec<String>> {
56 if columns.is_empty() {
57 None
58 } else {
59 Some(columns.iter().map(|v| v.real_value()).collect())
60 }
61}
62
63pub(super) fn get_column_names(
68 bound: &BoundQuery,
69 columns: Vec<Ident>,
70) -> Result<Option<Vec<String>>> {
71 let col_names = parse_column_names(&columns);
72 if let BoundSetExpr::Select(select) = &bound.body {
73 if col_names.is_none() {
78 for (i, alias) in select.aliases.iter().enumerate() {
79 if alias.is_none() {
80 return Err(ErrorCode::BindError(format!(
81 "An alias must be specified for the {} expression (counting from 1) in result relation", ordinal(i+1)
82 ))
83 .into());
84 }
85 }
86 }
87 }
88
89 Ok(col_names)
90}
91
92pub fn explain_create_mv_plan(
94 session: &SessionImpl,
95 context: OptimizerContextRef,
96 query: Query,
97 name: ObjectName,
98 columns: Vec<Ident>,
99 emit_mode: Option<EmitMode>,
100) -> Result<(PlanRef, TableCatalog)> {
101 let mut binder = Binder::new_for_stream(session);
102 let bound = binder.bind_query(&query)?;
103 gen_create_mv_plan_bound(session, context, bound, name, columns, emit_mode, None)
104}
105
106pub fn gen_create_mv_plan_bound(
108 session: &SessionImpl,
109 context: OptimizerContextRef,
110 query: BoundQuery,
111 name: ObjectName,
112 columns: Vec<Ident>,
113 emit_mode: Option<EmitMode>,
114 refresh_interval_sec: Option<u64>,
115) -> Result<(PlanRef, TableCatalog)> {
116 if session.config().create_compaction_group_for_mv() {
117 context.warn_to_user("The session variable CREATE_COMPACTION_GROUP_FOR_MV has been deprecated. It will not take effect.");
118 }
119
120 let db_name = &session.database();
121 let (schema_name, table_name) = Binder::resolve_schema_qualified_name(db_name, &name)?;
122
123 let (database_id, schema_id) = session.get_database_and_schema_id_for_create(schema_name)?;
124
125 let definition = context.normalized_sql().to_owned();
126
127 let col_names = get_column_names(&query, columns)?;
128
129 let emit_on_window_close = emit_mode == Some(EmitMode::OnWindowClose);
130 if emit_on_window_close {
131 context.warn_to_user("EMIT ON WINDOW CLOSE is currently an experimental feature. Please use it with caution.");
132 }
133
134 let mut plan_root = Planner::new_for_stream(context).plan_query(query)?;
135 if let Some(col_names) = col_names {
136 for name in &col_names {
137 check_column_name_not_reserved(name)?;
138 }
139 plan_root.set_out_names(col_names)?;
140 }
141
142 let backfill_type = if refresh_interval_sec.is_some() {
143 plan_root.require_snapshot_backfill_for_batch_refresh()?;
144 BackfillType::SnapshotBackfill
145 } else {
146 plan_root.derive_backfill_type(true)
147 };
148
149 let materialize = plan_root.gen_materialize_plan(
150 database_id,
151 schema_id,
152 table_name,
153 definition,
154 emit_on_window_close,
155 backfill_type,
156 )?;
157
158 let mut table = materialize.table().clone();
159 table.owner = session.user_id();
160
161 let plan: PlanRef = ensure_sync_log_store_fragment_root(materialize.into());
162
163 let ctx = plan.ctx();
164 let explain_trace = ctx.is_explain_trace();
165 if explain_trace {
166 ctx.trace("Create Materialized View:");
167 ctx.trace(plan.explain_to_string());
168 }
169
170 Ok((plan, table))
171}
172
173pub async fn handle_create_mv(
174 handler_args: HandlerArgs,
175 if_not_exists: bool,
176 name: ObjectName,
177 query: Query,
178 columns: Vec<Ident>,
179 emit_mode: Option<EmitMode>,
180) -> Result<RwPgResponse> {
181 let (dependent_relations, dependent_udfs, dependent_secrets, bound_query) = {
182 let mut binder = Binder::new_for_stream(handler_args.session.as_ref());
183 let bound_query = binder.bind_query(&query)?;
184 (
185 binder.included_relations().clone(),
186 binder.included_udfs().clone(),
187 binder.included_secrets().clone(),
188 bound_query,
189 )
190 };
191 handle_create_mv_bound(
192 handler_args,
193 if_not_exists,
194 name,
195 bound_query,
196 dependent_relations,
197 dependent_udfs,
198 dependent_secrets,
199 columns,
200 emit_mode,
201 )
202 .await
203}
204
205pub async fn provision_resource_group(sbc_addr: String) -> Result<String> {
207 let request = tonic::Request::new(ProvisionRequest {});
208 let mut client =
209 node_group_controller_service_client::NodeGroupControllerServiceClient::connect(
210 sbc_addr.clone(),
211 )
212 .await
213 .map_err(|e| {
214 RwError::from(ErrorCode::InternalError(format!(
215 "unable to reach serverless backfill controller at addr {}: {}",
216 sbc_addr,
217 e.as_report()
218 )))
219 })?;
220
221 match client.provision(request).await {
222 Ok(resp) => Ok(resp.into_inner().resource_group),
223 Err(e) => Err(RwError::from(ErrorCode::InternalError(format!(
224 "serverless backfill controller returned error :{}",
225 e.as_report()
226 )))),
227 }
228}
229
230fn get_with_options(handler_args: HandlerArgs) -> WithOptions {
231 let context = OptimizerContext::from_handler_args(handler_args);
232 context.with_options().clone()
233}
234
235pub async fn handle_create_mv_bound(
236 handler_args: HandlerArgs,
237 if_not_exists: bool,
238 name: ObjectName,
239 query: BoundQuery,
240 dependent_relations: HashSet<ObjectId>,
241 dependent_udfs: HashSet<FunctionId>, dependent_secrets: HashSet<SecretId>,
243 columns: Vec<Ident>,
244 emit_mode: Option<EmitMode>,
245) -> Result<RwPgResponse> {
246 let session = handler_args.session.clone();
247
248 session.check_cluster_limits().await?;
250
251 if let Either::Right(resp) = session.check_relation_name_duplicated(
252 name.clone(),
253 StatementType::CREATE_MATERIALIZED_VIEW,
254 if_not_exists,
255 )? {
256 return Ok(resp);
257 }
258
259 reject_internal_table_dependencies(
260 session.as_ref(),
261 &dependent_relations,
262 "CREATE MATERIALIZED VIEW",
263 )?;
264
265 let (table, graph, dependencies, resource_type, refresh_interval_sec) = {
266 gen_create_mv_graph(
267 handler_args,
268 name,
269 query,
270 dependent_relations,
271 dependent_udfs,
272 dependent_secrets,
273 columns,
274 emit_mode,
275 )
276 .await?
277 };
278
279 let _job_guard =
281 session
282 .env()
283 .creating_streaming_job_tracker()
284 .guard(CreatingStreamingJobInfo::new(
285 session.session_id(),
286 table.database_id,
287 table.schema_id,
288 table.name.clone(),
289 ));
290
291 let catalog_writer = session.catalog_writer()?;
292 execute_with_long_running_notification(
293 catalog_writer.create_materialized_view(
294 table.to_prost(),
295 graph,
296 dependencies,
297 resource_type,
298 if_not_exists,
299 refresh_interval_sec,
300 ),
301 &session,
302 "CREATE MATERIALIZED VIEW",
303 LongRunningNotificationAction::MonitorBackfillJob,
304 )
305 .await?;
306
307 Ok(PgResponse::empty_result(
308 StatementType::CREATE_MATERIALIZED_VIEW,
309 ))
310}
311
312pub(crate) async fn gen_create_mv_graph(
313 handler_args: HandlerArgs,
314 name: ObjectName,
315 query: BoundQuery,
316 dependent_relations: HashSet<ObjectId>,
317 dependent_udfs: HashSet<FunctionId>,
318 dependent_secrets: HashSet<SecretId>,
319 columns: Vec<Ident>,
320 emit_mode: Option<EmitMode>,
321) -> Result<(
322 TableCatalog,
323 PbStreamFragmentGraph,
324 HashSet<ObjectId>,
325 streaming_job_resource_type::ResourceType,
326 Option<u64>,
327)> {
328 let mut with_options = get_with_options(handler_args.clone());
329 let refresh_interval_sec = with_options.refresh_interval_sec()?;
330 with_options.remove(MV_REFRESH_INTERVAL_SEC_KEY);
331 let resource_group = with_options.remove(&RESOURCE_GROUP_KEY.to_owned());
332
333 if resource_group.is_some() {
334 risingwave_common::license::Feature::ResourceGroup.check_available()?;
335 }
336
337 let serverless_backfill_from_with = with_options
338 .remove(&CLOUD_SERVERLESS_BACKFILL_ENABLED.to_owned())
339 .map(|value| value.parse::<bool>().unwrap_or(false));
340 let is_serverless_backfill = match serverless_backfill_from_with {
341 Some(value) => value,
342 None => {
343 if resource_group.is_some() {
344 false
345 } else {
346 handler_args.session.config().enable_serverless_backfill()
347 }
348 }
349 };
350
351 if resource_group.is_some() && is_serverless_backfill {
352 return Err(RwError::from(InvalidInputSyntax(
353 "Please do not specify serverless backfilling and resource group together".to_owned(),
354 )));
355 }
356
357 if !with_options.is_empty() {
358 return Err(RwError::from(ProtocolError(format!(
360 "unexpected options in WITH clause: {:?}",
361 with_options.keys()
362 ))));
363 }
364
365 let sbc_addr = match SESSION_MANAGER.get() {
366 Some(manager) => manager.env().sbc_address(),
367 None => "",
368 }
369 .to_owned();
370
371 if is_serverless_backfill && sbc_addr.is_empty() {
372 return Err(RwError::from(InvalidInputSyntax(
373 "Serverless Backfill is disabled. Use RisingWave cloud at https://cloud.risingwave.com/auth/signup to try this feature".to_owned(),
374 )));
375 }
376
377 let resource_type = if is_serverless_backfill {
378 assert_eq!(resource_group, None);
379 match provision_resource_group(sbc_addr).await {
380 Err(e) => {
381 return Err(RwError::from(ProtocolError(format!(
382 "failed to provision serverless backfill nodes: {}",
383 e.as_report()
384 ))));
385 }
386 Ok(group) => {
387 tracing::info!(
388 resource_group = group,
389 "provisioning serverless backfill resource group"
390 );
391 streaming_job_resource_type::ResourceType::ServerlessBackfillResourceGroup(group)
392 }
393 }
394 } else if let Some(group) = resource_group {
395 streaming_job_resource_type::ResourceType::SpecificResourceGroup(group)
396 } else {
397 streaming_job_resource_type::ResourceType::Regular(true)
398 };
399 let context = OptimizerContext::from_handler_args(handler_args);
400 let has_order_by = !query.order.is_empty();
401 if has_order_by {
402 context.warn_to_user(r#"The ORDER BY clause in the CREATE MATERIALIZED VIEW statement does not guarantee that the rows selected out of this materialized view is returned in this order.
403It only indicates the physical clustering of the data, which may improve the performance of queries issued against this materialized view.
404"#.to_owned());
405 }
406
407 if resource_type.resource_group().is_some()
408 && !context
409 .session_ctx()
410 .config()
411 .streaming_use_arrangement_backfill()
412 {
413 return Err(RwError::from(ProtocolError("The session config arrangement backfill must be enabled to use the resource_group option".to_owned())));
414 }
415
416 let context: OptimizerContextRef = context.into();
417 let session = context.session_ctx().as_ref();
418
419 let (plan, table) = gen_create_mv_plan_bound(
420 session,
421 context.clone(),
422 query,
423 name,
424 columns,
425 emit_mode,
426 refresh_interval_sec,
427 )?;
428
429 let backfill_order = plan_backfill_order(
430 session,
431 context.with_options().backfill_order_strategy(),
432 plan.clone(),
433 )?;
434
435 let dependencies = RelationCollectorVisitor::collect_with(dependent_relations, plan.clone())
438 .into_iter()
439 .chain(dependent_udfs.iter().copied().map_into())
440 .chain(
441 dependent_secrets
442 .iter()
443 .copied()
444 .map(|id| id.as_object_id()),
445 )
446 .collect();
447
448 let graph = build_graph_with_strategy(
449 plan,
450 Some(GraphJobType::MaterializedView),
451 Some(backfill_order),
452 )?;
453
454 Ok((
455 table,
456 graph,
457 dependencies,
458 resource_type,
459 refresh_interval_sec,
460 ))
461}
462
463#[cfg(test)]
464pub mod tests {
465 use std::collections::HashMap;
466
467 use pgwire::pg_response::StatementType::CREATE_MATERIALIZED_VIEW;
468 use risingwave_common::catalog::{
469 DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, ROW_ID_COLUMN_NAME, RW_TIMESTAMP_COLUMN_NAME,
470 };
471 use risingwave_common::types::{DataType, StructType};
472
473 use crate::catalog::root_catalog::SchemaPath;
474 use crate::test_utils::{LocalFrontend, PROTO_FILE_DATA, create_proto_file};
475
476 #[tokio::test]
477 async fn test_create_mv_handler() {
478 let proto_file = create_proto_file(PROTO_FILE_DATA);
479 let sql = format!(
480 r#"CREATE SOURCE t1
481 WITH (connector = 'kinesis')
482 FORMAT PLAIN ENCODE PROTOBUF (message = '.test.TestRecord', schema.location = 'file://{}')"#,
483 proto_file.path().to_str().unwrap()
484 );
485 let frontend = LocalFrontend::new(Default::default()).await;
486 frontend.run_sql(sql).await.unwrap();
487
488 let sql = "create materialized view mv1 as select t1.country from t1";
489 frontend.run_sql(sql).await.unwrap();
490
491 let session = frontend.session_ref();
492 let catalog_reader = session.env().catalog_reader().read_guard();
493 let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
494
495 let (source, _) = catalog_reader
497 .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t1")
498 .unwrap();
499 assert_eq!(source.name, "t1");
500
501 let (table, _) = catalog_reader
503 .get_created_table_by_name(DEFAULT_DATABASE_NAME, schema_path, "mv1")
504 .unwrap();
505 assert_eq!(table.name(), "mv1");
506
507 let columns = table
508 .columns
509 .iter()
510 .map(|col| (col.name(), col.data_type().clone()))
511 .collect::<HashMap<&str, DataType>>();
512
513 let city_type = StructType::new(vec![
514 ("address", DataType::Varchar),
515 ("zipcode", DataType::Varchar),
516 ])
517 .into();
519 let expected_columns = maplit::hashmap! {
520 ROW_ID_COLUMN_NAME => DataType::Serial,
521 "country" => StructType::new(
522 vec![("address", DataType::Varchar),("city", city_type),("zipcode", DataType::Varchar)],
523 )
524 .into(),
526 RW_TIMESTAMP_COLUMN_NAME => DataType::Timestamptz,
527 };
528 assert_eq!(columns, expected_columns, "{columns:#?}");
529 }
530
531 #[tokio::test]
533 async fn test_no_alias() {
534 let frontend = LocalFrontend::new(Default::default()).await;
535
536 let sql = "create table t(x varchar)";
537 frontend.run_sql(sql).await.unwrap();
538
539 let sql = "create materialized view mv0 as select count(x) from t";
541 frontend.run_sql(sql).await.unwrap();
542
543 let sql = "create materialized view mv1 as select count(x), count(*) from t";
545 let err = frontend.run_sql(sql).await.unwrap_err();
546 assert_eq!(
547 err.to_string(),
548 "Invalid input syntax: column \"count\" specified more than once"
549 );
550
551 let sql = "create materialized view mv1 as select 1";
553 let err = frontend.run_sql(sql).await.unwrap_err();
554 assert_eq!(
555 err.to_string(),
556 "Bind error: An alias must be specified for the 1st expression (counting from 1) in result relation"
557 );
558
559 let sql = "create materialized view mv1 as select x is null from t";
561 let err = frontend.run_sql(sql).await.unwrap_err();
562 assert_eq!(
563 err.to_string(),
564 "Bind error: An alias must be specified for the 1st expression (counting from 1) in result relation"
565 );
566 }
567
568 #[tokio::test]
570 async fn test_create_mv_with_order_by() {
571 let frontend = LocalFrontend::new(Default::default()).await;
572
573 let sql = "create table t(x varchar)";
574 frontend.run_sql(sql).await.unwrap();
575
576 let sql = "create materialized view mv1 as select * from t";
578 let response = frontend.run_sql(sql).await.unwrap();
579 assert_eq!(response.stmt_type(), CREATE_MATERIALIZED_VIEW);
580 assert!(response.notices().is_empty());
581
582 let sql = "create materialized view mv2 as select * from t order by x";
584 let response = frontend.run_sql(sql).await.unwrap();
585 assert_eq!(response.stmt_type(), CREATE_MATERIALIZED_VIEW);
586 }
587}