1use std::collections::HashSet;
16
17use either::Either;
18use itertools::Itertools;
19use pgwire::pg_response::{PgResponse, StatementType};
20use risingwave_common::catalog::{FunctionId, ObjectId, SecretId};
21use risingwave_pb::ddl_service::streaming_job_resource_type;
22use risingwave_pb::serverless_backfill_controller::{
23 ProvisionRequest, node_group_controller_service_client,
24};
25use risingwave_pb::stream_plan::PbStreamFragmentGraph;
26use risingwave_sqlparser::ast::{EmitMode, Ident, ObjectName, Query};
27use thiserror_ext::AsReport;
28
29use super::RwPgResponse;
30use crate::binder::{Binder, BoundQuery, BoundSetExpr};
31use crate::catalog::check_column_name_not_reserved;
32use crate::error::ErrorCode::{InvalidInputSyntax, ProtocolError};
33use crate::error::{ErrorCode, Result, RwError};
34use crate::handler::HandlerArgs;
35use crate::handler::util::{LongRunningNotificationAction, execute_with_long_running_notification};
36use crate::optimizer::backfill_order_strategy::plan_backfill_order;
37use crate::optimizer::plan_node::generic::GenericPlanRef;
38use crate::optimizer::plan_node::{Explain, StreamPlanRef as PlanRef};
39use crate::optimizer::{OptimizerContext, OptimizerContextRef, RelationCollectorVisitor};
40use crate::planner::Planner;
41use crate::scheduler::streaming_manager::CreatingStreamingJobInfo;
42use crate::session::{SESSION_MANAGER, SessionImpl};
43use crate::stream_fragmenter::{GraphJobType, build_graph_with_strategy};
44use crate::utils::ordinal;
45use crate::{TableCatalog, WithOptions};
46
47pub const RESOURCE_GROUP_KEY: &str = "resource_group";
48pub const CLOUD_SERVERLESS_BACKFILL_ENABLED: &str = "cloud.serverless_backfill_enabled";
49
50pub(super) fn parse_column_names(columns: &[Ident]) -> Option<Vec<String>> {
51 if columns.is_empty() {
52 None
53 } else {
54 Some(columns.iter().map(|v| v.real_value()).collect())
55 }
56}
57
58pub(super) fn get_column_names(
63 bound: &BoundQuery,
64 columns: Vec<Ident>,
65) -> Result<Option<Vec<String>>> {
66 let col_names = parse_column_names(&columns);
67 if let BoundSetExpr::Select(select) = &bound.body {
68 if col_names.is_none() {
73 for (i, alias) in select.aliases.iter().enumerate() {
74 if alias.is_none() {
75 return Err(ErrorCode::BindError(format!(
76 "An alias must be specified for the {} expression (counting from 1) in result relation", ordinal(i+1)
77 ))
78 .into());
79 }
80 }
81 }
82 }
83
84 Ok(col_names)
85}
86
87pub fn gen_create_mv_plan(
89 session: &SessionImpl,
90 context: OptimizerContextRef,
91 query: Query,
92 name: ObjectName,
93 columns: Vec<Ident>,
94 emit_mode: Option<EmitMode>,
95) -> Result<(PlanRef, TableCatalog)> {
96 let mut binder = Binder::new_for_stream(session);
97 let bound = binder.bind_query(&query)?;
98 gen_create_mv_plan_bound(session, context, bound, name, columns, emit_mode)
99}
100
101pub fn gen_create_mv_plan_bound(
103 session: &SessionImpl,
104 context: OptimizerContextRef,
105 query: BoundQuery,
106 name: ObjectName,
107 columns: Vec<Ident>,
108 emit_mode: Option<EmitMode>,
109) -> Result<(PlanRef, TableCatalog)> {
110 if session.config().create_compaction_group_for_mv() {
111 context.warn_to_user("The session variable CREATE_COMPACTION_GROUP_FOR_MV has been deprecated. It will not take effect.");
112 }
113
114 let db_name = &session.database();
115 let (schema_name, table_name) = Binder::resolve_schema_qualified_name(db_name, &name)?;
116
117 let (database_id, schema_id) = session.get_database_and_schema_id_for_create(schema_name)?;
118
119 let definition = context.normalized_sql().to_owned();
120
121 let col_names = get_column_names(&query, columns)?;
122
123 let emit_on_window_close = emit_mode == Some(EmitMode::OnWindowClose);
124 if emit_on_window_close {
125 context.warn_to_user("EMIT ON WINDOW CLOSE is currently an experimental feature. Please use it with caution.");
126 }
127
128 let mut plan_root = Planner::new_for_stream(context).plan_query(query)?;
129 if let Some(col_names) = col_names {
130 for name in &col_names {
131 check_column_name_not_reserved(name)?;
132 }
133 plan_root.set_out_names(col_names)?;
134 }
135 let materialize = plan_root.gen_materialize_plan(
136 database_id,
137 schema_id,
138 table_name,
139 definition,
140 emit_on_window_close,
141 )?;
142
143 let mut table = materialize.table().clone();
144 table.owner = session.user_id();
145
146 let plan: PlanRef = materialize.into();
147
148 let ctx = plan.ctx();
149 let explain_trace = ctx.is_explain_trace();
150 if explain_trace {
151 ctx.trace("Create Materialized View:");
152 ctx.trace(plan.explain_to_string());
153 }
154
155 Ok((plan, table))
156}
157
158pub async fn handle_create_mv(
159 handler_args: HandlerArgs,
160 if_not_exists: bool,
161 name: ObjectName,
162 query: Query,
163 columns: Vec<Ident>,
164 emit_mode: Option<EmitMode>,
165) -> Result<RwPgResponse> {
166 let (dependent_relations, dependent_udfs, dependent_secrets, bound_query) = {
167 let mut binder = Binder::new_for_stream(handler_args.session.as_ref());
168 let bound_query = binder.bind_query(&query)?;
169 (
170 binder.included_relations().clone(),
171 binder.included_udfs().clone(),
172 binder.included_secrets().clone(),
173 bound_query,
174 )
175 };
176 handle_create_mv_bound(
177 handler_args,
178 if_not_exists,
179 name,
180 bound_query,
181 dependent_relations,
182 dependent_udfs,
183 dependent_secrets,
184 columns,
185 emit_mode,
186 )
187 .await
188}
189
190pub async fn provision_resource_group(sbc_addr: String) -> Result<String> {
192 let request = tonic::Request::new(ProvisionRequest {});
193 let mut client =
194 node_group_controller_service_client::NodeGroupControllerServiceClient::connect(
195 sbc_addr.clone(),
196 )
197 .await
198 .map_err(|e| {
199 RwError::from(ErrorCode::InternalError(format!(
200 "unable to reach serverless backfill controller at addr {}: {}",
201 sbc_addr,
202 e.as_report()
203 )))
204 })?;
205
206 match client.provision(request).await {
207 Ok(resp) => Ok(resp.into_inner().resource_group),
208 Err(e) => Err(RwError::from(ErrorCode::InternalError(format!(
209 "serverless backfill controller returned error :{}",
210 e.as_report()
211 )))),
212 }
213}
214
215fn get_with_options(handler_args: HandlerArgs) -> WithOptions {
216 let context = OptimizerContext::from_handler_args(handler_args);
217 context.with_options().clone()
218}
219
220pub async fn handle_create_mv_bound(
221 handler_args: HandlerArgs,
222 if_not_exists: bool,
223 name: ObjectName,
224 query: BoundQuery,
225 dependent_relations: HashSet<ObjectId>,
226 dependent_udfs: HashSet<FunctionId>, dependent_secrets: HashSet<SecretId>,
228 columns: Vec<Ident>,
229 emit_mode: Option<EmitMode>,
230) -> Result<RwPgResponse> {
231 let session = handler_args.session.clone();
232
233 session.check_cluster_limits().await?;
235
236 if let Either::Right(resp) = session.check_relation_name_duplicated(
237 name.clone(),
238 StatementType::CREATE_MATERIALIZED_VIEW,
239 if_not_exists,
240 )? {
241 return Ok(resp);
242 }
243
244 let (table, graph, dependencies, resource_type) = {
245 gen_create_mv_graph(
246 handler_args,
247 name,
248 query,
249 dependent_relations,
250 dependent_udfs,
251 dependent_secrets,
252 columns,
253 emit_mode,
254 )
255 .await?
256 };
257
258 let _job_guard =
260 session
261 .env()
262 .creating_streaming_job_tracker()
263 .guard(CreatingStreamingJobInfo::new(
264 session.session_id(),
265 table.database_id,
266 table.schema_id,
267 table.name.clone(),
268 ));
269
270 let catalog_writer = session.catalog_writer()?;
271 execute_with_long_running_notification(
272 catalog_writer.create_materialized_view(
273 table.to_prost(),
274 graph,
275 dependencies,
276 resource_type,
277 if_not_exists,
278 ),
279 &session,
280 "CREATE MATERIALIZED VIEW",
281 LongRunningNotificationAction::MonitorBackfillJob,
282 )
283 .await?;
284
285 Ok(PgResponse::empty_result(
286 StatementType::CREATE_MATERIALIZED_VIEW,
287 ))
288}
289
290pub(crate) async fn gen_create_mv_graph(
291 handler_args: HandlerArgs,
292 name: ObjectName,
293 query: BoundQuery,
294 dependent_relations: HashSet<ObjectId>,
295 dependent_udfs: HashSet<FunctionId>,
296 dependent_secrets: HashSet<SecretId>,
297 columns: Vec<Ident>,
298 emit_mode: Option<EmitMode>,
299) -> Result<(
300 TableCatalog,
301 PbStreamFragmentGraph,
302 HashSet<ObjectId>,
303 streaming_job_resource_type::ResourceType,
304)> {
305 let mut with_options = get_with_options(handler_args.clone());
306 let resource_group = with_options.remove(&RESOURCE_GROUP_KEY.to_owned());
307
308 if resource_group.is_some() {
309 risingwave_common::license::Feature::ResourceGroup.check_available()?;
310 }
311
312 let serverless_backfill_from_with = with_options
313 .remove(&CLOUD_SERVERLESS_BACKFILL_ENABLED.to_owned())
314 .map(|value| value.parse::<bool>().unwrap_or(false));
315 let is_serverless_backfill = match serverless_backfill_from_with {
316 Some(value) => value,
317 None => {
318 if resource_group.is_some() {
319 false
320 } else {
321 handler_args.session.config().enable_serverless_backfill()
322 }
323 }
324 };
325
326 if resource_group.is_some() && is_serverless_backfill {
327 return Err(RwError::from(InvalidInputSyntax(
328 "Please do not specify serverless backfilling and resource group together".to_owned(),
329 )));
330 }
331
332 if !with_options.is_empty() {
333 return Err(RwError::from(ProtocolError(format!(
335 "unexpected options in WITH clause: {:?}",
336 with_options.keys()
337 ))));
338 }
339
340 let sbc_addr = match SESSION_MANAGER.get() {
341 Some(manager) => manager.env().sbc_address(),
342 None => "",
343 }
344 .to_owned();
345
346 if is_serverless_backfill && sbc_addr.is_empty() {
347 return Err(RwError::from(InvalidInputSyntax(
348 "Serverless Backfill is disabled. Use RisingWave cloud at https://cloud.risingwave.com/auth/signup to try this feature".to_owned(),
349 )));
350 }
351
352 let resource_type = if is_serverless_backfill {
353 assert_eq!(resource_group, None);
354 match provision_resource_group(sbc_addr).await {
355 Err(e) => {
356 return Err(RwError::from(ProtocolError(format!(
357 "failed to provision serverless backfill nodes: {}",
358 e.as_report()
359 ))));
360 }
361 Ok(group) => {
362 tracing::info!(
363 resource_group = group,
364 "provisioning serverless backfill resource group"
365 );
366 streaming_job_resource_type::ResourceType::ServerlessBackfillResourceGroup(group)
367 }
368 }
369 } else if let Some(group) = resource_group {
370 streaming_job_resource_type::ResourceType::SpecificResourceGroup(group)
371 } else {
372 streaming_job_resource_type::ResourceType::Regular(true)
373 };
374 let context = OptimizerContext::from_handler_args(handler_args);
375 let has_order_by = !query.order.is_empty();
376 if has_order_by {
377 context.warn_to_user(r#"The ORDER BY clause in the CREATE MATERIALIZED VIEW statement does not guarantee that the rows selected out of this materialized view is returned in this order.
378It only indicates the physical clustering of the data, which may improve the performance of queries issued against this materialized view.
379"#.to_owned());
380 }
381
382 if resource_type.resource_group().is_some()
383 && !context
384 .session_ctx()
385 .config()
386 .streaming_use_arrangement_backfill()
387 {
388 return Err(RwError::from(ProtocolError("The session config arrangement backfill must be enabled to use the resource_group option".to_owned())));
389 }
390
391 let context: OptimizerContextRef = context.into();
392 let session = context.session_ctx().as_ref();
393
394 let (plan, table) =
395 gen_create_mv_plan_bound(session, context.clone(), query, name, columns, emit_mode)?;
396
397 let backfill_order = plan_backfill_order(
398 session,
399 context.with_options().backfill_order_strategy(),
400 plan.clone(),
401 )?;
402
403 let dependencies = RelationCollectorVisitor::collect_with(dependent_relations, plan.clone())
406 .into_iter()
407 .chain(dependent_udfs.iter().copied().map_into())
408 .chain(
409 dependent_secrets
410 .iter()
411 .copied()
412 .map(|id| id.as_object_id()),
413 )
414 .collect();
415
416 let graph = build_graph_with_strategy(
417 plan,
418 Some(GraphJobType::MaterializedView),
419 Some(backfill_order),
420 )?;
421
422 Ok((table, graph, dependencies, resource_type))
423}
424
425#[cfg(test)]
426pub mod tests {
427 use std::collections::HashMap;
428
429 use pgwire::pg_response::StatementType::CREATE_MATERIALIZED_VIEW;
430 use risingwave_common::catalog::{
431 DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, ROW_ID_COLUMN_NAME, RW_TIMESTAMP_COLUMN_NAME,
432 };
433 use risingwave_common::types::{DataType, StructType};
434
435 use crate::catalog::root_catalog::SchemaPath;
436 use crate::test_utils::{LocalFrontend, PROTO_FILE_DATA, create_proto_file};
437
438 #[tokio::test]
439 async fn test_create_mv_handler() {
440 let proto_file = create_proto_file(PROTO_FILE_DATA);
441 let sql = format!(
442 r#"CREATE SOURCE t1
443 WITH (connector = 'kinesis')
444 FORMAT PLAIN ENCODE PROTOBUF (message = '.test.TestRecord', schema.location = 'file://{}')"#,
445 proto_file.path().to_str().unwrap()
446 );
447 let frontend = LocalFrontend::new(Default::default()).await;
448 frontend.run_sql(sql).await.unwrap();
449
450 let sql = "create materialized view mv1 as select t1.country from t1";
451 frontend.run_sql(sql).await.unwrap();
452
453 let session = frontend.session_ref();
454 let catalog_reader = session.env().catalog_reader().read_guard();
455 let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
456
457 let (source, _) = catalog_reader
459 .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t1")
460 .unwrap();
461 assert_eq!(source.name, "t1");
462
463 let (table, _) = catalog_reader
465 .get_created_table_by_name(DEFAULT_DATABASE_NAME, schema_path, "mv1")
466 .unwrap();
467 assert_eq!(table.name(), "mv1");
468
469 let columns = table
470 .columns
471 .iter()
472 .map(|col| (col.name(), col.data_type().clone()))
473 .collect::<HashMap<&str, DataType>>();
474
475 let city_type = StructType::new(vec![
476 ("address", DataType::Varchar),
477 ("zipcode", DataType::Varchar),
478 ])
479 .into();
481 let expected_columns = maplit::hashmap! {
482 ROW_ID_COLUMN_NAME => DataType::Serial,
483 "country" => StructType::new(
484 vec![("address", DataType::Varchar),("city", city_type),("zipcode", DataType::Varchar)],
485 )
486 .into(),
488 RW_TIMESTAMP_COLUMN_NAME => DataType::Timestamptz,
489 };
490 assert_eq!(columns, expected_columns, "{columns:#?}");
491 }
492
493 #[tokio::test]
495 async fn test_no_alias() {
496 let frontend = LocalFrontend::new(Default::default()).await;
497
498 let sql = "create table t(x varchar)";
499 frontend.run_sql(sql).await.unwrap();
500
501 let sql = "create materialized view mv0 as select count(x) from t";
503 frontend.run_sql(sql).await.unwrap();
504
505 let sql = "create materialized view mv1 as select count(x), count(*) from t";
507 let err = frontend.run_sql(sql).await.unwrap_err();
508 assert_eq!(
509 err.to_string(),
510 "Invalid input syntax: column \"count\" specified more than once"
511 );
512
513 let sql = "create materialized view mv1 as select 1";
515 let err = frontend.run_sql(sql).await.unwrap_err();
516 assert_eq!(
517 err.to_string(),
518 "Bind error: An alias must be specified for the 1st expression (counting from 1) in result relation"
519 );
520
521 let sql = "create materialized view mv1 as select x is null from t";
523 let err = frontend.run_sql(sql).await.unwrap_err();
524 assert_eq!(
525 err.to_string(),
526 "Bind error: An alias must be specified for the 1st expression (counting from 1) in result relation"
527 );
528 }
529
530 #[tokio::test]
532 async fn test_create_mv_with_order_by() {
533 let frontend = LocalFrontend::new(Default::default()).await;
534
535 let sql = "create table t(x varchar)";
536 frontend.run_sql(sql).await.unwrap();
537
538 let sql = "create materialized view mv1 as select * from t";
540 let response = frontend.run_sql(sql).await.unwrap();
541 assert_eq!(response.stmt_type(), CREATE_MATERIALIZED_VIEW);
542 assert!(response.notices().is_empty());
543
544 let sql = "create materialized view mv2 as select * from t order by x";
546 let response = frontend.run_sql(sql).await.unwrap();
547 assert_eq!(response.stmt_type(), CREATE_MATERIALIZED_VIEW);
548 }
549}