risingwave_frontend/handler/
create_subscription.rs1use std::rc::Rc;
16
17use either::Either;
18use pgwire::pg_response::{PgResponse, StatementType};
19use risingwave_common::catalog::UserId;
20use risingwave_sqlparser::ast::CreateSubscriptionStatement;
21
22use super::{HandlerArgs, RwPgResponse};
23use crate::catalog::subscription_catalog::{SubscriptionCatalog, SubscriptionId};
24use crate::error::Result;
25use crate::scheduler::streaming_manager::CreatingStreamingJobInfo;
26use crate::session::SessionImpl;
27use crate::{Binder, OptimizerContext, OptimizerContextRef};
28
29pub fn create_subscription_catalog(
30 session: &SessionImpl,
31 context: OptimizerContextRef,
32 stmt: CreateSubscriptionStatement,
33) -> Result<SubscriptionCatalog> {
34 let db_name = &session.database();
35 let (subscription_schema_name, subscription_name) =
36 Binder::resolve_schema_qualified_name(db_name, stmt.subscription_name.clone())?;
37 let (table_schema_name, subscription_from_table_name) =
38 Binder::resolve_schema_qualified_name(db_name, stmt.subscription_from.clone())?;
39 let (table_database_id, table_schema_id) =
40 session.get_database_and_schema_id_for_create(table_schema_name.clone())?;
41 let (subscription_database_id, subscription_schema_id) =
42 session.get_database_and_schema_id_for_create(subscription_schema_name.clone())?;
43 let definition = context.normalized_sql().to_owned();
44 let dependent_table_id = session
45 .get_table_by_name(
46 &subscription_from_table_name,
47 table_database_id,
48 table_schema_id,
49 )?
50 .id;
51
52 let mut subscription_catalog = SubscriptionCatalog {
53 id: SubscriptionId::placeholder(),
54 name: subscription_name,
55 definition,
56 retention_seconds: 0,
57 database_id: subscription_database_id,
58 schema_id: subscription_schema_id,
59 dependent_table_id,
60 owner: UserId::new(session.user_id()),
61 initialized_at_epoch: None,
62 created_at_epoch: None,
63 created_at_cluster_version: None,
64 initialized_at_cluster_version: None,
65 };
66
67 subscription_catalog.set_retention_seconds(context.with_options())?;
68
69 Ok(subscription_catalog)
70}
71
72pub async fn handle_create_subscription(
73 handle_args: HandlerArgs,
74 stmt: CreateSubscriptionStatement,
75) -> Result<RwPgResponse> {
76 let session = handle_args.session.clone();
77
78 if let Either::Right(resp) = session.check_relation_name_duplicated(
79 stmt.subscription_name.clone(),
80 StatementType::CREATE_SUBSCRIPTION,
81 stmt.if_not_exists,
82 )? {
83 return Ok(resp);
84 };
85 let subscription_catalog = {
86 let context = Rc::new(OptimizerContext::from_handler_args(handle_args));
87 create_subscription_catalog(&session, context.clone(), stmt)?
88 };
89
90 let _job_guard =
91 session
92 .env()
93 .creating_streaming_job_tracker()
94 .guard(CreatingStreamingJobInfo::new(
95 session.session_id(),
96 subscription_catalog.database_id,
97 subscription_catalog.schema_id,
98 subscription_catalog.name.clone(),
99 ));
100
101 let catalog_writer = session.catalog_writer()?;
102 catalog_writer
103 .create_subscription(subscription_catalog.to_proto())
104 .await?;
105
106 Ok(PgResponse::empty_result(StatementType::CREATE_SUBSCRIPTION))
107}