risingwave_frontend/handler/
create_subscription.rs1use std::rc::Rc;
16
17use either::Either;
18use pgwire::pg_response::{PgResponse, StatementType};
19use risingwave_common::catalog::UserId;
20use risingwave_sqlparser::ast::CreateSubscriptionStatement;
21
22use super::{HandlerArgs, RwPgResponse};
23use crate::catalog::subscription_catalog::{
24 SubscriptionCatalog, SubscriptionId, SubscriptionState,
25};
26use crate::error::Result;
27use crate::scheduler::streaming_manager::CreatingStreamingJobInfo;
28use crate::session::SessionImpl;
29use crate::{Binder, OptimizerContext, OptimizerContextRef};
30
31pub fn create_subscription_catalog(
32 session: &SessionImpl,
33 context: OptimizerContextRef,
34 stmt: CreateSubscriptionStatement,
35) -> Result<SubscriptionCatalog> {
36 let db_name = &session.database();
37 let (subscription_schema_name, subscription_name) =
38 Binder::resolve_schema_qualified_name(db_name, stmt.subscription_name.clone())?;
39 let (table_schema_name, subscription_from_table_name) =
40 Binder::resolve_schema_qualified_name(db_name, stmt.subscription_from.clone())?;
41 let (table_database_id, table_schema_id) =
42 session.get_database_and_schema_id_for_create(table_schema_name.clone())?;
43 let (subscription_database_id, subscription_schema_id) =
44 session.get_database_and_schema_id_for_create(subscription_schema_name.clone())?;
45 let definition = context.normalized_sql().to_owned();
46 let dependent_table_id = session
47 .get_table_by_name(
48 &subscription_from_table_name,
49 table_database_id,
50 table_schema_id,
51 )?
52 .id;
53
54 let mut subscription_catalog = SubscriptionCatalog {
55 id: SubscriptionId::placeholder(),
56 name: subscription_name,
57 definition,
58 retention_seconds: 0,
59 database_id: subscription_database_id,
60 schema_id: subscription_schema_id,
61 dependent_table_id,
62 owner: UserId::new(session.user_id()),
63 initialized_at_epoch: None,
64 created_at_epoch: None,
65 created_at_cluster_version: None,
66 initialized_at_cluster_version: None,
67 subscription_state: SubscriptionState::Init,
68 };
69
70 subscription_catalog.set_retention_seconds(context.with_options())?;
71
72 Ok(subscription_catalog)
73}
74
75pub async fn handle_create_subscription(
76 handle_args: HandlerArgs,
77 stmt: CreateSubscriptionStatement,
78) -> Result<RwPgResponse> {
79 let session = handle_args.session.clone();
80
81 if let Either::Right(resp) = session.check_relation_name_duplicated(
82 stmt.subscription_name.clone(),
83 StatementType::CREATE_SUBSCRIPTION,
84 stmt.if_not_exists,
85 )? {
86 return Ok(resp);
87 };
88 let subscription_catalog = {
89 let context = Rc::new(OptimizerContext::from_handler_args(handle_args));
90 create_subscription_catalog(&session, context.clone(), stmt)?
91 };
92
93 let _job_guard =
94 session
95 .env()
96 .creating_streaming_job_tracker()
97 .guard(CreatingStreamingJobInfo::new(
98 session.session_id(),
99 subscription_catalog.database_id,
100 subscription_catalog.schema_id,
101 subscription_catalog.name.clone(),
102 ));
103
104 let catalog_writer = session.catalog_writer()?;
105 catalog_writer
106 .create_subscription(subscription_catalog.to_proto())
107 .await?;
108
109 Ok(PgResponse::empty_result(StatementType::CREATE_SUBSCRIPTION))
110}