1use std::collections::HashSet;
16
17use either::Either;
18use itertools::Itertools;
19use pgwire::pg_response::{PgResponse, StatementType};
20use risingwave_common::catalog::{FunctionId, ObjectId};
21use risingwave_pb::serverless_backfill_controller::{
22 ProvisionRequest, node_group_controller_service_client,
23};
24use risingwave_pb::stream_plan::PbStreamFragmentGraph;
25use risingwave_sqlparser::ast::{EmitMode, Ident, ObjectName, Query};
26use thiserror_ext::AsReport;
27
28use super::RwPgResponse;
29use crate::binder::{Binder, BoundQuery, BoundSetExpr};
30use crate::catalog::check_column_name_not_reserved;
31use crate::error::ErrorCode::{InvalidInputSyntax, ProtocolError};
32use crate::error::{ErrorCode, Result, RwError};
33use crate::handler::HandlerArgs;
34use crate::optimizer::backfill_order_strategy::plan_backfill_order;
35use crate::optimizer::plan_node::generic::GenericPlanRef;
36use crate::optimizer::plan_node::{Explain, StreamPlanRef as PlanRef};
37use crate::optimizer::{OptimizerContext, OptimizerContextRef, RelationCollectorVisitor};
38use crate::planner::Planner;
39use crate::scheduler::streaming_manager::CreatingStreamingJobInfo;
40use crate::session::{SESSION_MANAGER, SessionImpl};
41use crate::stream_fragmenter::{GraphJobType, build_graph_with_strategy};
42use crate::utils::ordinal;
43use crate::{TableCatalog, WithOptions};
44
45pub const RESOURCE_GROUP_KEY: &str = "resource_group";
46pub const CLOUD_SERVERLESS_BACKFILL_ENABLED: &str = "cloud.serverless_backfill_enabled";
47
48pub(super) fn parse_column_names(columns: &[Ident]) -> Option<Vec<String>> {
49 if columns.is_empty() {
50 None
51 } else {
52 Some(columns.iter().map(|v| v.real_value()).collect())
53 }
54}
55
56pub(super) fn get_column_names(
61 bound: &BoundQuery,
62 columns: Vec<Ident>,
63) -> Result<Option<Vec<String>>> {
64 let col_names = parse_column_names(&columns);
65 if let BoundSetExpr::Select(select) = &bound.body {
66 if col_names.is_none() {
71 for (i, alias) in select.aliases.iter().enumerate() {
72 if alias.is_none() {
73 return Err(ErrorCode::BindError(format!(
74 "An alias must be specified for the {} expression (counting from 1) in result relation", ordinal(i+1)
75 ))
76 .into());
77 }
78 }
79 }
80 }
81
82 Ok(col_names)
83}
84
85pub fn gen_create_mv_plan(
87 session: &SessionImpl,
88 context: OptimizerContextRef,
89 query: Query,
90 name: ObjectName,
91 columns: Vec<Ident>,
92 emit_mode: Option<EmitMode>,
93) -> Result<(PlanRef, TableCatalog)> {
94 let mut binder = Binder::new_for_stream(session);
95 let bound = binder.bind_query(&query)?;
96 gen_create_mv_plan_bound(session, context, bound, name, columns, emit_mode)
97}
98
99pub fn gen_create_mv_plan_bound(
101 session: &SessionImpl,
102 context: OptimizerContextRef,
103 query: BoundQuery,
104 name: ObjectName,
105 columns: Vec<Ident>,
106 emit_mode: Option<EmitMode>,
107) -> Result<(PlanRef, TableCatalog)> {
108 if session.config().create_compaction_group_for_mv() {
109 context.warn_to_user("The session variable CREATE_COMPACTION_GROUP_FOR_MV has been deprecated. It will not take effect.");
110 }
111
112 let db_name = &session.database();
113 let (schema_name, table_name) = Binder::resolve_schema_qualified_name(db_name, &name)?;
114
115 let (database_id, schema_id) = session.get_database_and_schema_id_for_create(schema_name)?;
116
117 let definition = context.normalized_sql().to_owned();
118
119 let col_names = get_column_names(&query, columns)?;
120
121 let emit_on_window_close = emit_mode == Some(EmitMode::OnWindowClose);
122 if emit_on_window_close {
123 context.warn_to_user("EMIT ON WINDOW CLOSE is currently an experimental feature. Please use it with caution.");
124 }
125
126 let mut plan_root = Planner::new_for_stream(context).plan_query(query)?;
127 if let Some(col_names) = col_names {
128 for name in &col_names {
129 check_column_name_not_reserved(name)?;
130 }
131 plan_root.set_out_names(col_names)?;
132 }
133 let materialize = plan_root.gen_materialize_plan(
134 database_id,
135 schema_id,
136 table_name,
137 definition,
138 emit_on_window_close,
139 )?;
140
141 let mut table = materialize.table().clone();
142 table.owner = session.user_id();
143
144 let plan: PlanRef = materialize.into();
145
146 let ctx = plan.ctx();
147 let explain_trace = ctx.is_explain_trace();
148 if explain_trace {
149 ctx.trace("Create Materialized View:");
150 ctx.trace(plan.explain_to_string());
151 }
152
153 Ok((plan, table))
154}
155
156pub async fn handle_create_mv(
157 handler_args: HandlerArgs,
158 if_not_exists: bool,
159 name: ObjectName,
160 query: Query,
161 columns: Vec<Ident>,
162 emit_mode: Option<EmitMode>,
163) -> Result<RwPgResponse> {
164 let (dependent_relations, dependent_udfs, bound_query) = {
165 let mut binder = Binder::new_for_stream(handler_args.session.as_ref());
166 let bound_query = binder.bind_query(&query)?;
167 (
168 binder.included_relations().clone(),
169 binder.included_udfs().clone(),
170 bound_query,
171 )
172 };
173 handle_create_mv_bound(
174 handler_args,
175 if_not_exists,
176 name,
177 bound_query,
178 dependent_relations,
179 dependent_udfs,
180 columns,
181 emit_mode,
182 )
183 .await
184}
185
186pub async fn provision_resource_group(sbc_addr: String) -> Result<String> {
188 let request = tonic::Request::new(ProvisionRequest {});
189 let mut client =
190 node_group_controller_service_client::NodeGroupControllerServiceClient::connect(
191 sbc_addr.clone(),
192 )
193 .await
194 .map_err(|e| {
195 RwError::from(ErrorCode::InternalError(format!(
196 "unable to reach serverless backfill controller at addr {}: {}",
197 sbc_addr,
198 e.as_report()
199 )))
200 })?;
201
202 match client.provision(request).await {
203 Ok(resp) => Ok(resp.into_inner().resource_group),
204 Err(e) => Err(RwError::from(ErrorCode::InternalError(format!(
205 "serverless backfill controller returned error :{}",
206 e.as_report()
207 )))),
208 }
209}
210
211fn get_with_options(handler_args: HandlerArgs) -> WithOptions {
212 let context = OptimizerContext::from_handler_args(handler_args);
213 context.with_options().clone()
214}
215
216pub async fn handle_create_mv_bound(
217 handler_args: HandlerArgs,
218 if_not_exists: bool,
219 name: ObjectName,
220 query: BoundQuery,
221 dependent_relations: HashSet<ObjectId>,
222 dependent_udfs: HashSet<FunctionId>, columns: Vec<Ident>,
224 emit_mode: Option<EmitMode>,
225) -> Result<RwPgResponse> {
226 let session = handler_args.session.clone();
227
228 session.check_cluster_limits().await?;
230
231 if let Either::Right(resp) = session.check_relation_name_duplicated(
232 name.clone(),
233 StatementType::CREATE_MATERIALIZED_VIEW,
234 if_not_exists,
235 )? {
236 return Ok(resp);
237 }
238
239 let (table, graph, dependencies, resource_group) = {
240 gen_create_mv_graph(
241 handler_args,
242 name,
243 query,
244 dependent_relations,
245 dependent_udfs,
246 columns,
247 emit_mode,
248 )
249 .await?
250 };
251
252 let _job_guard =
254 session
255 .env()
256 .creating_streaming_job_tracker()
257 .guard(CreatingStreamingJobInfo::new(
258 session.session_id(),
259 table.database_id,
260 table.schema_id,
261 table.name.clone(),
262 ));
263
264 let catalog_writer = session.catalog_writer()?;
265 catalog_writer
266 .create_materialized_view(
267 table.to_prost(),
268 graph,
269 dependencies,
270 resource_group,
271 if_not_exists,
272 )
273 .await?;
274
275 Ok(PgResponse::empty_result(
276 StatementType::CREATE_MATERIALIZED_VIEW,
277 ))
278}
279
280pub(crate) async fn gen_create_mv_graph(
281 handler_args: HandlerArgs,
282 name: ObjectName,
283 query: BoundQuery,
284 dependent_relations: HashSet<ObjectId>,
285 dependent_udfs: HashSet<FunctionId>,
286 columns: Vec<Ident>,
287 emit_mode: Option<EmitMode>,
288) -> Result<(
289 TableCatalog,
290 PbStreamFragmentGraph,
291 HashSet<ObjectId>,
292 Option<String>,
293)> {
294 let mut with_options = get_with_options(handler_args.clone());
295 let mut resource_group = with_options.remove(&RESOURCE_GROUP_KEY.to_owned());
296
297 if resource_group.is_some() {
298 risingwave_common::license::Feature::ResourceGroup.check_available()?;
299 }
300
301 let is_serverless_backfill = with_options
302 .remove(&CLOUD_SERVERLESS_BACKFILL_ENABLED.to_owned())
303 .unwrap_or_default()
304 .parse::<bool>()
305 .unwrap_or(false);
306
307 if resource_group.is_some() && is_serverless_backfill {
308 return Err(RwError::from(InvalidInputSyntax(
309 "Please do not specify serverless backfilling and resource group together".to_owned(),
310 )));
311 }
312
313 if !with_options.is_empty() {
314 return Err(RwError::from(ProtocolError(format!(
316 "unexpected options in WITH clause: {:?}",
317 with_options.keys()
318 ))));
319 }
320
321 let sbc_addr = match SESSION_MANAGER.get() {
322 Some(manager) => manager.env().sbc_address(),
323 None => "",
324 }
325 .to_owned();
326
327 if is_serverless_backfill && sbc_addr.is_empty() {
328 return Err(RwError::from(InvalidInputSyntax(
329 "Serverless Backfill is disabled on-premise. Use RisingWave cloud at https://cloud.risingwave.com/auth/signup to try this feature".to_owned(),
330 )));
331 }
332
333 if is_serverless_backfill {
334 match provision_resource_group(sbc_addr).await {
335 Err(e) => {
336 return Err(RwError::from(ProtocolError(format!(
337 "failed to provision serverless backfill nodes: {}",
338 e.as_report()
339 ))));
340 }
341 Ok(val) => resource_group = Some(val),
342 }
343 }
344 tracing::debug!(
345 resource_group = resource_group,
346 "provisioning on resource group"
347 );
348
349 let context = OptimizerContext::from_handler_args(handler_args);
350 let has_order_by = !query.order.is_empty();
351 if has_order_by {
352 context.warn_to_user(r#"The ORDER BY clause in the CREATE MATERIALIZED VIEW statement does not guarantee that the rows selected out of this materialized view is returned in this order.
353It only indicates the physical clustering of the data, which may improve the performance of queries issued against this materialized view.
354"#.to_owned());
355 }
356
357 if resource_group.is_some()
358 && !context
359 .session_ctx()
360 .config()
361 .streaming_use_arrangement_backfill()
362 {
363 return Err(RwError::from(ProtocolError("The session config arrangement backfill must be enabled to use the resource_group option".to_owned())));
364 }
365
366 let context: OptimizerContextRef = context.into();
367 let session = context.session_ctx().as_ref();
368
369 let (plan, table) =
370 gen_create_mv_plan_bound(session, context.clone(), query, name, columns, emit_mode)?;
371
372 let backfill_order = plan_backfill_order(
373 session,
374 context.with_options().backfill_order_strategy(),
375 plan.clone(),
376 )?;
377
378 let dependencies = RelationCollectorVisitor::collect_with(dependent_relations, plan.clone())
381 .into_iter()
382 .chain(dependent_udfs.iter().copied().map_into())
383 .collect();
384
385 let graph = build_graph_with_strategy(
386 plan,
387 Some(GraphJobType::MaterializedView),
388 Some(backfill_order),
389 )?;
390
391 Ok((table, graph, dependencies, resource_group))
392}
393
394#[cfg(test)]
395pub mod tests {
396 use std::collections::HashMap;
397
398 use pgwire::pg_response::StatementType::CREATE_MATERIALIZED_VIEW;
399 use risingwave_common::catalog::{
400 DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, ROW_ID_COLUMN_NAME, RW_TIMESTAMP_COLUMN_NAME,
401 };
402 use risingwave_common::types::{DataType, StructType};
403
404 use crate::catalog::root_catalog::SchemaPath;
405 use crate::test_utils::{LocalFrontend, PROTO_FILE_DATA, create_proto_file};
406
407 #[tokio::test]
408 async fn test_create_mv_handler() {
409 let proto_file = create_proto_file(PROTO_FILE_DATA);
410 let sql = format!(
411 r#"CREATE SOURCE t1
412 WITH (connector = 'kinesis')
413 FORMAT PLAIN ENCODE PROTOBUF (message = '.test.TestRecord', schema.location = 'file://{}')"#,
414 proto_file.path().to_str().unwrap()
415 );
416 let frontend = LocalFrontend::new(Default::default()).await;
417 frontend.run_sql(sql).await.unwrap();
418
419 let sql = "create materialized view mv1 as select t1.country from t1";
420 frontend.run_sql(sql).await.unwrap();
421
422 let session = frontend.session_ref();
423 let catalog_reader = session.env().catalog_reader().read_guard();
424 let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
425
426 let (source, _) = catalog_reader
428 .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t1")
429 .unwrap();
430 assert_eq!(source.name, "t1");
431
432 let (table, _) = catalog_reader
434 .get_created_table_by_name(DEFAULT_DATABASE_NAME, schema_path, "mv1")
435 .unwrap();
436 assert_eq!(table.name(), "mv1");
437
438 let columns = table
439 .columns
440 .iter()
441 .map(|col| (col.name(), col.data_type().clone()))
442 .collect::<HashMap<&str, DataType>>();
443
444 let city_type = StructType::new(vec![
445 ("address", DataType::Varchar),
446 ("zipcode", DataType::Varchar),
447 ])
448 .into();
450 let expected_columns = maplit::hashmap! {
451 ROW_ID_COLUMN_NAME => DataType::Serial,
452 "country" => StructType::new(
453 vec![("address", DataType::Varchar),("city", city_type),("zipcode", DataType::Varchar)],
454 )
455 .into(),
457 RW_TIMESTAMP_COLUMN_NAME => DataType::Timestamptz,
458 };
459 assert_eq!(columns, expected_columns, "{columns:#?}");
460 }
461
462 #[tokio::test]
464 async fn test_no_alias() {
465 let frontend = LocalFrontend::new(Default::default()).await;
466
467 let sql = "create table t(x varchar)";
468 frontend.run_sql(sql).await.unwrap();
469
470 let sql = "create materialized view mv0 as select count(x) from t";
472 frontend.run_sql(sql).await.unwrap();
473
474 let sql = "create materialized view mv1 as select count(x), count(*) from t";
476 let err = frontend.run_sql(sql).await.unwrap_err();
477 assert_eq!(
478 err.to_string(),
479 "Invalid input syntax: column \"count\" specified more than once"
480 );
481
482 let sql = "create materialized view mv1 as select 1";
484 let err = frontend.run_sql(sql).await.unwrap_err();
485 assert_eq!(
486 err.to_string(),
487 "Bind error: An alias must be specified for the 1st expression (counting from 1) in result relation"
488 );
489
490 let sql = "create materialized view mv1 as select x is null from t";
492 let err = frontend.run_sql(sql).await.unwrap_err();
493 assert_eq!(
494 err.to_string(),
495 "Bind error: An alias must be specified for the 1st expression (counting from 1) in result relation"
496 );
497 }
498
499 #[tokio::test]
501 async fn test_create_mv_with_order_by() {
502 let frontend = LocalFrontend::new(Default::default()).await;
503
504 let sql = "create table t(x varchar)";
505 frontend.run_sql(sql).await.unwrap();
506
507 let sql = "create materialized view mv1 as select * from t";
509 let response = frontend.run_sql(sql).await.unwrap();
510 assert_eq!(response.stmt_type(), CREATE_MATERIALIZED_VIEW);
511 assert!(response.notices().is_empty());
512
513 let sql = "create materialized view mv2 as select * from t order by x";
515 let response = frontend.run_sql(sql).await.unwrap();
516 assert_eq!(response.stmt_type(), CREATE_MATERIALIZED_VIEW);
517 }
518}