1use std::collections::HashSet;
16
17use either::Either;
18use itertools::Itertools;
19use pgwire::pg_response::{PgResponse, StatementType};
20use risingwave_common::catalog::{FunctionId, ObjectId, SecretId};
21use risingwave_common::license::Feature;
22use risingwave_pb::ddl_service::streaming_job_resource_type;
23use risingwave_pb::serverless_backfill_controller::{
24 ProvisionRequest, node_group_controller_service_client,
25};
26use risingwave_pb::stream_plan::PbStreamFragmentGraph;
27use risingwave_sqlparser::ast::{EmitMode, Ident, ObjectName, Query};
28use thiserror_ext::AsReport;
29
30use super::RwPgResponse;
31use crate::binder::{Binder, BoundQuery, BoundSetExpr};
32use crate::catalog::check_column_name_not_reserved;
33use crate::error::ErrorCode::{InvalidInputSyntax, ProtocolError};
34use crate::error::{ErrorCode, Result, RwError};
35use crate::handler::HandlerArgs;
36use crate::handler::util::{
37 LongRunningNotificationAction, execute_with_long_running_notification,
38 reject_internal_table_dependencies,
39};
40use crate::optimizer::backfill_order_strategy::plan_backfill_order;
41use crate::optimizer::plan_node::generic::GenericPlanRef;
42use crate::optimizer::plan_node::{
43 BackfillType, Explain, StreamPlanRef as PlanRef, ensure_sync_log_store_fragment_root,
44};
45use crate::optimizer::{OptimizerContext, OptimizerContextRef, RelationCollectorVisitor};
46use crate::planner::Planner;
47use crate::scheduler::streaming_manager::CreatingStreamingJobInfo;
48use crate::session::{SESSION_MANAGER, SessionImpl};
49use crate::stream_fragmenter::{GraphJobType, build_graph_with_strategy};
50use crate::utils::{MV_REFRESH_INTERVAL_SEC_KEY, ordinal};
51use crate::{TableCatalog, WithOptions};
52
53pub const RESOURCE_GROUP_KEY: &str = "resource_group";
54pub const CLOUD_SERVERLESS_BACKFILL_ENABLED: &str = "cloud.serverless_backfill_enabled";
55
56pub(crate) struct StreamingJobResourceOptions {
57 pub resource_group: Option<String>,
58 pub serverless_backfill_enabled: Option<bool>,
59}
60
61pub(crate) fn extract_streaming_job_resource_options(
62 with_options: &mut WithOptions,
63) -> StreamingJobResourceOptions {
64 StreamingJobResourceOptions {
65 resource_group: with_options.remove(RESOURCE_GROUP_KEY),
66 serverless_backfill_enabled: with_options
67 .remove(CLOUD_SERVERLESS_BACKFILL_ENABLED)
68 .map(|value| value.parse::<bool>().unwrap_or(false)),
69 }
70}
71
72pub(super) fn parse_column_names(columns: &[Ident]) -> Option<Vec<String>> {
73 if columns.is_empty() {
74 None
75 } else {
76 Some(columns.iter().map(|v| v.real_value()).collect())
77 }
78}
79
80pub(super) fn get_column_names(
85 bound: &BoundQuery,
86 columns: Vec<Ident>,
87) -> Result<Option<Vec<String>>> {
88 let col_names = parse_column_names(&columns);
89 if let BoundSetExpr::Select(select) = &bound.body {
90 if col_names.is_none() {
95 for (i, alias) in select.aliases.iter().enumerate() {
96 if alias.is_none() {
97 return Err(ErrorCode::BindError(format!(
98 "An alias must be specified for the {} expression (counting from 1) in result relation", ordinal(i+1)
99 ))
100 .into());
101 }
102 }
103 }
104 }
105
106 Ok(col_names)
107}
108
109pub fn explain_create_mv_plan(
111 session: &SessionImpl,
112 context: OptimizerContextRef,
113 query: Query,
114 name: ObjectName,
115 columns: Vec<Ident>,
116 emit_mode: Option<EmitMode>,
117) -> Result<(PlanRef, TableCatalog)> {
118 let mut binder = Binder::new_for_stream(session);
119 let bound = binder.bind_query(&query)?;
120 gen_create_mv_plan_bound(session, context, bound, name, columns, emit_mode, None)
121}
122
123pub fn gen_create_mv_plan_bound(
125 session: &SessionImpl,
126 context: OptimizerContextRef,
127 query: BoundQuery,
128 name: ObjectName,
129 columns: Vec<Ident>,
130 emit_mode: Option<EmitMode>,
131 refresh_interval_sec: Option<u64>,
132) -> Result<(PlanRef, TableCatalog)> {
133 if session.config().create_compaction_group_for_mv() {
134 context.warn_to_user("The session variable CREATE_COMPACTION_GROUP_FOR_MV has been deprecated. It will not take effect.");
135 }
136
137 let db_name = &session.database();
138 let (schema_name, table_name) = Binder::resolve_schema_qualified_name(db_name, &name)?;
139
140 let (database_id, schema_id) = session.get_database_and_schema_id_for_create(schema_name)?;
141
142 let definition = context.normalized_sql().to_owned();
143
144 let col_names = get_column_names(&query, columns)?;
145
146 let emit_on_window_close = emit_mode == Some(EmitMode::OnWindowClose);
147 if emit_on_window_close {
148 context.warn_to_user("EMIT ON WINDOW CLOSE is currently an experimental feature. Please use it with caution.");
149 }
150
151 let mut plan_root = Planner::new_for_stream(context).plan_query(query)?;
152 if let Some(col_names) = col_names {
153 for name in &col_names {
154 check_column_name_not_reserved(name)?;
155 }
156 plan_root.set_out_names(col_names)?;
157 }
158
159 let backfill_type = if refresh_interval_sec.is_some() {
160 plan_root.require_snapshot_backfill_for_batch_refresh()?;
161 BackfillType::SnapshotBackfill
162 } else {
163 plan_root.derive_backfill_type(true)
164 };
165
166 let materialize = plan_root.gen_materialize_plan(
167 database_id,
168 schema_id,
169 table_name,
170 definition,
171 emit_on_window_close,
172 backfill_type,
173 )?;
174
175 let mut table = materialize.table().clone();
176 table.owner = session.user_id();
177
178 let plan: PlanRef = ensure_sync_log_store_fragment_root(materialize.into());
179
180 let ctx = plan.ctx();
181 let explain_trace = ctx.is_explain_trace();
182 if explain_trace {
183 ctx.trace("Create Materialized View:");
184 ctx.trace(plan.explain_to_string());
185 }
186
187 Ok((plan, table))
188}
189
190pub async fn handle_create_mv(
191 handler_args: HandlerArgs,
192 if_not_exists: bool,
193 name: ObjectName,
194 query: Query,
195 columns: Vec<Ident>,
196 emit_mode: Option<EmitMode>,
197) -> Result<RwPgResponse> {
198 let (dependent_relations, dependent_udfs, dependent_secrets, bound_query) = {
199 let mut binder = Binder::new_for_stream(handler_args.session.as_ref());
200 let bound_query = binder.bind_query(&query)?;
201 (
202 binder.included_relations().clone(),
203 binder.included_udfs().clone(),
204 binder.included_secrets().clone(),
205 bound_query,
206 )
207 };
208 handle_create_mv_bound(
209 handler_args,
210 if_not_exists,
211 name,
212 bound_query,
213 dependent_relations,
214 dependent_udfs,
215 dependent_secrets,
216 columns,
217 emit_mode,
218 )
219 .await
220}
221
222pub async fn provision_resource_group(sbc_addr: String) -> Result<String> {
224 let request = tonic::Request::new(ProvisionRequest {});
225 let mut client =
226 node_group_controller_service_client::NodeGroupControllerServiceClient::connect(
227 sbc_addr.clone(),
228 )
229 .await
230 .map_err(|e| {
231 RwError::from(ErrorCode::InternalError(format!(
232 "unable to reach serverless backfill controller at addr {}: {}",
233 sbc_addr,
234 e.as_report()
235 )))
236 })?;
237
238 match client.provision(request).await {
239 Ok(resp) => Ok(resp.into_inner().resource_group),
240 Err(e) => Err(RwError::from(ErrorCode::InternalError(format!(
241 "serverless backfill controller returned error :{}",
242 e.as_report()
243 )))),
244 }
245}
246
247pub(crate) async fn resolve_streaming_job_resource_type(
248 session: &SessionImpl,
249 with_options: &mut WithOptions,
250) -> Result<streaming_job_resource_type::ResourceType> {
251 let StreamingJobResourceOptions {
252 resource_group,
253 serverless_backfill_enabled,
254 } = extract_streaming_job_resource_options(with_options);
255
256 if resource_group.is_some() {
257 Feature::ResourceGroup.check_available()?;
258 }
259
260 let is_serverless_backfill = match serverless_backfill_enabled {
261 Some(value) => value,
262 None => {
263 if resource_group.is_some() {
264 false
265 } else {
266 session.config().enable_serverless_backfill()
267 }
268 }
269 };
270
271 if resource_group.is_some() && is_serverless_backfill {
272 return Err(RwError::from(InvalidInputSyntax(
273 "Please do not specify serverless backfilling and resource group together".to_owned(),
274 )));
275 }
276
277 let sbc_addr = match SESSION_MANAGER.get() {
278 Some(manager) => manager.env().sbc_address(),
279 None => "",
280 }
281 .to_owned();
282
283 if is_serverless_backfill && sbc_addr.is_empty() {
284 return Err(RwError::from(InvalidInputSyntax(
285 "Serverless Backfill is disabled. Use RisingWave cloud at https://cloud.risingwave.com/auth/signup to try this feature".to_owned(),
286 )));
287 }
288
289 let resource_type = if is_serverless_backfill {
290 assert_eq!(resource_group, None);
291 match provision_resource_group(sbc_addr).await {
292 Err(e) => {
293 return Err(RwError::from(ProtocolError(format!(
294 "failed to provision serverless backfill nodes: {}",
295 e.as_report()
296 ))));
297 }
298 Ok(group) => {
299 tracing::info!(
300 resource_group = group,
301 "provisioning serverless backfill resource group"
302 );
303 streaming_job_resource_type::ResourceType::ServerlessBackfillResourceGroup(group)
304 }
305 }
306 } else if let Some(group) = resource_group {
307 streaming_job_resource_type::ResourceType::SpecificResourceGroup(group)
308 } else {
309 streaming_job_resource_type::ResourceType::Regular(true)
310 };
311
312 if resource_type.resource_group().is_some()
313 && !session.config().streaming_use_arrangement_backfill()
314 {
315 return Err(RwError::from(ProtocolError("The session config arrangement backfill must be enabled to use the resource_group option".to_owned())));
316 }
317
318 Ok(resource_type)
319}
320
321fn get_with_options(handler_args: HandlerArgs) -> WithOptions {
322 let context = OptimizerContext::from_handler_args(handler_args);
323 context.with_options().clone()
324}
325
326pub async fn handle_create_mv_bound(
327 handler_args: HandlerArgs,
328 if_not_exists: bool,
329 name: ObjectName,
330 query: BoundQuery,
331 dependent_relations: HashSet<ObjectId>,
332 dependent_udfs: HashSet<FunctionId>, dependent_secrets: HashSet<SecretId>,
334 columns: Vec<Ident>,
335 emit_mode: Option<EmitMode>,
336) -> Result<RwPgResponse> {
337 let session = handler_args.session.clone();
338
339 session.check_cluster_limits().await?;
341
342 if let Either::Right(resp) = session.check_relation_name_duplicated(
343 name.clone(),
344 StatementType::CREATE_MATERIALIZED_VIEW,
345 if_not_exists,
346 )? {
347 return Ok(resp);
348 }
349
350 reject_internal_table_dependencies(
351 session.as_ref(),
352 &dependent_relations,
353 "CREATE MATERIALIZED VIEW",
354 )?;
355
356 let (table, graph, dependencies, resource_type, refresh_interval_sec) = {
357 gen_create_mv_graph(
358 handler_args,
359 name,
360 query,
361 dependent_relations,
362 dependent_udfs,
363 dependent_secrets,
364 columns,
365 emit_mode,
366 )
367 .await?
368 };
369
370 let _job_guard =
372 session
373 .env()
374 .creating_streaming_job_tracker()
375 .guard(CreatingStreamingJobInfo::new(
376 session.session_id(),
377 table.database_id,
378 table.schema_id,
379 table.name.clone(),
380 ));
381
382 let catalog_writer = session.catalog_writer()?;
383 execute_with_long_running_notification(
384 catalog_writer.create_materialized_view(
385 table.to_prost(),
386 graph,
387 dependencies,
388 resource_type,
389 if_not_exists,
390 refresh_interval_sec,
391 ),
392 &session,
393 "CREATE MATERIALIZED VIEW",
394 LongRunningNotificationAction::MonitorBackfillJob,
395 )
396 .await?;
397
398 Ok(PgResponse::empty_result(
399 StatementType::CREATE_MATERIALIZED_VIEW,
400 ))
401}
402
403pub(crate) async fn gen_create_mv_graph(
404 handler_args: HandlerArgs,
405 name: ObjectName,
406 query: BoundQuery,
407 dependent_relations: HashSet<ObjectId>,
408 dependent_udfs: HashSet<FunctionId>,
409 dependent_secrets: HashSet<SecretId>,
410 columns: Vec<Ident>,
411 emit_mode: Option<EmitMode>,
412) -> Result<(
413 TableCatalog,
414 PbStreamFragmentGraph,
415 HashSet<ObjectId>,
416 streaming_job_resource_type::ResourceType,
417 Option<u64>,
418)> {
419 let mut with_options = get_with_options(handler_args.clone());
420 let refresh_interval_sec = with_options.refresh_interval_sec()?;
421 with_options.remove(MV_REFRESH_INTERVAL_SEC_KEY);
422 let resource_type =
423 resolve_streaming_job_resource_type(handler_args.session.as_ref(), &mut with_options)
424 .await?;
425
426 if !with_options.is_empty() {
427 return Err(RwError::from(ProtocolError(format!(
429 "unexpected options in WITH clause: {:?}",
430 with_options.keys()
431 ))));
432 }
433
434 let context = OptimizerContext::from_handler_args(handler_args);
435 let has_order_by = !query.order.is_empty();
436 if has_order_by {
437 context.warn_to_user(r#"The ORDER BY clause in the CREATE MATERIALIZED VIEW statement does not guarantee that the rows selected out of this materialized view is returned in this order.
438It only indicates the physical clustering of the data, which may improve the performance of queries issued against this materialized view.
439"#.to_owned());
440 }
441
442 let context: OptimizerContextRef = context.into();
443 let session = context.session_ctx().as_ref();
444
445 let (plan, table) = gen_create_mv_plan_bound(
446 session,
447 context.clone(),
448 query,
449 name,
450 columns,
451 emit_mode,
452 refresh_interval_sec,
453 )?;
454
455 let backfill_order = plan_backfill_order(
456 session,
457 context.with_options().backfill_order_strategy(),
458 plan.clone(),
459 )?;
460
461 let dependencies = RelationCollectorVisitor::collect_with(dependent_relations, plan.clone())
464 .into_iter()
465 .chain(dependent_udfs.iter().copied().map_into())
466 .chain(
467 dependent_secrets
468 .iter()
469 .copied()
470 .map(|id| id.as_object_id()),
471 )
472 .collect();
473
474 let graph = build_graph_with_strategy(
475 plan,
476 Some(GraphJobType::MaterializedView),
477 Some(backfill_order),
478 )?;
479
480 Ok((
481 table,
482 graph,
483 dependencies,
484 resource_type,
485 refresh_interval_sec,
486 ))
487}
488
489#[cfg(test)]
490pub mod tests {
491 use std::collections::HashMap;
492
493 use pgwire::pg_response::StatementType::CREATE_MATERIALIZED_VIEW;
494 use risingwave_common::catalog::{
495 DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, ROW_ID_COLUMN_NAME, RW_TIMESTAMP_COLUMN_NAME,
496 };
497 use risingwave_common::types::{DataType, StructType};
498
499 use crate::catalog::root_catalog::SchemaPath;
500 use crate::test_utils::{LocalFrontend, PROTO_FILE_DATA, create_proto_file};
501
502 #[tokio::test]
503 async fn test_create_mv_handler() {
504 let proto_file = create_proto_file(PROTO_FILE_DATA);
505 let sql = format!(
506 r#"CREATE SOURCE t1
507 WITH (connector = 'kinesis')
508 FORMAT PLAIN ENCODE PROTOBUF (message = '.test.TestRecord', schema.location = 'file://{}')"#,
509 proto_file.path().to_str().unwrap()
510 );
511 let frontend = LocalFrontend::new(Default::default()).await;
512 frontend.run_sql(sql).await.unwrap();
513
514 let sql = "create materialized view mv1 as select t1.country from t1";
515 frontend.run_sql(sql).await.unwrap();
516
517 let session = frontend.session_ref();
518 let catalog_reader = session.env().catalog_reader().read_guard();
519 let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
520
521 let (source, _) = catalog_reader
523 .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t1")
524 .unwrap();
525 assert_eq!(source.name, "t1");
526
527 let (table, _) = catalog_reader
529 .get_created_table_by_name(DEFAULT_DATABASE_NAME, schema_path, "mv1")
530 .unwrap();
531 assert_eq!(table.name(), "mv1");
532
533 let columns = table
534 .columns
535 .iter()
536 .map(|col| (col.name(), col.data_type().clone()))
537 .collect::<HashMap<&str, DataType>>();
538
539 let city_type = StructType::new(vec![
540 ("address", DataType::Varchar),
541 ("zipcode", DataType::Varchar),
542 ])
543 .into();
545 let expected_columns = maplit::hashmap! {
546 ROW_ID_COLUMN_NAME => DataType::Serial,
547 "country" => StructType::new(
548 vec![("address", DataType::Varchar),("city", city_type),("zipcode", DataType::Varchar)],
549 )
550 .into(),
552 RW_TIMESTAMP_COLUMN_NAME => DataType::Timestamptz,
553 };
554 assert_eq!(columns, expected_columns, "{columns:#?}");
555 }
556
557 #[tokio::test]
559 async fn test_no_alias() {
560 let frontend = LocalFrontend::new(Default::default()).await;
561
562 let sql = "create table t(x varchar)";
563 frontend.run_sql(sql).await.unwrap();
564
565 let sql = "create materialized view mv0 as select count(x) from t";
567 frontend.run_sql(sql).await.unwrap();
568
569 let sql = "create materialized view mv1 as select count(x), count(*) from t";
571 let err = frontend.run_sql(sql).await.unwrap_err();
572 assert_eq!(
573 err.to_string(),
574 "Invalid input syntax: column \"count\" specified more than once"
575 );
576
577 let sql = "create materialized view mv1 as select 1";
579 let err = frontend.run_sql(sql).await.unwrap_err();
580 assert_eq!(
581 err.to_string(),
582 "Bind error: An alias must be specified for the 1st expression (counting from 1) in result relation"
583 );
584
585 let sql = "create materialized view mv1 as select x is null from t";
587 let err = frontend.run_sql(sql).await.unwrap_err();
588 assert_eq!(
589 err.to_string(),
590 "Bind error: An alias must be specified for the 1st expression (counting from 1) in result relation"
591 );
592 }
593
594 #[tokio::test]
596 async fn test_create_mv_with_order_by() {
597 let frontend = LocalFrontend::new(Default::default()).await;
598
599 let sql = "create table t(x varchar)";
600 frontend.run_sql(sql).await.unwrap();
601
602 let sql = "create materialized view mv1 as select * from t";
604 let response = frontend.run_sql(sql).await.unwrap();
605 assert_eq!(response.stmt_type(), CREATE_MATERIALIZED_VIEW);
606 assert!(response.notices().is_empty());
607
608 let sql = "create materialized view mv2 as select * from t order by x";
610 let response = frontend.run_sql(sql).await.unwrap();
611 assert_eq!(response.stmt_type(), CREATE_MATERIALIZED_VIEW);
612 }
613}