risingwave_frontend/handler/
create_mv.rs1use std::collections::HashSet;
16
17use either::Either;
18use pgwire::pg_response::{PgResponse, StatementType};
19use risingwave_common::catalog::{FunctionId, ObjectId, TableId};
20use risingwave_pb::catalog::PbTable;
21use risingwave_pb::serverless_backfill_controller::{
22 ProvisionRequest, node_group_controller_service_client,
23};
24use risingwave_sqlparser::ast::{EmitMode, Ident, ObjectName, Query};
25use thiserror_ext::AsReport;
26
27use super::RwPgResponse;
28use crate::WithOptions;
29use crate::binder::{Binder, BoundQuery, BoundSetExpr};
30use crate::catalog::check_column_name_not_reserved;
31use crate::error::ErrorCode::{InvalidInputSyntax, ProtocolError};
32use crate::error::{ErrorCode, Result, RwError};
33use crate::handler::HandlerArgs;
34use crate::optimizer::plan_node::Explain;
35use crate::optimizer::plan_node::generic::GenericPlanRef;
36use crate::optimizer::{OptimizerContext, OptimizerContextRef, PlanRef, RelationCollectorVisitor};
37use crate::planner::Planner;
38use crate::scheduler::streaming_manager::CreatingStreamingJobInfo;
39use crate::session::{SESSION_MANAGER, SessionImpl};
40use crate::stream_fragmenter::{GraphJobType, build_graph};
41use crate::utils::ordinal;
42
43pub const RESOURCE_GROUP_KEY: &str = "resource_group";
44pub const CLOUD_SERVERLESS_BACKFILL_ENABLED: &str = "cloud.serverless_backfill_enabled";
45
46pub(super) fn parse_column_names(columns: &[Ident]) -> Option<Vec<String>> {
47 if columns.is_empty() {
48 None
49 } else {
50 Some(columns.iter().map(|v| v.real_value()).collect())
51 }
52}
53
54pub(super) fn get_column_names(
59 bound: &BoundQuery,
60 columns: Vec<Ident>,
61) -> Result<Option<Vec<String>>> {
62 let col_names = parse_column_names(&columns);
63 if let BoundSetExpr::Select(select) = &bound.body {
64 if col_names.is_none() {
69 for (i, alias) in select.aliases.iter().enumerate() {
70 if alias.is_none() {
71 return Err(ErrorCode::BindError(format!(
72 "An alias must be specified for the {} expression (counting from 1) in result relation", ordinal(i+1)
73 ))
74 .into());
75 }
76 }
77 }
78 }
79
80 Ok(col_names)
81}
82
83pub fn gen_create_mv_plan(
85 session: &SessionImpl,
86 context: OptimizerContextRef,
87 query: Query,
88 name: ObjectName,
89 columns: Vec<Ident>,
90 emit_mode: Option<EmitMode>,
91) -> Result<(PlanRef, PbTable)> {
92 let mut binder = Binder::new_for_stream(session);
93 let bound = binder.bind_query(query)?;
94 gen_create_mv_plan_bound(session, context, bound, name, columns, emit_mode)
95}
96
97pub fn gen_create_mv_plan_bound(
99 session: &SessionImpl,
100 context: OptimizerContextRef,
101 query: BoundQuery,
102 name: ObjectName,
103 columns: Vec<Ident>,
104 emit_mode: Option<EmitMode>,
105) -> Result<(PlanRef, PbTable)> {
106 if session.config().create_compaction_group_for_mv() {
107 context.warn_to_user("The session variable CREATE_COMPACTION_GROUP_FOR_MV has been deprecated. It will not take effect.");
108 }
109
110 let db_name = &session.database();
111 let (schema_name, table_name) = Binder::resolve_schema_qualified_name(db_name, name)?;
112
113 let (database_id, schema_id) = session.get_database_and_schema_id_for_create(schema_name)?;
114
115 let definition = context.normalized_sql().to_owned();
116
117 let col_names = get_column_names(&query, columns)?;
118
119 let emit_on_window_close = emit_mode == Some(EmitMode::OnWindowClose);
120 if emit_on_window_close {
121 context.warn_to_user("EMIT ON WINDOW CLOSE is currently an experimental feature. Please use it with caution.");
122 }
123
124 let mut plan_root = Planner::new_for_stream(context).plan_query(query)?;
125 plan_root.set_req_dist_as_same_as_req_order();
126 if let Some(col_names) = col_names {
127 for name in &col_names {
128 check_column_name_not_reserved(name)?;
129 }
130 plan_root.set_out_names(col_names)?;
131 }
132 let materialize = plan_root.gen_materialize_plan(
133 database_id,
134 schema_id,
135 table_name,
136 definition,
137 emit_on_window_close,
138 )?;
139 let mut table = materialize.table().to_prost();
140
141 let plan: PlanRef = materialize.into();
142
143 table.owner = session.user_id();
144
145 let ctx = plan.ctx();
146 let explain_trace = ctx.is_explain_trace();
147 if explain_trace {
148 ctx.trace("Create Materialized View:");
149 ctx.trace(plan.explain_to_string());
150 }
151
152 Ok((plan, table))
153}
154
155pub async fn handle_create_mv(
156 handler_args: HandlerArgs,
157 if_not_exists: bool,
158 name: ObjectName,
159 query: Query,
160 columns: Vec<Ident>,
161 emit_mode: Option<EmitMode>,
162) -> Result<RwPgResponse> {
163 let (dependent_relations, dependent_udfs, bound) = {
164 let mut binder = Binder::new_for_stream(handler_args.session.as_ref());
165 let bound = binder.bind_query(query)?;
166 (
167 binder.included_relations().clone(),
168 binder.included_udfs().clone(),
169 bound,
170 )
171 };
172 handle_create_mv_bound(
173 handler_args,
174 if_not_exists,
175 name,
176 bound,
177 dependent_relations,
178 dependent_udfs,
179 columns,
180 emit_mode,
181 )
182 .await
183}
184
185pub async fn provision_resource_group(sbc_addr: String) -> Result<String> {
187 let request = tonic::Request::new(ProvisionRequest {});
188 let mut client =
189 node_group_controller_service_client::NodeGroupControllerServiceClient::connect(
190 sbc_addr.clone(),
191 )
192 .await
193 .map_err(|e| {
194 RwError::from(ErrorCode::InternalError(format!(
195 "unable to reach serverless backfill controller at addr {}: {}",
196 sbc_addr,
197 e.as_report()
198 )))
199 })?;
200
201 match client.provision(request).await {
202 Ok(resp) => Ok(resp.into_inner().resource_group),
203 Err(e) => Err(RwError::from(ErrorCode::InternalError(format!(
204 "serverless backfill controller returned error :{}",
205 e.as_report()
206 )))),
207 }
208}
209
210fn get_with_options(handler_args: HandlerArgs) -> WithOptions {
211 let context = OptimizerContext::from_handler_args(handler_args);
212 context.with_options().clone()
213}
214
215pub async fn handle_create_mv_bound(
216 handler_args: HandlerArgs,
217 if_not_exists: bool,
218 name: ObjectName,
219 query: BoundQuery,
220 dependent_relations: HashSet<TableId>,
221 dependent_udfs: HashSet<FunctionId>, columns: Vec<Ident>,
223 emit_mode: Option<EmitMode>,
224) -> Result<RwPgResponse> {
225 let session = handler_args.session.clone();
226
227 session.check_cluster_limits().await?;
229
230 if let Either::Right(resp) = session.check_relation_name_duplicated(
231 name.clone(),
232 StatementType::CREATE_MATERIALIZED_VIEW,
233 if_not_exists,
234 )? {
235 return Ok(resp);
236 }
237
238 let (table, graph, dependencies, resource_group) = {
239 let mut with_options = get_with_options(handler_args.clone());
240 let mut resource_group = with_options.remove(&RESOURCE_GROUP_KEY.to_owned());
241
242 if resource_group.is_some() {
243 risingwave_common::license::Feature::ResourceGroup
244 .check_available()
245 .map_err(|e| anyhow::anyhow!(e))?;
246 }
247
248 let is_serverless_backfill = with_options
249 .remove(&CLOUD_SERVERLESS_BACKFILL_ENABLED.to_owned())
250 .unwrap_or_default()
251 .parse::<bool>()
252 .unwrap_or(false);
253
254 if resource_group.is_some() && is_serverless_backfill {
255 return Err(RwError::from(InvalidInputSyntax(
256 "Please do not specify serverless backfilling and resource group together"
257 .to_owned(),
258 )));
259 }
260
261 if !with_options.is_empty() {
262 return Err(RwError::from(ProtocolError(format!(
264 "unexpected options in WITH clause: {:?}",
265 with_options.keys()
266 ))));
267 }
268
269 let sbc_addr = match SESSION_MANAGER.get() {
270 Some(manager) => manager.env().sbc_address(),
271 None => "",
272 }
273 .to_owned();
274
275 if is_serverless_backfill && sbc_addr.is_empty() {
276 return Err(RwError::from(InvalidInputSyntax(
277 "Serverless Backfill is disabled on-premise. Use RisingWave cloud at https://cloud.risingwave.com/auth/signup to try this feature".to_owned(),
278 )));
279 }
280
281 if is_serverless_backfill {
282 match provision_resource_group(sbc_addr).await {
283 Err(e) => {
284 return Err(RwError::from(ProtocolError(format!(
285 "failed to provision serverless backfill nodes: {}",
286 e.as_report()
287 ))));
288 }
289 Ok(val) => resource_group = Some(val),
290 }
291 }
292 tracing::debug!(
293 resource_group = resource_group,
294 "provisioning on resource group"
295 );
296
297 let context = OptimizerContext::from_handler_args(handler_args);
298 let has_order_by = !query.order.is_empty();
299 if has_order_by {
300 context.warn_to_user(r#"The ORDER BY clause in the CREATE MATERIALIZED VIEW statement does not guarantee that the rows selected out of this materialized view is returned in this order.
301It only indicates the physical clustering of the data, which may improve the performance of queries issued against this materialized view.
302"#.to_owned());
303 }
304
305 if resource_group.is_some()
306 && !context
307 .session_ctx()
308 .config()
309 .streaming_use_arrangement_backfill()
310 {
311 return Err(RwError::from(ProtocolError("The session config arrangement backfill must be enabled to use the resource_group option".to_owned())));
312 }
313
314 let (plan, table) =
315 gen_create_mv_plan_bound(&session, context.into(), query, name, columns, emit_mode)?;
316
317 let dependencies =
320 RelationCollectorVisitor::collect_with(dependent_relations, plan.clone())
321 .into_iter()
322 .map(|id| id.table_id() as ObjectId)
323 .chain(
324 dependent_udfs
325 .into_iter()
326 .map(|id| id.function_id() as ObjectId),
327 )
328 .collect();
329
330 let graph = build_graph(plan, Some(GraphJobType::MaterializedView))?;
331
332 (table, graph, dependencies, resource_group)
333 };
334
335 let _job_guard =
337 session
338 .env()
339 .creating_streaming_job_tracker()
340 .guard(CreatingStreamingJobInfo::new(
341 session.session_id(),
342 table.database_id,
343 table.schema_id,
344 table.name.clone(),
345 ));
346
347 let session = session.clone();
348 let catalog_writer = session.catalog_writer()?;
349 catalog_writer
350 .create_materialized_view(table, graph, dependencies, resource_group)
351 .await?;
352
353 Ok(PgResponse::empty_result(
354 StatementType::CREATE_MATERIALIZED_VIEW,
355 ))
356}
357
358#[cfg(test)]
359pub mod tests {
360 use std::collections::HashMap;
361
362 use pgwire::pg_response::StatementType::CREATE_MATERIALIZED_VIEW;
363 use risingwave_common::catalog::{
364 DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, ROW_ID_COLUMN_NAME, RW_TIMESTAMP_COLUMN_NAME,
365 };
366 use risingwave_common::types::{DataType, StructType};
367
368 use crate::catalog::root_catalog::SchemaPath;
369 use crate::test_utils::{LocalFrontend, PROTO_FILE_DATA, create_proto_file};
370
371 #[tokio::test]
372 async fn test_create_mv_handler() {
373 let proto_file = create_proto_file(PROTO_FILE_DATA);
374 let sql = format!(
375 r#"CREATE SOURCE t1
376 WITH (connector = 'kinesis')
377 FORMAT PLAIN ENCODE PROTOBUF (message = '.test.TestRecord', schema.location = 'file://{}')"#,
378 proto_file.path().to_str().unwrap()
379 );
380 let frontend = LocalFrontend::new(Default::default()).await;
381 frontend.run_sql(sql).await.unwrap();
382
383 let sql = "create materialized view mv1 as select t1.country from t1";
384 frontend.run_sql(sql).await.unwrap();
385
386 let session = frontend.session_ref();
387 let catalog_reader = session.env().catalog_reader().read_guard();
388 let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
389
390 let (source, _) = catalog_reader
392 .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t1")
393 .unwrap();
394 assert_eq!(source.name, "t1");
395
396 let (table, _) = catalog_reader
398 .get_created_table_by_name(DEFAULT_DATABASE_NAME, schema_path, "mv1")
399 .unwrap();
400 assert_eq!(table.name(), "mv1");
401
402 let columns = table
403 .columns
404 .iter()
405 .map(|col| (col.name(), col.data_type().clone()))
406 .collect::<HashMap<&str, DataType>>();
407
408 let city_type = StructType::new(vec![
409 ("address", DataType::Varchar),
410 ("zipcode", DataType::Varchar),
411 ])
412 .into();
414 let expected_columns = maplit::hashmap! {
415 ROW_ID_COLUMN_NAME => DataType::Serial,
416 "country" => StructType::new(
417 vec![("address", DataType::Varchar),("city", city_type),("zipcode", DataType::Varchar)],
418 )
419 .into(),
421 RW_TIMESTAMP_COLUMN_NAME => DataType::Timestamptz,
422 };
423 assert_eq!(columns, expected_columns, "{columns:#?}");
424 }
425
426 #[tokio::test]
428 async fn test_no_alias() {
429 let frontend = LocalFrontend::new(Default::default()).await;
430
431 let sql = "create table t(x varchar)";
432 frontend.run_sql(sql).await.unwrap();
433
434 let sql = "create materialized view mv0 as select count(x) from t";
436 frontend.run_sql(sql).await.unwrap();
437
438 let sql = "create materialized view mv1 as select count(x), count(*) from t";
440 let err = frontend.run_sql(sql).await.unwrap_err();
441 assert_eq!(
442 err.to_string(),
443 "Invalid input syntax: column \"count\" specified more than once"
444 );
445
446 let sql = "create materialized view mv1 as select 1";
448 let err = frontend.run_sql(sql).await.unwrap_err();
449 assert_eq!(
450 err.to_string(),
451 "Bind error: An alias must be specified for the 1st expression (counting from 1) in result relation"
452 );
453
454 let sql = "create materialized view mv1 as select x is null from t";
456 let err = frontend.run_sql(sql).await.unwrap_err();
457 assert_eq!(
458 err.to_string(),
459 "Bind error: An alias must be specified for the 1st expression (counting from 1) in result relation"
460 );
461 }
462
463 #[tokio::test]
465 async fn test_create_mv_with_order_by() {
466 let frontend = LocalFrontend::new(Default::default()).await;
467
468 let sql = "create table t(x varchar)";
469 frontend.run_sql(sql).await.unwrap();
470
471 let sql = "create materialized view mv1 as select * from t";
473 let response = frontend.run_sql(sql).await.unwrap();
474 assert_eq!(response.stmt_type(), CREATE_MATERIALIZED_VIEW);
475 assert!(response.notices().is_empty());
476
477 let sql = "create materialized view mv2 as select * from t order by x";
479 let response = frontend.run_sql(sql).await.unwrap();
480 assert_eq!(response.stmt_type(), CREATE_MATERIALIZED_VIEW);
481 }
482}