risingwave_meta/manager/
license.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::atomic::AtomicUsize;
16use std::sync::atomic::Ordering::Relaxed;
17
18use anyhow::Context;
19use notify::Watcher;
20use risingwave_common::system_param::LICENSE_KEY_KEY;
21use thiserror_ext::AsReport;
22use tokio::sync::watch;
23use tokio::task::JoinHandle;
24
25use super::MetaSrvEnv;
26use crate::MetaResult;
27
28/// For test purposes, we count the number of times the license key file is reloaded.
29static RELOAD_TIMES: AtomicUsize = AtomicUsize::new(0);
30
31impl MetaSrvEnv {
32    /// Spawn background tasks to watch the license key file and update the system parameter,
33    /// if configured.
34    pub fn may_start_watch_license_key_file(&self) -> MetaResult<Option<JoinHandle<()>>> {
35        let Some(path) = self.opts.license_key_path.as_ref() else {
36            return Ok(None);
37        };
38
39        let (changed_tx, mut changed_rx) = watch::channel(());
40        // Send an initial event to trigger the initial load.
41        changed_tx.send(()).unwrap();
42
43        let mut watcher =
44            notify::recommended_watcher(move |event: Result<notify::Event, notify::Error>| {
45                match event {
46                    Ok(event) => {
47                        if event.kind.is_access() {
48                            // Ignore access events as they do not indicate changes and will be
49                            // triggered on our own read operations.
50                        } else {
51                            let _ = changed_tx.send(());
52                        }
53                    }
54                    Err(e) => {
55                        tracing::warn!(
56                            error = %e.as_report(),
57                            "error occurred while watching license key file"
58                        );
59                    }
60                }
61            })
62            .context("failed to create license key file watcher")?;
63
64        // This will spawn a new thread to watch the file, so no need to be concerned about blocking.
65        watcher
66            .watch(path, notify::RecursiveMode::NonRecursive)
67            .context("failed to watch license key file")?;
68
69        let updater = {
70            let mgr = self.system_params_manager_impl_ref();
71            let path = path.to_path_buf();
72
73            async move {
74                // Let the watcher live until the end of the updater to prevent dropping (then stopping).
75                let _watcher = watcher;
76
77                // Read the file content and set the system parameter every time the file changes.
78                // Note that `changed()` will immediately resolves on the very first call, so we
79                // will do the initialization then.
80                while changed_rx.changed().await.is_ok() {
81                    tracing::info!(path = %path.display(), "license key file changed, reloading...");
82                    RELOAD_TIMES.fetch_add(1, Relaxed);
83
84                    let content = match tokio::fs::read_to_string(&path).await {
85                        Ok(v) => v,
86                        Err(e) => {
87                            tracing::warn!(
88                                path = %path.display(),
89                                error = %e.as_report(),
90                                "failed to read license key file"
91                            );
92                            continue;
93                        }
94                    };
95
96                    // Trim the content and use it as the new license key value.
97                    //
98                    // It's always a `Some`, meaning that an empty license key file here is equivalent to
99                    // `ALTER SYSTEM SET license_key TO ''`, instead of `... TO DEFAULT`. Please note
100                    // the slight difference in behavior of debug build, where the default value of the
101                    // `license_key` system parameter is a test key but not an empty string.
102                    let value = Some(content.trim().to_owned());
103
104                    let result = mgr.set_param(LICENSE_KEY_KEY, value).await;
105
106                    if let Err(e) = result {
107                        tracing::error!(
108                            error = %e.as_report(),
109                            "failed to set license key from file"
110                        );
111                    }
112                }
113            }
114        };
115
116        let handle = tokio::spawn(updater);
117        Ok(Some(handle))
118    }
119}
120
121#[cfg(test)]
122mod tests {
123    use std::time::Duration;
124
125    use risingwave_license::{License, LicenseManager, Tier};
126
127    use super::*;
128    use crate::manager::MetaOpts;
129
130    // License {
131    //     sub: "rw-test",
132    //     iss: Test,
133    //     tier: Free,              <- difference from the default license in debug build
134    //     cpu_core_limit: None,
135    //     exp: 9999999999,
136    // }
137    const INITIAL_KEY: &str = "eyJhbGciOiJSUzUxMiIsInR5cCI6IkpXVCJ9.\
138         eyJzdWIiOiJydy10ZXN0IiwidGllciI6ImZyZWUiLCJpc3MiOiJ0ZXN0LnJpc2luZ3dhdmUuY29tIiwiZXhwIjo5OTk5OTk5OTk5fQ.\
139         ALC3Kc9LI6u0S-jeMB1YTxg1k8Azxwvc750ihuSZgjA_e1OJC9moxMvpLrHdLZDzCXHjBYi0XJ_1lowmuO_0iPEuPqN5AFpDV1ywmzJvGmMCMtw3A2wuN7hhem9OsWbwe6lzdwrefZLipyo4GZtIkg5ZdwGuHzm33zsM-X5gl_Ns4P6axHKiorNSR6nTAyA6B32YVET_FAM2YJQrXqpwA61wn1XLfarZqpdIQyJ5cgyiC33BFBlUL3lcRXLMLeYe6TjYGeV4K63qARCjM9yeOlsRbbW5ViWeGtR2Yf18pN8ysPXdbaXm_P_IVhl3jCTDJt9ctPh6pUCbkt36FZqO9A";
140
141    #[cfg(not(madsim))] // `notify` will spawn system threads, which is not allowed in madsim
142    #[tokio::test]
143    #[cfg_attr(not(debug_assertions), ignore)] // skip in release build
144    async fn test_watch_license_key_file() {
145        tracing_subscriber::fmt()
146            .with_max_level(tracing::Level::INFO)
147            .init();
148
149        let key_file = tempfile::NamedTempFile::new().unwrap();
150        std::fs::write(key_file.path(), INITIAL_KEY).unwrap();
151
152        let srv = MetaSrvEnv::for_test_opts(MetaOpts {
153            license_key_path: Some(key_file.path().to_path_buf()),
154            ..MetaOpts::test(false)
155        })
156        .await;
157        let _updater_handle = srv.may_start_watch_license_key_file().unwrap().unwrap();
158
159        // Since we've filled the key file with the initial key, the license should be loaded.
160        tokio::time::sleep(Duration::from_secs(1)).await;
161        assert_eq!(RELOAD_TIMES.load(Relaxed), 1);
162        let license = LicenseManager::get().license().unwrap();
163        assert_eq!(license.sub, "rw-test");
164        assert_eq!(license.tier, Tier::Free);
165
166        // Update the key file with an empty content, which should reset the license to the default.
167        std::fs::write(key_file.path(), "").unwrap();
168        tokio::time::sleep(Duration::from_secs(1)).await;
169        assert_eq!(RELOAD_TIMES.load(Relaxed), 2);
170        let license = LicenseManager::get().license().unwrap();
171        assert_eq!(license, License::default());
172
173        // Show that our "access" on the key file does not trigger a reload recursively.
174        tokio::time::sleep(Duration::from_secs(3)).await;
175        assert_eq!(RELOAD_TIMES.load(Relaxed), 2);
176    }
177}