1use serde::{Deserialize, Serialize};
16
17#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
18#[serde(rename_all = "kebab-case")]
19#[serde(deny_unknown_fields)]
20pub struct ComputeNodeConfig {
21 #[serde(rename = "use")]
22 phantom_use: Option<String>,
23 pub id: String,
24
25 pub address: String,
26 #[serde(with = "string")]
27 pub port: u16,
28 pub listen_address: String,
29 pub exporter_port: u16,
30 pub async_stack_trace: String,
31 pub enable_tiered_cache: bool,
32
33 pub provide_minio: Option<Vec<MinioConfig>>,
34 pub provide_meta_node: Option<Vec<MetaNodeConfig>>,
35 pub provide_compute_node: Option<Vec<ComputeNodeConfig>>,
36 pub provide_opendal: Option<Vec<OpendalConfig>>,
37 pub provide_aws_s3: Option<Vec<AwsS3Config>>,
38 pub provide_moat: Option<Vec<MoatConfig>>,
39 pub provide_tempo: Option<Vec<TempoConfig>>,
40 pub user_managed: bool,
41 pub resource_group: String,
42
43 pub total_memory_bytes: usize,
44 pub parallelism: usize,
45 pub role: String,
46}
47
48#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
49#[serde(rename_all = "kebab-case")]
50#[serde(deny_unknown_fields)]
51pub enum MetaBackend {
52 Memory,
53 Sqlite,
54 Postgres,
55 Mysql,
56 Env,
57}
58
59#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
60#[serde(rename_all = "kebab-case")]
61#[serde(deny_unknown_fields)]
62pub struct MetaNodeConfig {
63 #[serde(rename = "use")]
64 phantom_use: Option<String>,
65 pub id: String,
66
67 pub address: String,
68 #[serde(with = "string")]
69 pub port: u16,
70 pub listen_address: String,
71 pub dashboard_port: u16,
72 pub exporter_port: u16,
73
74 pub user_managed: bool,
75
76 pub meta_backend: MetaBackend,
77 pub provide_sqlite_backend: Option<Vec<SqliteConfig>>,
78 pub provide_postgres_backend: Option<Vec<PostgresConfig>>,
79 pub provide_mysql_backend: Option<Vec<MySqlConfig>>,
80 pub provide_prometheus: Option<Vec<PrometheusConfig>>,
81
82 pub provide_compute_node: Option<Vec<ComputeNodeConfig>>,
83 pub provide_compactor: Option<Vec<CompactorConfig>>,
84
85 pub provide_tempo: Option<Vec<TempoConfig>>,
86
87 pub provide_aws_s3: Option<Vec<AwsS3Config>>,
88 pub provide_minio: Option<Vec<MinioConfig>>,
89 pub provide_opendal: Option<Vec<OpendalConfig>>,
90 pub provide_moat: Option<Vec<MoatConfig>>,
91}
92
93#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
94#[serde(rename_all = "kebab-case")]
95#[serde(deny_unknown_fields)]
96pub struct FrontendConfig {
97 #[serde(rename = "use")]
98 phantom_use: Option<String>,
99 pub id: String,
100
101 pub address: String,
102 #[serde(with = "string")]
103 pub port: u16,
104 pub listen_address: String,
105 pub exporter_port: u16,
106 pub health_check_port: u16,
107
108 pub provide_meta_node: Option<Vec<MetaNodeConfig>>,
109 pub provide_tempo: Option<Vec<TempoConfig>>,
110
111 pub user_managed: bool,
112}
113
114#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
115#[serde(rename_all = "kebab-case")]
116#[serde(deny_unknown_fields)]
117pub struct CompactorConfig {
118 #[serde(rename = "use")]
119 phantom_use: Option<String>,
120 pub id: String,
121
122 pub address: String,
123 #[serde(with = "string")]
124 pub port: u16,
125 pub listen_address: String,
126 pub exporter_port: u16,
127
128 pub provide_minio: Option<Vec<MinioConfig>>,
129
130 pub provide_meta_node: Option<Vec<MetaNodeConfig>>,
131 pub provide_tempo: Option<Vec<TempoConfig>>,
132
133 pub user_managed: bool,
134 pub compaction_worker_threads_number: Option<usize>,
135
136 pub compactor_mode: String,
137}
138
139#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
140#[serde(rename_all = "kebab-case")]
141#[serde(deny_unknown_fields)]
142pub struct MinioConfig {
143 #[serde(rename = "use")]
144 phantom_use: Option<String>,
145 pub id: String,
146
147 pub address: String,
148 #[serde(with = "string")]
149 pub port: u16,
150 pub listen_address: String,
151
152 pub console_address: String,
153 #[serde(with = "string")]
154 pub console_port: u16,
155
156 pub root_user: String,
157 pub root_password: String,
158 pub hummock_bucket: String,
159
160 pub provide_prometheus: Option<Vec<PrometheusConfig>>,
161
162 pub api_requests_max: usize,
164 pub api_requests_deadline: String,
165}
166
167#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
168#[serde(rename_all = "kebab-case")]
169#[serde(deny_unknown_fields)]
170pub struct SqliteConfig {
171 #[serde(rename = "use")]
172 phantom_use: Option<String>,
173 pub id: String,
174
175 pub file: String,
176}
177
178#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
179#[serde(rename_all = "kebab-case")]
180#[serde(deny_unknown_fields)]
181pub struct PrometheusConfig {
182 #[serde(rename = "use")]
183 phantom_use: Option<String>,
184 pub id: String,
185
186 pub address: String,
187 #[serde(with = "string")]
188 pub port: u16,
189 pub listen_address: String,
190
191 pub remote_write: bool,
192 pub remote_write_region: String,
193 pub remote_write_url: String,
194
195 pub scrape_interval: String,
196
197 pub provide_compute_node: Option<Vec<ComputeNodeConfig>>,
198 pub provide_meta_node: Option<Vec<MetaNodeConfig>>,
199 pub provide_minio: Option<Vec<MinioConfig>>,
200 pub provide_compactor: Option<Vec<CompactorConfig>>,
201 pub provide_frontend: Option<Vec<FrontendConfig>>,
202}
203
204#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
205#[serde(rename_all = "kebab-case")]
206#[serde(deny_unknown_fields)]
207pub struct GrafanaConfig {
208 #[serde(rename = "use")]
209 phantom_use: Option<String>,
210 pub id: String,
211 pub address: String,
212 pub listen_address: String,
213 pub port: u16,
214
215 pub provide_prometheus: Option<Vec<PrometheusConfig>>,
216 pub provide_tempo: Option<Vec<TempoConfig>>,
217}
218
219#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
220#[serde(rename_all = "kebab-case")]
221#[serde(deny_unknown_fields)]
222pub struct TempoConfig {
223 #[serde(rename = "use")]
224 phantom_use: Option<String>,
225 pub id: String,
226
227 pub listen_address: String,
228 pub address: String,
229 pub port: u16,
230 pub otlp_port: u16,
231 pub max_bytes_per_trace: usize,
232}
233
234#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
235#[serde(rename_all = "kebab-case")]
236#[serde(deny_unknown_fields)]
237pub struct AwsS3Config {
238 #[serde(rename = "use")]
239 phantom_use: Option<String>,
240 pub id: String,
241 pub bucket: String,
242}
243
244#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
245#[serde(rename_all = "kebab-case")]
246#[serde(deny_unknown_fields)]
247pub struct OpendalConfig {
248 #[serde(rename = "use")]
249 phantom_use: Option<String>,
250
251 pub id: String,
252 pub engine: String,
253 pub namenode: String,
254 pub bucket: String,
255}
256
257#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
258#[serde(rename_all = "kebab-case")]
259#[serde(deny_unknown_fields)]
260pub struct KafkaConfig {
261 #[serde(rename = "use")]
262 phantom_use: Option<String>,
263 pub id: String,
264
265 pub address: String,
267 #[serde(with = "string")]
268 pub port: u16,
269 pub docker_port: u16,
272
273 #[serde(with = "string")]
274 pub controller_port: u16,
275
276 pub image: String,
277 pub persist_data: bool,
278 pub node_id: u32,
279
280 pub user_managed: bool,
281}
282
283#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
284#[serde(rename_all = "kebab-case")]
285#[serde(deny_unknown_fields)]
286pub struct SchemaRegistryConfig {
287 #[serde(rename = "use")]
288 phantom_use: Option<String>,
289
290 pub id: String,
291
292 pub address: String,
293 #[serde(with = "string")]
294 pub port: u16,
295
296 pub provide_kafka: Option<Vec<KafkaConfig>>,
297
298 pub image: String,
299 pub user_managed: bool,
302}
303
304#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
305#[serde(rename_all = "kebab-case")]
306#[serde(deny_unknown_fields)]
307pub struct PubsubConfig {
308 #[serde(rename = "use")]
309 phantom_use: Option<String>,
310 pub id: String,
311 #[serde(with = "string")]
312 pub port: u16,
313 pub address: String,
314
315 pub persist_data: bool,
316}
317
318#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
319#[serde(rename_all = "kebab-case")]
320#[serde(deny_unknown_fields)]
321pub struct PulsarConfig {
322 #[serde(rename = "use")]
323 phantom_use: Option<String>,
324 pub id: String,
325
326 pub address: String,
327 pub broker_port: u16,
328 pub http_port: u16,
329
330 pub user_managed: bool,
331 pub image: String,
332 pub persist_data: bool,
333}
334
335#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
336#[serde(rename_all = "kebab-case")]
337#[serde(deny_unknown_fields)]
338pub struct RedisConfig {
339 #[serde(rename = "use")]
340 phantom_use: Option<String>,
341 pub id: String,
342
343 pub port: u16,
344 pub address: String,
345}
346
347#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
348#[serde(rename_all = "kebab-case")]
349#[serde(deny_unknown_fields)]
350pub enum Application {
351 Metastore,
352 Connector,
353}
354
355#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
356#[serde(rename_all = "kebab-case")]
357#[serde(deny_unknown_fields)]
358pub struct MySqlConfig {
359 #[serde(rename = "use")]
360 phantom_use: Option<String>,
361 pub id: String,
362
363 pub port: u16,
364 pub address: String,
365
366 pub user: String,
367 pub password: String,
368 pub database: String,
369
370 pub application: Application,
371 pub image: String,
372 pub user_managed: bool,
373 pub persist_data: bool,
374}
375
376#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
377#[serde(rename_all = "kebab-case")]
378#[serde(deny_unknown_fields)]
379pub struct PostgresConfig {
380 #[serde(rename = "use")]
381 phantom_use: Option<String>,
382 pub id: String,
383
384 pub port: u16,
385 pub address: String,
386
387 pub user: String,
388 pub password: String,
389 pub database: String,
390
391 pub application: Application,
392 pub image: String,
393 pub user_managed: bool,
394 pub persist_data: bool,
395
396 pub latency_ms: u32,
398}
399
400#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
401#[serde(rename_all = "kebab-case")]
402#[serde(deny_unknown_fields)]
403pub struct SqlServerConfig {
404 #[serde(rename = "use")]
405 phantom_use: Option<String>,
406 pub id: String,
407
408 pub port: u16,
409 pub address: String,
410
411 pub user: String,
412 pub password: String,
413 pub database: String,
414
415 pub image: String,
416 pub user_managed: bool,
417 pub persist_data: bool,
418}
419
420#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
421#[serde(rename_all = "kebab-case")]
422#[serde(deny_unknown_fields)]
423pub struct LakekeeperConfig {
424 #[serde(rename = "use")]
425 phantom_use: Option<String>,
426 pub id: String,
427
428 pub port: u16,
429 pub address: String,
430
431 pub user_managed: bool,
432 pub persist_data: bool,
433
434 pub catalog_backend: String,
435 pub encryption_key: String,
436 pub provide_postgres_backend: Option<Vec<PostgresConfig>>,
437 pub provide_minio: Option<Vec<MinioConfig>>,
438}
439
440#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
441#[serde(rename_all = "kebab-case")]
442#[serde(deny_unknown_fields)]
443pub struct MoatConfig {
444 #[serde(rename = "use")]
445 phantom_use: Option<String>,
446 pub id: String,
447
448 pub address: String,
449 pub port: u16,
450
451 pub provide_minio: Option<Vec<MinioConfig>>,
452}
453
454#[derive(Clone, Debug, PartialEq)]
456pub enum ServiceConfig {
457 ComputeNode(ComputeNodeConfig),
458 MetaNode(MetaNodeConfig),
459 Frontend(FrontendConfig),
460 Compactor(CompactorConfig),
461 Minio(MinioConfig),
462 Sqlite(SqliteConfig),
463 Prometheus(PrometheusConfig),
464 Grafana(GrafanaConfig),
465 Tempo(TempoConfig),
466 Opendal(OpendalConfig),
467 AwsS3(AwsS3Config),
468 Kafka(KafkaConfig),
469 SchemaRegistry(SchemaRegistryConfig),
470 Pubsub(PubsubConfig),
471 Pulsar(PulsarConfig),
472 Redis(RedisConfig),
473 MySql(MySqlConfig),
474 Postgres(PostgresConfig),
475 SqlServer(SqlServerConfig),
476 Lakekeeper(LakekeeperConfig),
477 Moat(MoatConfig),
478}
479
480#[derive(PartialEq, Eq, Hash, Debug)]
481pub enum TaskGroup {
482 RisingWave,
483 Observability,
484 Kafka,
485 Pubsub,
486 Pulsar,
487 MySql,
488 Postgres,
489 SqlServer,
490 Redis,
491 Lakekeeper,
492 Moat,
493}
494
495impl ServiceConfig {
496 pub fn id(&self) -> &str {
497 match self {
498 Self::ComputeNode(c) => &c.id,
499 Self::MetaNode(c) => &c.id,
500 Self::Frontend(c) => &c.id,
501 Self::Compactor(c) => &c.id,
502 Self::Minio(c) => &c.id,
503 Self::Sqlite(c) => &c.id,
504 Self::Prometheus(c) => &c.id,
505 Self::Grafana(c) => &c.id,
506 Self::Tempo(c) => &c.id,
507 Self::AwsS3(c) => &c.id,
508 Self::Kafka(c) => &c.id,
509 Self::Pubsub(c) => &c.id,
510 Self::Pulsar(c) => &c.id,
511 Self::Redis(c) => &c.id,
512 Self::Opendal(c) => &c.id,
513 Self::MySql(c) => &c.id,
514 Self::Postgres(c) => &c.id,
515 Self::SqlServer(c) => &c.id,
516 Self::SchemaRegistry(c) => &c.id,
517 Self::Lakekeeper(c) => &c.id,
518 Self::Moat(c) => &c.id,
519 }
520 }
521
522 pub fn port(&self) -> Option<u16> {
524 match self {
525 Self::ComputeNode(c) => Some(c.port),
526 Self::MetaNode(c) => Some(c.port),
527 Self::Frontend(c) => Some(c.port),
528 Self::Compactor(c) => Some(c.port),
529 Self::Minio(c) => Some(c.port),
530 Self::Sqlite(_) => None,
531 Self::Prometheus(c) => Some(c.port),
532 Self::Grafana(c) => Some(c.port),
533 Self::Tempo(c) => Some(c.port),
534 Self::AwsS3(_) => None,
535 Self::Kafka(c) => Some(c.port),
536 Self::Pubsub(c) => Some(c.port),
537 Self::Pulsar(c) => Some(c.http_port),
538 Self::Redis(c) => Some(c.port),
539 Self::Opendal(_) => None,
540 Self::MySql(c) => Some(c.port),
541 Self::Postgres(c) => Some(c.port),
542 Self::SqlServer(c) => Some(c.port),
543 Self::SchemaRegistry(c) => Some(c.port),
544 Self::Lakekeeper(c) => Some(c.port),
545 Self::Moat(c) => Some(c.port),
546 }
547 }
548
549 pub fn user_managed(&self) -> bool {
550 match self {
551 Self::ComputeNode(c) => c.user_managed,
552 Self::MetaNode(c) => c.user_managed,
553 Self::Frontend(c) => c.user_managed,
554 Self::Compactor(c) => c.user_managed,
555 Self::Minio(_c) => false,
556 Self::Sqlite(_c) => false,
557 Self::Prometheus(_c) => false,
558 Self::Grafana(_c) => false,
559 Self::Tempo(_c) => false,
560 Self::AwsS3(_c) => false,
561 Self::Kafka(c) => c.user_managed,
562 Self::Pubsub(_c) => false,
563 Self::Pulsar(c) => c.user_managed,
564 Self::Redis(_c) => false,
565 Self::Opendal(_c) => false,
566 Self::MySql(c) => c.user_managed,
567 Self::Postgres(c) => c.user_managed,
568 Self::SqlServer(c) => c.user_managed,
569 Self::SchemaRegistry(c) => c.user_managed,
570 Self::Lakekeeper(c) => c.user_managed,
571 Self::Moat(_c) => false,
572 }
573 }
574
575 pub fn task_group(&self) -> TaskGroup {
576 use TaskGroup::*;
577 match self {
578 ServiceConfig::ComputeNode(_)
579 | ServiceConfig::MetaNode(_)
580 | ServiceConfig::Frontend(_)
581 | ServiceConfig::Compactor(_)
582 | ServiceConfig::Minio(_)
583 | ServiceConfig::Sqlite(_) => RisingWave,
584 ServiceConfig::Prometheus(_) | ServiceConfig::Grafana(_) | ServiceConfig::Tempo(_) => {
585 Observability
586 }
587 ServiceConfig::Opendal(_) | ServiceConfig::AwsS3(_) => RisingWave,
588 ServiceConfig::Kafka(_) | ServiceConfig::SchemaRegistry(_) => Kafka,
589 ServiceConfig::Pubsub(_) => Pubsub,
590 ServiceConfig::Pulsar(_) => Pulsar,
591 ServiceConfig::Redis(_) => Redis,
592 ServiceConfig::MySql(my_sql_config) => {
593 if matches!(my_sql_config.application, Application::Metastore) {
594 RisingWave
595 } else {
596 MySql
597 }
598 }
599 ServiceConfig::Postgres(postgres_config) => {
600 if matches!(postgres_config.application, Application::Metastore) {
601 RisingWave
602 } else {
603 Postgres
604 }
605 }
606 ServiceConfig::SqlServer(_) => SqlServer,
607 ServiceConfig::Lakekeeper(_) => Lakekeeper,
608 ServiceConfig::Moat(_) => Moat,
609 }
610 }
611}
612
613mod string {
614 use std::fmt::Display;
615 use std::str::FromStr;
616
617 use serde::{Deserialize, Deserializer, Serializer, de};
618
619 pub fn serialize<T, S>(value: &T, serializer: S) -> Result<S::Ok, S::Error>
620 where
621 T: Display,
622 S: Serializer,
623 {
624 serializer.collect_str(value)
625 }
626
627 pub fn deserialize<'de, T, D>(deserializer: D) -> Result<T, D::Error>
628 where
629 T: FromStr,
630 T::Err: Display,
631 D: Deserializer<'de>,
632 {
633 String::deserialize(deserializer)?
634 .parse()
635 .map_err(de::Error::custom)
636 }
637}