risingwave_frontend/handler/
create_view.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Handle creation of logical (non-materialized) views.
16
17use either::Either;
18use pgwire::pg_response::{PgResponse, StatementType};
19use risingwave_common::util::iter_util::ZipEqFast;
20use risingwave_pb::catalog::PbView;
21use risingwave_sqlparser::ast::{Ident, ObjectName, Query, Statement};
22
23use super::RwPgResponse;
24use crate::binder::Binder;
25use crate::error::Result;
26use crate::handler::HandlerArgs;
27use crate::handler::util::reject_internal_table_dependencies;
28use crate::optimizer::OptimizerContext;
29
30pub async fn handle_create_view(
31    handler_args: HandlerArgs,
32    if_not_exists: bool,
33    name: ObjectName,
34    columns: Vec<Ident>,
35    query: Query,
36) -> Result<RwPgResponse> {
37    let session = handler_args.session.clone();
38    let db_name = &session.database();
39    let (schema_name, view_name) = Binder::resolve_schema_qualified_name(db_name, &name)?;
40
41    let (database_id, schema_id) = session.get_database_and_schema_id_for_create(schema_name)?;
42
43    let properties = handler_args.with_options.clone();
44
45    if let Either::Right(resp) = session.check_relation_name_duplicated(
46        name.clone(),
47        StatementType::CREATE_VIEW,
48        if_not_exists,
49    )? {
50        return Ok(resp);
51    }
52
53    // plan the query to validate it and resolve dependencies
54    let (dependent_relations, dependent_secrets, schema) = {
55        let context = OptimizerContext::from_handler_args(handler_args);
56        let super::query::RwBatchQueryPlanResult {
57            schema,
58            dependent_relations,
59            dependent_secrets,
60            ..
61        } = super::query::gen_batch_plan_by_statement(
62            &session,
63            context.into(),
64            Statement::Query(Box::new(query.clone())),
65        )?
66        .unwrap_rw()?;
67
68        reject_internal_table_dependencies(&session, &dependent_relations, "CREATE VIEW")?;
69
70        (dependent_relations, dependent_secrets, schema)
71    };
72
73    let columns = if columns.is_empty() {
74        schema.fields().to_vec()
75    } else {
76        if columns.len() != schema.fields().len() {
77            return Err(crate::error::ErrorCode::InternalError(
78                "view has different number of columns than the query's columns".to_owned(),
79            )
80            .into());
81        }
82        schema
83            .fields()
84            .iter()
85            .zip_eq_fast(columns)
86            .map(|(f, c)| {
87                let mut field = f.clone();
88                field.name = c.real_value();
89                field
90            })
91            .collect()
92    };
93
94    let (properties, secret_refs, connection_refs) = properties.into_parts();
95    if !secret_refs.is_empty() || !connection_refs.is_empty() {
96        return Err(crate::error::ErrorCode::InvalidParameterValue(
97            "Secret reference and Connection reference are not allowed in create view options"
98                .to_owned(),
99        )
100        .into());
101    }
102
103    let view = PbView {
104        id: 0.into(),
105        schema_id,
106        database_id,
107        name: view_name,
108        properties,
109        owner: session.user_id(),
110        sql: format!("{}", query),
111        columns: columns.into_iter().map(|f| f.to_prost()).collect(),
112        created_at_epoch: None,
113        created_at_cluster_version: None,
114    };
115
116    let catalog_writer = session.catalog_writer()?;
117    catalog_writer
118        .create_view(
119            view,
120            dependent_relations
121                .into_iter()
122                .chain(
123                    dependent_secrets
124                        .iter()
125                        .copied()
126                        .map(|id| id.as_object_id()),
127                )
128                .collect(),
129        )
130        .await?;
131
132    Ok(PgResponse::empty_result(StatementType::CREATE_VIEW))
133}