risingwave_frontend/catalog/system_catalog/rw_catalog/
rw_sinks.rs1use risingwave_common::catalog::CreateType;
16use risingwave_common::id::{ConnectionId, SchemaId, SinkId, UserId};
17use risingwave_common::types::{Fields, JsonbVal, Timestamptz};
18use risingwave_connector::WithOptionsSecResolved;
19use risingwave_frontend_macro::system_catalog;
20
21use crate::catalog::system_catalog::rw_catalog::rw_sources::serialize_props_with_secret;
22use crate::catalog::system_catalog::{SysCatalogReaderImpl, get_acl_items};
23use crate::error::Result;
24use crate::handler::create_source::UPSTREAM_SOURCE_KEY;
25
26#[derive(Fields)]
27struct RwSink {
28 #[primary_key]
29 id: SinkId,
30 name: String,
31 schema_id: SchemaId,
32 owner: UserId,
33 connector: String,
34 sink_type: String,
35 connection_id: Option<ConnectionId>,
36 definition: String,
37 acl: Vec<String>,
38 initialized_at: Option<Timestamptz>,
39 created_at: Option<Timestamptz>,
40 initialized_at_cluster_version: Option<String>,
41 created_at_cluster_version: Option<String>,
42 background_ddl: bool,
43
44 connector_props: JsonbVal,
46 format_encode_options: JsonbVal,
48}
49
50#[system_catalog(table, "rw_catalog.rw_sinks")]
51fn read_rw_sinks_info(reader: &SysCatalogReaderImpl) -> Result<Vec<RwSink>> {
52 let catalog_reader = reader.catalog_reader.read_guard();
53 let schemas = catalog_reader.iter_schemas(&reader.auth_context.database)?;
54 let user_reader = reader.user_info_reader.read_guard();
55 let current_user = user_reader
56 .get_user_by_name(&reader.auth_context.user_name)
57 .expect("user not found");
58 let users = user_reader.get_all_users();
59 let username_map = user_reader.get_user_name_map();
60
61 Ok(schemas
62 .flat_map(|schema| {
63 schema.iter_sink_with_acl(current_user).map(|sink| {
64 let connector_props = serialize_props_with_secret(
65 &catalog_reader,
66 &reader.auth_context.database,
67 WithOptionsSecResolved::new(sink.properties.clone(), sink.secret_refs.clone()),
68 )
69 .into();
70 let format_encode_options = sink
71 .format_desc
72 .as_ref()
73 .map(|desc| {
74 serialize_props_with_secret(
75 &catalog_reader,
76 &reader.auth_context.database,
77 WithOptionsSecResolved::new(
78 desc.options.clone(),
79 desc.secret_refs.clone(),
80 ),
81 )
82 })
83 .unwrap_or_else(jsonbb::Value::null)
84 .into();
85 RwSink {
86 id: sink.id,
87 name: sink.name.clone(),
88 schema_id: schema.id(),
89 owner: sink.owner,
90 connector: sink
91 .properties
92 .get(UPSTREAM_SOURCE_KEY)
93 .cloned()
94 .unwrap_or("".to_owned())
95 .to_uppercase(),
96 sink_type: sink.sink_type.to_proto().as_str_name().into(),
97 connection_id: sink.connection_id,
98 definition: sink.create_sql(),
99 acl: get_acl_items(sink.id, false, &users, username_map),
100 initialized_at: sink.initialized_at_epoch.map(|e| e.as_timestamptz()),
101 created_at: sink.created_at_epoch.map(|e| e.as_timestamptz()),
102 initialized_at_cluster_version: sink.initialized_at_cluster_version.clone(),
103 created_at_cluster_version: sink.created_at_cluster_version.clone(),
104 connector_props,
105 format_encode_options,
106 background_ddl: sink.create_type == CreateType::Background,
107 }
108 })
109 })
110 .collect())
111}
112
113#[system_catalog(
114 view,
115 "rw_catalog.rw_sink_decouple",
116 "WITH decoupled_sink_internal_table_ids AS (
117 SELECT
118 sink_id,
119 internal_table_id
120 FROM rw_catalog.rw_sink_log_store_tables
121 ),
122 internal_table_vnode_count AS (
123 SELECT
124 internal_table_id, count(*)::int as watermark_vnode_count
125 FROM decoupled_sink_internal_table_ids
126 LEFT JOIN
127 rw_catalog.rw_hummock_table_watermark
128 ON decoupled_sink_internal_table_ids.internal_table_id = rw_catalog.rw_hummock_table_watermark.table_id
129 GROUP BY internal_table_id
130 )
131 SELECT
132 rw_catalog.rw_sinks.id as sink_id,
133 (watermark_vnode_count is not null) as is_decouple,
134 watermark_vnode_count
135 FROM rw_catalog.rw_sinks
136 LEFT JOIN
137 (decoupled_sink_internal_table_ids
138 JOIN
139 internal_table_vnode_count
140 ON decoupled_sink_internal_table_ids.internal_table_id = internal_table_vnode_count.internal_table_id
141 )
142 ON sink_id = rw_catalog.rw_sinks.id
143 "
144)]
145#[derive(Fields)]
146struct RwSinkDecouple {
147 sink_id: i32,
148 is_decouple: bool,
149 watermark_vnode_count: i32,
150}