risingwave_frontend/handler/
create_table_as.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16
17use either::Either;
18use pgwire::pg_response::StatementType;
19use risingwave_common::catalog::{ColumnCatalog, ColumnDesc};
20use risingwave_pb::ddl_service::TableJobType;
21use risingwave_sqlparser::ast::{ColumnDef, ObjectName, OnConflict, Query, Statement};
22
23use super::{HandlerArgs, RwPgResponse};
24use crate::binder::BoundStatement;
25use crate::error::{ErrorCode, Result};
26use crate::handler::create_table::{
27    ColumnIdGenerator, CreateTableProps, gen_create_table_plan_without_source,
28};
29use crate::handler::query::handle_query;
30use crate::handler::util::{
31    LongRunningNotificationAction, execute_with_long_running_notification,
32    reject_internal_table_dependencies,
33};
34use crate::stream_fragmenter::GraphJobType;
35use crate::{Binder, OptimizerContext, build_graph};
36
37pub async fn handle_create_as(
38    handler_args: HandlerArgs,
39    table_name: ObjectName,
40    if_not_exists: bool,
41    query: Box<Query>,
42    column_defs: Vec<ColumnDef>,
43    append_only: bool,
44    on_conflict: Option<OnConflict>,
45    with_version_columns: Vec<String>,
46    ast_engine: risingwave_sqlparser::ast::Engine,
47) -> Result<RwPgResponse> {
48    if column_defs.iter().any(|column| column.data_type.is_some()) {
49        return Err(ErrorCode::InvalidInputSyntax(
50            "Should not specify data type in CREATE TABLE AS".into(),
51        )
52        .into());
53    }
54    let engine = match ast_engine {
55        risingwave_sqlparser::ast::Engine::Hummock => risingwave_common::catalog::Engine::Hummock,
56        risingwave_sqlparser::ast::Engine::Iceberg => risingwave_common::catalog::Engine::Iceberg,
57    };
58
59    let session = handler_args.session.clone();
60
61    if let Either::Right(resp) = session.check_relation_name_duplicated(
62        table_name.clone(),
63        StatementType::CREATE_TABLE,
64        if_not_exists,
65    )? {
66        return Ok(resp);
67    }
68
69    let mut col_id_gen = ColumnIdGenerator::new_initial();
70
71    // Generate catalog descs from query
72    let (mut columns, dependent_relations): (Vec<_>, HashSet<_>) = {
73        let mut binder = Binder::new_for_batch(&session);
74        let bound = binder.bind(Statement::Query(query.clone()))?;
75        if let BoundStatement::Query(query) = bound {
76            // Create ColumnCatelog by Field
77            let columns = query
78                .schema()
79                .fields()
80                .iter()
81                .map(|field| ColumnCatalog {
82                    column_desc: ColumnDesc::from_field_without_column_id(field),
83                    is_hidden: false,
84                })
85                .collect();
86            (columns, binder.included_relations().clone())
87        } else {
88            unreachable!()
89        }
90    };
91
92    reject_internal_table_dependencies(&session, &dependent_relations, "CREATE TABLE AS")?;
93
94    // Generate column id.
95    for c in &mut columns {
96        col_id_gen.generate(c)?;
97    }
98
99    if column_defs.len() > columns.len() {
100        return Err(ErrorCode::InvalidInputSyntax(
101            "too many column names were specified".to_owned(),
102        )
103        .into());
104    }
105
106    // Override column name if it specified in create statement.
107    column_defs.iter().enumerate().for_each(|(idx, column)| {
108        columns[idx].column_desc.name = column.name.real_value();
109    });
110
111    let (graph, source, table) = {
112        let context = OptimizerContext::from_handler_args(handler_args.clone());
113        let (_, secret_refs, connection_refs) = context.with_options().clone().into_parts();
114        if !secret_refs.is_empty() || !connection_refs.is_empty() {
115            return Err(crate::error::ErrorCode::InvalidParameterValue(
116                "Secret reference and Connection reference are not allowed in options for CREATE TABLE AS".to_owned(),
117            )
118            .into());
119        }
120        let (plan, table) = gen_create_table_plan_without_source(
121            context,
122            table_name.clone(),
123            columns,
124            vec![],
125            vec![],
126            vec![], // No watermark should be defined in for `CREATE TABLE AS`
127            col_id_gen.into_version(),
128            CreateTableProps {
129                // Note: by providing and persisting an empty definition, querying the definition of the table
130                // will hit the purification logic, which will construct it based on the catalog.
131                definition: "".to_owned(),
132                append_only,
133                on_conflict: on_conflict.into(),
134                with_version_columns,
135                webhook_info: None,
136                engine,
137            },
138        )?;
139        let graph = build_graph(plan, Some(GraphJobType::Table))?;
140
141        (graph, None, table)
142    };
143
144    tracing::trace!(
145        "name={}, graph=\n{}",
146        table_name,
147        serde_json::to_string_pretty(&graph).unwrap()
148    );
149
150    let catalog_writer = session.catalog_writer()?;
151    execute_with_long_running_notification(
152        catalog_writer.create_table(
153            source,
154            table.to_prost(),
155            graph,
156            TableJobType::Unspecified,
157            if_not_exists,
158            HashSet::default(),
159        ),
160        &session,
161        "CREATE TABLE AS",
162        LongRunningNotificationAction::DiagnoseBarrierLatency,
163    )
164    .await?;
165
166    // Generate insert
167    let insert = Statement::Insert {
168        table_name,
169        columns: vec![],
170        source: query,
171        returning: vec![],
172    };
173
174    handle_query(handler_args, insert, vec![]).await
175}