1use std::collections::HashSet;
16
17use either::Either;
18use itertools::Itertools;
19use pgwire::pg_response::{PgResponse, StatementType};
20use risingwave_common::catalog::{FunctionId, ObjectId};
21use risingwave_pb::ddl_service::streaming_job_resource_type;
22use risingwave_pb::serverless_backfill_controller::{
23 ProvisionRequest, node_group_controller_service_client,
24};
25use risingwave_pb::stream_plan::PbStreamFragmentGraph;
26use risingwave_sqlparser::ast::{EmitMode, Ident, ObjectName, Query};
27use thiserror_ext::AsReport;
28
29use super::RwPgResponse;
30use crate::binder::{Binder, BoundQuery, BoundSetExpr};
31use crate::catalog::check_column_name_not_reserved;
32use crate::error::ErrorCode::{InvalidInputSyntax, ProtocolError};
33use crate::error::{ErrorCode, Result, RwError};
34use crate::handler::HandlerArgs;
35use crate::handler::util::{LongRunningNotificationAction, execute_with_long_running_notification};
36use crate::optimizer::backfill_order_strategy::plan_backfill_order;
37use crate::optimizer::plan_node::generic::GenericPlanRef;
38use crate::optimizer::plan_node::{Explain, StreamPlanRef as PlanRef};
39use crate::optimizer::{OptimizerContext, OptimizerContextRef, RelationCollectorVisitor};
40use crate::planner::Planner;
41use crate::scheduler::streaming_manager::CreatingStreamingJobInfo;
42use crate::session::{SESSION_MANAGER, SessionImpl};
43use crate::stream_fragmenter::{GraphJobType, build_graph_with_strategy};
44use crate::utils::ordinal;
45use crate::{TableCatalog, WithOptions};
46
47pub const RESOURCE_GROUP_KEY: &str = "resource_group";
48pub const CLOUD_SERVERLESS_BACKFILL_ENABLED: &str = "cloud.serverless_backfill_enabled";
49
50pub(super) fn parse_column_names(columns: &[Ident]) -> Option<Vec<String>> {
51 if columns.is_empty() {
52 None
53 } else {
54 Some(columns.iter().map(|v| v.real_value()).collect())
55 }
56}
57
58pub(super) fn get_column_names(
63 bound: &BoundQuery,
64 columns: Vec<Ident>,
65) -> Result<Option<Vec<String>>> {
66 let col_names = parse_column_names(&columns);
67 if let BoundSetExpr::Select(select) = &bound.body {
68 if col_names.is_none() {
73 for (i, alias) in select.aliases.iter().enumerate() {
74 if alias.is_none() {
75 return Err(ErrorCode::BindError(format!(
76 "An alias must be specified for the {} expression (counting from 1) in result relation", ordinal(i+1)
77 ))
78 .into());
79 }
80 }
81 }
82 }
83
84 Ok(col_names)
85}
86
87pub fn gen_create_mv_plan(
89 session: &SessionImpl,
90 context: OptimizerContextRef,
91 query: Query,
92 name: ObjectName,
93 columns: Vec<Ident>,
94 emit_mode: Option<EmitMode>,
95) -> Result<(PlanRef, TableCatalog)> {
96 let mut binder = Binder::new_for_stream(session);
97 let bound = binder.bind_query(&query)?;
98 gen_create_mv_plan_bound(session, context, bound, name, columns, emit_mode)
99}
100
101pub fn gen_create_mv_plan_bound(
103 session: &SessionImpl,
104 context: OptimizerContextRef,
105 query: BoundQuery,
106 name: ObjectName,
107 columns: Vec<Ident>,
108 emit_mode: Option<EmitMode>,
109) -> Result<(PlanRef, TableCatalog)> {
110 if session.config().create_compaction_group_for_mv() {
111 context.warn_to_user("The session variable CREATE_COMPACTION_GROUP_FOR_MV has been deprecated. It will not take effect.");
112 }
113
114 let db_name = &session.database();
115 let (schema_name, table_name) = Binder::resolve_schema_qualified_name(db_name, &name)?;
116
117 let (database_id, schema_id) = session.get_database_and_schema_id_for_create(schema_name)?;
118
119 let definition = context.normalized_sql().to_owned();
120
121 let col_names = get_column_names(&query, columns)?;
122
123 let emit_on_window_close = emit_mode == Some(EmitMode::OnWindowClose);
124 if emit_on_window_close {
125 context.warn_to_user("EMIT ON WINDOW CLOSE is currently an experimental feature. Please use it with caution.");
126 }
127
128 let mut plan_root = Planner::new_for_stream(context).plan_query(query)?;
129 if let Some(col_names) = col_names {
130 for name in &col_names {
131 check_column_name_not_reserved(name)?;
132 }
133 plan_root.set_out_names(col_names)?;
134 }
135 let materialize = plan_root.gen_materialize_plan(
136 database_id,
137 schema_id,
138 table_name,
139 definition,
140 emit_on_window_close,
141 )?;
142
143 let mut table = materialize.table().clone();
144 table.owner = session.user_id();
145
146 let plan: PlanRef = materialize.into();
147
148 let ctx = plan.ctx();
149 let explain_trace = ctx.is_explain_trace();
150 if explain_trace {
151 ctx.trace("Create Materialized View:");
152 ctx.trace(plan.explain_to_string());
153 }
154
155 Ok((plan, table))
156}
157
158pub async fn handle_create_mv(
159 handler_args: HandlerArgs,
160 if_not_exists: bool,
161 name: ObjectName,
162 query: Query,
163 columns: Vec<Ident>,
164 emit_mode: Option<EmitMode>,
165) -> Result<RwPgResponse> {
166 let (dependent_relations, dependent_udfs, bound_query) = {
167 let mut binder = Binder::new_for_stream(handler_args.session.as_ref());
168 let bound_query = binder.bind_query(&query)?;
169 (
170 binder.included_relations().clone(),
171 binder.included_udfs().clone(),
172 bound_query,
173 )
174 };
175 handle_create_mv_bound(
176 handler_args,
177 if_not_exists,
178 name,
179 bound_query,
180 dependent_relations,
181 dependent_udfs,
182 columns,
183 emit_mode,
184 )
185 .await
186}
187
188pub async fn provision_resource_group(sbc_addr: String) -> Result<String> {
190 let request = tonic::Request::new(ProvisionRequest {});
191 let mut client =
192 node_group_controller_service_client::NodeGroupControllerServiceClient::connect(
193 sbc_addr.clone(),
194 )
195 .await
196 .map_err(|e| {
197 RwError::from(ErrorCode::InternalError(format!(
198 "unable to reach serverless backfill controller at addr {}: {}",
199 sbc_addr,
200 e.as_report()
201 )))
202 })?;
203
204 match client.provision(request).await {
205 Ok(resp) => Ok(resp.into_inner().resource_group),
206 Err(e) => Err(RwError::from(ErrorCode::InternalError(format!(
207 "serverless backfill controller returned error :{}",
208 e.as_report()
209 )))),
210 }
211}
212
213fn get_with_options(handler_args: HandlerArgs) -> WithOptions {
214 let context = OptimizerContext::from_handler_args(handler_args);
215 context.with_options().clone()
216}
217
218pub async fn handle_create_mv_bound(
219 handler_args: HandlerArgs,
220 if_not_exists: bool,
221 name: ObjectName,
222 query: BoundQuery,
223 dependent_relations: HashSet<ObjectId>,
224 dependent_udfs: HashSet<FunctionId>, columns: Vec<Ident>,
226 emit_mode: Option<EmitMode>,
227) -> Result<RwPgResponse> {
228 let session = handler_args.session.clone();
229
230 session.check_cluster_limits().await?;
232
233 if let Either::Right(resp) = session.check_relation_name_duplicated(
234 name.clone(),
235 StatementType::CREATE_MATERIALIZED_VIEW,
236 if_not_exists,
237 )? {
238 return Ok(resp);
239 }
240
241 let (table, graph, dependencies, resource_type) = {
242 gen_create_mv_graph(
243 handler_args,
244 name,
245 query,
246 dependent_relations,
247 dependent_udfs,
248 columns,
249 emit_mode,
250 )
251 .await?
252 };
253
254 let _job_guard =
256 session
257 .env()
258 .creating_streaming_job_tracker()
259 .guard(CreatingStreamingJobInfo::new(
260 session.session_id(),
261 table.database_id,
262 table.schema_id,
263 table.name.clone(),
264 ));
265
266 let catalog_writer = session.catalog_writer()?;
267 execute_with_long_running_notification(
268 catalog_writer.create_materialized_view(
269 table.to_prost(),
270 graph,
271 dependencies,
272 resource_type,
273 if_not_exists,
274 ),
275 &session,
276 "CREATE MATERIALIZED VIEW",
277 LongRunningNotificationAction::MonitorBackfillJob,
278 )
279 .await?;
280
281 Ok(PgResponse::empty_result(
282 StatementType::CREATE_MATERIALIZED_VIEW,
283 ))
284}
285
286pub(crate) async fn gen_create_mv_graph(
287 handler_args: HandlerArgs,
288 name: ObjectName,
289 query: BoundQuery,
290 dependent_relations: HashSet<ObjectId>,
291 dependent_udfs: HashSet<FunctionId>,
292 columns: Vec<Ident>,
293 emit_mode: Option<EmitMode>,
294) -> Result<(
295 TableCatalog,
296 PbStreamFragmentGraph,
297 HashSet<ObjectId>,
298 streaming_job_resource_type::ResourceType,
299)> {
300 let mut with_options = get_with_options(handler_args.clone());
301 let resource_group = with_options.remove(&RESOURCE_GROUP_KEY.to_owned());
302
303 if resource_group.is_some() {
304 risingwave_common::license::Feature::ResourceGroup.check_available()?;
305 }
306
307 let serverless_backfill_from_with = with_options
308 .remove(&CLOUD_SERVERLESS_BACKFILL_ENABLED.to_owned())
309 .map(|value| value.parse::<bool>().unwrap_or(false));
310 let is_serverless_backfill = match serverless_backfill_from_with {
311 Some(value) => value,
312 None => {
313 if resource_group.is_some() {
314 false
315 } else {
316 handler_args.session.config().enable_serverless_backfill()
317 }
318 }
319 };
320
321 if resource_group.is_some() && is_serverless_backfill {
322 return Err(RwError::from(InvalidInputSyntax(
323 "Please do not specify serverless backfilling and resource group together".to_owned(),
324 )));
325 }
326
327 if !with_options.is_empty() {
328 return Err(RwError::from(ProtocolError(format!(
330 "unexpected options in WITH clause: {:?}",
331 with_options.keys()
332 ))));
333 }
334
335 let sbc_addr = match SESSION_MANAGER.get() {
336 Some(manager) => manager.env().sbc_address(),
337 None => "",
338 }
339 .to_owned();
340
341 if is_serverless_backfill && sbc_addr.is_empty() {
342 return Err(RwError::from(InvalidInputSyntax(
343 "Serverless Backfill is disabled. Use RisingWave cloud at https://cloud.risingwave.com/auth/signup to try this feature".to_owned(),
344 )));
345 }
346
347 let resource_type = if is_serverless_backfill {
348 assert_eq!(resource_group, None);
349 match provision_resource_group(sbc_addr).await {
350 Err(e) => {
351 return Err(RwError::from(ProtocolError(format!(
352 "failed to provision serverless backfill nodes: {}",
353 e.as_report()
354 ))));
355 }
356 Ok(group) => {
357 tracing::info!(
358 resource_group = group,
359 "provisioning serverless backfill resource group"
360 );
361 streaming_job_resource_type::ResourceType::ServerlessBackfillResourceGroup(group)
362 }
363 }
364 } else if let Some(group) = resource_group {
365 streaming_job_resource_type::ResourceType::SpecificResourceGroup(group)
366 } else {
367 streaming_job_resource_type::ResourceType::Regular(true)
368 };
369 let context = OptimizerContext::from_handler_args(handler_args);
370 let has_order_by = !query.order.is_empty();
371 if has_order_by {
372 context.warn_to_user(r#"The ORDER BY clause in the CREATE MATERIALIZED VIEW statement does not guarantee that the rows selected out of this materialized view is returned in this order.
373It only indicates the physical clustering of the data, which may improve the performance of queries issued against this materialized view.
374"#.to_owned());
375 }
376
377 if resource_type.resource_group().is_some()
378 && !context
379 .session_ctx()
380 .config()
381 .streaming_use_arrangement_backfill()
382 {
383 return Err(RwError::from(ProtocolError("The session config arrangement backfill must be enabled to use the resource_group option".to_owned())));
384 }
385
386 let context: OptimizerContextRef = context.into();
387 let session = context.session_ctx().as_ref();
388
389 let (plan, table) =
390 gen_create_mv_plan_bound(session, context.clone(), query, name, columns, emit_mode)?;
391
392 let backfill_order = plan_backfill_order(
393 session,
394 context.with_options().backfill_order_strategy(),
395 plan.clone(),
396 )?;
397
398 let dependencies = RelationCollectorVisitor::collect_with(dependent_relations, plan.clone())
401 .into_iter()
402 .chain(dependent_udfs.iter().copied().map_into())
403 .collect();
404
405 let graph = build_graph_with_strategy(
406 plan,
407 Some(GraphJobType::MaterializedView),
408 Some(backfill_order),
409 )?;
410
411 Ok((table, graph, dependencies, resource_type))
412}
413
414#[cfg(test)]
415pub mod tests {
416 use std::collections::HashMap;
417
418 use pgwire::pg_response::StatementType::CREATE_MATERIALIZED_VIEW;
419 use risingwave_common::catalog::{
420 DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, ROW_ID_COLUMN_NAME, RW_TIMESTAMP_COLUMN_NAME,
421 };
422 use risingwave_common::types::{DataType, StructType};
423
424 use crate::catalog::root_catalog::SchemaPath;
425 use crate::test_utils::{LocalFrontend, PROTO_FILE_DATA, create_proto_file};
426
427 #[tokio::test]
428 async fn test_create_mv_handler() {
429 let proto_file = create_proto_file(PROTO_FILE_DATA);
430 let sql = format!(
431 r#"CREATE SOURCE t1
432 WITH (connector = 'kinesis')
433 FORMAT PLAIN ENCODE PROTOBUF (message = '.test.TestRecord', schema.location = 'file://{}')"#,
434 proto_file.path().to_str().unwrap()
435 );
436 let frontend = LocalFrontend::new(Default::default()).await;
437 frontend.run_sql(sql).await.unwrap();
438
439 let sql = "create materialized view mv1 as select t1.country from t1";
440 frontend.run_sql(sql).await.unwrap();
441
442 let session = frontend.session_ref();
443 let catalog_reader = session.env().catalog_reader().read_guard();
444 let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
445
446 let (source, _) = catalog_reader
448 .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t1")
449 .unwrap();
450 assert_eq!(source.name, "t1");
451
452 let (table, _) = catalog_reader
454 .get_created_table_by_name(DEFAULT_DATABASE_NAME, schema_path, "mv1")
455 .unwrap();
456 assert_eq!(table.name(), "mv1");
457
458 let columns = table
459 .columns
460 .iter()
461 .map(|col| (col.name(), col.data_type().clone()))
462 .collect::<HashMap<&str, DataType>>();
463
464 let city_type = StructType::new(vec![
465 ("address", DataType::Varchar),
466 ("zipcode", DataType::Varchar),
467 ])
468 .into();
470 let expected_columns = maplit::hashmap! {
471 ROW_ID_COLUMN_NAME => DataType::Serial,
472 "country" => StructType::new(
473 vec![("address", DataType::Varchar),("city", city_type),("zipcode", DataType::Varchar)],
474 )
475 .into(),
477 RW_TIMESTAMP_COLUMN_NAME => DataType::Timestamptz,
478 };
479 assert_eq!(columns, expected_columns, "{columns:#?}");
480 }
481
482 #[tokio::test]
484 async fn test_no_alias() {
485 let frontend = LocalFrontend::new(Default::default()).await;
486
487 let sql = "create table t(x varchar)";
488 frontend.run_sql(sql).await.unwrap();
489
490 let sql = "create materialized view mv0 as select count(x) from t";
492 frontend.run_sql(sql).await.unwrap();
493
494 let sql = "create materialized view mv1 as select count(x), count(*) from t";
496 let err = frontend.run_sql(sql).await.unwrap_err();
497 assert_eq!(
498 err.to_string(),
499 "Invalid input syntax: column \"count\" specified more than once"
500 );
501
502 let sql = "create materialized view mv1 as select 1";
504 let err = frontend.run_sql(sql).await.unwrap_err();
505 assert_eq!(
506 err.to_string(),
507 "Bind error: An alias must be specified for the 1st expression (counting from 1) in result relation"
508 );
509
510 let sql = "create materialized view mv1 as select x is null from t";
512 let err = frontend.run_sql(sql).await.unwrap_err();
513 assert_eq!(
514 err.to_string(),
515 "Bind error: An alias must be specified for the 1st expression (counting from 1) in result relation"
516 );
517 }
518
519 #[tokio::test]
521 async fn test_create_mv_with_order_by() {
522 let frontend = LocalFrontend::new(Default::default()).await;
523
524 let sql = "create table t(x varchar)";
525 frontend.run_sql(sql).await.unwrap();
526
527 let sql = "create materialized view mv1 as select * from t";
529 let response = frontend.run_sql(sql).await.unwrap();
530 assert_eq!(response.stmt_type(), CREATE_MATERIALIZED_VIEW);
531 assert!(response.notices().is_empty());
532
533 let sql = "create materialized view mv2 as select * from t order by x";
535 let response = frontend.run_sql(sql).await.unwrap();
536 assert_eq!(response.stmt_type(), CREATE_MATERIALIZED_VIEW);
537 }
538}