risingwave_frontend/handler/
create_view.rs1use either::Either;
18use pgwire::pg_response::{PgResponse, StatementType};
19use risingwave_common::util::iter_util::ZipEqFast;
20use risingwave_pb::catalog::PbView;
21use risingwave_sqlparser::ast::{Ident, ObjectName, Query, Statement};
22
23use super::RwPgResponse;
24use crate::binder::Binder;
25use crate::error::Result;
26use crate::handler::HandlerArgs;
27use crate::optimizer::OptimizerContext;
28
29pub async fn handle_create_view(
30 handler_args: HandlerArgs,
31 if_not_exists: bool,
32 name: ObjectName,
33 columns: Vec<Ident>,
34 query: Query,
35) -> Result<RwPgResponse> {
36 let session = handler_args.session.clone();
37 let db_name = &session.database();
38 let (schema_name, view_name) = Binder::resolve_schema_qualified_name(db_name, name.clone())?;
39
40 let (database_id, schema_id) = session.get_database_and_schema_id_for_create(schema_name)?;
41
42 let properties = handler_args.with_options.clone();
43
44 if let Either::Right(resp) = session.check_relation_name_duplicated(
45 name.clone(),
46 StatementType::CREATE_VIEW,
47 if_not_exists,
48 )? {
49 return Ok(resp);
50 }
51
52 let (dependent_relations, schema) = {
54 let context = OptimizerContext::from_handler_args(handler_args);
55 let super::query::BatchQueryPlanResult {
56 schema,
57 dependent_relations,
58 ..
59 } = super::query::gen_batch_plan_by_statement(
60 &session,
61 context.into(),
62 Statement::Query(Box::new(query.clone())),
63 )?;
64
65 (dependent_relations, schema)
66 };
67
68 let columns = if columns.is_empty() {
69 schema.fields().to_vec()
70 } else {
71 if columns.len() != schema.fields().len() {
72 return Err(crate::error::ErrorCode::InternalError(
73 "view has different number of columns than the query's columns".to_owned(),
74 )
75 .into());
76 }
77 schema
78 .fields()
79 .iter()
80 .zip_eq_fast(columns)
81 .map(|(f, c)| {
82 let mut field = f.clone();
83 field.name = c.real_value();
84 field
85 })
86 .collect()
87 };
88
89 let (properties, secret_refs, connection_refs) = properties.into_parts();
90 if !secret_refs.is_empty() || !connection_refs.is_empty() {
91 return Err(crate::error::ErrorCode::InvalidParameterValue(
92 "Secret reference and Connection reference are not allowed in create view options"
93 .to_owned(),
94 )
95 .into());
96 }
97
98 let view = PbView {
99 id: 0,
100 schema_id,
101 database_id,
102 name: view_name,
103 properties,
104 owner: session.user_id(),
105 sql: format!("{}", query),
106 columns: columns.into_iter().map(|f| f.to_prost()).collect(),
107 };
108
109 let catalog_writer = session.catalog_writer()?;
110 catalog_writer
111 .create_view(
112 view,
113 dependent_relations
114 .into_iter()
115 .map(|t| t.table_id)
116 .collect(),
117 )
118 .await?;
119
120 Ok(PgResponse::empty_result(StatementType::CREATE_VIEW))
121}