1use std::num::NonZeroU64;
16use std::sync::{LazyLock, RwLock};
17
18use jsonwebtoken::{Algorithm, DecodingKey, Validation};
19use risingwave_pb::common::ClusterResource;
20use serde::{Deserialize, Serialize};
21use thiserror::Error;
22use thiserror_ext::AsReport;
23
24use crate::{Feature, LicenseKeyRef};
25
26#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
28#[serde(untagged)]
29pub enum MaybeFeature {
30 Feature(Feature),
32 Unknown(String),
35}
36
37#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
41#[serde(rename_all = "snake_case")]
42pub enum Tier {
43 Free,
45
46 #[serde(rename = "paid")]
48 AllAsOf2_5,
49
50 All,
52
53 #[serde(untagged)]
55 Custom {
56 name: String,
57 features: Vec<MaybeFeature>,
58 },
59}
60
61impl Tier {
62 #[auto_enums::auto_enum(Iterator)]
64 pub fn available_features(&self) -> impl Iterator<Item = Feature> {
65 match self {
66 Tier::Free => std::iter::empty(),
67 Tier::AllAsOf2_5 => Feature::all_as_of_2_5().iter().copied(),
68 Tier::All => Feature::all().iter().copied(),
69 Tier::Custom { features, .. } => features.iter().filter_map(|feature| match feature {
70 MaybeFeature::Feature(feature) => Some(*feature),
71 MaybeFeature::Unknown(_) => None,
72 }),
73 }
74 }
75}
76
77#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
82pub enum Issuer {
83 #[serde(rename = "prod.risingwave.com")]
84 Prod,
85
86 #[serde(rename = "test.risingwave.com")]
87 Test,
88
89 #[serde(untagged)]
90 Unknown(String),
91}
92
93#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
101#[serde(rename_all = "snake_case")]
102pub struct License {
103 #[allow(dead_code)]
107 pub sub: String,
108
109 #[allow(dead_code)]
113 pub iss: Issuer,
114
115 pub tier: Tier,
117
118 #[serde(alias = "cpu_core_limit")]
123 pub rwu_limit: Option<NonZeroU64>,
124
125 pub exp: u64,
129}
130
131impl License {
132 pub fn cpu_core_limit(&self) -> Option<u64> {
134 self.rwu_limit.map(|limit| limit.get())
135 }
136
137 pub fn memory_limit(&self) -> Option<u64> {
139 const MEMORY_PER_RWU: u64 = 4 * 1024 * 1024 * 1024;
141
142 self.rwu_limit.map(
143 |limit| (limit.get() + 1) * MEMORY_PER_RWU - 1, )
145 }
146
147 pub fn check_cluster_resource(&self, resource: ClusterResource) -> Result<(), LicenseError> {
149 let ClusterResource {
150 total_memory_bytes,
151 total_cpu_cores,
152 } = resource;
153
154 if let Some(limit) = self.cpu_core_limit()
155 && total_cpu_cores > limit
156 {
157 return Err(LicenseError::CpuLimitExceeded {
158 limit,
159 actual: total_cpu_cores,
160 });
161 }
162
163 #[cfg(not(madsim))] if let Some(limit) = self.memory_limit()
165 && total_memory_bytes > limit
166 {
167 return Err(LicenseError::MemoryLimitExceeded {
168 limit,
169 actual: total_memory_bytes,
170 });
171 }
172
173 Ok(())
174 }
175}
176
177impl Default for License {
178 fn default() -> Self {
182 Self {
183 sub: "default".to_owned(),
184 tier: Tier::Free,
185 iss: Issuer::Prod,
186 rwu_limit: None,
187 exp: u64::MAX,
188 }
189 }
190}
191
192#[derive(Debug, Clone, Error)]
194pub enum LicenseError {
195 #[error("invalid license key")]
196 InvalidKey(#[source] jsonwebtoken::errors::Error),
197
198 #[error(
199 "a valid license key is set, but it is currently not effective because the CPU core in the cluster \
200 ({actual}) exceeds the maximum allowed by the license key ({limit}); \
201 consider removing some nodes or acquiring a new license key with a higher limit"
202 )]
203 CpuLimitExceeded { limit: u64, actual: u64 },
204
205 #[error(
206 "a valid license key is set, but it is currently not effective because the memory in the cluster \
207 ({actual}) exceeds the maximum allowed by the license key ({limit}); \
208 consider removing some nodes or acquiring a new license key with a higher limit",
209 actual = humansize::format_size(*actual, humansize::BINARY),
210 limit = humansize::format_size(*limit, humansize::BINARY),
211 )]
212 MemoryLimitExceeded { limit: u64, actual: u64 },
213}
214
215struct Inner {
216 license: Result<License, LicenseError>,
217 cached_cluster_resource: ClusterResource,
218}
219
220pub struct LicenseManager {
222 inner: RwLock<Inner>,
223}
224
225static PUBLIC_KEY: LazyLock<DecodingKey> = LazyLock::new(|| {
226 DecodingKey::from_rsa_pem(include_bytes!("key.pub"))
227 .expect("invalid public key for license validation")
228});
229
230impl LicenseManager {
231 pub(crate) fn new() -> Self {
233 Self {
234 inner: RwLock::new(Inner {
235 license: Ok(License::default()),
236 cached_cluster_resource: ClusterResource {
237 total_cpu_cores: 0,
238 total_memory_bytes: 0,
239 },
240 }),
241 }
242 }
243
244 pub fn get() -> &'static Self {
246 static INSTANCE: LazyLock<LicenseManager> = LazyLock::new(LicenseManager::new);
247 &INSTANCE
248 }
249
250 pub fn refresh(&self, license_key: LicenseKeyRef<'_>) {
252 let license_key = license_key.0;
253 let mut inner = self.inner.write().unwrap();
254
255 if license_key.is_empty() {
257 inner.license = Ok(License::default());
258 return;
259 }
260
261 let mut validation = Validation::new(Algorithm::RS512);
263 validation.set_issuer(&[
266 "prod.risingwave.com",
267 #[cfg(debug_assertions)]
268 "test.risingwave.com",
269 ]);
270
271 inner.license = match jsonwebtoken::decode(license_key, &PUBLIC_KEY, &validation) {
272 Ok(data) => Ok(data.claims),
273 Err(error) => Err(LicenseError::InvalidKey(error)),
274 };
275
276 match &inner.license {
277 Ok(license) => tracing::info!(?license, "license refreshed"),
278 Err(error) => tracing::warn!(error = %error.as_report(), "invalid license key"),
279 }
280 }
281
282 pub fn update_cluster_resource(&self, resource: ClusterResource) {
284 let mut inner = self.inner.write().unwrap();
285 inner.cached_cluster_resource = resource;
286 }
287
288 pub fn license(&self) -> Result<License, LicenseError> {
295 let inner = self.inner.read().unwrap();
296 let license = inner.license.clone()?;
297
298 if license.exp < jsonwebtoken::get_current_timestamp() {
300 return Err(LicenseError::InvalidKey(
301 jsonwebtoken::errors::ErrorKind::ExpiredSignature.into(),
302 ));
303 }
304
305 license.check_cluster_resource(inner.cached_cluster_resource)?;
307
308 Ok(license)
309 }
310}
311
312#[cfg(debug_assertions)]
314#[cfg(test)]
315mod tests {
316 use expect_test::expect;
317
318 use super::*;
319 use crate::{LicenseKey, PROD_ALL_4_CORE_LICENSE_KEY_CONTENT, TEST_ALL_LICENSE_KEY_CONTENT};
320
321 fn do_test(key: &str, expect: expect_test::Expect) -> LicenseManager {
322 let manager = LicenseManager::new();
323 manager.refresh(LicenseKey(key));
324
325 match manager.license() {
326 Ok(license) => expect.assert_debug_eq(&license),
327 Err(error) => expect.assert_eq(&error.to_report_string()),
328 }
329
330 manager
331 }
332
333 #[test]
334 fn test_all_license_key() {
335 do_test(
336 TEST_ALL_LICENSE_KEY_CONTENT,
337 expect![[r#"
338 License {
339 sub: "rw-test-all",
340 iss: Test,
341 tier: All,
342 rwu_limit: None,
343 exp: 10000627200,
344 }
345 "#]],
346 );
347 }
348
349 #[test]
350 fn test_prod_all_4_core_license_key() {
351 do_test(
352 PROD_ALL_4_CORE_LICENSE_KEY_CONTENT,
353 expect![[r#"
354 License {
355 sub: "rw-default-all-4-core",
356 iss: Prod,
357 tier: All,
358 rwu_limit: Some(
359 4,
360 ),
361 exp: 10000627200,
362 }
363 "#]],
364 );
365 }
366
367 #[test]
368 fn test_free_license_key() {
369 const KEY: &str = "eyJhbGciOiJSUzUxMiIsInR5cCI6IkpXVCJ9.\
370 eyJzdWIiOiJydy10ZXN0IiwidGllciI6ImZyZWUiLCJpc3MiOiJ0ZXN0LnJpc2luZ3dhdmUuY29tIiwiZXhwIjo5OTk5OTk5OTk5fQ.\
371 ALC3Kc9LI6u0S-jeMB1YTxg1k8Azxwvc750ihuSZgjA_e1OJC9moxMvpLrHdLZDzCXHjBYi0XJ_1lowmuO_0iPEuPqN5AFpDV1ywmzJvGmMCMtw3A2wuN7hhem9OsWbwe6lzdwrefZLipyo4GZtIkg5ZdwGuHzm33zsM-X5gl_Ns4P6axHKiorNSR6nTAyA6B32YVET_FAM2YJQrXqpwA61wn1XLfarZqpdIQyJ5cgyiC33BFBlUL3lcRXLMLeYe6TjYGeV4K63qARCjM9yeOlsRbbW5ViWeGtR2Yf18pN8ysPXdbaXm_P_IVhl3jCTDJt9ctPh6pUCbkt36FZqO9A";
372
373 do_test(
374 KEY,
375 expect![[r#"
376 License {
377 sub: "rw-test",
378 iss: Test,
379 tier: Free,
380 rwu_limit: None,
381 exp: 9999999999,
382 }
383 "#]],
384 );
385 }
386
387 #[test]
388 fn test_empty_license_key() {
389 do_test(
391 "",
392 expect![[r#"
393 License {
394 sub: "default",
395 iss: Prod,
396 tier: Free,
397 rwu_limit: None,
398 exp: 18446744073709551615,
399 }
400 "#]],
401 );
402 }
403
404 #[test]
405 fn test_custom_tier_license_key() {
406 const KEY: &str = "eyJhbGciOiJSUzUxMiIsInR5cCI6IkpXVCJ9.\
407 eyJzdWIiOiJydy11bml0LXRlc3QiLCJpc3MiOiJ0ZXN0LnJpc2luZ3dhdmUuY29tIiwiZXhwIjoxMDAwMDYyNzIwMCwiaWF0IjoxNzUxODY5OTI3LCJ0aWVyIjp7Im5hbWUiOiJzZWNyZXQtb25seSIsImZlYXR1cmVzIjpbIlNlY3JldE1hbmFnZW1lbnQiXX19.\
408 iZKNyAHxz1l24Qh4F9wauuQEtDPLAeOmW2m_ttlew2ENmP_XcGWCx_O8r50NXRDaKv66z-ibteWVL5XOUqaVgJw9EnCyfVuFNoKtFQu18Vt0l52Yw2zNh3iHNQFKwHuCUki5FlOHw-K57a5f414-gzfwAwQkO1bAYoyAFhtoX6QQ2jdbxctFg0NxTQqpjnP-h0k2myZ_IhxA8fKrQIAqBj5Y8tGljuxjTLJpoK7X0ESXNh7bhA8njEf2Hm4QCymI1uo8OYRaR1Siw87r0aZykw9wW15Q8VK58VxIQLS7b7gQOmDToBjJt9yIF3MT6YMfqMX_l3Dtn9bOS_htVd1bjQ";
409
410 let manager = do_test(
411 KEY,
412 expect![[r#"
413 License {
414 sub: "rw-unit-test",
415 iss: Test,
416 tier: Custom {
417 name: "secret-only",
418 features: [
419 Feature(
420 SecretManagement,
421 ),
422 ],
423 },
424 rwu_limit: None,
425 exp: 10000627200,
426 }
427 "#]],
428 );
429
430 let tier = manager.license().unwrap().tier;
431 assert_eq!(
432 tier.available_features().collect::<Vec<_>>(),
433 vec![Feature::SecretManagement]
434 );
435 assert!(
436 Feature::SecretManagement
437 .check_available_with(&manager)
438 .is_ok()
439 );
440 assert!(
441 Feature::BigQuerySink
442 .check_available_with(&manager)
443 .is_err()
444 );
445 }
446
447 #[test]
448 fn test_invalid_license_key() {
449 const KEY: &str = "invalid";
450
451 do_test(KEY, expect!["invalid license key: InvalidToken"]);
452 }
453
454 #[test]
455 fn test_expired_license_key() {
456 const KEY: &str = "eyJhbGciOiJSUzUxMiIsInR5cCI6IkpXVCJ9.\
458 eyJzdWIiOiJydy10ZXN0IiwidGllciI6InBhaWQiLCJpc3MiOiJ0ZXN0LnJpc2luZ3dhdmUuY29tIiwiZXhwIjowfQ.\
459 TyYmoT5Gw9-FN7DWDbeg3myW8g_3Xlc90i4M9bGuPf2WLv9zRMJy2r9J7sl1BO7t6F1uGgyrvNxsVRVZ2XF_WAs6uNlluYBnd4Cqvsj6Xny1XJCCo8II3RIea-ZlRjp6tc1saaoe-_eTtqDH8NIIWe73vVtBeBTBU4zAiN2vCtU_Si2XuoTLBKJMIjtn0HjLNhb6-DX2P3SCzp75tMyWzr49qcsBgratyKdu_v2kqBM1qw_dTaRg2ZeNNO6scSOBwu4YHHJTL4nUaZO2yEodI_OKUztIPLYuO2A33Fb5OE57S7LTgSzmxZLf7e23Vrck7Os14AfBQr7p9ncUeyIXhA";
460
461 do_test(KEY, expect!["invalid license key: ExpiredSignature"]);
462 }
463
464 #[test]
465 fn test_invalid_issuer() {
466 const KEY: &str = "eyJhbGciOiJSUzUxMiIsInR5cCI6IkpXVCJ9.\
468 eyJzdWIiOiJydy10ZXN0IiwidGllciI6ImZyZWUiLCJpc3MiOiJiYWQucmlzaW5nd2F2ZS5jb20iLCJleHAiOjk5OTk5OTk5OTl9.\
469 SUbDJTri902FbGgIoe5L3LG4edTXoR42BQCIu_KLyW41OK47bMnD2aK7JggyJmWyGtN7b_596hxM9HjU58oQtHePUo_zHi5li5IcRaMi8gqHae7CJGqOGAUo9vYOWCP5OjEuDfozJhpgcHBLzDRnSwYnWhLKtsrzb3UcpOXEqRVK7EDShBNx6kNqfYs2LlFI7ASsgFRLhoRuOTR5LeVDjj6NZfkZGsdMe1VyrODWoGT9kcAF--hBpUd1ZJ5mZ67A0_948VPFBYDbDPcTRnw1-5MvdibO-jKX49rJ0rlPXcAbqKPE_yYUaqUaORUzb3PaPgCT_quO9PWPuAFIgAb_fg";
470
471 do_test(KEY, expect!["invalid license key: InvalidIssuer"]);
472 }
473
474 #[test]
475 fn test_invalid_signature() {
476 const KEY: &str = "eyJhbGciOiJSUzUxMiIsInR5cCI6IkpXVCJ9.\
477 eyJzdWIiOiJydy10ZXN0IiwidGllciI6ImZyZWUiLCJpc3MiOiJ0ZXN0LnJpc2luZ3dhdmUuY29tIiwiZXhwIjo5OTk5OTk5OTk5fQ.\
478 InvalidSignatureoe5L3LG4edTXoR42BQCIu_KLyW41OK47bMnD2aK7JggyJmWyGtN7b_596hxM9HjU58oQtHePUo_zHi5li5IcRaMi8gqHae7CJGqOGAUo9vYOWCP5OjEuDfozJhpgcHBLzDRnSwYnWhLKtsrzb3UcpOXEqRVK7EDShBNx6kNqfYs2LlFI7ASsgFRLhoRuOTR5LeVDjj6NZfkZGsdMe1VyrODWoGT9kcAF--hBpUd1ZJ5mZ67A0_948VPFBYDbDPcTRnw1-5MvdibO-jKX49rJ0rlPXcAbqKPE_yYUaqUaORUzb3PaPgCT_quO9PWPuAFIgAb_fg";
479
480 do_test(KEY, expect!["invalid license key: InvalidSignature"]);
481 }
482}