risingwave_frontend/catalog/system_catalog/rw_catalog/
rw_sinks.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::types::{Fields, JsonbVal, Timestamptz};
16use risingwave_connector::WithOptionsSecResolved;
17use risingwave_frontend_macro::system_catalog;
18
19use crate::catalog::system_catalog::rw_catalog::rw_sources::serialize_props_with_secret;
20use crate::catalog::system_catalog::{SysCatalogReaderImpl, get_acl_items};
21use crate::error::Result;
22use crate::handler::create_source::UPSTREAM_SOURCE_KEY;
23
24#[derive(Fields)]
25struct RwSink {
26    #[primary_key]
27    id: i32,
28    name: String,
29    schema_id: i32,
30    owner: i32,
31    connector: String,
32    sink_type: String,
33    connection_id: Option<i32>,
34    definition: String,
35    acl: Vec<String>,
36    initialized_at: Option<Timestamptz>,
37    created_at: Option<Timestamptz>,
38    initialized_at_cluster_version: Option<String>,
39    created_at_cluster_version: Option<String>,
40
41    // connector properties in json format
42    connector_props: JsonbVal,
43    // format and encode properties in json format
44    format_encode_options: JsonbVal,
45}
46
47#[system_catalog(table, "rw_catalog.rw_sinks")]
48fn read_rw_sinks_info(reader: &SysCatalogReaderImpl) -> Result<Vec<RwSink>> {
49    let catalog_reader = reader.catalog_reader.read_guard();
50    let schemas = catalog_reader.iter_schemas(&reader.auth_context.database)?;
51    let user_reader = reader.user_info_reader.read_guard();
52    let current_user = user_reader
53        .get_user_by_name(&reader.auth_context.user_name)
54        .expect("user not found");
55    let users = user_reader.get_all_users();
56    let username_map = user_reader.get_user_name_map();
57
58    Ok(schemas
59        .flat_map(|schema| {
60            schema.iter_sink_with_acl(current_user).map(|sink| {
61                let connector_props = serialize_props_with_secret(
62                    &catalog_reader,
63                    &reader.auth_context.database,
64                    WithOptionsSecResolved::new(sink.properties.clone(), sink.secret_refs.clone()),
65                )
66                .into();
67                let format_encode_options = sink
68                    .format_desc
69                    .as_ref()
70                    .map(|desc| {
71                        serialize_props_with_secret(
72                            &catalog_reader,
73                            &reader.auth_context.database,
74                            WithOptionsSecResolved::new(
75                                desc.options.clone(),
76                                desc.secret_refs.clone(),
77                            ),
78                        )
79                    })
80                    .unwrap_or_else(jsonbb::Value::null)
81                    .into();
82                RwSink {
83                    id: sink.id.as_i32_id(),
84                    name: sink.name.clone(),
85                    schema_id: schema.id().as_i32_id(),
86                    owner: sink.owner.user_id as i32,
87                    connector: sink
88                        .properties
89                        .get(UPSTREAM_SOURCE_KEY)
90                        .cloned()
91                        .unwrap_or("".to_owned())
92                        .to_uppercase(),
93                    sink_type: sink.sink_type.to_proto().as_str_name().into(),
94                    connection_id: sink.connection_id.map(|id| id.as_i32_id()),
95                    definition: sink.create_sql(),
96                    acl: get_acl_items(sink.id, false, &users, username_map),
97                    initialized_at: sink.initialized_at_epoch.map(|e| e.as_timestamptz()),
98                    created_at: sink.created_at_epoch.map(|e| e.as_timestamptz()),
99                    initialized_at_cluster_version: sink.initialized_at_cluster_version.clone(),
100                    created_at_cluster_version: sink.created_at_cluster_version.clone(),
101                    connector_props,
102                    format_encode_options,
103                }
104            })
105        })
106        .collect())
107}
108
109#[system_catalog(
110    view,
111    "rw_catalog.rw_sink_decouple",
112    "WITH decoupled_sink_internal_table_ids AS (
113        SELECT
114            (node->'sink'->'table'->'id')::int as internal_table_id
115        FROM rw_catalog.rw_fragments
116        WHERE
117            'SINK' = any(flags)
118            AND
119            (node->'sink'->'logStoreType')::string = '\"SINK_LOG_STORE_TYPE_KV_LOG_STORE\"'
120    ),
121    internal_table_vnode_count AS (
122        SELECT
123            internal_table_id, count(*)::int as watermark_vnode_count
124        FROM decoupled_sink_internal_table_ids
125            LEFT JOIN
126                rw_catalog.rw_hummock_table_watermark
127            ON decoupled_sink_internal_table_ids.internal_table_id = rw_catalog.rw_hummock_table_watermark.table_id
128        GROUP BY internal_table_id
129    )
130    SELECT
131        rw_catalog.rw_sinks.id as sink_id,
132        (watermark_vnode_count is not null) as is_decouple,
133        watermark_vnode_count
134    FROM rw_catalog.rw_sinks
135        LEFT JOIN
136            (rw_catalog.rw_fragments
137                JOIN
138                    internal_table_vnode_count
139                ON internal_table_id = any(state_table_ids)
140            )
141        ON table_id = rw_catalog.rw_sinks.id
142    "
143)]
144#[derive(Fields)]
145struct RwSinkDecouple {
146    sink_id: i32,
147    is_decouple: bool,
148    watermark_vnode_count: i32,
149}