1use std::num::NonZeroU64;
16use std::sync::{LazyLock, RwLock};
17
18use jsonwebtoken::{Algorithm, DecodingKey, Validation};
19use risingwave_pb::common::ClusterResource;
20use serde::{Deserialize, Serialize};
21use thiserror::Error;
22use thiserror_ext::AsReport;
23
24use crate::{Feature, LicenseKeyRef};
25
26#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
28#[serde(untagged)]
29pub enum MaybeFeature {
30 Feature(Feature),
32 Unknown(String),
35}
36
37#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
41#[serde(rename_all = "snake_case")]
42pub enum Tier {
43 Free,
45
46 #[serde(rename = "paid")]
48 AllAsOf2_5,
49
50 All,
52
53 #[serde(untagged)]
55 Custom {
56 name: String,
57 features: Vec<MaybeFeature>,
58 },
59}
60
61impl Tier {
62 #[auto_enums::auto_enum(Iterator)]
64 pub fn available_features(&self) -> impl Iterator<Item = Feature> {
65 match self {
66 Tier::Free => std::iter::empty(),
67 Tier::AllAsOf2_5 => Feature::all_as_of_2_5().iter().copied(),
68 Tier::All => Feature::all().iter().copied(),
69 Tier::Custom { features, .. } => features.iter().filter_map(|feature| match feature {
70 MaybeFeature::Feature(feature) => Some(*feature),
71 MaybeFeature::Unknown(_) => None,
72 }),
73 }
74 }
75
76 pub fn name(&self) -> &str {
77 match self {
78 Tier::Free => "free",
79 Tier::AllAsOf2_5 => "paid",
80 Tier::All => "all",
81 Tier::Custom { name, .. } => name,
82 }
83 }
84}
85
86#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
91pub enum Issuer {
92 #[serde(rename = "prod.risingwave.com")]
93 Prod,
94
95 #[serde(rename = "test.risingwave.com")]
96 Test,
97
98 #[serde(untagged)]
99 Unknown(String),
100}
101
102#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
110#[serde(rename_all = "snake_case")]
111pub struct License {
112 #[allow(dead_code)]
116 pub sub: String,
117
118 #[allow(dead_code)]
122 pub iss: Issuer,
123
124 pub tier: Tier,
126
127 #[serde(alias = "cpu_core_limit")]
132 pub rwu_limit: Option<NonZeroU64>,
133
134 pub exp: u64,
138}
139
140impl License {
141 pub fn cpu_core_limit(&self) -> Option<u64> {
143 self.rwu_limit.map(|limit| limit.get())
144 }
145
146 pub fn memory_limit(&self) -> Option<u64> {
148 const MEMORY_PER_RWU: u64 = 4 * 1024 * 1024 * 1024;
150
151 self.rwu_limit.map(
152 |limit| (limit.get() + 1) * MEMORY_PER_RWU - 1, )
154 }
155
156 pub fn check_cluster_resource(&self, resource: ClusterResource) -> Result<(), LicenseError> {
158 let ClusterResource {
159 total_memory_bytes,
160 total_cpu_cores,
161 } = resource;
162
163 if let Some(limit) = self.cpu_core_limit()
164 && total_cpu_cores > limit
165 {
166 return Err(LicenseError::CpuLimitExceeded {
167 limit,
168 actual: total_cpu_cores,
169 });
170 }
171
172 #[cfg(not(madsim))] if let Some(limit) = self.memory_limit()
174 && total_memory_bytes > limit
175 {
176 return Err(LicenseError::MemoryLimitExceeded {
177 limit,
178 actual: total_memory_bytes,
179 });
180 }
181
182 Ok(())
183 }
184}
185
186impl Default for License {
187 fn default() -> Self {
191 Self {
192 sub: "default".to_owned(),
193 tier: Tier::Free,
194 iss: Issuer::Prod,
195 rwu_limit: None,
196 exp: u64::MAX,
197 }
198 }
199}
200
201#[derive(Debug, Clone, Error)]
203pub enum LicenseError {
204 #[error("invalid license key")]
205 InvalidKey(#[source] jsonwebtoken::errors::Error),
206
207 #[error(
208 "a valid license key is set, but it is currently not effective because the CPU core in the cluster \
209 ({actual}) exceeds the maximum allowed by the license key ({limit}); \
210 consider removing some nodes or acquiring a new license key with a higher limit"
211 )]
212 CpuLimitExceeded { limit: u64, actual: u64 },
213
214 #[error(
215 "a valid license key is set, but it is currently not effective because the memory in the cluster \
216 ({actual}) exceeds the maximum allowed by the license key ({limit}); \
217 consider removing some nodes or acquiring a new license key with a higher limit",
218 actual = humansize::format_size(*actual, humansize::BINARY),
219 limit = humansize::format_size(*limit, humansize::BINARY),
220 )]
221 MemoryLimitExceeded { limit: u64, actual: u64 },
222}
223
224struct Inner {
225 license: Result<License, LicenseError>,
226 cached_cluster_resource: ClusterResource,
227 ignore_resource_limit: bool,
228}
229
230pub struct LicenseManager {
232 inner: RwLock<Inner>,
233}
234
235static PUBLIC_KEY: LazyLock<DecodingKey> = LazyLock::new(|| {
236 DecodingKey::from_rsa_pem(include_bytes!("key.pub"))
237 .expect("invalid public key for license validation")
238});
239
240impl LicenseManager {
241 pub(crate) fn new() -> Self {
243 Self {
244 inner: RwLock::new(Inner {
245 license: Ok(License::default()),
246 cached_cluster_resource: ClusterResource {
247 total_cpu_cores: 0,
248 total_memory_bytes: 0,
249 },
250 ignore_resource_limit: false,
251 }),
252 }
253 }
254
255 pub fn get() -> &'static Self {
257 static INSTANCE: LazyLock<LicenseManager> = LazyLock::new(LicenseManager::new);
258 &INSTANCE
259 }
260
261 pub fn refresh(&self, license_key: LicenseKeyRef<'_>) {
263 let license_key = license_key.0;
264 let mut inner = self.inner.write().unwrap();
265
266 if license_key.is_empty() {
268 inner.license = Ok(License::default());
269 return;
270 }
271
272 let mut validation = Validation::new(Algorithm::RS512);
274 validation.set_issuer(&[
277 "prod.risingwave.com",
278 #[cfg(debug_assertions)]
279 "test.risingwave.com",
280 ]);
281
282 inner.license = match jsonwebtoken::decode(license_key, &PUBLIC_KEY, &validation) {
283 Ok(data) => Ok(data.claims),
284 Err(error) => Err(LicenseError::InvalidKey(error)),
285 };
286
287 match &inner.license {
288 Ok(license) => tracing::info!(?license, "license refreshed"),
289 Err(error) => tracing::warn!(error = %error.as_report(), "invalid license key"),
290 }
291 }
292
293 pub fn update_cluster_resource(&self, resource: ClusterResource) {
295 let mut inner = self.inner.write().unwrap();
296 inner.cached_cluster_resource = resource;
297 }
298
299 pub fn set_ignore_resource_limit(&self, ignore: bool) {
301 let mut inner = self.inner.write().unwrap();
302 inner.ignore_resource_limit = ignore;
303 }
304
305 pub fn license(&self) -> Result<License, LicenseError> {
312 let inner = self.inner.read().unwrap();
313 let mut license = inner.license.clone()?;
314
315 if license.exp < jsonwebtoken::get_current_timestamp() {
317 return Err(LicenseError::InvalidKey(
318 jsonwebtoken::errors::ErrorKind::ExpiredSignature.into(),
319 ));
320 }
321
322 if !inner.ignore_resource_limit {
324 license.check_cluster_resource(inner.cached_cluster_resource)?;
325 } else {
326 license.rwu_limit = None;
328 }
329
330 Ok(license)
331 }
332}
333
334#[cfg(debug_assertions)]
336#[cfg(test)]
337mod tests {
338 use expect_test::expect;
339
340 use super::*;
341 use crate::{LicenseKey, PROD_ALL_4_CORE_LICENSE_KEY_CONTENT, TEST_ALL_LICENSE_KEY_CONTENT};
342
343 fn do_test(key: &str, expect: expect_test::Expect) -> LicenseManager {
344 let manager = LicenseManager::new();
345 manager.refresh(LicenseKey(key));
346
347 match manager.license() {
348 Ok(license) => expect.assert_debug_eq(&license),
349 Err(error) => expect.assert_eq(&error.to_report_string()),
350 }
351
352 manager
353 }
354
355 #[test]
356 fn test_all_license_key() {
357 do_test(
358 TEST_ALL_LICENSE_KEY_CONTENT,
359 expect![[r#"
360 License {
361 sub: "rw-test-all",
362 iss: Test,
363 tier: All,
364 rwu_limit: None,
365 exp: 10000627200,
366 }
367 "#]],
368 );
369 }
370
371 #[test]
372 fn test_prod_all_4_core_license_key() {
373 do_test(
374 PROD_ALL_4_CORE_LICENSE_KEY_CONTENT,
375 expect![[r#"
376 License {
377 sub: "rw-default-all-4-core",
378 iss: Prod,
379 tier: All,
380 rwu_limit: Some(
381 4,
382 ),
383 exp: 10000627200,
384 }
385 "#]],
386 );
387 }
388
389 #[test]
390 fn test_free_license_key() {
391 const KEY: &str = "eyJhbGciOiJSUzUxMiIsInR5cCI6IkpXVCJ9.\
392 eyJzdWIiOiJydy10ZXN0IiwidGllciI6ImZyZWUiLCJpc3MiOiJ0ZXN0LnJpc2luZ3dhdmUuY29tIiwiZXhwIjo5OTk5OTk5OTk5fQ.\
393 ALC3Kc9LI6u0S-jeMB1YTxg1k8Azxwvc750ihuSZgjA_e1OJC9moxMvpLrHdLZDzCXHjBYi0XJ_1lowmuO_0iPEuPqN5AFpDV1ywmzJvGmMCMtw3A2wuN7hhem9OsWbwe6lzdwrefZLipyo4GZtIkg5ZdwGuHzm33zsM-X5gl_Ns4P6axHKiorNSR6nTAyA6B32YVET_FAM2YJQrXqpwA61wn1XLfarZqpdIQyJ5cgyiC33BFBlUL3lcRXLMLeYe6TjYGeV4K63qARCjM9yeOlsRbbW5ViWeGtR2Yf18pN8ysPXdbaXm_P_IVhl3jCTDJt9ctPh6pUCbkt36FZqO9A";
394
395 do_test(
396 KEY,
397 expect![[r#"
398 License {
399 sub: "rw-test",
400 iss: Test,
401 tier: Free,
402 rwu_limit: None,
403 exp: 9999999999,
404 }
405 "#]],
406 );
407 }
408
409 #[test]
410 fn test_empty_license_key() {
411 do_test(
413 "",
414 expect![[r#"
415 License {
416 sub: "default",
417 iss: Prod,
418 tier: Free,
419 rwu_limit: None,
420 exp: 18446744073709551615,
421 }
422 "#]],
423 );
424 }
425
426 #[test]
427 fn test_custom_tier_license_key() {
428 const KEY: &str = "eyJhbGciOiJSUzUxMiIsInR5cCI6IkpXVCJ9.\
429 eyJzdWIiOiJydy11bml0LXRlc3QiLCJpc3MiOiJ0ZXN0LnJpc2luZ3dhdmUuY29tIiwiZXhwIjoxMDAwMDYyNzIwMCwiaWF0IjoxNzUxODY5OTI3LCJ0aWVyIjp7Im5hbWUiOiJzZWNyZXQtb25seSIsImZlYXR1cmVzIjpbIlNlY3JldE1hbmFnZW1lbnQiXX19.\
430 iZKNyAHxz1l24Qh4F9wauuQEtDPLAeOmW2m_ttlew2ENmP_XcGWCx_O8r50NXRDaKv66z-ibteWVL5XOUqaVgJw9EnCyfVuFNoKtFQu18Vt0l52Yw2zNh3iHNQFKwHuCUki5FlOHw-K57a5f414-gzfwAwQkO1bAYoyAFhtoX6QQ2jdbxctFg0NxTQqpjnP-h0k2myZ_IhxA8fKrQIAqBj5Y8tGljuxjTLJpoK7X0ESXNh7bhA8njEf2Hm4QCymI1uo8OYRaR1Siw87r0aZykw9wW15Q8VK58VxIQLS7b7gQOmDToBjJt9yIF3MT6YMfqMX_l3Dtn9bOS_htVd1bjQ";
431
432 let manager = do_test(
433 KEY,
434 expect![[r#"
435 License {
436 sub: "rw-unit-test",
437 iss: Test,
438 tier: Custom {
439 name: "secret-only",
440 features: [
441 Feature(
442 SecretManagement,
443 ),
444 ],
445 },
446 rwu_limit: None,
447 exp: 10000627200,
448 }
449 "#]],
450 );
451
452 let tier = manager.license().unwrap().tier;
453 assert_eq!(
454 tier.available_features().collect::<Vec<_>>(),
455 vec![Feature::SecretManagement]
456 );
457 assert!(
458 Feature::SecretManagement
459 .check_available_with(&manager)
460 .is_ok()
461 );
462 assert!(
463 Feature::BigQuerySink
464 .check_available_with(&manager)
465 .is_err()
466 );
467 }
468
469 #[test]
470 fn test_invalid_license_key() {
471 const KEY: &str = "invalid";
472
473 do_test(KEY, expect!["invalid license key: InvalidToken"]);
474 }
475
476 #[test]
477 fn test_expired_license_key() {
478 const KEY: &str = "eyJhbGciOiJSUzUxMiIsInR5cCI6IkpXVCJ9.\
480 eyJzdWIiOiJydy10ZXN0IiwidGllciI6InBhaWQiLCJpc3MiOiJ0ZXN0LnJpc2luZ3dhdmUuY29tIiwiZXhwIjowfQ.\
481 TyYmoT5Gw9-FN7DWDbeg3myW8g_3Xlc90i4M9bGuPf2WLv9zRMJy2r9J7sl1BO7t6F1uGgyrvNxsVRVZ2XF_WAs6uNlluYBnd4Cqvsj6Xny1XJCCo8II3RIea-ZlRjp6tc1saaoe-_eTtqDH8NIIWe73vVtBeBTBU4zAiN2vCtU_Si2XuoTLBKJMIjtn0HjLNhb6-DX2P3SCzp75tMyWzr49qcsBgratyKdu_v2kqBM1qw_dTaRg2ZeNNO6scSOBwu4YHHJTL4nUaZO2yEodI_OKUztIPLYuO2A33Fb5OE57S7LTgSzmxZLf7e23Vrck7Os14AfBQr7p9ncUeyIXhA";
482
483 do_test(KEY, expect!["invalid license key: ExpiredSignature"]);
484 }
485
486 #[test]
487 fn test_invalid_issuer() {
488 const KEY: &str = "eyJhbGciOiJSUzUxMiIsInR5cCI6IkpXVCJ9.\
490 eyJzdWIiOiJydy10ZXN0IiwidGllciI6ImZyZWUiLCJpc3MiOiJiYWQucmlzaW5nd2F2ZS5jb20iLCJleHAiOjk5OTk5OTk5OTl9.\
491 SUbDJTri902FbGgIoe5L3LG4edTXoR42BQCIu_KLyW41OK47bMnD2aK7JggyJmWyGtN7b_596hxM9HjU58oQtHePUo_zHi5li5IcRaMi8gqHae7CJGqOGAUo9vYOWCP5OjEuDfozJhpgcHBLzDRnSwYnWhLKtsrzb3UcpOXEqRVK7EDShBNx6kNqfYs2LlFI7ASsgFRLhoRuOTR5LeVDjj6NZfkZGsdMe1VyrODWoGT9kcAF--hBpUd1ZJ5mZ67A0_948VPFBYDbDPcTRnw1-5MvdibO-jKX49rJ0rlPXcAbqKPE_yYUaqUaORUzb3PaPgCT_quO9PWPuAFIgAb_fg";
492
493 do_test(KEY, expect!["invalid license key: InvalidIssuer"]);
494 }
495
496 #[test]
497 fn test_invalid_signature() {
498 const KEY: &str = "eyJhbGciOiJSUzUxMiIsInR5cCI6IkpXVCJ9.\
499 eyJzdWIiOiJydy10ZXN0IiwidGllciI6ImZyZWUiLCJpc3MiOiJ0ZXN0LnJpc2luZ3dhdmUuY29tIiwiZXhwIjo5OTk5OTk5OTk5fQ.\
500 InvalidSignatureoe5L3LG4edTXoR42BQCIu_KLyW41OK47bMnD2aK7JggyJmWyGtN7b_596hxM9HjU58oQtHePUo_zHi5li5IcRaMi8gqHae7CJGqOGAUo9vYOWCP5OjEuDfozJhpgcHBLzDRnSwYnWhLKtsrzb3UcpOXEqRVK7EDShBNx6kNqfYs2LlFI7ASsgFRLhoRuOTR5LeVDjj6NZfkZGsdMe1VyrODWoGT9kcAF--hBpUd1ZJ5mZ67A0_948VPFBYDbDPcTRnw1-5MvdibO-jKX49rJ0rlPXcAbqKPE_yYUaqUaORUzb3PaPgCT_quO9PWPuAFIgAb_fg";
501
502 do_test(KEY, expect!["invalid license key: InvalidSignature"]);
503 }
504}