1use serde::{Deserialize, Serialize};
16
17#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
18#[serde(rename_all = "kebab-case")]
19#[serde(deny_unknown_fields)]
20pub struct ComputeNodeConfig {
21 #[serde(rename = "use")]
22 phantom_use: Option<String>,
23 pub id: String,
24
25 pub address: String,
26 #[serde(with = "string")]
27 pub port: u16,
28 pub listen_address: String,
29 pub exporter_port: u16,
30 pub async_stack_trace: String,
31 pub enable_tiered_cache: bool,
32
33 pub provide_minio: Option<Vec<MinioConfig>>,
34 pub provide_meta_node: Option<Vec<MetaNodeConfig>>,
35 pub provide_compute_node: Option<Vec<ComputeNodeConfig>>,
36 pub provide_opendal: Option<Vec<OpendalConfig>>,
37 pub provide_aws_s3: Option<Vec<AwsS3Config>>,
38 pub provide_moat: Option<Vec<MoatConfig>>,
39 pub provide_tempo: Option<Vec<TempoConfig>>,
40 pub user_managed: bool,
41 pub resource_group: String,
42
43 pub total_memory_bytes: usize,
44 pub parallelism: usize,
45 pub role: String,
46}
47
48#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
49#[serde(rename_all = "kebab-case")]
50#[serde(deny_unknown_fields)]
51pub enum MetaBackend {
52 Memory,
53 Sqlite,
54 Postgres,
55 Mysql,
56 Env,
57}
58
59#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
60#[serde(rename_all = "kebab-case")]
61#[serde(deny_unknown_fields)]
62pub struct MetaNodeConfig {
63 #[serde(rename = "use")]
64 phantom_use: Option<String>,
65 pub id: String,
66
67 pub address: String,
68 #[serde(with = "string")]
69 pub port: u16,
70 pub listen_address: String,
71 pub dashboard_port: u16,
72 pub exporter_port: u16,
73
74 pub user_managed: bool,
75
76 pub meta_backend: MetaBackend,
77 pub provide_sqlite_backend: Option<Vec<SqliteConfig>>,
78 pub provide_postgres_backend: Option<Vec<PostgresConfig>>,
79 pub provide_mysql_backend: Option<Vec<MySqlConfig>>,
80 pub provide_prometheus: Option<Vec<PrometheusConfig>>,
81
82 pub provide_compute_node: Option<Vec<ComputeNodeConfig>>,
83 pub provide_compactor: Option<Vec<CompactorConfig>>,
84
85 pub provide_tempo: Option<Vec<TempoConfig>>,
86
87 pub provide_aws_s3: Option<Vec<AwsS3Config>>,
88 pub provide_minio: Option<Vec<MinioConfig>>,
89 pub provide_opendal: Option<Vec<OpendalConfig>>,
90 pub provide_moat: Option<Vec<MoatConfig>>,
91}
92
93#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
94#[serde(rename_all = "kebab-case")]
95#[serde(deny_unknown_fields)]
96pub struct FrontendConfig {
97 #[serde(rename = "use")]
98 phantom_use: Option<String>,
99 pub id: String,
100
101 pub address: String,
102 #[serde(with = "string")]
103 pub port: u16,
104 pub listen_address: String,
105 pub exporter_port: u16,
106 pub health_check_port: u16,
107
108 pub provide_meta_node: Option<Vec<MetaNodeConfig>>,
109 pub provide_tempo: Option<Vec<TempoConfig>>,
110 pub provide_prometheus: Option<Vec<PrometheusConfig>>,
111
112 pub user_managed: bool,
113}
114
115#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
116#[serde(rename_all = "kebab-case")]
117#[serde(deny_unknown_fields)]
118pub struct CompactorConfig {
119 #[serde(rename = "use")]
120 phantom_use: Option<String>,
121 pub id: String,
122
123 pub address: String,
124 #[serde(with = "string")]
125 pub port: u16,
126 pub listen_address: String,
127 pub exporter_port: u16,
128
129 pub provide_minio: Option<Vec<MinioConfig>>,
130
131 pub provide_meta_node: Option<Vec<MetaNodeConfig>>,
132 pub provide_tempo: Option<Vec<TempoConfig>>,
133
134 pub user_managed: bool,
135 pub compaction_worker_threads_number: Option<usize>,
136
137 pub compactor_mode: String,
138}
139
140#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
141#[serde(rename_all = "kebab-case")]
142#[serde(deny_unknown_fields)]
143pub struct MinioConfig {
144 #[serde(rename = "use")]
145 phantom_use: Option<String>,
146 pub id: String,
147
148 pub address: String,
149 #[serde(with = "string")]
150 pub port: u16,
151 pub listen_address: String,
152
153 pub console_address: String,
154 #[serde(with = "string")]
155 pub console_port: u16,
156
157 pub root_user: String,
158 pub root_password: String,
159 pub hummock_bucket: String,
160
161 pub provide_prometheus: Option<Vec<PrometheusConfig>>,
162
163 pub api_requests_max: usize,
165 pub api_requests_deadline: String,
166}
167
168#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
169#[serde(rename_all = "kebab-case")]
170#[serde(deny_unknown_fields)]
171pub struct SqliteConfig {
172 #[serde(rename = "use")]
173 phantom_use: Option<String>,
174 pub id: String,
175
176 pub file: String,
177}
178
179#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
180#[serde(rename_all = "kebab-case")]
181#[serde(deny_unknown_fields)]
182pub struct PrometheusConfig {
183 #[serde(rename = "use")]
184 phantom_use: Option<String>,
185 pub id: String,
186
187 pub address: String,
188 #[serde(with = "string")]
189 pub port: u16,
190 pub listen_address: String,
191
192 pub remote_write: bool,
193 pub remote_write_region: String,
194 pub remote_write_url: String,
195
196 pub scrape_interval: String,
197
198 pub provide_compute_node: Option<Vec<ComputeNodeConfig>>,
199 pub provide_meta_node: Option<Vec<MetaNodeConfig>>,
200 pub provide_minio: Option<Vec<MinioConfig>>,
201 pub provide_compactor: Option<Vec<CompactorConfig>>,
202 pub provide_frontend: Option<Vec<FrontendConfig>>,
203}
204
205#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
206#[serde(rename_all = "kebab-case")]
207#[serde(deny_unknown_fields)]
208pub struct GrafanaConfig {
209 #[serde(rename = "use")]
210 phantom_use: Option<String>,
211 pub id: String,
212 pub address: String,
213 pub listen_address: String,
214 pub port: u16,
215
216 pub provide_prometheus: Option<Vec<PrometheusConfig>>,
217 pub provide_tempo: Option<Vec<TempoConfig>>,
218}
219
220#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
221#[serde(rename_all = "kebab-case")]
222#[serde(deny_unknown_fields)]
223pub struct TempoConfig {
224 #[serde(rename = "use")]
225 phantom_use: Option<String>,
226 pub id: String,
227
228 pub listen_address: String,
229 pub address: String,
230 pub port: u16,
231 pub otlp_port: u16,
232 pub max_bytes_per_trace: usize,
233}
234
235#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
236#[serde(rename_all = "kebab-case")]
237#[serde(deny_unknown_fields)]
238pub struct AwsS3Config {
239 #[serde(rename = "use")]
240 phantom_use: Option<String>,
241 pub id: String,
242 pub bucket: String,
243}
244
245#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
246#[serde(rename_all = "kebab-case")]
247#[serde(deny_unknown_fields)]
248pub struct OpendalConfig {
249 #[serde(rename = "use")]
250 phantom_use: Option<String>,
251
252 pub id: String,
253 pub engine: String,
254 pub namenode: String,
255 pub bucket: String,
256}
257
258#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
259#[serde(rename_all = "kebab-case")]
260#[serde(deny_unknown_fields)]
261pub struct KafkaConfig {
262 #[serde(rename = "use")]
263 phantom_use: Option<String>,
264 pub id: String,
265
266 pub address: String,
268 #[serde(with = "string")]
269 pub port: u16,
270 pub docker_port: u16,
273
274 #[serde(with = "string")]
275 pub controller_port: u16,
276
277 pub image: String,
278 pub persist_data: bool,
279 pub node_id: u32,
280
281 pub user_managed: bool,
282}
283
284#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
285#[serde(rename_all = "kebab-case")]
286#[serde(deny_unknown_fields)]
287pub struct SchemaRegistryConfig {
288 #[serde(rename = "use")]
289 phantom_use: Option<String>,
290
291 pub id: String,
292
293 pub address: String,
294 #[serde(with = "string")]
295 pub port: u16,
296
297 pub provide_kafka: Option<Vec<KafkaConfig>>,
298
299 pub image: String,
300 pub user_managed: bool,
303}
304
305#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
306#[serde(rename_all = "kebab-case")]
307#[serde(deny_unknown_fields)]
308pub struct PubsubConfig {
309 #[serde(rename = "use")]
310 phantom_use: Option<String>,
311 pub id: String,
312 #[serde(default)]
313 pub user_managed: bool,
314 #[serde(with = "string")]
315 pub port: u16,
316 pub address: String,
317
318 pub persist_data: bool,
319}
320
321#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
322#[serde(rename_all = "kebab-case")]
323#[serde(deny_unknown_fields)]
324pub struct PulsarConfig {
325 #[serde(rename = "use")]
326 phantom_use: Option<String>,
327 pub id: String,
328
329 pub address: String,
330 pub broker_port: u16,
331 pub http_port: u16,
332
333 pub user_managed: bool,
334 pub image: String,
335 pub persist_data: bool,
336}
337
338#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
339#[serde(rename_all = "kebab-case")]
340#[serde(deny_unknown_fields)]
341pub struct RedisConfig {
342 #[serde(rename = "use")]
343 phantom_use: Option<String>,
344 pub id: String,
345
346 pub port: u16,
347 pub address: String,
348}
349
350#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
351#[serde(rename_all = "kebab-case")]
352#[serde(deny_unknown_fields)]
353pub enum Application {
354 Metastore,
355 Connector,
356}
357
358#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
359#[serde(rename_all = "kebab-case")]
360#[serde(deny_unknown_fields)]
361pub struct MySqlConfig {
362 #[serde(rename = "use")]
363 phantom_use: Option<String>,
364 pub id: String,
365
366 pub port: u16,
367 pub address: String,
368
369 pub user: String,
370 pub password: String,
371 pub database: String,
372
373 pub application: Application,
374 pub image: String,
375 pub user_managed: bool,
376 pub persist_data: bool,
377}
378
379#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
380#[serde(rename_all = "kebab-case")]
381#[serde(deny_unknown_fields)]
382pub struct PostgresConfig {
383 #[serde(rename = "use")]
384 phantom_use: Option<String>,
385 pub id: String,
386
387 pub port: u16,
388 pub address: String,
389
390 pub user: String,
391 pub password: String,
392 pub database: String,
393
394 pub application: Application,
395 pub image: String,
396 pub user_managed: bool,
397 pub persist_data: bool,
398
399 pub latency_ms: u32,
401}
402
403#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
404#[serde(rename_all = "kebab-case")]
405#[serde(deny_unknown_fields)]
406pub struct SqlServerConfig {
407 #[serde(rename = "use")]
408 phantom_use: Option<String>,
409 pub id: String,
410
411 pub port: u16,
412 pub address: String,
413
414 pub user: String,
415 pub password: String,
416 pub database: String,
417
418 pub image: String,
419 pub user_managed: bool,
420 pub persist_data: bool,
421}
422
423#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
424#[serde(rename_all = "kebab-case")]
425#[serde(deny_unknown_fields)]
426pub struct NatsConfig {
427 #[serde(rename = "use")]
428 phantom_use: Option<String>,
429 pub id: String,
430
431 pub address: String,
432 pub port: u16,
433 pub monitor_port: u16,
434
435 pub image: String,
436 pub user_managed: bool,
437 pub persist_data: bool,
438}
439
440#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
441#[serde(rename_all = "kebab-case")]
442#[serde(deny_unknown_fields)]
443pub struct MqttConfig {
444 #[serde(rename = "use")]
445 phantom_use: Option<String>,
446 pub id: String,
447
448 pub address: String,
449 pub port: u16,
450
451 pub image: String,
452 pub user_managed: bool,
453 pub persist_data: bool,
454}
455
456#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
457#[serde(rename_all = "kebab-case")]
458#[serde(deny_unknown_fields)]
459pub struct LakekeeperConfig {
460 #[serde(rename = "use")]
461 phantom_use: Option<String>,
462 pub id: String,
463
464 pub port: u16,
465 pub address: String,
466
467 pub user_managed: bool,
468 pub persist_data: bool,
469
470 pub catalog_backend: String,
471 pub encryption_key: String,
472 pub provide_postgres_backend: Option<Vec<PostgresConfig>>,
473 pub provide_minio: Option<Vec<MinioConfig>>,
474}
475
476#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
477#[serde(rename_all = "kebab-case")]
478#[serde(deny_unknown_fields)]
479pub struct MoatConfig {
480 #[serde(rename = "use")]
481 phantom_use: Option<String>,
482 pub id: String,
483
484 pub address: String,
485 pub port: u16,
486
487 pub provide_minio: Option<Vec<MinioConfig>>,
488}
489
490#[derive(Clone, Debug, PartialEq)]
492pub enum ServiceConfig {
493 ComputeNode(ComputeNodeConfig),
494 MetaNode(MetaNodeConfig),
495 Frontend(FrontendConfig),
496 Compactor(CompactorConfig),
497 Minio(MinioConfig),
498 Sqlite(SqliteConfig),
499 Prometheus(PrometheusConfig),
500 Grafana(GrafanaConfig),
501 Tempo(TempoConfig),
502 Opendal(OpendalConfig),
503 AwsS3(AwsS3Config),
504 Kafka(KafkaConfig),
505 SchemaRegistry(SchemaRegistryConfig),
506 Pubsub(PubsubConfig),
507 Pulsar(PulsarConfig),
508 Redis(RedisConfig),
509 MySql(MySqlConfig),
510 Postgres(PostgresConfig),
511 SqlServer(SqlServerConfig),
512 Nats(NatsConfig),
513 Mqtt(MqttConfig),
514 Lakekeeper(LakekeeperConfig),
515 Moat(MoatConfig),
516}
517
518#[derive(PartialEq, Eq, Hash, Debug)]
519pub enum TaskGroup {
520 RisingWave,
521 Observability,
522 Kafka,
523 Pubsub,
524 Pulsar,
525 MySql,
526 Postgres,
527 SqlServer,
528 Nats,
529 Mqtt,
530 Redis,
531 Lakekeeper,
532 Moat,
533}
534
535impl ServiceConfig {
536 pub fn id(&self) -> &str {
537 match self {
538 Self::ComputeNode(c) => &c.id,
539 Self::MetaNode(c) => &c.id,
540 Self::Frontend(c) => &c.id,
541 Self::Compactor(c) => &c.id,
542 Self::Minio(c) => &c.id,
543 Self::Sqlite(c) => &c.id,
544 Self::Prometheus(c) => &c.id,
545 Self::Grafana(c) => &c.id,
546 Self::Tempo(c) => &c.id,
547 Self::AwsS3(c) => &c.id,
548 Self::Kafka(c) => &c.id,
549 Self::Pubsub(c) => &c.id,
550 Self::Pulsar(c) => &c.id,
551 Self::Redis(c) => &c.id,
552 Self::Opendal(c) => &c.id,
553 Self::MySql(c) => &c.id,
554 Self::Postgres(c) => &c.id,
555 Self::SqlServer(c) => &c.id,
556 Self::Nats(c) => &c.id,
557 Self::Mqtt(c) => &c.id,
558 Self::SchemaRegistry(c) => &c.id,
559 Self::Lakekeeper(c) => &c.id,
560 Self::Moat(c) => &c.id,
561 }
562 }
563
564 pub fn port(&self) -> Option<u16> {
566 match self {
567 Self::ComputeNode(c) => Some(c.port),
568 Self::MetaNode(c) => Some(c.port),
569 Self::Frontend(c) => Some(c.port),
570 Self::Compactor(c) => Some(c.port),
571 Self::Minio(c) => Some(c.port),
572 Self::Sqlite(_) => None,
573 Self::Prometheus(c) => Some(c.port),
574 Self::Grafana(c) => Some(c.port),
575 Self::Tempo(c) => Some(c.port),
576 Self::AwsS3(_) => None,
577 Self::Kafka(c) => Some(c.port),
578 Self::Pubsub(c) => Some(c.port),
579 Self::Pulsar(c) => Some(c.http_port),
580 Self::Redis(c) => Some(c.port),
581 Self::Opendal(_) => None,
582 Self::MySql(c) => Some(c.port),
583 Self::Postgres(c) => Some(c.port),
584 Self::SqlServer(c) => Some(c.port),
585 Self::Nats(c) => Some(c.port),
586 Self::Mqtt(c) => Some(c.port),
587 Self::SchemaRegistry(c) => Some(c.port),
588 Self::Lakekeeper(c) => Some(c.port),
589 Self::Moat(c) => Some(c.port),
590 }
591 }
592
593 pub fn user_managed(&self) -> bool {
594 match self {
595 Self::ComputeNode(c) => c.user_managed,
596 Self::MetaNode(c) => c.user_managed,
597 Self::Frontend(c) => c.user_managed,
598 Self::Compactor(c) => c.user_managed,
599 Self::Minio(_c) => false,
600 Self::Sqlite(_c) => false,
601 Self::Prometheus(_c) => false,
602 Self::Grafana(_c) => false,
603 Self::Tempo(_c) => false,
604 Self::AwsS3(_c) => false,
605 Self::Kafka(c) => c.user_managed,
606 Self::Pubsub(c) => c.user_managed,
607 Self::Pulsar(c) => c.user_managed,
608 Self::Redis(_c) => false,
609 Self::Opendal(_c) => false,
610 Self::MySql(c) => c.user_managed,
611 Self::Postgres(c) => c.user_managed,
612 Self::SqlServer(c) => c.user_managed,
613 Self::Nats(c) => c.user_managed,
614 Self::Mqtt(c) => c.user_managed,
615 Self::SchemaRegistry(c) => c.user_managed,
616 Self::Lakekeeper(c) => c.user_managed,
617 Self::Moat(_c) => false,
618 }
619 }
620
621 pub fn task_group(&self) -> TaskGroup {
622 use TaskGroup::*;
623 match self {
624 ServiceConfig::ComputeNode(_)
625 | ServiceConfig::MetaNode(_)
626 | ServiceConfig::Frontend(_)
627 | ServiceConfig::Compactor(_)
628 | ServiceConfig::Minio(_)
629 | ServiceConfig::Sqlite(_) => RisingWave,
630 ServiceConfig::Prometheus(_) | ServiceConfig::Grafana(_) | ServiceConfig::Tempo(_) => {
631 Observability
632 }
633 ServiceConfig::Opendal(_) | ServiceConfig::AwsS3(_) => RisingWave,
634 ServiceConfig::Kafka(_) | ServiceConfig::SchemaRegistry(_) => Kafka,
635 ServiceConfig::Pubsub(_) => Pubsub,
636 ServiceConfig::Pulsar(_) => Pulsar,
637 ServiceConfig::Redis(_) => Redis,
638 ServiceConfig::MySql(my_sql_config) => {
639 if matches!(my_sql_config.application, Application::Metastore) {
640 RisingWave
641 } else {
642 MySql
643 }
644 }
645 ServiceConfig::Postgres(postgres_config) => {
646 if matches!(postgres_config.application, Application::Metastore) {
647 RisingWave
648 } else {
649 Postgres
650 }
651 }
652 ServiceConfig::SqlServer(_) => SqlServer,
653 ServiceConfig::Nats(_) => Nats,
654 ServiceConfig::Mqtt(_) => Mqtt,
655 ServiceConfig::Lakekeeper(_) => Lakekeeper,
656 ServiceConfig::Moat(_) => Moat,
657 }
658 }
659}
660
661mod string {
662 use std::fmt::Display;
663 use std::str::FromStr;
664
665 use serde::{Deserialize, Deserializer, Serializer, de};
666
667 pub fn serialize<T, S>(value: &T, serializer: S) -> Result<S::Ok, S::Error>
668 where
669 T: Display,
670 S: Serializer,
671 {
672 serializer.collect_str(value)
673 }
674
675 pub fn deserialize<'de, T, D>(deserializer: D) -> Result<T, D::Error>
676 where
677 T: FromStr,
678 T::Err: Display,
679 D: Deserializer<'de>,
680 {
681 String::deserialize(deserializer)?
682 .parse()
683 .map_err(de::Error::custom)
684 }
685}