risingwave_frontend/handler/
create_subscription.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::rc::Rc;
16
17use either::Either;
18use pgwire::pg_response::{PgResponse, StatementType};
19use risingwave_common::catalog::UserId;
20use risingwave_sqlparser::ast::CreateSubscriptionStatement;
21
22use super::{HandlerArgs, RwPgResponse};
23use crate::catalog::subscription_catalog::{
24    SubscriptionCatalog, SubscriptionId, SubscriptionState,
25};
26use crate::error::Result;
27use crate::scheduler::streaming_manager::CreatingStreamingJobInfo;
28use crate::session::SessionImpl;
29use crate::{Binder, OptimizerContext, OptimizerContextRef};
30
31pub fn create_subscription_catalog(
32    session: &SessionImpl,
33    context: OptimizerContextRef,
34    stmt: CreateSubscriptionStatement,
35) -> Result<SubscriptionCatalog> {
36    let db_name = &session.database();
37    let (subscription_schema_name, subscription_name) =
38        Binder::resolve_schema_qualified_name(db_name, stmt.subscription_name.clone())?;
39    let (table_schema_name, subscription_from_table_name) =
40        Binder::resolve_schema_qualified_name(db_name, stmt.subscription_from.clone())?;
41    let (table_database_id, table_schema_id) =
42        session.get_database_and_schema_id_for_create(table_schema_name.clone())?;
43    let (subscription_database_id, subscription_schema_id) =
44        session.get_database_and_schema_id_for_create(subscription_schema_name.clone())?;
45    let definition = context.normalized_sql().to_owned();
46    let dependent_table_id = session
47        .get_table_by_name(
48            &subscription_from_table_name,
49            table_database_id,
50            table_schema_id,
51        )?
52        .id;
53
54    let mut subscription_catalog = SubscriptionCatalog {
55        id: SubscriptionId::placeholder(),
56        name: subscription_name,
57        definition,
58        retention_seconds: 0,
59        database_id: subscription_database_id,
60        schema_id: subscription_schema_id,
61        dependent_table_id,
62        owner: UserId::new(session.user_id()),
63        initialized_at_epoch: None,
64        created_at_epoch: None,
65        created_at_cluster_version: None,
66        initialized_at_cluster_version: None,
67        subscription_state: SubscriptionState::Init,
68    };
69
70    subscription_catalog.set_retention_seconds(context.with_options())?;
71
72    Ok(subscription_catalog)
73}
74
75pub async fn handle_create_subscription(
76    handle_args: HandlerArgs,
77    stmt: CreateSubscriptionStatement,
78) -> Result<RwPgResponse> {
79    let session = handle_args.session.clone();
80
81    if let Either::Right(resp) = session.check_relation_name_duplicated(
82        stmt.subscription_name.clone(),
83        StatementType::CREATE_SUBSCRIPTION,
84        stmt.if_not_exists,
85    )? {
86        return Ok(resp);
87    };
88    let subscription_catalog = {
89        let context = Rc::new(OptimizerContext::from_handler_args(handle_args));
90        create_subscription_catalog(&session, context.clone(), stmt)?
91    };
92
93    let _job_guard =
94        session
95            .env()
96            .creating_streaming_job_tracker()
97            .guard(CreatingStreamingJobInfo::new(
98                session.session_id(),
99                subscription_catalog.database_id,
100                subscription_catalog.schema_id,
101                subscription_catalog.name.clone(),
102            ));
103
104    let catalog_writer = session.catalog_writer()?;
105    catalog_writer
106        .create_subscription(subscription_catalog.to_proto())
107        .await?;
108
109    Ok(PgResponse::empty_result(StatementType::CREATE_SUBSCRIPTION))
110}