risingwave_frontend/handler/
create_subscription.rs1use std::rc::Rc;
16
17use either::Either;
18use pgwire::pg_response::{PgResponse, StatementType};
19use risingwave_sqlparser::ast::CreateSubscriptionStatement;
20
21use super::{HandlerArgs, RwPgResponse};
22use crate::catalog::subscription_catalog::{
23 SubscriptionCatalog, SubscriptionId, SubscriptionState,
24};
25use crate::error::Result;
26use crate::scheduler::streaming_manager::CreatingStreamingJobInfo;
27use crate::session::SessionImpl;
28use crate::{Binder, OptimizerContext, OptimizerContextRef};
29
30pub fn create_subscription_catalog(
31 session: &SessionImpl,
32 context: OptimizerContextRef,
33 stmt: CreateSubscriptionStatement,
34) -> Result<SubscriptionCatalog> {
35 let db_name = &session.database();
36 let (subscription_schema_name, subscription_name) =
37 Binder::resolve_schema_qualified_name(db_name, &stmt.subscription_name)?;
38 let (table_schema_name, subscription_from_table_name) =
39 Binder::resolve_schema_qualified_name(db_name, &stmt.subscription_from)?;
40 let (table_database_id, table_schema_id) =
41 session.get_database_and_schema_id_for_create(table_schema_name)?;
42 let (subscription_database_id, subscription_schema_id) =
43 session.get_database_and_schema_id_for_create(subscription_schema_name)?;
44 let definition = context.normalized_sql().to_owned();
45 let dependent_table_id = session
46 .get_table_by_name(
47 &subscription_from_table_name,
48 table_database_id,
49 table_schema_id,
50 )?
51 .id;
52
53 let mut subscription_catalog = SubscriptionCatalog {
54 id: SubscriptionId::placeholder(),
55 name: subscription_name,
56 definition,
57 retention_seconds: 0,
58 database_id: subscription_database_id,
59 schema_id: subscription_schema_id,
60 dependent_table_id,
61 owner: session.user_id(),
62 initialized_at_epoch: None,
63 created_at_epoch: None,
64 created_at_cluster_version: None,
65 initialized_at_cluster_version: None,
66 subscription_state: SubscriptionState::Init,
67 };
68
69 subscription_catalog.set_retention_seconds(context.with_options())?;
70
71 Ok(subscription_catalog)
72}
73
74pub async fn handle_create_subscription(
75 handle_args: HandlerArgs,
76 stmt: CreateSubscriptionStatement,
77) -> Result<RwPgResponse> {
78 let session = handle_args.session.clone();
79
80 if let Either::Right(resp) = session.check_relation_name_duplicated(
81 stmt.subscription_name.clone(),
82 StatementType::CREATE_SUBSCRIPTION,
83 stmt.if_not_exists,
84 )? {
85 return Ok(resp);
86 };
87 let subscription_catalog = {
88 let context = Rc::new(OptimizerContext::from_handler_args(handle_args));
89 create_subscription_catalog(&session, context, stmt)?
90 };
91
92 let _job_guard =
93 session
94 .env()
95 .creating_streaming_job_tracker()
96 .guard(CreatingStreamingJobInfo::new(
97 session.session_id(),
98 subscription_catalog.database_id,
99 subscription_catalog.schema_id,
100 subscription_catalog.name.clone(),
101 ));
102
103 let catalog_writer = session.catalog_writer()?;
104 catalog_writer
105 .create_subscription(subscription_catalog.to_proto())
106 .await?;
107
108 Ok(PgResponse::empty_result(StatementType::CREATE_SUBSCRIPTION))
109}