risingwave_meta/manager/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod diagnose;
16mod env;
17pub mod event_log;
18mod idle;
19mod license;
20mod metadata;
21mod notification;
22mod notification_version;
23pub mod sink_coordination;
24mod streaming_job;
25
26use std::collections::HashSet;
27use std::hash::{Hash, Hasher};
28
29pub use env::{MetaSrvEnv, *};
30pub use event_log::EventLogManagerRef;
31pub use idle::*;
32pub use metadata::*;
33pub use notification::{LocalNotification, MessageStatus, NotificationManagerRef, *};
34pub use risingwave_meta_model::prelude;
35use risingwave_pb::catalog::{PbSink, PbSource};
36use risingwave_pb::common::PbHostAddress;
37pub use streaming_job::*;
38
39use crate::MetaResult;
40
41#[derive(Clone, Debug)]
42pub struct WorkerKey(pub PbHostAddress);
43
44impl PartialEq<Self> for WorkerKey {
45    fn eq(&self, other: &Self) -> bool {
46        self.0.eq(&other.0)
47    }
48}
49
50impl Eq for WorkerKey {}
51
52impl Hash for WorkerKey {
53    fn hash<H: Hasher>(&self, state: &mut H) {
54        self.0.host.hash(state);
55        self.0.port.hash(state);
56    }
57}
58
59/// The id preserved for the meta node. Note that there's no such entry in cluster manager.
60pub const META_NODE_ID: u32 = 0;
61
62pub fn get_referred_secret_ids_from_source(source: &PbSource) -> MetaResult<HashSet<u32>> {
63    let mut secret_ids = HashSet::new();
64    for secret_ref in source.get_secret_refs().values() {
65        secret_ids.insert(secret_ref.secret_id);
66    }
67    // `info` must exist in `Source`
68    for secret_ref in source.get_info()?.get_format_encode_secret_refs().values() {
69        secret_ids.insert(secret_ref.secret_id);
70    }
71    Ok(secret_ids)
72}
73
74pub fn get_referred_connection_ids_from_source(source: &PbSource) -> HashSet<u32> {
75    let mut connection_ids = HashSet::new();
76    if let Some(conn_id) = source.connection_id {
77        connection_ids.insert(conn_id);
78    }
79    if let Some(info) = &source.info
80        && let Some(conn_id) = info.connection_id
81    {
82        connection_ids.insert(conn_id);
83    }
84    connection_ids
85}
86
87pub fn get_referred_connection_ids_from_sink(sink: &PbSink) -> HashSet<u32> {
88    let mut connection_ids = HashSet::new();
89    if let Some(format_desc) = &sink.format_desc
90        && let Some(conn_id) = format_desc.connection_id
91    {
92        connection_ids.insert(conn_id);
93    }
94    if let Some(conn_id) = sink.connection_id {
95        connection_ids.insert(conn_id);
96    }
97    connection_ids
98}
99
100pub fn get_referred_secret_ids_from_sink(sink: &PbSink) -> HashSet<u32> {
101    let mut secret_ids = HashSet::new();
102    for secret_ref in sink.get_secret_refs().values() {
103        secret_ids.insert(secret_ref.secret_id);
104    }
105    // `format_desc` may not exist in `Sink`
106    if let Some(format_desc) = &sink.format_desc {
107        for secret_ref in format_desc.get_secret_refs().values() {
108            secret_ids.insert(secret_ref.secret_id);
109        }
110    }
111    secret_ids
112}