Skip to main content

risingwave_meta/manager/
mod.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod diagnose;
16mod env;
17pub mod event_log;
18mod exactly_once_util;
19pub mod iceberg_compaction;
20pub mod iceberg_v3_sink;
21mod idle;
22mod license;
23mod metadata;
24mod notification;
25mod notification_version;
26pub mod sink_coordination;
27mod streaming_job;
28
29use std::collections::HashSet;
30use std::hash::{Hash, Hasher};
31
32pub use env::{MetaSrvEnv, *};
33pub use event_log::EventLogManagerRef;
34pub use idle::*;
35pub use metadata::*;
36pub use notification::{LocalNotification, MessageStatus, NotificationManagerRef, *};
37use risingwave_common::id::WorkerId;
38pub use risingwave_meta_model::prelude;
39use risingwave_meta_model::{ConnectionId, SecretId};
40use risingwave_pb::catalog::{PbSink, PbSource};
41use risingwave_pb::common::PbHostAddress;
42pub use streaming_job::*;
43
44use crate::MetaResult;
45
46#[derive(Clone, Debug)]
47pub struct WorkerKey(pub PbHostAddress);
48
49impl PartialEq<Self> for WorkerKey {
50    fn eq(&self, other: &Self) -> bool {
51        self.0.eq(&other.0)
52    }
53}
54
55impl Eq for WorkerKey {}
56
57impl Hash for WorkerKey {
58    fn hash<H: Hasher>(&self, state: &mut H) {
59        self.0.host.hash(state);
60        self.0.port.hash(state);
61    }
62}
63
64/// The id preserved for the meta node. Note that there's no such entry in cluster manager.
65pub const META_NODE_ID: WorkerId = WorkerId::new(0);
66
67pub fn get_referred_secret_ids_from_source(source: &PbSource) -> MetaResult<HashSet<SecretId>> {
68    let mut secret_ids = HashSet::new();
69    for secret_ref in source.get_secret_refs().values() {
70        secret_ids.insert(secret_ref.secret_id);
71    }
72    // `info` must exist in `Source`
73    for secret_ref in source.get_info()?.get_format_encode_secret_refs().values() {
74        secret_ids.insert(secret_ref.secret_id);
75    }
76    Ok(secret_ids)
77}
78
79pub fn get_referred_connection_ids_from_source(source: &PbSource) -> HashSet<ConnectionId> {
80    let mut connection_ids = HashSet::new();
81    if let Some(conn_id) = source.connection_id {
82        connection_ids.insert(conn_id);
83    }
84    if let Some(info) = &source.info
85        && let Some(conn_id) = info.connection_id
86    {
87        connection_ids.insert(conn_id);
88    }
89    connection_ids
90}
91
92pub fn get_referred_connection_ids_from_sink(sink: &PbSink) -> HashSet<ConnectionId> {
93    let mut connection_ids = HashSet::new();
94    if let Some(format_desc) = &sink.format_desc
95        && let Some(conn_id) = format_desc.connection_id
96    {
97        connection_ids.insert(conn_id);
98    }
99    if let Some(conn_id) = sink.connection_id {
100        connection_ids.insert(conn_id);
101    }
102    connection_ids
103}
104
105pub fn get_referred_secret_ids_from_sink(sink: &PbSink) -> HashSet<SecretId> {
106    let mut secret_ids = HashSet::new();
107    for secret_ref in sink.get_secret_refs().values() {
108        secret_ids.insert(secret_ref.secret_id);
109    }
110    // `format_desc` may not exist in `Sink`
111    if let Some(format_desc) = &sink.format_desc {
112        for secret_ref in format_desc.get_secret_refs().values() {
113            secret_ids.insert(secret_ref.secret_id);
114        }
115    }
116    secret_ids
117}