risingwave_frontend/catalog/system_catalog/rw_catalog/
rw_sinks.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::catalog::CreateType;
16use risingwave_common::id::{ConnectionId, SchemaId, SinkId, UserId};
17use risingwave_common::types::{Fields, JsonbVal, Timestamptz};
18use risingwave_connector::WithOptionsSecResolved;
19use risingwave_frontend_macro::system_catalog;
20
21use crate::catalog::system_catalog::rw_catalog::rw_sources::serialize_props_with_secret;
22use crate::catalog::system_catalog::{SysCatalogReaderImpl, get_acl_items};
23use crate::error::Result;
24use crate::handler::create_source::UPSTREAM_SOURCE_KEY;
25
26#[derive(Fields)]
27struct RwSink {
28    #[primary_key]
29    id: SinkId,
30    name: String,
31    schema_id: SchemaId,
32    owner: UserId,
33    connector: String,
34    sink_type: String,
35    connection_id: Option<ConnectionId>,
36    definition: String,
37    acl: Vec<String>,
38    initialized_at: Option<Timestamptz>,
39    created_at: Option<Timestamptz>,
40    initialized_at_cluster_version: Option<String>,
41    created_at_cluster_version: Option<String>,
42    background_ddl: bool,
43
44    // connector properties in json format
45    connector_props: JsonbVal,
46    // format and encode properties in json format
47    format_encode_options: JsonbVal,
48}
49
50#[system_catalog(table, "rw_catalog.rw_sinks")]
51fn read_rw_sinks_info(reader: &SysCatalogReaderImpl) -> Result<Vec<RwSink>> {
52    let catalog_reader = reader.catalog_reader.read_guard();
53    let schemas = catalog_reader.iter_schemas(&reader.auth_context.database)?;
54    let user_reader = reader.user_info_reader.read_guard();
55    let current_user = user_reader
56        .get_user_by_name(&reader.auth_context.user_name)
57        .expect("user not found");
58    let users = user_reader.get_all_users();
59    let username_map = user_reader.get_user_name_map();
60
61    Ok(schemas
62        .flat_map(|schema| {
63            schema.iter_sink_with_acl(current_user).map(|sink| {
64                let connector_props = serialize_props_with_secret(
65                    &catalog_reader,
66                    &reader.auth_context.database,
67                    WithOptionsSecResolved::new(sink.properties.clone(), sink.secret_refs.clone()),
68                )
69                .into();
70                let format_encode_options = sink
71                    .format_desc
72                    .as_ref()
73                    .map(|desc| {
74                        serialize_props_with_secret(
75                            &catalog_reader,
76                            &reader.auth_context.database,
77                            WithOptionsSecResolved::new(
78                                desc.options.clone(),
79                                desc.secret_refs.clone(),
80                            ),
81                        )
82                    })
83                    .unwrap_or_else(jsonbb::Value::null)
84                    .into();
85                RwSink {
86                    id: sink.id,
87                    name: sink.name.clone(),
88                    schema_id: schema.id(),
89                    owner: sink.owner,
90                    connector: sink
91                        .properties
92                        .get(UPSTREAM_SOURCE_KEY)
93                        .cloned()
94                        .unwrap_or("".to_owned())
95                        .to_uppercase(),
96                    sink_type: sink.sink_type.to_proto().as_str_name().into(),
97                    connection_id: sink.connection_id,
98                    definition: sink.create_sql(),
99                    acl: get_acl_items(sink.id, false, &users, username_map),
100                    initialized_at: sink.initialized_at_epoch.map(|e| e.as_timestamptz()),
101                    created_at: sink.created_at_epoch.map(|e| e.as_timestamptz()),
102                    initialized_at_cluster_version: sink.initialized_at_cluster_version.clone(),
103                    created_at_cluster_version: sink.created_at_cluster_version.clone(),
104                    connector_props,
105                    format_encode_options,
106                    background_ddl: sink.create_type == CreateType::Background,
107                }
108            })
109        })
110        .collect())
111}
112
113#[system_catalog(
114    view,
115    "rw_catalog.rw_sink_decouple",
116    "WITH decoupled_sink_internal_table_ids AS (
117        SELECT
118            sink_id,
119            internal_table_id
120        FROM rw_catalog.rw_sink_log_store_tables
121    ),
122    internal_table_vnode_count AS (
123        SELECT
124            internal_table_id, count(*)::int as watermark_vnode_count
125        FROM decoupled_sink_internal_table_ids
126            LEFT JOIN
127                rw_catalog.rw_hummock_table_watermark
128            ON decoupled_sink_internal_table_ids.internal_table_id = rw_catalog.rw_hummock_table_watermark.table_id
129        GROUP BY internal_table_id
130    )
131    SELECT
132        rw_catalog.rw_sinks.id as sink_id,
133        (watermark_vnode_count is not null) as is_decouple,
134        watermark_vnode_count
135    FROM rw_catalog.rw_sinks
136        LEFT JOIN
137            (decoupled_sink_internal_table_ids
138                JOIN
139                    internal_table_vnode_count
140                ON decoupled_sink_internal_table_ids.internal_table_id = internal_table_vnode_count.internal_table_id
141            )
142        ON sink_id = rw_catalog.rw_sinks.id
143    "
144)]
145#[derive(Fields)]
146struct RwSinkDecouple {
147    sink_id: i32,
148    is_decouple: bool,
149    watermark_vnode_count: i32,
150}