risingwave_common/system_param/
local_manager.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::ops::Deref;
16use std::sync::Arc;
17
18use arc_swap::ArcSwap;
19use risingwave_pb::meta::SystemParams;
20use tokio::sync::watch::{Receiver, Sender, channel};
21
22use super::common::CommonHandler;
23use super::diff::SystemParamsDiff;
24use super::reader::SystemParamsReader;
25use super::system_params_for_test;
26
27pub type SystemParamsReaderRef = Arc<ArcSwap<SystemParamsReader>>;
28pub type LocalSystemParamsManagerRef = Arc<LocalSystemParamsManager>;
29
30/// The system parameter manager on worker nodes. It provides two methods for other components to
31/// read the latest system parameters:
32/// - `get_params` returns a reference to the latest parameters that is atomically updated.
33/// - `watch_params` returns a channel on which calling `recv` will get the latest parameters.
34///   Compared with `get_params`, the caller can be explicitly notified of parameter change.
35#[derive(Debug)]
36pub struct LocalSystemParamsManager {
37    /// The latest parameters.
38    params: SystemParamsReaderRef,
39
40    /// Sender of the latest parameters.
41    tx: Sender<SystemParamsReaderRef>,
42}
43
44impl LocalSystemParamsManager {
45    /// Create a new instance of `LocalSystemParamsManager` and spawn a task to run
46    /// the common handler.
47    pub fn new(initial_params: SystemParamsReader) -> Self {
48        let this = Self::new_inner(initial_params.clone());
49
50        // Spawn a task to run the common handler.
51        // TODO(bugen): this may be spawned multiple times under standalone deployment, though idempotent.
52        tokio::spawn({
53            let mut rx = this.tx.subscribe();
54            async move {
55                let mut params = initial_params.clone();
56                let handler = CommonHandler::new(initial_params);
57
58                while rx.changed().await.is_ok() {
59                    // TODO: directly watch the changes instead of diffing ourselves.
60                    let new_params = (**rx.borrow_and_update().load()).clone();
61                    let diff = SystemParamsDiff::diff(params.as_ref(), new_params.as_ref());
62                    handler.handle_change(&diff);
63                    params = new_params;
64                }
65            }
66        });
67
68        this
69    }
70
71    pub fn for_test() -> Self {
72        Self::new_inner(system_params_for_test().into())
73    }
74
75    fn new_inner(initial_params: SystemParamsReader) -> Self {
76        let params = Arc::new(ArcSwap::from_pointee(initial_params));
77        let (tx, _) = channel(params.clone());
78        Self { params, tx }
79    }
80
81    pub fn get_params(&self) -> SystemParamsReaderRef {
82        self.params.clone()
83    }
84
85    pub fn try_set_params(&self, new_params: SystemParams) {
86        let new_params_reader = SystemParamsReader::from(new_params);
87        if self.params.load().deref().deref() != &new_params_reader {
88            self.params.store(Arc::new(new_params_reader));
89            // Ignore no active receiver.
90            let _ = self.tx.send(self.params.clone());
91        }
92    }
93
94    pub fn watch_params(&self) -> Receiver<SystemParamsReaderRef> {
95        self.tx.subscribe()
96    }
97}
98
99#[cfg(test)]
100mod tests {
101    use super::*;
102
103    #[tokio::test]
104    async fn test_manager() {
105        let manager = LocalSystemParamsManager::for_test();
106        let shared_params = manager.get_params();
107
108        let new_params = SystemParams {
109            sstable_size_mb: Some(114514),
110            ..Default::default()
111        };
112
113        let mut params_rx = manager.watch_params();
114
115        manager.try_set_params(new_params.clone());
116        params_rx.changed().await.unwrap();
117        assert_eq!(**params_rx.borrow().load(), new_params.clone().into());
118        assert_eq!(**shared_params.load(), new_params.into());
119    }
120}