risingwave_frontend/catalog/system_catalog/rw_catalog/
rw_sinks.rs1use risingwave_common::catalog::CreateType;
16use risingwave_common::id::{ConnectionId, SchemaId, SinkId, UserId};
17use risingwave_common::types::{Fields, JsonbVal, Timestamptz};
18use risingwave_connector::WithOptionsSecResolved;
19use risingwave_frontend_macro::system_catalog;
20
21use crate::catalog::system_catalog::rw_catalog::rw_sources::serialize_props_with_secret;
22use crate::catalog::system_catalog::{SysCatalogReaderImpl, get_acl_items};
23use crate::error::Result;
24use crate::handler::create_source::UPSTREAM_SOURCE_KEY;
25
26#[derive(Fields)]
27struct RwSink {
28 #[primary_key]
29 id: SinkId,
30 name: String,
31 schema_id: SchemaId,
32 owner: UserId,
33 connector: String,
34 sink_type: String,
35 connection_id: Option<ConnectionId>,
36 definition: String,
37 target_table_name: Option<String>,
38 acl: Vec<String>,
39 initialized_at: Option<Timestamptz>,
40 created_at: Option<Timestamptz>,
41 initialized_at_cluster_version: Option<String>,
42 created_at_cluster_version: Option<String>,
43 background_ddl: bool,
44
45 connector_props: JsonbVal,
47 format_encode_options: JsonbVal,
49}
50
51#[system_catalog(table, "rw_catalog.rw_sinks")]
52fn read_rw_sinks_info(reader: &SysCatalogReaderImpl) -> Result<Vec<RwSink>> {
53 let catalog_reader = reader.catalog_reader.read_guard();
54 let schemas = catalog_reader.iter_schemas(&reader.auth_context.database)?;
55 let user_reader = reader.user_info_reader.read_guard();
56 let current_user = user_reader
57 .get_user_by_name(&reader.auth_context.user_name)
58 .expect("user not found");
59 let users = user_reader.get_all_users();
60 let username_map = user_reader.get_user_name_map();
61
62 Ok(schemas
63 .flat_map(|schema| {
64 schema.iter_sink_with_acl(current_user).map(|sink| {
65 let connector_props = serialize_props_with_secret(
66 &catalog_reader,
67 &reader.auth_context.database,
68 WithOptionsSecResolved::new(sink.properties.clone(), sink.secret_refs.clone()),
69 )
70 .into();
71 let format_encode_options = sink
72 .format_desc
73 .as_ref()
74 .map(|desc| {
75 serialize_props_with_secret(
76 &catalog_reader,
77 &reader.auth_context.database,
78 WithOptionsSecResolved::new(
79 desc.options.clone(),
80 desc.secret_refs.clone(),
81 ),
82 )
83 })
84 .unwrap_or_else(jsonbb::Value::null)
85 .into();
86 let target_table_name = sink
87 .target_table
88 .and_then(|table_id| catalog_reader.get_table_name_by_id(table_id).ok());
89 RwSink {
90 id: sink.id,
91 name: sink.name.clone(),
92 schema_id: schema.id(),
93 owner: sink.owner,
94 connector: sink
95 .properties
96 .get(UPSTREAM_SOURCE_KEY)
97 .cloned()
98 .unwrap_or("".to_owned())
99 .to_uppercase(),
100 sink_type: sink.sink_type.to_proto().as_str_name().into(),
101 connection_id: sink.connection_id,
102 definition: sink.create_sql(),
103 acl: get_acl_items(sink.id, false, &users, username_map),
104 initialized_at: sink.initialized_at_epoch.map(|e| e.as_timestamptz()),
105 created_at: sink.created_at_epoch.map(|e| e.as_timestamptz()),
106 initialized_at_cluster_version: sink.initialized_at_cluster_version.clone(),
107 created_at_cluster_version: sink.created_at_cluster_version.clone(),
108 connector_props,
109 format_encode_options,
110 background_ddl: sink.create_type == CreateType::Background,
111 target_table_name,
112 }
113 })
114 })
115 .collect())
116}
117
118#[system_catalog(
119 view,
120 "rw_catalog.rw_sink_decouple",
121 "WITH decoupled_sink_internal_table_ids AS (
122 SELECT
123 sink_id,
124 internal_table_id
125 FROM rw_catalog.rw_sink_log_store_tables
126 ),
127 internal_table_vnode_count AS (
128 SELECT
129 internal_table_id, count(*)::int as watermark_vnode_count
130 FROM decoupled_sink_internal_table_ids
131 LEFT JOIN
132 rw_catalog.rw_hummock_table_watermark
133 ON decoupled_sink_internal_table_ids.internal_table_id = rw_catalog.rw_hummock_table_watermark.table_id
134 GROUP BY internal_table_id
135 )
136 SELECT
137 rw_catalog.rw_sinks.id as sink_id,
138 (watermark_vnode_count is not null) as is_decouple,
139 watermark_vnode_count
140 FROM rw_catalog.rw_sinks
141 LEFT JOIN
142 (decoupled_sink_internal_table_ids
143 JOIN
144 internal_table_vnode_count
145 ON decoupled_sink_internal_table_ids.internal_table_id = internal_table_vnode_count.internal_table_id
146 )
147 ON sink_id = rw_catalog.rw_sinks.id
148 "
149)]
150#[derive(Fields)]
151struct RwSinkDecouple {
152 sink_id: i32,
153 is_decouple: bool,
154 watermark_vnode_count: i32,
155}