risingwave_common_service/
observer_manager.rs1use std::time::Duration;
16
17use risingwave_pb::meta::subscribe_response::Info;
18use risingwave_pb::meta::{SubscribeResponse, SubscribeType};
19use risingwave_rpc_client::MetaClient;
20use risingwave_rpc_client::error::RpcError;
21use thiserror_ext::AsReport;
22use tokio::task::JoinHandle;
23use tonic::{Status, Streaming};
24
25pub struct ObserverManager<T: NotificationClient, S: ObserverState> {
29 rx: T::Channel,
30 client: T,
31 observer_states: S,
32}
33
34pub trait ObserverState: Send + 'static {
35 fn subscribe_type() -> SubscribeType;
36 fn handle_notification(&mut self, resp: SubscribeResponse);
38
39 fn handle_initialization_notification(&mut self, resp: SubscribeResponse);
41}
42
43impl<S: ObserverState> ObserverManager<RpcNotificationClient, S> {
44 pub async fn new_with_meta_client(meta_client: MetaClient, observer_states: S) -> Self {
45 let client = RpcNotificationClient { meta_client };
46 Self::new(client, observer_states).await
47 }
48}
49
50#[derive(thiserror::Error, Debug)]
52pub enum ObserverError {
53 #[error("notification channel closed")]
54 ChannelClosed,
55
56 #[error(transparent)]
57 Rpc(
58 #[from]
59 #[backtrace]
60 RpcError,
61 ),
62}
63
64impl From<tonic::Status> for ObserverError {
65 fn from(status: tonic::Status) -> Self {
66 Self::Rpc(RpcError::from_meta_status(status))
67 }
68}
69
70impl<T, S> ObserverManager<T, S>
71where
72 T: NotificationClient,
73 S: ObserverState,
74{
75 pub async fn new(client: T, observer_states: S) -> Self {
76 let rx = client.subscribe(S::subscribe_type()).await.unwrap();
77 Self {
78 rx,
79 client,
80 observer_states,
81 }
82 }
83
84 async fn wait_init_notification(&mut self) -> Result<(), ObserverError> {
85 let mut notification_vec = Vec::new();
86 let init_notification = loop {
87 match self.rx.message().await? {
89 Some(notification) => {
90 if !matches!(notification.info.as_ref().unwrap(), &Info::Snapshot(_)) {
91 notification_vec.push(notification);
92 } else {
93 break notification;
94 }
95 }
96 None => return Err(ObserverError::ChannelClosed),
97 }
98 };
99
100 let Info::Snapshot(info) = init_notification.info.as_ref().unwrap() else {
101 unreachable!();
102 };
103
104 notification_vec.retain_mut(|notification| match notification.info.as_ref().unwrap() {
105 Info::Database(_)
106 | Info::Schema(_)
107 | Info::ObjectGroup(_)
108 | Info::User(_)
109 | Info::Connection(_)
110 | Info::Secret(_)
111 | Info::Function(_) => {
112 notification.version > info.version.as_ref().unwrap().catalog_version
113 }
114 Info::Node(_) => {
115 notification.version > info.version.as_ref().unwrap().worker_node_version
116 }
117 Info::HummockVersionDeltas(version_delta) => {
118 version_delta.version_deltas[0].id > info.hummock_version.as_ref().unwrap().id
119 }
120 Info::MetaBackupManifestId(_) => true,
121 Info::SystemParams(_) | Info::SessionParam(_) => true,
122 Info::Snapshot(_) | Info::HummockWriteLimits(_) => unreachable!(),
123 Info::HummockStats(_) => true,
124 Info::Recovery(_) => true,
125 Info::ComputeNodeTotalCpuCount(_) => true,
126 Info::StreamingWorkerSlotMapping(_) => {
127 notification.version
128 > info
129 .version
130 .as_ref()
131 .unwrap()
132 .streaming_worker_slot_mapping_version
133 }
134 Info::ServingWorkerSlotMappings(_) => true,
135 });
136
137 self.observer_states
138 .handle_initialization_notification(init_notification);
139
140 for notification in notification_vec {
141 self.observer_states.handle_notification(notification);
142 }
143
144 Ok(())
145 }
146
147 pub async fn start(mut self) -> JoinHandle<()> {
150 if let Err(err) = self.wait_init_notification().await {
151 tracing::warn!(error = %err.as_report(), "Receives meta's notification err");
152 self.re_subscribe().await;
153 }
154
155 tokio::spawn(async move {
156 loop {
157 match self.rx.message().await {
158 Ok(resp) => {
159 if resp.is_none() {
160 tracing::warn!("Stream of notification terminated.");
161 self.re_subscribe().await;
162 continue;
163 }
164 self.observer_states.handle_notification(resp.unwrap());
165 }
166 Err(err) => {
167 tracing::warn!(error = %err.as_report(), "Receives meta's notification err");
168 self.re_subscribe().await;
169 }
170 }
171 }
172 })
173 }
174
175 async fn re_subscribe(&mut self) {
177 loop {
178 match self.client.subscribe(S::subscribe_type()).await {
179 Ok(rx) => {
180 tracing::debug!("re-subscribe success");
181 self.rx = rx;
182 match self.wait_init_notification().await {
183 Err(err) => {
184 tracing::warn!(error = %err.as_report(), "Receives meta's notification err");
185 continue;
186 }
187 _ => {
188 break;
189 }
190 }
191 }
192 Err(_) => {
193 tokio::time::sleep(RE_SUBSCRIBE_RETRY_INTERVAL).await;
194 }
195 }
196 }
197 }
198}
199
200const RE_SUBSCRIBE_RETRY_INTERVAL: Duration = Duration::from_millis(100);
201
202#[async_trait::async_trait]
203pub trait Channel: Send + 'static {
204 type Item;
205 async fn message(&mut self) -> std::result::Result<Option<Self::Item>, Status>;
206}
207
208#[async_trait::async_trait]
209impl<T: Send + 'static> Channel for Streaming<T> {
210 type Item = T;
211
212 async fn message(&mut self) -> std::result::Result<Option<T>, Status> {
213 self.message().await
214 }
215}
216
217#[async_trait::async_trait]
218pub trait NotificationClient: Send + Sync + 'static {
219 type Channel: Channel<Item = SubscribeResponse>;
220 async fn subscribe(
221 &self,
222 subscribe_type: SubscribeType,
223 ) -> Result<Self::Channel, ObserverError>;
224}
225
226pub struct RpcNotificationClient {
227 meta_client: MetaClient,
228}
229
230impl RpcNotificationClient {
231 pub fn new(meta_client: MetaClient) -> Self {
232 Self { meta_client }
233 }
234}
235
236#[async_trait::async_trait]
237impl NotificationClient for RpcNotificationClient {
238 type Channel = Streaming<SubscribeResponse>;
239
240 async fn subscribe(
241 &self,
242 subscribe_type: SubscribeType,
243 ) -> Result<Self::Channel, ObserverError> {
244 self.meta_client
245 .subscribe(subscribe_type)
246 .await
247 .map_err(Into::into)
248 }
249}