risingwave_frontend/catalog/system_catalog/rw_catalog/
rw_sinks.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::types::{Fields, JsonbVal, Timestamptz};
16use risingwave_connector::WithOptionsSecResolved;
17use risingwave_frontend_macro::system_catalog;
18use risingwave_pb::user::grant_privilege::Object;
19
20use crate::catalog::system_catalog::rw_catalog::rw_sources::serialize_props_with_secret;
21use crate::catalog::system_catalog::{SysCatalogReaderImpl, get_acl_items};
22use crate::error::Result;
23use crate::handler::create_source::UPSTREAM_SOURCE_KEY;
24
25#[derive(Fields)]
26struct RwSink {
27    #[primary_key]
28    id: i32,
29    name: String,
30    schema_id: i32,
31    owner: i32,
32    connector: String,
33    sink_type: String,
34    connection_id: Option<i32>,
35    definition: String,
36    acl: Vec<String>,
37    initialized_at: Option<Timestamptz>,
38    created_at: Option<Timestamptz>,
39    initialized_at_cluster_version: Option<String>,
40    created_at_cluster_version: Option<String>,
41
42    // connector properties in json format
43    connector_props: JsonbVal,
44    // format and encode properties in json format
45    format_encode_options: JsonbVal,
46}
47
48#[system_catalog(table, "rw_catalog.rw_sinks")]
49fn read_rw_sinks_info(reader: &SysCatalogReaderImpl) -> Result<Vec<RwSink>> {
50    let catalog_reader = reader.catalog_reader.read_guard();
51    let schemas = catalog_reader.iter_schemas(&reader.auth_context.database)?;
52    let user_reader = reader.user_info_reader.read_guard();
53    let current_user = user_reader
54        .get_user_by_name(&reader.auth_context.user_name)
55        .expect("user not found");
56    let users = user_reader.get_all_users();
57    let username_map = user_reader.get_user_name_map();
58
59    Ok(schemas
60        .flat_map(|schema| {
61            schema.iter_sink_with_acl(current_user).map(|sink| {
62                let connector_props = serialize_props_with_secret(
63                    &catalog_reader,
64                    &reader.auth_context.database,
65                    WithOptionsSecResolved::new(sink.properties.clone(), sink.secret_refs.clone()),
66                )
67                .into();
68                let format_encode_options = sink
69                    .format_desc
70                    .as_ref()
71                    .map(|desc| {
72                        serialize_props_with_secret(
73                            &catalog_reader,
74                            &reader.auth_context.database,
75                            WithOptionsSecResolved::new(
76                                desc.options.clone(),
77                                desc.secret_refs.clone(),
78                            ),
79                        )
80                    })
81                    .unwrap_or_else(jsonbb::Value::null)
82                    .into();
83                RwSink {
84                    id: sink.id.sink_id as i32,
85                    name: sink.name.clone(),
86                    schema_id: schema.id() as i32,
87                    owner: sink.owner.user_id as i32,
88                    connector: sink
89                        .properties
90                        .get(UPSTREAM_SOURCE_KEY)
91                        .cloned()
92                        .unwrap_or("".to_owned())
93                        .to_uppercase(),
94                    sink_type: sink.sink_type.to_proto().as_str_name().into(),
95                    connection_id: sink.connection_id.map(|id| id.connection_id() as i32),
96                    definition: sink.create_sql(),
97                    acl: get_acl_items(
98                        &Object::SinkId(sink.id.sink_id),
99                        false,
100                        &users,
101                        username_map,
102                    ),
103                    initialized_at: sink.initialized_at_epoch.map(|e| e.as_timestamptz()),
104                    created_at: sink.created_at_epoch.map(|e| e.as_timestamptz()),
105                    initialized_at_cluster_version: sink.initialized_at_cluster_version.clone(),
106                    created_at_cluster_version: sink.created_at_cluster_version.clone(),
107                    connector_props,
108                    format_encode_options,
109                }
110            })
111        })
112        .collect())
113}
114
115#[system_catalog(
116    view,
117    "rw_catalog.rw_sink_decouple",
118    "WITH decoupled_sink_internal_table_ids AS (
119        SELECT
120            (node->'sink'->'table'->'id')::int as internal_table_id
121        FROM rw_catalog.rw_fragments
122        WHERE
123            'SINK' = any(flags)
124            AND
125            (node->'sink'->'logStoreType')::string = '\"SINK_LOG_STORE_TYPE_KV_LOG_STORE\"'
126    ),
127    internal_table_vnode_count AS (
128        SELECT
129            internal_table_id, count(*)::int as watermark_vnode_count
130        FROM decoupled_sink_internal_table_ids
131            LEFT JOIN
132                rw_catalog.rw_hummock_table_watermark
133            ON decoupled_sink_internal_table_ids.internal_table_id = rw_catalog.rw_hummock_table_watermark.table_id
134        GROUP BY internal_table_id
135    )
136    SELECT
137        rw_catalog.rw_sinks.id as sink_id,
138        (watermark_vnode_count is not null) as is_decouple,
139        watermark_vnode_count
140    FROM rw_catalog.rw_sinks
141        LEFT JOIN
142            (rw_catalog.rw_fragments
143                JOIN
144                    internal_table_vnode_count
145                ON internal_table_id = any(state_table_ids)
146            )
147        ON table_id = rw_catalog.rw_sinks.id
148    "
149)]
150#[derive(Fields)]
151struct RwSinkDecouple {
152    sink_id: i32,
153    is_decouple: bool,
154    watermark_vnode_count: i32,
155}