risingwave_frontend/handler/
create_table_as.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16
17use either::Either;
18use pgwire::pg_response::StatementType;
19use risingwave_common::catalog::{ColumnCatalog, ColumnDesc};
20use risingwave_pb::ddl_service::TableJobType;
21use risingwave_sqlparser::ast::{ColumnDef, ObjectName, OnConflict, Query, Statement};
22
23use super::{HandlerArgs, RwPgResponse};
24use crate::binder::BoundStatement;
25use crate::error::{ErrorCode, Result};
26use crate::handler::create_table::{
27    ColumnIdGenerator, CreateTableProps, gen_create_table_plan_without_source,
28};
29use crate::handler::query::handle_query;
30use crate::handler::util::{LongRunningNotificationAction, execute_with_long_running_notification};
31use crate::stream_fragmenter::GraphJobType;
32use crate::{Binder, OptimizerContext, build_graph};
33
34pub async fn handle_create_as(
35    handler_args: HandlerArgs,
36    table_name: ObjectName,
37    if_not_exists: bool,
38    query: Box<Query>,
39    column_defs: Vec<ColumnDef>,
40    append_only: bool,
41    on_conflict: Option<OnConflict>,
42    with_version_columns: Vec<String>,
43    ast_engine: risingwave_sqlparser::ast::Engine,
44) -> Result<RwPgResponse> {
45    if column_defs.iter().any(|column| column.data_type.is_some()) {
46        return Err(ErrorCode::InvalidInputSyntax(
47            "Should not specify data type in CREATE TABLE AS".into(),
48        )
49        .into());
50    }
51    let engine = match ast_engine {
52        risingwave_sqlparser::ast::Engine::Hummock => risingwave_common::catalog::Engine::Hummock,
53        risingwave_sqlparser::ast::Engine::Iceberg => risingwave_common::catalog::Engine::Iceberg,
54    };
55
56    let session = handler_args.session.clone();
57
58    if let Either::Right(resp) = session.check_relation_name_duplicated(
59        table_name.clone(),
60        StatementType::CREATE_TABLE,
61        if_not_exists,
62    )? {
63        return Ok(resp);
64    }
65
66    let mut col_id_gen = ColumnIdGenerator::new_initial();
67
68    // Generate catalog descs from query
69    let mut columns: Vec<_> = {
70        let mut binder = Binder::new_for_batch(&session);
71        let bound = binder.bind(Statement::Query(query.clone()))?;
72        if let BoundStatement::Query(query) = bound {
73            // Create ColumnCatelog by Field
74            query
75                .schema()
76                .fields()
77                .iter()
78                .map(|field| ColumnCatalog {
79                    column_desc: ColumnDesc::from_field_without_column_id(field),
80                    is_hidden: false,
81                })
82                .collect()
83        } else {
84            unreachable!()
85        }
86    };
87
88    // Generate column id.
89    for c in &mut columns {
90        col_id_gen.generate(c)?;
91    }
92
93    if column_defs.len() > columns.len() {
94        return Err(ErrorCode::InvalidInputSyntax(
95            "too many column names were specified".to_owned(),
96        )
97        .into());
98    }
99
100    // Override column name if it specified in create statement.
101    column_defs.iter().enumerate().for_each(|(idx, column)| {
102        columns[idx].column_desc.name = column.name.real_value();
103    });
104
105    let (graph, source, table) = {
106        let context = OptimizerContext::from_handler_args(handler_args.clone());
107        let (_, secret_refs, connection_refs) = context.with_options().clone().into_parts();
108        if !secret_refs.is_empty() || !connection_refs.is_empty() {
109            return Err(crate::error::ErrorCode::InvalidParameterValue(
110                "Secret reference and Connection reference are not allowed in options for CREATE TABLE AS".to_owned(),
111            )
112            .into());
113        }
114        let (plan, table) = gen_create_table_plan_without_source(
115            context,
116            table_name.clone(),
117            columns,
118            vec![],
119            vec![],
120            vec![], // No watermark should be defined in for `CREATE TABLE AS`
121            col_id_gen.into_version(),
122            CreateTableProps {
123                // Note: by providing and persisting an empty definition, querying the definition of the table
124                // will hit the purification logic, which will construct it based on the catalog.
125                definition: "".to_owned(),
126                append_only,
127                on_conflict: on_conflict.into(),
128                with_version_columns,
129                webhook_info: None,
130                engine,
131            },
132        )?;
133        let graph = build_graph(plan, Some(GraphJobType::Table))?;
134
135        (graph, None, table)
136    };
137
138    tracing::trace!(
139        "name={}, graph=\n{}",
140        table_name,
141        serde_json::to_string_pretty(&graph).unwrap()
142    );
143
144    let catalog_writer = session.catalog_writer()?;
145    execute_with_long_running_notification(
146        catalog_writer.create_table(
147            source,
148            table.to_prost(),
149            graph,
150            TableJobType::Unspecified,
151            if_not_exists,
152            HashSet::default(),
153        ),
154        &session,
155        "CREATE TABLE AS",
156        LongRunningNotificationAction::DiagnoseBarrierLatency,
157    )
158    .await?;
159
160    // Generate insert
161    let insert = Statement::Insert {
162        table_name,
163        columns: vec![],
164        source: query,
165        returning: vec![],
166    };
167
168    handle_query(handler_args, insert, vec![]).await
169}