risingwave_meta/manager/
mod.rs1pub mod diagnose;
16mod env;
17pub mod event_log;
18mod idle;
19mod license;
20mod metadata;
21mod notification;
22mod notification_version;
23pub mod sink_coordination;
24mod streaming_job;
25
26use std::collections::HashSet;
27use std::hash::{Hash, Hasher};
28
29pub use env::{MetaSrvEnv, *};
30pub use event_log::EventLogManagerRef;
31pub use idle::*;
32pub use metadata::*;
33pub use notification::{LocalNotification, MessageStatus, NotificationManagerRef, *};
34pub use risingwave_meta_model::prelude;
35use risingwave_pb::catalog::{PbSink, PbSource};
36use risingwave_pb::common::PbHostAddress;
37pub use streaming_job::*;
38
39use crate::MetaResult;
40
41#[derive(Clone, Debug)]
42pub struct WorkerKey(pub PbHostAddress);
43
44impl PartialEq<Self> for WorkerKey {
45 fn eq(&self, other: &Self) -> bool {
46 self.0.eq(&other.0)
47 }
48}
49
50impl Eq for WorkerKey {}
51
52impl Hash for WorkerKey {
53 fn hash<H: Hasher>(&self, state: &mut H) {
54 self.0.host.hash(state);
55 self.0.port.hash(state);
56 }
57}
58
59pub const META_NODE_ID: u32 = 0;
61
62pub fn get_referred_secret_ids_from_source(source: &PbSource) -> MetaResult<HashSet<u32>> {
63 let mut secret_ids = HashSet::new();
64 for secret_ref in source.get_secret_refs().values() {
65 secret_ids.insert(secret_ref.secret_id);
66 }
67 for secret_ref in source.get_info()?.get_format_encode_secret_refs().values() {
69 secret_ids.insert(secret_ref.secret_id);
70 }
71 Ok(secret_ids)
72}
73
74pub fn get_referred_connection_ids_from_source(source: &PbSource) -> HashSet<u32> {
75 let mut connection_ids = HashSet::new();
76 if let Some(conn_id) = source.connection_id {
77 connection_ids.insert(conn_id);
78 }
79 if let Some(info) = &source.info
80 && let Some(conn_id) = info.connection_id
81 {
82 connection_ids.insert(conn_id);
83 }
84 connection_ids
85}
86
87pub fn get_referred_connection_ids_from_sink(sink: &PbSink) -> HashSet<u32> {
88 let mut connection_ids = HashSet::new();
89 if let Some(format_desc) = &sink.format_desc
90 && let Some(conn_id) = format_desc.connection_id
91 {
92 connection_ids.insert(conn_id);
93 }
94 if let Some(conn_id) = sink.connection_id {
95 connection_ids.insert(conn_id);
96 }
97 connection_ids
98}
99
100pub fn get_referred_secret_ids_from_sink(sink: &PbSink) -> HashSet<u32> {
101 let mut secret_ids = HashSet::new();
102 for secret_ref in sink.get_secret_refs().values() {
103 secret_ids.insert(secret_ref.secret_id);
104 }
105 if let Some(format_desc) = &sink.format_desc {
107 for secret_ref in format_desc.get_secret_refs().values() {
108 secret_ids.insert(secret_ref.secret_id);
109 }
110 }
111 secret_ids
112}