risingwave_common_service/
observer_manager.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::time::Duration;
16
17use risingwave_pb::meta::subscribe_response::Info;
18use risingwave_pb::meta::{SubscribeResponse, SubscribeType};
19use risingwave_rpc_client::MetaClient;
20use risingwave_rpc_client::error::RpcError;
21use thiserror_ext::AsReport;
22use tokio::task::JoinHandle;
23use tonic::{Status, Streaming};
24
25/// `ObserverManager` is used to update data based on notification from meta.
26/// Call `start` to spawn a new asynchronous task
27/// We can write the notification logic by implementing `ObserverNodeImpl`.
28pub struct ObserverManager<T: NotificationClient, S: ObserverState> {
29    rx: T::Channel,
30    client: T,
31    observer_states: S,
32}
33
34pub trait ObserverState: Send + 'static {
35    fn subscribe_type() -> SubscribeType;
36    /// modify data after receiving notification from meta
37    fn handle_notification(&mut self, resp: SubscribeResponse);
38
39    /// Initialize data from the meta. It will be called at start or resubscribe
40    fn handle_initialization_notification(&mut self, resp: SubscribeResponse);
41}
42
43impl<S: ObserverState> ObserverManager<RpcNotificationClient, S> {
44    pub async fn new_with_meta_client(meta_client: MetaClient, observer_states: S) -> Self {
45        let client = RpcNotificationClient { meta_client };
46        Self::new(client, observer_states).await
47    }
48}
49
50/// Error type for [`ObserverManager`].
51#[derive(thiserror::Error, Debug)]
52pub enum ObserverError {
53    #[error("notification channel closed")]
54    ChannelClosed,
55
56    #[error(transparent)]
57    Rpc(
58        #[from]
59        #[backtrace]
60        RpcError,
61    ),
62}
63
64impl From<tonic::Status> for ObserverError {
65    fn from(status: tonic::Status) -> Self {
66        Self::Rpc(RpcError::from_meta_status(status))
67    }
68}
69
70impl<T, S> ObserverManager<T, S>
71where
72    T: NotificationClient,
73    S: ObserverState,
74{
75    pub async fn new(client: T, observer_states: S) -> Self {
76        let rx = client.subscribe(S::subscribe_type()).await.unwrap();
77        Self {
78            rx,
79            client,
80            observer_states,
81        }
82    }
83
84    async fn wait_init_notification(&mut self) -> Result<(), ObserverError> {
85        let mut notification_vec = Vec::new();
86        let init_notification = loop {
87            // notification before init notification must be received successfully.
88            match self.rx.message().await? {
89                Some(notification) => {
90                    if !matches!(notification.info.as_ref().unwrap(), &Info::Snapshot(_)) {
91                        notification_vec.push(notification);
92                    } else {
93                        break notification;
94                    }
95                }
96                None => return Err(ObserverError::ChannelClosed),
97            }
98        };
99
100        let Info::Snapshot(info) = init_notification.info.as_ref().unwrap() else {
101            unreachable!();
102        };
103
104        notification_vec.retain_mut(|notification| match notification.info.as_ref().unwrap() {
105            Info::Database(_)
106            | Info::Schema(_)
107            | Info::ObjectGroup(_)
108            | Info::User(_)
109            | Info::Connection(_)
110            | Info::Secret(_)
111            | Info::Function(_) => {
112                notification.version > info.version.as_ref().unwrap().catalog_version
113            }
114            Info::Node(_) => {
115                notification.version > info.version.as_ref().unwrap().worker_node_version
116            }
117            Info::HummockVersionDeltas(version_delta) => {
118                version_delta.version_deltas[0].id > info.hummock_version.as_ref().unwrap().id
119            }
120            Info::MetaBackupManifestId(_) => true,
121            Info::SystemParams(_) | Info::SessionParam(_) => true,
122            Info::Snapshot(_) | Info::HummockWriteLimits(_) => unreachable!(),
123            Info::HummockStats(_) => true,
124            Info::Recovery(_) => true,
125            Info::ComputeNodeTotalCpuCount(_) => true,
126            Info::StreamingWorkerSlotMapping(_) => {
127                notification.version
128                    > info
129                        .version
130                        .as_ref()
131                        .unwrap()
132                        .streaming_worker_slot_mapping_version
133            }
134            Info::ServingWorkerSlotMappings(_) => true,
135        });
136
137        self.observer_states
138            .handle_initialization_notification(init_notification);
139
140        for notification in notification_vec {
141            self.observer_states.handle_notification(notification);
142        }
143
144        Ok(())
145    }
146
147    /// `start` is used to spawn a new asynchronous task which receives meta's notification and
148    /// call the `handle_initialization_notification` and `handle_notification` to update node data.
149    pub async fn start(mut self) -> JoinHandle<()> {
150        if let Err(err) = self.wait_init_notification().await {
151            tracing::warn!(error = %err.as_report(), "Receives meta's notification err");
152            self.re_subscribe().await;
153        }
154
155        tokio::spawn(async move {
156            loop {
157                match self.rx.message().await {
158                    Ok(resp) => {
159                        if resp.is_none() {
160                            tracing::warn!("Stream of notification terminated.");
161                            self.re_subscribe().await;
162                            continue;
163                        }
164                        self.observer_states.handle_notification(resp.unwrap());
165                    }
166                    Err(err) => {
167                        tracing::warn!(error = %err.as_report(), "Receives meta's notification err");
168                        self.re_subscribe().await;
169                    }
170                }
171            }
172        })
173    }
174
175    /// `re_subscribe` is used to re-subscribe to the meta's notification.
176    async fn re_subscribe(&mut self) {
177        loop {
178            match self.client.subscribe(S::subscribe_type()).await {
179                Ok(rx) => {
180                    tracing::debug!("re-subscribe success");
181                    self.rx = rx;
182                    match self.wait_init_notification().await {
183                        Err(err) => {
184                            tracing::warn!(error = %err.as_report(), "Receives meta's notification err");
185                            continue;
186                        }
187                        _ => {
188                            break;
189                        }
190                    }
191                }
192                Err(_) => {
193                    tokio::time::sleep(RE_SUBSCRIBE_RETRY_INTERVAL).await;
194                }
195            }
196        }
197    }
198}
199
200const RE_SUBSCRIBE_RETRY_INTERVAL: Duration = Duration::from_millis(100);
201
202#[async_trait::async_trait]
203pub trait Channel: Send + 'static {
204    type Item;
205    async fn message(&mut self) -> std::result::Result<Option<Self::Item>, Status>;
206}
207
208#[async_trait::async_trait]
209impl<T: Send + 'static> Channel for Streaming<T> {
210    type Item = T;
211
212    async fn message(&mut self) -> std::result::Result<Option<T>, Status> {
213        self.message().await
214    }
215}
216
217#[async_trait::async_trait]
218pub trait NotificationClient: Send + Sync + 'static {
219    type Channel: Channel<Item = SubscribeResponse>;
220    async fn subscribe(
221        &self,
222        subscribe_type: SubscribeType,
223    ) -> Result<Self::Channel, ObserverError>;
224}
225
226pub struct RpcNotificationClient {
227    meta_client: MetaClient,
228}
229
230impl RpcNotificationClient {
231    pub fn new(meta_client: MetaClient) -> Self {
232        Self { meta_client }
233    }
234}
235
236#[async_trait::async_trait]
237impl NotificationClient for RpcNotificationClient {
238    type Channel = Streaming<SubscribeResponse>;
239
240    async fn subscribe(
241        &self,
242        subscribe_type: SubscribeType,
243    ) -> Result<Self::Channel, ObserverError> {
244        self.meta_client
245            .subscribe(subscribe_type)
246            .await
247            .map_err(Into::into)
248    }
249}