risingwave_common/system_param/
local_manager.rs1use std::ops::Deref;
16use std::sync::Arc;
17
18use arc_swap::ArcSwap;
19use risingwave_pb::meta::SystemParams;
20use tokio::sync::watch::{Receiver, Sender, channel};
21
22use super::common::CommonHandler;
23use super::diff::SystemParamsDiff;
24use super::reader::SystemParamsReader;
25use super::system_params_for_test;
26
27pub type SystemParamsReaderRef = Arc<ArcSwap<SystemParamsReader>>;
28pub type LocalSystemParamsManagerRef = Arc<LocalSystemParamsManager>;
29
30#[derive(Debug)]
36pub struct LocalSystemParamsManager {
37 params: SystemParamsReaderRef,
39
40 tx: Sender<SystemParamsReaderRef>,
42}
43
44impl LocalSystemParamsManager {
45 pub fn new(initial_params: SystemParamsReader) -> Self {
48 let this = Self::new_inner(initial_params.clone());
49
50 tokio::spawn({
53 let mut rx = this.tx.subscribe();
54 async move {
55 let mut params = initial_params.clone();
56 let handler = CommonHandler::new(initial_params);
57
58 while rx.changed().await.is_ok() {
59 let new_params = (**rx.borrow_and_update().load()).clone();
61 let diff = SystemParamsDiff::diff(params.as_ref(), new_params.as_ref());
62 handler.handle_change(&diff);
63 params = new_params;
64 }
65 }
66 });
67
68 this
69 }
70
71 pub fn for_test() -> Self {
72 Self::new_inner(system_params_for_test().into())
73 }
74
75 fn new_inner(initial_params: SystemParamsReader) -> Self {
76 let params = Arc::new(ArcSwap::from_pointee(initial_params));
77 let (tx, _) = channel(params.clone());
78 Self { params, tx }
79 }
80
81 pub fn get_params(&self) -> SystemParamsReaderRef {
82 self.params.clone()
83 }
84
85 pub fn try_set_params(&self, new_params: SystemParams) {
86 let new_params_reader = SystemParamsReader::from(new_params);
87 if self.params.load().deref().deref() != &new_params_reader {
88 self.params.store(Arc::new(new_params_reader));
89 let _ = self.tx.send(self.params.clone());
91 }
92 }
93
94 pub fn watch_params(&self) -> Receiver<SystemParamsReaderRef> {
95 self.tx.subscribe()
96 }
97}
98
99#[cfg(test)]
100mod tests {
101 use super::*;
102
103 #[tokio::test]
104 async fn test_manager() {
105 let manager = LocalSystemParamsManager::for_test();
106 let shared_params = manager.get_params();
107
108 let new_params = SystemParams {
109 sstable_size_mb: Some(114514),
110 ..Default::default()
111 };
112
113 let mut params_rx = manager.watch_params();
114
115 manager.try_set_params(new_params.clone());
116 params_rx.changed().await.unwrap();
117 assert_eq!(**params_rx.borrow().load(), new_params.clone().into());
118 assert_eq!(**shared_params.load(), new_params.into());
119 }
120}