1use serde::{Deserialize, Serialize};
16
17#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
18#[serde(rename_all = "kebab-case")]
19#[serde(deny_unknown_fields)]
20pub struct ComputeNodeConfig {
21 #[serde(rename = "use")]
22 phantom_use: Option<String>,
23 pub id: String,
24
25 pub address: String,
26 #[serde(with = "string")]
27 pub port: u16,
28 pub listen_address: String,
29 pub exporter_port: u16,
30 pub async_stack_trace: String,
31 pub enable_tiered_cache: bool,
32
33 pub provide_minio: Option<Vec<MinioConfig>>,
34 pub provide_meta_node: Option<Vec<MetaNodeConfig>>,
35 pub provide_compute_node: Option<Vec<ComputeNodeConfig>>,
36 pub provide_opendal: Option<Vec<OpendalConfig>>,
37 pub provide_aws_s3: Option<Vec<AwsS3Config>>,
38 pub provide_moat: Option<Vec<MoatConfig>>,
39 pub provide_tempo: Option<Vec<TempoConfig>>,
40 pub user_managed: bool,
41 pub resource_group: String,
42
43 pub total_memory_bytes: usize,
44 pub parallelism: usize,
45 pub role: String,
46}
47
48#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
49#[serde(rename_all = "kebab-case")]
50#[serde(deny_unknown_fields)]
51pub enum MetaBackend {
52 Memory,
53 Sqlite,
54 Postgres,
55 Mysql,
56 Env,
57}
58
59#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
60#[serde(rename_all = "kebab-case")]
61#[serde(deny_unknown_fields)]
62pub struct MetaNodeConfig {
63 #[serde(rename = "use")]
64 phantom_use: Option<String>,
65 pub id: String,
66
67 pub address: String,
68 #[serde(with = "string")]
69 pub port: u16,
70 pub listen_address: String,
71 pub dashboard_port: u16,
72 pub exporter_port: u16,
73
74 pub user_managed: bool,
75
76 pub meta_backend: MetaBackend,
77 pub provide_sqlite_backend: Option<Vec<SqliteConfig>>,
78 pub provide_postgres_backend: Option<Vec<PostgresConfig>>,
79 pub provide_mysql_backend: Option<Vec<MySqlConfig>>,
80 pub provide_prometheus: Option<Vec<PrometheusConfig>>,
81
82 pub provide_compute_node: Option<Vec<ComputeNodeConfig>>,
83 pub provide_compactor: Option<Vec<CompactorConfig>>,
84
85 pub provide_tempo: Option<Vec<TempoConfig>>,
86
87 pub provide_aws_s3: Option<Vec<AwsS3Config>>,
88 pub provide_minio: Option<Vec<MinioConfig>>,
89 pub provide_opendal: Option<Vec<OpendalConfig>>,
90 pub provide_moat: Option<Vec<MoatConfig>>,
91}
92
93#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
94#[serde(rename_all = "kebab-case")]
95#[serde(deny_unknown_fields)]
96pub struct FrontendConfig {
97 #[serde(rename = "use")]
98 phantom_use: Option<String>,
99 pub id: String,
100
101 pub address: String,
102 #[serde(with = "string")]
103 pub port: u16,
104 pub listen_address: String,
105 pub exporter_port: u16,
106 pub health_check_port: u16,
107
108 pub provide_meta_node: Option<Vec<MetaNodeConfig>>,
109 pub provide_tempo: Option<Vec<TempoConfig>>,
110 pub provide_prometheus: Option<Vec<PrometheusConfig>>,
111
112 pub user_managed: bool,
113}
114
115#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
116#[serde(rename_all = "kebab-case")]
117#[serde(deny_unknown_fields)]
118pub struct CompactorConfig {
119 #[serde(rename = "use")]
120 phantom_use: Option<String>,
121 pub id: String,
122
123 pub address: String,
124 #[serde(with = "string")]
125 pub port: u16,
126 pub listen_address: String,
127 pub exporter_port: u16,
128
129 pub provide_minio: Option<Vec<MinioConfig>>,
130
131 pub provide_meta_node: Option<Vec<MetaNodeConfig>>,
132 pub provide_tempo: Option<Vec<TempoConfig>>,
133
134 pub user_managed: bool,
135 pub compaction_worker_threads_number: Option<usize>,
136
137 pub compactor_mode: String,
138}
139
140#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
141#[serde(rename_all = "kebab-case")]
142#[serde(deny_unknown_fields)]
143pub struct MinioConfig {
144 #[serde(rename = "use")]
145 phantom_use: Option<String>,
146 pub id: String,
147
148 pub address: String,
149 #[serde(with = "string")]
150 pub port: u16,
151 pub listen_address: String,
152
153 pub console_address: String,
154 #[serde(with = "string")]
155 pub console_port: u16,
156
157 pub root_user: String,
158 pub root_password: String,
159 pub hummock_bucket: String,
160
161 pub provide_prometheus: Option<Vec<PrometheusConfig>>,
162
163 pub api_requests_max: usize,
165 pub api_requests_deadline: String,
166}
167
168#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
169#[serde(rename_all = "kebab-case")]
170#[serde(deny_unknown_fields)]
171pub struct SqliteConfig {
172 #[serde(rename = "use")]
173 phantom_use: Option<String>,
174 pub id: String,
175
176 pub file: String,
177}
178
179#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
180#[serde(rename_all = "kebab-case")]
181#[serde(deny_unknown_fields)]
182pub struct PrometheusConfig {
183 #[serde(rename = "use")]
184 phantom_use: Option<String>,
185 pub id: String,
186
187 pub address: String,
188 #[serde(with = "string")]
189 pub port: u16,
190 pub listen_address: String,
191
192 pub remote_write: bool,
193 pub remote_write_region: String,
194 pub remote_write_url: String,
195
196 pub scrape_interval: String,
197
198 pub provide_compute_node: Option<Vec<ComputeNodeConfig>>,
199 pub provide_meta_node: Option<Vec<MetaNodeConfig>>,
200 pub provide_minio: Option<Vec<MinioConfig>>,
201 pub provide_compactor: Option<Vec<CompactorConfig>>,
202 pub provide_frontend: Option<Vec<FrontendConfig>>,
203}
204
205#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
206#[serde(rename_all = "kebab-case")]
207#[serde(deny_unknown_fields)]
208pub struct GrafanaConfig {
209 #[serde(rename = "use")]
210 phantom_use: Option<String>,
211 pub id: String,
212 pub address: String,
213 pub listen_address: String,
214 pub port: u16,
215
216 pub provide_prometheus: Option<Vec<PrometheusConfig>>,
217 pub provide_tempo: Option<Vec<TempoConfig>>,
218}
219
220#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
221#[serde(rename_all = "kebab-case")]
222#[serde(deny_unknown_fields)]
223pub struct TempoConfig {
224 #[serde(rename = "use")]
225 phantom_use: Option<String>,
226 pub id: String,
227
228 pub listen_address: String,
229 pub address: String,
230 pub port: u16,
231 pub otlp_port: u16,
232 pub max_bytes_per_trace: usize,
233}
234
235#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
236#[serde(rename_all = "kebab-case")]
237#[serde(deny_unknown_fields)]
238pub struct AwsS3Config {
239 #[serde(rename = "use")]
240 phantom_use: Option<String>,
241 pub id: String,
242 pub bucket: String,
243}
244
245#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
246#[serde(rename_all = "kebab-case")]
247#[serde(deny_unknown_fields)]
248pub struct OpendalConfig {
249 #[serde(rename = "use")]
250 phantom_use: Option<String>,
251
252 pub id: String,
253 pub engine: String,
254 pub namenode: String,
255 pub bucket: String,
256}
257
258#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
259#[serde(rename_all = "kebab-case")]
260#[serde(deny_unknown_fields)]
261pub struct KafkaConfig {
262 #[serde(rename = "use")]
263 phantom_use: Option<String>,
264 pub id: String,
265
266 pub address: String,
268 #[serde(with = "string")]
269 pub port: u16,
270 pub docker_port: u16,
273
274 #[serde(with = "string")]
275 pub controller_port: u16,
276
277 pub image: String,
278 pub persist_data: bool,
279 pub node_id: u32,
280
281 pub user_managed: bool,
282}
283
284#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
285#[serde(rename_all = "kebab-case")]
286#[serde(deny_unknown_fields)]
287pub struct SchemaRegistryConfig {
288 #[serde(rename = "use")]
289 phantom_use: Option<String>,
290
291 pub id: String,
292
293 pub address: String,
294 #[serde(with = "string")]
295 pub port: u16,
296
297 pub provide_kafka: Option<Vec<KafkaConfig>>,
298
299 pub image: String,
300 pub user_managed: bool,
303}
304
305#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
306#[serde(rename_all = "kebab-case")]
307#[serde(deny_unknown_fields)]
308pub struct PubsubConfig {
309 #[serde(rename = "use")]
310 phantom_use: Option<String>,
311 pub id: String,
312 #[serde(with = "string")]
313 pub port: u16,
314 pub address: String,
315
316 pub persist_data: bool,
317}
318
319#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
320#[serde(rename_all = "kebab-case")]
321#[serde(deny_unknown_fields)]
322pub struct PulsarConfig {
323 #[serde(rename = "use")]
324 phantom_use: Option<String>,
325 pub id: String,
326
327 pub address: String,
328 pub broker_port: u16,
329 pub http_port: u16,
330
331 pub user_managed: bool,
332 pub image: String,
333 pub persist_data: bool,
334}
335
336#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
337#[serde(rename_all = "kebab-case")]
338#[serde(deny_unknown_fields)]
339pub struct RedisConfig {
340 #[serde(rename = "use")]
341 phantom_use: Option<String>,
342 pub id: String,
343
344 pub port: u16,
345 pub address: String,
346}
347
348#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
349#[serde(rename_all = "kebab-case")]
350#[serde(deny_unknown_fields)]
351pub enum Application {
352 Metastore,
353 Connector,
354}
355
356#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
357#[serde(rename_all = "kebab-case")]
358#[serde(deny_unknown_fields)]
359pub struct MySqlConfig {
360 #[serde(rename = "use")]
361 phantom_use: Option<String>,
362 pub id: String,
363
364 pub port: u16,
365 pub address: String,
366
367 pub user: String,
368 pub password: String,
369 pub database: String,
370
371 pub application: Application,
372 pub image: String,
373 pub user_managed: bool,
374 pub persist_data: bool,
375}
376
377#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
378#[serde(rename_all = "kebab-case")]
379#[serde(deny_unknown_fields)]
380pub struct PostgresConfig {
381 #[serde(rename = "use")]
382 phantom_use: Option<String>,
383 pub id: String,
384
385 pub port: u16,
386 pub address: String,
387
388 pub user: String,
389 pub password: String,
390 pub database: String,
391
392 pub application: Application,
393 pub image: String,
394 pub user_managed: bool,
395 pub persist_data: bool,
396
397 pub latency_ms: u32,
399}
400
401#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
402#[serde(rename_all = "kebab-case")]
403#[serde(deny_unknown_fields)]
404pub struct SqlServerConfig {
405 #[serde(rename = "use")]
406 phantom_use: Option<String>,
407 pub id: String,
408
409 pub port: u16,
410 pub address: String,
411
412 pub user: String,
413 pub password: String,
414 pub database: String,
415
416 pub image: String,
417 pub user_managed: bool,
418 pub persist_data: bool,
419}
420
421#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
422#[serde(rename_all = "kebab-case")]
423#[serde(deny_unknown_fields)]
424pub struct LakekeeperConfig {
425 #[serde(rename = "use")]
426 phantom_use: Option<String>,
427 pub id: String,
428
429 pub port: u16,
430 pub address: String,
431
432 pub user_managed: bool,
433 pub persist_data: bool,
434
435 pub catalog_backend: String,
436 pub encryption_key: String,
437 pub provide_postgres_backend: Option<Vec<PostgresConfig>>,
438 pub provide_minio: Option<Vec<MinioConfig>>,
439}
440
441#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
442#[serde(rename_all = "kebab-case")]
443#[serde(deny_unknown_fields)]
444pub struct MoatConfig {
445 #[serde(rename = "use")]
446 phantom_use: Option<String>,
447 pub id: String,
448
449 pub address: String,
450 pub port: u16,
451
452 pub provide_minio: Option<Vec<MinioConfig>>,
453}
454
455#[derive(Clone, Debug, PartialEq)]
457pub enum ServiceConfig {
458 ComputeNode(ComputeNodeConfig),
459 MetaNode(MetaNodeConfig),
460 Frontend(FrontendConfig),
461 Compactor(CompactorConfig),
462 Minio(MinioConfig),
463 Sqlite(SqliteConfig),
464 Prometheus(PrometheusConfig),
465 Grafana(GrafanaConfig),
466 Tempo(TempoConfig),
467 Opendal(OpendalConfig),
468 AwsS3(AwsS3Config),
469 Kafka(KafkaConfig),
470 SchemaRegistry(SchemaRegistryConfig),
471 Pubsub(PubsubConfig),
472 Pulsar(PulsarConfig),
473 Redis(RedisConfig),
474 MySql(MySqlConfig),
475 Postgres(PostgresConfig),
476 SqlServer(SqlServerConfig),
477 Lakekeeper(LakekeeperConfig),
478 Moat(MoatConfig),
479}
480
481#[derive(PartialEq, Eq, Hash, Debug)]
482pub enum TaskGroup {
483 RisingWave,
484 Observability,
485 Kafka,
486 Pubsub,
487 Pulsar,
488 MySql,
489 Postgres,
490 SqlServer,
491 Redis,
492 Lakekeeper,
493 Moat,
494}
495
496impl ServiceConfig {
497 pub fn id(&self) -> &str {
498 match self {
499 Self::ComputeNode(c) => &c.id,
500 Self::MetaNode(c) => &c.id,
501 Self::Frontend(c) => &c.id,
502 Self::Compactor(c) => &c.id,
503 Self::Minio(c) => &c.id,
504 Self::Sqlite(c) => &c.id,
505 Self::Prometheus(c) => &c.id,
506 Self::Grafana(c) => &c.id,
507 Self::Tempo(c) => &c.id,
508 Self::AwsS3(c) => &c.id,
509 Self::Kafka(c) => &c.id,
510 Self::Pubsub(c) => &c.id,
511 Self::Pulsar(c) => &c.id,
512 Self::Redis(c) => &c.id,
513 Self::Opendal(c) => &c.id,
514 Self::MySql(c) => &c.id,
515 Self::Postgres(c) => &c.id,
516 Self::SqlServer(c) => &c.id,
517 Self::SchemaRegistry(c) => &c.id,
518 Self::Lakekeeper(c) => &c.id,
519 Self::Moat(c) => &c.id,
520 }
521 }
522
523 pub fn port(&self) -> Option<u16> {
525 match self {
526 Self::ComputeNode(c) => Some(c.port),
527 Self::MetaNode(c) => Some(c.port),
528 Self::Frontend(c) => Some(c.port),
529 Self::Compactor(c) => Some(c.port),
530 Self::Minio(c) => Some(c.port),
531 Self::Sqlite(_) => None,
532 Self::Prometheus(c) => Some(c.port),
533 Self::Grafana(c) => Some(c.port),
534 Self::Tempo(c) => Some(c.port),
535 Self::AwsS3(_) => None,
536 Self::Kafka(c) => Some(c.port),
537 Self::Pubsub(c) => Some(c.port),
538 Self::Pulsar(c) => Some(c.http_port),
539 Self::Redis(c) => Some(c.port),
540 Self::Opendal(_) => None,
541 Self::MySql(c) => Some(c.port),
542 Self::Postgres(c) => Some(c.port),
543 Self::SqlServer(c) => Some(c.port),
544 Self::SchemaRegistry(c) => Some(c.port),
545 Self::Lakekeeper(c) => Some(c.port),
546 Self::Moat(c) => Some(c.port),
547 }
548 }
549
550 pub fn user_managed(&self) -> bool {
551 match self {
552 Self::ComputeNode(c) => c.user_managed,
553 Self::MetaNode(c) => c.user_managed,
554 Self::Frontend(c) => c.user_managed,
555 Self::Compactor(c) => c.user_managed,
556 Self::Minio(_c) => false,
557 Self::Sqlite(_c) => false,
558 Self::Prometheus(_c) => false,
559 Self::Grafana(_c) => false,
560 Self::Tempo(_c) => false,
561 Self::AwsS3(_c) => false,
562 Self::Kafka(c) => c.user_managed,
563 Self::Pubsub(_c) => false,
564 Self::Pulsar(c) => c.user_managed,
565 Self::Redis(_c) => false,
566 Self::Opendal(_c) => false,
567 Self::MySql(c) => c.user_managed,
568 Self::Postgres(c) => c.user_managed,
569 Self::SqlServer(c) => c.user_managed,
570 Self::SchemaRegistry(c) => c.user_managed,
571 Self::Lakekeeper(c) => c.user_managed,
572 Self::Moat(_c) => false,
573 }
574 }
575
576 pub fn task_group(&self) -> TaskGroup {
577 use TaskGroup::*;
578 match self {
579 ServiceConfig::ComputeNode(_)
580 | ServiceConfig::MetaNode(_)
581 | ServiceConfig::Frontend(_)
582 | ServiceConfig::Compactor(_)
583 | ServiceConfig::Minio(_)
584 | ServiceConfig::Sqlite(_) => RisingWave,
585 ServiceConfig::Prometheus(_) | ServiceConfig::Grafana(_) | ServiceConfig::Tempo(_) => {
586 Observability
587 }
588 ServiceConfig::Opendal(_) | ServiceConfig::AwsS3(_) => RisingWave,
589 ServiceConfig::Kafka(_) | ServiceConfig::SchemaRegistry(_) => Kafka,
590 ServiceConfig::Pubsub(_) => Pubsub,
591 ServiceConfig::Pulsar(_) => Pulsar,
592 ServiceConfig::Redis(_) => Redis,
593 ServiceConfig::MySql(my_sql_config) => {
594 if matches!(my_sql_config.application, Application::Metastore) {
595 RisingWave
596 } else {
597 MySql
598 }
599 }
600 ServiceConfig::Postgres(postgres_config) => {
601 if matches!(postgres_config.application, Application::Metastore) {
602 RisingWave
603 } else {
604 Postgres
605 }
606 }
607 ServiceConfig::SqlServer(_) => SqlServer,
608 ServiceConfig::Lakekeeper(_) => Lakekeeper,
609 ServiceConfig::Moat(_) => Moat,
610 }
611 }
612}
613
614mod string {
615 use std::fmt::Display;
616 use std::str::FromStr;
617
618 use serde::{Deserialize, Deserializer, Serializer, de};
619
620 pub fn serialize<T, S>(value: &T, serializer: S) -> Result<S::Ok, S::Error>
621 where
622 T: Display,
623 S: Serializer,
624 {
625 serializer.collect_str(value)
626 }
627
628 pub fn deserialize<'de, T, D>(deserializer: D) -> Result<T, D::Error>
629 where
630 T: FromStr,
631 T::Err: Display,
632 D: Deserializer<'de>,
633 {
634 String::deserialize(deserializer)?
635 .parse()
636 .map_err(de::Error::custom)
637 }
638}