risingwave_meta/manager/
mod.rs1pub mod diagnose;
16mod env;
17pub mod event_log;
18pub mod iceberg_compaction;
19mod idle;
20mod license;
21mod metadata;
22mod notification;
23mod notification_version;
24pub mod sink_coordination;
25mod streaming_job;
26
27use std::collections::HashSet;
28use std::hash::{Hash, Hasher};
29
30pub use env::{MetaSrvEnv, *};
31pub use event_log::EventLogManagerRef;
32pub use idle::*;
33pub use metadata::*;
34pub use notification::{LocalNotification, MessageStatus, NotificationManagerRef, *};
35pub use risingwave_meta_model::prelude;
36use risingwave_pb::catalog::{PbSink, PbSource};
37use risingwave_pb::common::PbHostAddress;
38pub use streaming_job::*;
39
40use crate::MetaResult;
41
42#[derive(Clone, Debug)]
43pub struct WorkerKey(pub PbHostAddress);
44
45impl PartialEq<Self> for WorkerKey {
46 fn eq(&self, other: &Self) -> bool {
47 self.0.eq(&other.0)
48 }
49}
50
51impl Eq for WorkerKey {}
52
53impl Hash for WorkerKey {
54 fn hash<H: Hasher>(&self, state: &mut H) {
55 self.0.host.hash(state);
56 self.0.port.hash(state);
57 }
58}
59
60pub const META_NODE_ID: u32 = 0;
62
63pub fn get_referred_secret_ids_from_source(source: &PbSource) -> MetaResult<HashSet<u32>> {
64 let mut secret_ids = HashSet::new();
65 for secret_ref in source.get_secret_refs().values() {
66 secret_ids.insert(secret_ref.secret_id);
67 }
68 for secret_ref in source.get_info()?.get_format_encode_secret_refs().values() {
70 secret_ids.insert(secret_ref.secret_id);
71 }
72 Ok(secret_ids)
73}
74
75pub fn get_referred_connection_ids_from_source(source: &PbSource) -> HashSet<u32> {
76 let mut connection_ids = HashSet::new();
77 if let Some(conn_id) = source.connection_id {
78 connection_ids.insert(conn_id);
79 }
80 if let Some(info) = &source.info
81 && let Some(conn_id) = info.connection_id
82 {
83 connection_ids.insert(conn_id);
84 }
85 connection_ids
86}
87
88pub fn get_referred_connection_ids_from_sink(sink: &PbSink) -> HashSet<u32> {
89 let mut connection_ids = HashSet::new();
90 if let Some(format_desc) = &sink.format_desc
91 && let Some(conn_id) = format_desc.connection_id
92 {
93 connection_ids.insert(conn_id);
94 }
95 if let Some(conn_id) = sink.connection_id {
96 connection_ids.insert(conn_id);
97 }
98 connection_ids
99}
100
101pub fn get_referred_secret_ids_from_sink(sink: &PbSink) -> HashSet<u32> {
102 let mut secret_ids = HashSet::new();
103 for secret_ref in sink.get_secret_refs().values() {
104 secret_ids.insert(secret_ref.secret_id);
105 }
106 if let Some(format_desc) = &sink.format_desc {
108 for secret_ref in format_desc.get_secret_refs().values() {
109 secret_ids.insert(secret_ref.secret_id);
110 }
111 }
112 secret_ids
113}