risingwave_frontend/handler/
create_table_as.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use either::Either;
16use pgwire::pg_response::StatementType;
17use risingwave_common::catalog::{ColumnCatalog, ColumnDesc};
18use risingwave_pb::ddl_service::TableJobType;
19use risingwave_sqlparser::ast::{ColumnDef, ObjectName, OnConflict, Query, Statement};
20
21use super::{HandlerArgs, RwPgResponse};
22use crate::binder::BoundStatement;
23use crate::error::{ErrorCode, Result};
24use crate::handler::create_table::{
25    ColumnIdGenerator, CreateTableProps, gen_create_table_plan_without_source,
26};
27use crate::handler::query::handle_query;
28use crate::stream_fragmenter::GraphJobType;
29use crate::{Binder, OptimizerContext, build_graph};
30
31pub async fn handle_create_as(
32    handler_args: HandlerArgs,
33    table_name: ObjectName,
34    if_not_exists: bool,
35    query: Box<Query>,
36    column_defs: Vec<ColumnDef>,
37    append_only: bool,
38    on_conflict: Option<OnConflict>,
39    with_version_column: Option<String>,
40    ast_engine: risingwave_sqlparser::ast::Engine,
41) -> Result<RwPgResponse> {
42    if column_defs.iter().any(|column| column.data_type.is_some()) {
43        return Err(ErrorCode::InvalidInputSyntax(
44            "Should not specify data type in CREATE TABLE AS".into(),
45        )
46        .into());
47    }
48    let engine = match ast_engine {
49        risingwave_sqlparser::ast::Engine::Hummock => risingwave_common::catalog::Engine::Hummock,
50        risingwave_sqlparser::ast::Engine::Iceberg => risingwave_common::catalog::Engine::Iceberg,
51    };
52
53    let session = handler_args.session.clone();
54
55    if let Either::Right(resp) = session.check_relation_name_duplicated(
56        table_name.clone(),
57        StatementType::CREATE_TABLE,
58        if_not_exists,
59    )? {
60        return Ok(resp);
61    }
62
63    let mut col_id_gen = ColumnIdGenerator::new_initial();
64
65    // Generate catalog descs from query
66    let mut columns: Vec<_> = {
67        let mut binder = Binder::new(&session);
68        let bound = binder.bind(Statement::Query(query.clone()))?;
69        if let BoundStatement::Query(query) = bound {
70            // Create ColumnCatelog by Field
71            query
72                .schema()
73                .fields()
74                .iter()
75                .map(|field| ColumnCatalog {
76                    column_desc: ColumnDesc::from_field_without_column_id(field),
77                    is_hidden: false,
78                })
79                .collect()
80        } else {
81            unreachable!()
82        }
83    };
84
85    // Generate column id.
86    for c in &mut columns {
87        col_id_gen.generate(c)?;
88    }
89
90    if column_defs.len() > columns.len() {
91        return Err(ErrorCode::InvalidInputSyntax(
92            "too many column names were specified".to_owned(),
93        )
94        .into());
95    }
96
97    // Override column name if it specified in creaet statement.
98    column_defs.iter().enumerate().for_each(|(idx, column)| {
99        columns[idx].column_desc.name = column.name.real_value();
100    });
101
102    let (graph, source, table) = {
103        let context = OptimizerContext::from_handler_args(handler_args.clone());
104        let (_, secret_refs, connection_refs) = context.with_options().clone().into_parts();
105        if !secret_refs.is_empty() || !connection_refs.is_empty() {
106            return Err(crate::error::ErrorCode::InvalidParameterValue(
107                "Secret reference and Connection reference are not allowed in options for CREATE TABLE AS".to_owned(),
108            )
109            .into());
110        }
111        let (plan, table) = gen_create_table_plan_without_source(
112            context,
113            table_name.clone(),
114            columns,
115            vec![],
116            vec![],
117            vec![], // No watermark should be defined in for `CREATE TABLE AS`
118            col_id_gen.into_version(),
119            CreateTableProps {
120                // Note: by providing and persisting an empty definition, querying the definition of the table
121                // will hit the purification logic, which will construct it based on the catalog.
122                definition: "".to_owned(),
123                append_only,
124                on_conflict: on_conflict.into(),
125                with_version_column,
126                webhook_info: None,
127                engine,
128            },
129        )?;
130        let graph = build_graph(plan, Some(GraphJobType::Table))?;
131
132        (graph, None, table)
133    };
134
135    tracing::trace!(
136        "name={}, graph=\n{}",
137        table_name,
138        serde_json::to_string_pretty(&graph).unwrap()
139    );
140
141    let catalog_writer = session.catalog_writer()?;
142    catalog_writer
143        .create_table(source, table, graph, TableJobType::Unspecified)
144        .await?;
145
146    // Generate insert
147    let insert = Statement::Insert {
148        table_name,
149        columns: vec![],
150        source: query,
151        returning: vec![],
152    };
153
154    handle_query(handler_args, insert, vec![]).await
155}