risingwave_meta/manager/
license.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use anyhow::Context;
use notify::Watcher;
use risingwave_common::system_param::LICENSE_KEY_KEY;
use thiserror_ext::AsReport;
use tokio::sync::watch;
use tokio::task::JoinHandle;

use super::MetaSrvEnv;
use crate::MetaResult;

impl MetaSrvEnv {
    /// Spawn background tasks to watch the license key file and update the system parameter,
    /// if configured.
    pub fn may_start_watch_license_key_file(&self) -> MetaResult<Option<JoinHandle<()>>> {
        let Some(path) = self.opts.license_key_path.as_ref() else {
            return Ok(None);
        };

        let (changed_tx, mut changed_rx) = watch::channel(());
        // Send an initial event to trigger the initial load.
        changed_tx.send(()).unwrap();

        let mut watcher =
            notify::recommended_watcher(move |event: Result<notify::Event, notify::Error>| {
                if let Err(e) = event {
                    tracing::warn!(
                        error = %e.as_report(),
                        "error occurred while watching license key file"
                    );
                    return;
                }
                // We don't check the event type but always notify the updater for simplicity.
                let _ = changed_tx.send(());
            })
            .context("failed to create license key file watcher")?;

        // This will spawn a new thread to watch the file, so no need to be concerned about blocking.
        watcher
            .watch(path, notify::RecursiveMode::NonRecursive)
            .context("failed to watch license key file")?;

        let updater = {
            let mgr = self.system_params_manager_impl_ref();
            let path = path.to_path_buf();
            async move {
                // Let the watcher live until the end of the updater to prevent dropping (then stopping).
                let _watcher = watcher;

                // Read the file content and set the system parameter every time the file changes.
                // Note that `changed()` will immediately resolves on the very first call, so we
                // will do the initialization then.
                while changed_rx.changed().await.is_ok() {
                    tracing::info!(path = %path.display(), "license key file changed, reloading...");

                    let content = match tokio::fs::read_to_string(&path).await {
                        Ok(v) => v,
                        Err(e) => {
                            tracing::warn!(
                                path = %path.display(),
                                error = %e.as_report(),
                                "failed to read license key file"
                            );
                            continue;
                        }
                    };

                    // Trim the content and use it as the new license key value.
                    //
                    // It's always a `Some`, meaning that an empty license key file here is equivalent to
                    // `ALTER SYSTEM SET license_key TO ''`, instead of `... TO DEFAULT`. Please note
                    // the slight difference in behavior of debug build, where the default value of the
                    // `license_key` system parameter is a test key but not an empty string.
                    let value = Some(content.trim().to_owned());

                    let result = mgr.set_param(LICENSE_KEY_KEY, value).await;

                    if let Err(e) = result {
                        tracing::error!(
                            error = %e.as_report(),
                            "failed to set license key from file"
                        );
                    }
                }
            }
        };

        let handle = tokio::spawn(updater);
        Ok(Some(handle))
    }
}

#[cfg(test)]
mod tests {
    use std::time::Duration;

    use risingwave_license::{License, LicenseManager, Tier};

    use super::*;
    use crate::manager::MetaOpts;

    // License {
    //     sub: "rw-test",
    //     iss: Test,
    //     tier: Free,              <- difference from the default license in debug build
    //     cpu_core_limit: None,
    //     exp: 9999999999,
    // }
    const INITIAL_KEY: &str =
        "eyJhbGciOiJSUzUxMiIsInR5cCI6IkpXVCJ9.\
         eyJzdWIiOiJydy10ZXN0IiwidGllciI6ImZyZWUiLCJpc3MiOiJ0ZXN0LnJpc2luZ3dhdmUuY29tIiwiZXhwIjo5OTk5OTk5OTk5fQ.\
         ALC3Kc9LI6u0S-jeMB1YTxg1k8Azxwvc750ihuSZgjA_e1OJC9moxMvpLrHdLZDzCXHjBYi0XJ_1lowmuO_0iPEuPqN5AFpDV1ywmzJvGmMCMtw3A2wuN7hhem9OsWbwe6lzdwrefZLipyo4GZtIkg5ZdwGuHzm33zsM-X5gl_Ns4P6axHKiorNSR6nTAyA6B32YVET_FAM2YJQrXqpwA61wn1XLfarZqpdIQyJ5cgyiC33BFBlUL3lcRXLMLeYe6TjYGeV4K63qARCjM9yeOlsRbbW5ViWeGtR2Yf18pN8ysPXdbaXm_P_IVhl3jCTDJt9ctPh6pUCbkt36FZqO9A";

    #[cfg(not(madsim))] // `notify` will spawn system threads, which is not allowed in madsim
    #[tokio::test]
    #[cfg_attr(not(debug_assertions), ignore)] // skip in release build
    async fn test_watch_license_key_file() {
        tracing_subscriber::fmt()
            .with_max_level(tracing::Level::INFO)
            .init();

        let key_file = tempfile::NamedTempFile::new().unwrap();
        std::fs::write(key_file.path(), INITIAL_KEY).unwrap();

        let srv = MetaSrvEnv::for_test_opts(MetaOpts {
            license_key_path: Some(key_file.path().to_path_buf()),
            ..MetaOpts::test(false)
        })
        .await;
        let _updater_handle = srv.may_start_watch_license_key_file().unwrap().unwrap();

        // Since we've filled the key file with the initial key, the license should be loaded.
        tokio::time::sleep(Duration::from_secs(1)).await;
        let license = LicenseManager::get().license().unwrap();
        assert_eq!(license.sub, "rw-test");
        assert_eq!(license.tier, Tier::Free);

        // Update the key file with an empty content, which should reset the license to the default.
        std::fs::write(key_file.path(), "").unwrap();
        tokio::time::sleep(Duration::from_secs(1)).await;
        let license = LicenseManager::get().license().unwrap();
        assert_eq!(license, License::default());
    }
}