risingwave_frontend/catalog/system_catalog/rw_catalog/
rw_sinks.rs1use risingwave_common::types::{Fields, JsonbVal, Timestamptz};
16use risingwave_connector::WithOptionsSecResolved;
17use risingwave_frontend_macro::system_catalog;
18use risingwave_pb::user::grant_privilege::Object;
19
20use crate::catalog::system_catalog::rw_catalog::rw_sources::serialize_props_with_secret;
21use crate::catalog::system_catalog::{SysCatalogReaderImpl, get_acl_items};
22use crate::error::Result;
23use crate::handler::create_source::UPSTREAM_SOURCE_KEY;
24
25#[derive(Fields)]
26struct RwSink {
27 #[primary_key]
28 id: i32,
29 name: String,
30 schema_id: i32,
31 owner: i32,
32 connector: String,
33 sink_type: String,
34 connection_id: Option<i32>,
35 definition: String,
36 acl: Vec<String>,
37 initialized_at: Option<Timestamptz>,
38 created_at: Option<Timestamptz>,
39 initialized_at_cluster_version: Option<String>,
40 created_at_cluster_version: Option<String>,
41
42 connector_props: JsonbVal,
44 format_encode_options: JsonbVal,
46}
47
48#[system_catalog(table, "rw_catalog.rw_sinks")]
49fn read_rw_sinks_info(reader: &SysCatalogReaderImpl) -> Result<Vec<RwSink>> {
50 let catalog_reader = reader.catalog_reader.read_guard();
51 let schemas = catalog_reader.iter_schemas(&reader.auth_context.database)?;
52 let user_reader = reader.user_info_reader.read_guard();
53 let current_user = user_reader
54 .get_user_by_name(&reader.auth_context.user_name)
55 .expect("user not found");
56 let users = user_reader.get_all_users();
57 let username_map = user_reader.get_user_name_map();
58
59 Ok(schemas
60 .flat_map(|schema| {
61 schema.iter_sink_with_acl(current_user).map(|sink| {
62 let connector_props = serialize_props_with_secret(
63 &catalog_reader,
64 &reader.auth_context.database,
65 WithOptionsSecResolved::new(sink.properties.clone(), sink.secret_refs.clone()),
66 )
67 .into();
68 let format_encode_options = sink
69 .format_desc
70 .as_ref()
71 .map(|desc| {
72 serialize_props_with_secret(
73 &catalog_reader,
74 &reader.auth_context.database,
75 WithOptionsSecResolved::new(
76 desc.options.clone(),
77 desc.secret_refs.clone(),
78 ),
79 )
80 })
81 .unwrap_or_else(jsonbb::Value::null)
82 .into();
83 RwSink {
84 id: sink.id.sink_id as i32,
85 name: sink.name.clone(),
86 schema_id: schema.id() as i32,
87 owner: sink.owner.user_id as i32,
88 connector: sink
89 .properties
90 .get(UPSTREAM_SOURCE_KEY)
91 .cloned()
92 .unwrap_or("".to_owned())
93 .to_uppercase(),
94 sink_type: sink.sink_type.to_proto().as_str_name().into(),
95 connection_id: sink.connection_id.map(|id| id.connection_id() as i32),
96 definition: sink.create_sql(),
97 acl: get_acl_items(
98 &Object::SinkId(sink.id.sink_id),
99 false,
100 &users,
101 username_map,
102 ),
103 initialized_at: sink.initialized_at_epoch.map(|e| e.as_timestamptz()),
104 created_at: sink.created_at_epoch.map(|e| e.as_timestamptz()),
105 initialized_at_cluster_version: sink.initialized_at_cluster_version.clone(),
106 created_at_cluster_version: sink.created_at_cluster_version.clone(),
107 connector_props,
108 format_encode_options,
109 }
110 })
111 })
112 .collect())
113}
114
115#[system_catalog(
116 view,
117 "rw_catalog.rw_sink_decouple",
118 "WITH decoupled_sink_internal_table_ids AS (
119 SELECT
120 (node->'sink'->'table'->'id')::int as internal_table_id
121 FROM rw_catalog.rw_fragments
122 WHERE
123 'SINK' = any(flags)
124 AND
125 (node->'sink'->'logStoreType')::string = '\"SINK_LOG_STORE_TYPE_KV_LOG_STORE\"'
126 ),
127 internal_table_vnode_count AS (
128 SELECT
129 internal_table_id, count(*)::int as watermark_vnode_count
130 FROM decoupled_sink_internal_table_ids
131 LEFT JOIN
132 rw_catalog.rw_hummock_table_watermark
133 ON decoupled_sink_internal_table_ids.internal_table_id = rw_catalog.rw_hummock_table_watermark.table_id
134 GROUP BY internal_table_id
135 )
136 SELECT
137 rw_catalog.rw_sinks.id as sink_id,
138 (watermark_vnode_count is not null) as is_decouple,
139 watermark_vnode_count
140 FROM rw_catalog.rw_sinks
141 LEFT JOIN
142 (rw_catalog.rw_fragments
143 JOIN
144 internal_table_vnode_count
145 ON internal_table_id = any(state_table_ids)
146 )
147 ON table_id = rw_catalog.rw_sinks.id
148 "
149)]
150#[derive(Fields)]
151struct RwSinkDecouple {
152 sink_id: i32,
153 is_decouple: bool,
154 watermark_vnode_count: i32,
155}