risingwave_frontend/handler/
create_view.rs1use either::Either;
18use pgwire::pg_response::{PgResponse, StatementType};
19use risingwave_common::util::iter_util::ZipEqFast;
20use risingwave_pb::catalog::PbView;
21use risingwave_sqlparser::ast::{Ident, ObjectName, Query, Statement};
22
23use super::RwPgResponse;
24use crate::binder::Binder;
25use crate::error::Result;
26use crate::handler::HandlerArgs;
27use crate::handler::util::reject_internal_table_dependencies;
28use crate::optimizer::OptimizerContext;
29
30pub async fn handle_create_view(
31 handler_args: HandlerArgs,
32 if_not_exists: bool,
33 name: ObjectName,
34 columns: Vec<Ident>,
35 query: Query,
36) -> Result<RwPgResponse> {
37 let session = handler_args.session.clone();
38 let db_name = &session.database();
39 let (schema_name, view_name) = Binder::resolve_schema_qualified_name(db_name, &name)?;
40
41 let (database_id, schema_id) = session.get_database_and_schema_id_for_create(schema_name)?;
42
43 let properties = handler_args.with_options.clone();
44
45 if let Either::Right(resp) = session.check_relation_name_duplicated(
46 name.clone(),
47 StatementType::CREATE_VIEW,
48 if_not_exists,
49 )? {
50 return Ok(resp);
51 }
52
53 let (dependent_relations, dependent_secrets, schema) = {
55 let context = OptimizerContext::from_handler_args(handler_args);
56 let super::query::RwBatchQueryPlanResult {
57 schema,
58 dependent_relations,
59 dependent_secrets,
60 ..
61 } = super::query::gen_batch_plan_by_statement(
62 &session,
63 context.into(),
64 Statement::Query(Box::new(query.clone())),
65 )?
66 .unwrap_rw()?;
67
68 reject_internal_table_dependencies(&session, &dependent_relations, "CREATE VIEW")?;
69
70 (dependent_relations, dependent_secrets, schema)
71 };
72
73 let columns = if columns.is_empty() {
74 schema.fields().to_vec()
75 } else {
76 if columns.len() != schema.fields().len() {
77 return Err(crate::error::ErrorCode::InternalError(
78 "view has different number of columns than the query's columns".to_owned(),
79 )
80 .into());
81 }
82 schema
83 .fields()
84 .iter()
85 .zip_eq_fast(columns)
86 .map(|(f, c)| {
87 let mut field = f.clone();
88 field.name = c.real_value();
89 field
90 })
91 .collect()
92 };
93
94 let (properties, secret_refs, connection_refs) = properties.into_parts();
95 if !secret_refs.is_empty() || !connection_refs.is_empty() {
96 return Err(crate::error::ErrorCode::InvalidParameterValue(
97 "Secret reference and Connection reference are not allowed in create view options"
98 .to_owned(),
99 )
100 .into());
101 }
102
103 let view = PbView {
104 id: 0.into(),
105 schema_id,
106 database_id,
107 name: view_name,
108 properties,
109 owner: session.user_id(),
110 sql: format!("{}", query),
111 columns: columns.into_iter().map(|f| f.to_prost()).collect(),
112 created_at_epoch: None,
113 created_at_cluster_version: None,
114 };
115
116 let catalog_writer = session.catalog_writer()?;
117 catalog_writer
118 .create_view(
119 view,
120 dependent_relations
121 .into_iter()
122 .chain(
123 dependent_secrets
124 .iter()
125 .copied()
126 .map(|id| id.as_object_id()),
127 )
128 .collect(),
129 )
130 .await?;
131
132 Ok(PgResponse::empty_result(StatementType::CREATE_VIEW))
133}