risingwave_frontend/catalog/system_catalog/rw_catalog/
rw_sinks.rs1use risingwave_common::types::{Fields, JsonbVal, Timestamptz};
16use risingwave_connector::WithOptionsSecResolved;
17use risingwave_frontend_macro::system_catalog;
18
19use crate::catalog::system_catalog::rw_catalog::rw_sources::serialize_props_with_secret;
20use crate::catalog::system_catalog::{SysCatalogReaderImpl, get_acl_items};
21use crate::error::Result;
22use crate::handler::create_source::UPSTREAM_SOURCE_KEY;
23
24#[derive(Fields)]
25struct RwSink {
26 #[primary_key]
27 id: i32,
28 name: String,
29 schema_id: i32,
30 owner: i32,
31 connector: String,
32 sink_type: String,
33 connection_id: Option<i32>,
34 definition: String,
35 acl: Vec<String>,
36 initialized_at: Option<Timestamptz>,
37 created_at: Option<Timestamptz>,
38 initialized_at_cluster_version: Option<String>,
39 created_at_cluster_version: Option<String>,
40
41 connector_props: JsonbVal,
43 format_encode_options: JsonbVal,
45}
46
47#[system_catalog(table, "rw_catalog.rw_sinks")]
48fn read_rw_sinks_info(reader: &SysCatalogReaderImpl) -> Result<Vec<RwSink>> {
49 let catalog_reader = reader.catalog_reader.read_guard();
50 let schemas = catalog_reader.iter_schemas(&reader.auth_context.database)?;
51 let user_reader = reader.user_info_reader.read_guard();
52 let current_user = user_reader
53 .get_user_by_name(&reader.auth_context.user_name)
54 .expect("user not found");
55 let users = user_reader.get_all_users();
56 let username_map = user_reader.get_user_name_map();
57
58 Ok(schemas
59 .flat_map(|schema| {
60 schema.iter_sink_with_acl(current_user).map(|sink| {
61 let connector_props = serialize_props_with_secret(
62 &catalog_reader,
63 &reader.auth_context.database,
64 WithOptionsSecResolved::new(sink.properties.clone(), sink.secret_refs.clone()),
65 )
66 .into();
67 let format_encode_options = sink
68 .format_desc
69 .as_ref()
70 .map(|desc| {
71 serialize_props_with_secret(
72 &catalog_reader,
73 &reader.auth_context.database,
74 WithOptionsSecResolved::new(
75 desc.options.clone(),
76 desc.secret_refs.clone(),
77 ),
78 )
79 })
80 .unwrap_or_else(jsonbb::Value::null)
81 .into();
82 RwSink {
83 id: sink.id.as_i32_id(),
84 name: sink.name.clone(),
85 schema_id: schema.id().as_i32_id(),
86 owner: sink.owner.user_id as i32,
87 connector: sink
88 .properties
89 .get(UPSTREAM_SOURCE_KEY)
90 .cloned()
91 .unwrap_or("".to_owned())
92 .to_uppercase(),
93 sink_type: sink.sink_type.to_proto().as_str_name().into(),
94 connection_id: sink.connection_id.map(|id| id.as_i32_id()),
95 definition: sink.create_sql(),
96 acl: get_acl_items(sink.id, false, &users, username_map),
97 initialized_at: sink.initialized_at_epoch.map(|e| e.as_timestamptz()),
98 created_at: sink.created_at_epoch.map(|e| e.as_timestamptz()),
99 initialized_at_cluster_version: sink.initialized_at_cluster_version.clone(),
100 created_at_cluster_version: sink.created_at_cluster_version.clone(),
101 connector_props,
102 format_encode_options,
103 }
104 })
105 })
106 .collect())
107}
108
109#[system_catalog(
110 view,
111 "rw_catalog.rw_sink_decouple",
112 "WITH decoupled_sink_internal_table_ids AS (
113 SELECT
114 (node->'sink'->'table'->'id')::int as internal_table_id
115 FROM rw_catalog.rw_fragments
116 WHERE
117 'SINK' = any(flags)
118 AND
119 (node->'sink'->'logStoreType')::string = '\"SINK_LOG_STORE_TYPE_KV_LOG_STORE\"'
120 ),
121 internal_table_vnode_count AS (
122 SELECT
123 internal_table_id, count(*)::int as watermark_vnode_count
124 FROM decoupled_sink_internal_table_ids
125 LEFT JOIN
126 rw_catalog.rw_hummock_table_watermark
127 ON decoupled_sink_internal_table_ids.internal_table_id = rw_catalog.rw_hummock_table_watermark.table_id
128 GROUP BY internal_table_id
129 )
130 SELECT
131 rw_catalog.rw_sinks.id as sink_id,
132 (watermark_vnode_count is not null) as is_decouple,
133 watermark_vnode_count
134 FROM rw_catalog.rw_sinks
135 LEFT JOIN
136 (rw_catalog.rw_fragments
137 JOIN
138 internal_table_vnode_count
139 ON internal_table_id = any(state_table_ids)
140 )
141 ON table_id = rw_catalog.rw_sinks.id
142 "
143)]
144#[derive(Fields)]
145struct RwSinkDecouple {
146 sink_id: i32,
147 is_decouple: bool,
148 watermark_vnode_count: i32,
149}