risingwave_frontend/handler/
create_subscription.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::rc::Rc;
16
17use either::Either;
18use pgwire::pg_response::{PgResponse, StatementType};
19use risingwave_sqlparser::ast::CreateSubscriptionStatement;
20
21use super::{HandlerArgs, RwPgResponse};
22use crate::catalog::subscription_catalog::{
23    SubscriptionCatalog, SubscriptionId, SubscriptionState,
24};
25use crate::error::Result;
26use crate::scheduler::streaming_manager::CreatingStreamingJobInfo;
27use crate::session::SessionImpl;
28use crate::{Binder, OptimizerContext, OptimizerContextRef};
29
30pub fn create_subscription_catalog(
31    session: &SessionImpl,
32    context: OptimizerContextRef,
33    stmt: CreateSubscriptionStatement,
34) -> Result<SubscriptionCatalog> {
35    let db_name = &session.database();
36    let (subscription_schema_name, subscription_name) =
37        Binder::resolve_schema_qualified_name(db_name, &stmt.subscription_name)?;
38    let (table_schema_name, subscription_from_table_name) =
39        Binder::resolve_schema_qualified_name(db_name, &stmt.subscription_from)?;
40    let (table_database_id, table_schema_id) =
41        session.get_database_and_schema_id_for_create(table_schema_name)?;
42    let (subscription_database_id, subscription_schema_id) =
43        session.get_database_and_schema_id_for_create(subscription_schema_name)?;
44    let definition = context.normalized_sql().to_owned();
45    let dependent_table_id = session
46        .get_table_by_name(
47            &subscription_from_table_name,
48            table_database_id,
49            table_schema_id,
50        )?
51        .id;
52
53    let mut subscription_catalog = SubscriptionCatalog {
54        id: SubscriptionId::placeholder(),
55        name: subscription_name,
56        definition,
57        retention_seconds: 0,
58        database_id: subscription_database_id,
59        schema_id: subscription_schema_id,
60        dependent_table_id,
61        owner: session.user_id(),
62        initialized_at_epoch: None,
63        created_at_epoch: None,
64        created_at_cluster_version: None,
65        initialized_at_cluster_version: None,
66        subscription_state: SubscriptionState::Init,
67    };
68
69    subscription_catalog.set_retention_seconds(context.with_options())?;
70
71    Ok(subscription_catalog)
72}
73
74pub async fn handle_create_subscription(
75    handle_args: HandlerArgs,
76    stmt: CreateSubscriptionStatement,
77) -> Result<RwPgResponse> {
78    let session = handle_args.session.clone();
79
80    if let Either::Right(resp) = session.check_relation_name_duplicated(
81        stmt.subscription_name.clone(),
82        StatementType::CREATE_SUBSCRIPTION,
83        stmt.if_not_exists,
84    )? {
85        return Ok(resp);
86    };
87    let subscription_catalog = {
88        let context = Rc::new(OptimizerContext::from_handler_args(handle_args));
89        create_subscription_catalog(&session, context, stmt)?
90    };
91
92    let _job_guard =
93        session
94            .env()
95            .creating_streaming_job_tracker()
96            .guard(CreatingStreamingJobInfo::new(
97                session.session_id(),
98                subscription_catalog.database_id,
99                subscription_catalog.schema_id,
100                subscription_catalog.name.clone(),
101            ));
102
103    let catalog_writer = session.catalog_writer()?;
104    catalog_writer
105        .create_subscription(subscription_catalog.to_proto())
106        .await?;
107
108    Ok(PgResponse::empty_result(StatementType::CREATE_SUBSCRIPTION))
109}