risingwave_meta/manager/
mod.rs1pub mod diagnose;
16mod env;
17pub mod event_log;
18pub mod iceberg_compaction;
19mod idle;
20mod license;
21mod metadata;
22mod notification;
23mod notification_version;
24pub mod sink_coordination;
25mod streaming_job;
26
27use std::collections::HashSet;
28use std::hash::{Hash, Hasher};
29
30pub use env::{MetaSrvEnv, *};
31pub use event_log::EventLogManagerRef;
32pub use idle::*;
33pub use metadata::*;
34pub use notification::{LocalNotification, MessageStatus, NotificationManagerRef, *};
35use risingwave_common::id::WorkerId;
36pub use risingwave_meta_model::prelude;
37use risingwave_meta_model::{ConnectionId, SecretId};
38use risingwave_pb::catalog::{PbSink, PbSource};
39use risingwave_pb::common::PbHostAddress;
40pub use streaming_job::*;
41
42use crate::MetaResult;
43
44#[derive(Clone, Debug)]
45pub struct WorkerKey(pub PbHostAddress);
46
47impl PartialEq<Self> for WorkerKey {
48 fn eq(&self, other: &Self) -> bool {
49 self.0.eq(&other.0)
50 }
51}
52
53impl Eq for WorkerKey {}
54
55impl Hash for WorkerKey {
56 fn hash<H: Hasher>(&self, state: &mut H) {
57 self.0.host.hash(state);
58 self.0.port.hash(state);
59 }
60}
61
62pub const META_NODE_ID: WorkerId = WorkerId::new(0);
64
65pub fn get_referred_secret_ids_from_source(source: &PbSource) -> MetaResult<HashSet<SecretId>> {
66 let mut secret_ids = HashSet::new();
67 for secret_ref in source.get_secret_refs().values() {
68 secret_ids.insert(secret_ref.secret_id);
69 }
70 for secret_ref in source.get_info()?.get_format_encode_secret_refs().values() {
72 secret_ids.insert(secret_ref.secret_id);
73 }
74 Ok(secret_ids)
75}
76
77pub fn get_referred_connection_ids_from_source(source: &PbSource) -> HashSet<ConnectionId> {
78 let mut connection_ids = HashSet::new();
79 if let Some(conn_id) = source.connection_id {
80 connection_ids.insert(conn_id);
81 }
82 if let Some(info) = &source.info
83 && let Some(conn_id) = info.connection_id
84 {
85 connection_ids.insert(conn_id);
86 }
87 connection_ids
88}
89
90pub fn get_referred_connection_ids_from_sink(sink: &PbSink) -> HashSet<ConnectionId> {
91 let mut connection_ids = HashSet::new();
92 if let Some(format_desc) = &sink.format_desc
93 && let Some(conn_id) = format_desc.connection_id
94 {
95 connection_ids.insert(conn_id);
96 }
97 if let Some(conn_id) = sink.connection_id {
98 connection_ids.insert(conn_id);
99 }
100 connection_ids
101}
102
103pub fn get_referred_secret_ids_from_sink(sink: &PbSink) -> HashSet<SecretId> {
104 let mut secret_ids = HashSet::new();
105 for secret_ref in sink.get_secret_refs().values() {
106 secret_ids.insert(secret_ref.secret_id);
107 }
108 if let Some(format_desc) = &sink.format_desc {
110 for secret_ref in format_desc.get_secret_refs().values() {
111 secret_ids.insert(secret_ref.secret_id);
112 }
113 }
114 secret_ids
115}