risingwave_common/system_param/
local_manager.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::ops::Deref;
use std::sync::Arc;

use arc_swap::ArcSwap;
use risingwave_pb::meta::SystemParams;
use tokio::sync::watch::{channel, Receiver, Sender};

use super::common::CommonHandler;
use super::diff::SystemParamsDiff;
use super::reader::SystemParamsReader;
use super::system_params_for_test;

pub type SystemParamsReaderRef = Arc<ArcSwap<SystemParamsReader>>;
pub type LocalSystemParamsManagerRef = Arc<LocalSystemParamsManager>;

/// The system parameter manager on worker nodes. It provides two methods for other components to
/// read the latest system parameters:
/// - `get_params` returns a reference to the latest parameters that is atomically updated.
/// - `watch_params` returns a channel on which calling `recv` will get the latest parameters.
///   Compared with `get_params`, the caller can be explicitly notified of parameter change.
#[derive(Debug)]
pub struct LocalSystemParamsManager {
    /// The latest parameters.
    params: SystemParamsReaderRef,

    /// Sender of the latest parameters.
    tx: Sender<SystemParamsReaderRef>,
}

impl LocalSystemParamsManager {
    /// Create a new instance of `LocalSystemParamsManager` and spawn a task to run
    /// the common handler.
    pub fn new(initial_params: SystemParamsReader) -> Self {
        let this = Self::new_inner(initial_params.clone());

        // Spawn a task to run the common handler.
        // TODO(bugen): this may be spawned multiple times under standalone deployment, though idempotent.
        tokio::spawn({
            let mut rx = this.tx.subscribe();
            async move {
                let mut params = initial_params.clone();
                let handler = CommonHandler::new(initial_params);

                while rx.changed().await.is_ok() {
                    // TODO: directly watch the changes instead of diffing ourselves.
                    let new_params = (**rx.borrow_and_update().load()).clone();
                    let diff = SystemParamsDiff::diff(params.as_ref(), new_params.as_ref());
                    handler.handle_change(&diff);
                    params = new_params;
                }
            }
        });

        this
    }

    pub fn for_test() -> Self {
        Self::new_inner(system_params_for_test().into())
    }

    fn new_inner(initial_params: SystemParamsReader) -> Self {
        let params = Arc::new(ArcSwap::from_pointee(initial_params));
        let (tx, _) = channel(params.clone());
        Self { params, tx }
    }

    pub fn get_params(&self) -> SystemParamsReaderRef {
        self.params.clone()
    }

    pub fn try_set_params(&self, new_params: SystemParams) {
        let new_params_reader = SystemParamsReader::from(new_params);
        if self.params.load().deref().deref() != &new_params_reader {
            self.params.store(Arc::new(new_params_reader));
            // Ignore no active receiver.
            let _ = self.tx.send(self.params.clone());
        }
    }

    pub fn watch_params(&self) -> Receiver<SystemParamsReaderRef> {
        self.tx.subscribe()
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[tokio::test]
    async fn test_manager() {
        let manager = LocalSystemParamsManager::for_test();
        let shared_params = manager.get_params();

        let new_params = SystemParams {
            sstable_size_mb: Some(114514),
            ..Default::default()
        };

        let mut params_rx = manager.watch_params();

        manager.try_set_params(new_params.clone());
        params_rx.changed().await.unwrap();
        assert_eq!(**params_rx.borrow().load(), new_params.clone().into());
        assert_eq!(**shared_params.load(), new_params.into());
    }
}