1use serde::{Deserialize, Serialize};
16
17#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
18#[serde(rename_all = "kebab-case")]
19#[serde(deny_unknown_fields)]
20pub struct ComputeNodeConfig {
21 #[serde(rename = "use")]
22 phantom_use: Option<String>,
23 pub id: String,
24
25 pub address: String,
26 #[serde(with = "string")]
27 pub port: u16,
28 pub listen_address: String,
29 pub exporter_port: u16,
30 pub async_stack_trace: String,
31 pub enable_tiered_cache: bool,
32
33 pub provide_minio: Option<Vec<MinioConfig>>,
34 pub provide_meta_node: Option<Vec<MetaNodeConfig>>,
35 pub provide_compute_node: Option<Vec<ComputeNodeConfig>>,
36 pub provide_opendal: Option<Vec<OpendalConfig>>,
37 pub provide_aws_s3: Option<Vec<AwsS3Config>>,
38 pub provide_moat: Option<Vec<MoatConfig>>,
39 pub provide_tempo: Option<Vec<TempoConfig>>,
40 pub user_managed: bool,
41 pub resource_group: String,
42
43 pub total_memory_bytes: usize,
44 pub parallelism: usize,
45 pub role: String,
46}
47
48#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
49#[serde(rename_all = "kebab-case")]
50#[serde(deny_unknown_fields)]
51pub enum MetaBackend {
52 Memory,
53 Sqlite,
54 Postgres,
55 Mysql,
56 Env,
57}
58
59#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
60#[serde(rename_all = "kebab-case")]
61#[serde(deny_unknown_fields)]
62pub struct MetaNodeConfig {
63 #[serde(rename = "use")]
64 phantom_use: Option<String>,
65 pub id: String,
66
67 pub address: String,
68 #[serde(with = "string")]
69 pub port: u16,
70 pub listen_address: String,
71 pub dashboard_port: u16,
72 pub exporter_port: u16,
73
74 pub user_managed: bool,
75
76 pub meta_backend: MetaBackend,
77 pub provide_sqlite_backend: Option<Vec<SqliteConfig>>,
78 pub provide_postgres_backend: Option<Vec<PostgresConfig>>,
79 pub provide_mysql_backend: Option<Vec<MySqlConfig>>,
80 pub provide_prometheus: Option<Vec<PrometheusConfig>>,
81
82 pub provide_compute_node: Option<Vec<ComputeNodeConfig>>,
83 pub provide_compactor: Option<Vec<CompactorConfig>>,
84
85 pub provide_tempo: Option<Vec<TempoConfig>>,
86
87 pub provide_aws_s3: Option<Vec<AwsS3Config>>,
88 pub provide_minio: Option<Vec<MinioConfig>>,
89 pub provide_opendal: Option<Vec<OpendalConfig>>,
90 pub provide_moat: Option<Vec<MoatConfig>>,
91}
92
93#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
94#[serde(rename_all = "kebab-case")]
95#[serde(deny_unknown_fields)]
96pub struct FrontendConfig {
97 #[serde(rename = "use")]
98 phantom_use: Option<String>,
99 pub id: String,
100
101 pub address: String,
102 #[serde(with = "string")]
103 pub port: u16,
104 pub listen_address: String,
105 pub exporter_port: u16,
106 pub health_check_port: u16,
107
108 pub provide_meta_node: Option<Vec<MetaNodeConfig>>,
109 pub provide_tempo: Option<Vec<TempoConfig>>,
110 pub provide_prometheus: Option<Vec<PrometheusConfig>>,
111
112 pub user_managed: bool,
113}
114
115#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
116#[serde(rename_all = "kebab-case")]
117#[serde(deny_unknown_fields)]
118pub struct CompactorConfig {
119 #[serde(rename = "use")]
120 phantom_use: Option<String>,
121 pub id: String,
122
123 pub address: String,
124 #[serde(with = "string")]
125 pub port: u16,
126 pub listen_address: String,
127 pub exporter_port: u16,
128
129 pub provide_minio: Option<Vec<MinioConfig>>,
130
131 pub provide_meta_node: Option<Vec<MetaNodeConfig>>,
132 pub provide_tempo: Option<Vec<TempoConfig>>,
133
134 pub user_managed: bool,
135 pub compaction_worker_threads_number: Option<usize>,
136
137 pub compactor_mode: String,
138}
139
140#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
141#[serde(rename_all = "kebab-case")]
142#[serde(deny_unknown_fields)]
143pub struct MinioConfig {
144 #[serde(rename = "use")]
145 phantom_use: Option<String>,
146 pub id: String,
147
148 pub address: String,
149 #[serde(with = "string")]
150 pub port: u16,
151 pub listen_address: String,
152
153 pub console_address: String,
154 #[serde(with = "string")]
155 pub console_port: u16,
156
157 pub root_user: String,
158 pub root_password: String,
159 pub hummock_bucket: String,
160
161 pub provide_prometheus: Option<Vec<PrometheusConfig>>,
162
163 pub api_requests_max: usize,
165 pub api_requests_deadline: String,
166}
167
168#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
169#[serde(rename_all = "kebab-case")]
170#[serde(deny_unknown_fields)]
171pub struct SqliteConfig {
172 #[serde(rename = "use")]
173 phantom_use: Option<String>,
174 pub id: String,
175
176 pub file: String,
177}
178
179#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
180#[serde(rename_all = "kebab-case")]
181#[serde(deny_unknown_fields)]
182pub struct PrometheusConfig {
183 #[serde(rename = "use")]
184 phantom_use: Option<String>,
185 pub id: String,
186
187 pub address: String,
188 #[serde(with = "string")]
189 pub port: u16,
190 pub listen_address: String,
191
192 pub remote_write: bool,
193 pub remote_write_region: String,
194 pub remote_write_url: String,
195
196 pub scrape_interval: String,
197
198 pub provide_compute_node: Option<Vec<ComputeNodeConfig>>,
199 pub provide_meta_node: Option<Vec<MetaNodeConfig>>,
200 pub provide_minio: Option<Vec<MinioConfig>>,
201 pub provide_compactor: Option<Vec<CompactorConfig>>,
202 pub provide_frontend: Option<Vec<FrontendConfig>>,
203}
204
205#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
206#[serde(rename_all = "kebab-case")]
207#[serde(deny_unknown_fields)]
208pub struct GrafanaConfig {
209 #[serde(rename = "use")]
210 phantom_use: Option<String>,
211 pub id: String,
212 pub address: String,
213 pub listen_address: String,
214 pub port: u16,
215
216 pub provide_prometheus: Option<Vec<PrometheusConfig>>,
217 pub provide_tempo: Option<Vec<TempoConfig>>,
218}
219
220#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
221#[serde(rename_all = "kebab-case")]
222#[serde(deny_unknown_fields)]
223pub struct TempoConfig {
224 #[serde(rename = "use")]
225 phantom_use: Option<String>,
226 pub id: String,
227
228 pub listen_address: String,
229 pub address: String,
230 pub port: u16,
231 pub otlp_port: u16,
232 pub max_bytes_per_trace: usize,
233}
234
235#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
236#[serde(rename_all = "kebab-case")]
237#[serde(deny_unknown_fields)]
238pub struct AwsS3Config {
239 #[serde(rename = "use")]
240 phantom_use: Option<String>,
241 pub id: String,
242 pub bucket: String,
243}
244
245#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
246#[serde(rename_all = "kebab-case")]
247#[serde(deny_unknown_fields)]
248pub struct OpendalConfig {
249 #[serde(rename = "use")]
250 phantom_use: Option<String>,
251
252 pub id: String,
253 pub engine: String,
254 pub namenode: String,
255 pub bucket: String,
256}
257
258#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
259#[serde(rename_all = "kebab-case")]
260#[serde(deny_unknown_fields)]
261pub struct KafkaConfig {
262 #[serde(rename = "use")]
263 phantom_use: Option<String>,
264 pub id: String,
265
266 pub address: String,
268 #[serde(with = "string")]
269 pub port: u16,
270 pub docker_port: u16,
273
274 #[serde(with = "string")]
275 pub controller_port: u16,
276
277 pub image: String,
278 pub persist_data: bool,
279 pub node_id: u32,
280
281 pub user_managed: bool,
282}
283
284#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
285#[serde(rename_all = "kebab-case")]
286#[serde(deny_unknown_fields)]
287pub struct SchemaRegistryConfig {
288 #[serde(rename = "use")]
289 phantom_use: Option<String>,
290
291 pub id: String,
292
293 pub address: String,
294 #[serde(with = "string")]
295 pub port: u16,
296
297 pub provide_kafka: Option<Vec<KafkaConfig>>,
298
299 pub image: String,
300 pub user_managed: bool,
303}
304
305#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
306#[serde(rename_all = "kebab-case")]
307#[serde(deny_unknown_fields)]
308pub struct PubsubConfig {
309 #[serde(rename = "use")]
310 phantom_use: Option<String>,
311 pub id: String,
312 #[serde(default)]
313 pub user_managed: bool,
314 #[serde(with = "string")]
315 pub port: u16,
316 pub address: String,
317
318 pub persist_data: bool,
319}
320
321#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
322#[serde(rename_all = "kebab-case")]
323#[serde(deny_unknown_fields)]
324pub struct PulsarConfig {
325 #[serde(rename = "use")]
326 phantom_use: Option<String>,
327 pub id: String,
328
329 pub address: String,
330 pub broker_port: u16,
331 pub http_port: u16,
332
333 pub user_managed: bool,
334 pub image: String,
335 pub persist_data: bool,
336}
337
338#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
339#[serde(rename_all = "kebab-case")]
340#[serde(deny_unknown_fields)]
341pub struct RedisConfig {
342 #[serde(rename = "use")]
343 phantom_use: Option<String>,
344 pub id: String,
345
346 pub port: u16,
347 pub address: String,
348}
349
350#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
351#[serde(rename_all = "kebab-case")]
352#[serde(deny_unknown_fields)]
353pub enum Application {
354 Metastore,
355 Connector,
356}
357
358#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
359#[serde(rename_all = "kebab-case")]
360#[serde(deny_unknown_fields)]
361pub struct MySqlConfig {
362 #[serde(rename = "use")]
363 phantom_use: Option<String>,
364 pub id: String,
365
366 pub port: u16,
367 pub address: String,
368
369 pub user: String,
370 pub password: String,
371 pub database: String,
372
373 pub application: Application,
374 pub image: String,
375 pub user_managed: bool,
376 pub persist_data: bool,
377}
378
379#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
380#[serde(rename_all = "kebab-case")]
381#[serde(deny_unknown_fields)]
382pub struct PostgresConfig {
383 #[serde(rename = "use")]
384 phantom_use: Option<String>,
385 pub id: String,
386
387 pub port: u16,
388 pub address: String,
389
390 pub user: String,
391 pub password: String,
392 pub database: String,
393
394 pub application: Application,
395 pub image: String,
396 pub user_managed: bool,
397 pub persist_data: bool,
398
399 pub latency_ms: u32,
401}
402
403#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
404#[serde(rename_all = "kebab-case")]
405#[serde(deny_unknown_fields)]
406pub struct SqlServerConfig {
407 #[serde(rename = "use")]
408 phantom_use: Option<String>,
409 pub id: String,
410
411 pub port: u16,
412 pub address: String,
413
414 pub user: String,
415 pub password: String,
416 pub database: String,
417
418 pub image: String,
419 pub user_managed: bool,
420 pub persist_data: bool,
421}
422
423#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
424#[serde(rename_all = "kebab-case")]
425#[serde(deny_unknown_fields)]
426pub struct LakekeeperConfig {
427 #[serde(rename = "use")]
428 phantom_use: Option<String>,
429 pub id: String,
430
431 pub port: u16,
432 pub address: String,
433
434 pub user_managed: bool,
435 pub persist_data: bool,
436
437 pub catalog_backend: String,
438 pub encryption_key: String,
439 pub provide_postgres_backend: Option<Vec<PostgresConfig>>,
440 pub provide_minio: Option<Vec<MinioConfig>>,
441}
442
443#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
444#[serde(rename_all = "kebab-case")]
445#[serde(deny_unknown_fields)]
446pub struct MoatConfig {
447 #[serde(rename = "use")]
448 phantom_use: Option<String>,
449 pub id: String,
450
451 pub address: String,
452 pub port: u16,
453
454 pub provide_minio: Option<Vec<MinioConfig>>,
455}
456
457#[derive(Clone, Debug, PartialEq)]
459pub enum ServiceConfig {
460 ComputeNode(ComputeNodeConfig),
461 MetaNode(MetaNodeConfig),
462 Frontend(FrontendConfig),
463 Compactor(CompactorConfig),
464 Minio(MinioConfig),
465 Sqlite(SqliteConfig),
466 Prometheus(PrometheusConfig),
467 Grafana(GrafanaConfig),
468 Tempo(TempoConfig),
469 Opendal(OpendalConfig),
470 AwsS3(AwsS3Config),
471 Kafka(KafkaConfig),
472 SchemaRegistry(SchemaRegistryConfig),
473 Pubsub(PubsubConfig),
474 Pulsar(PulsarConfig),
475 Redis(RedisConfig),
476 MySql(MySqlConfig),
477 Postgres(PostgresConfig),
478 SqlServer(SqlServerConfig),
479 Lakekeeper(LakekeeperConfig),
480 Moat(MoatConfig),
481}
482
483#[derive(PartialEq, Eq, Hash, Debug)]
484pub enum TaskGroup {
485 RisingWave,
486 Observability,
487 Kafka,
488 Pubsub,
489 Pulsar,
490 MySql,
491 Postgres,
492 SqlServer,
493 Redis,
494 Lakekeeper,
495 Moat,
496}
497
498impl ServiceConfig {
499 pub fn id(&self) -> &str {
500 match self {
501 Self::ComputeNode(c) => &c.id,
502 Self::MetaNode(c) => &c.id,
503 Self::Frontend(c) => &c.id,
504 Self::Compactor(c) => &c.id,
505 Self::Minio(c) => &c.id,
506 Self::Sqlite(c) => &c.id,
507 Self::Prometheus(c) => &c.id,
508 Self::Grafana(c) => &c.id,
509 Self::Tempo(c) => &c.id,
510 Self::AwsS3(c) => &c.id,
511 Self::Kafka(c) => &c.id,
512 Self::Pubsub(c) => &c.id,
513 Self::Pulsar(c) => &c.id,
514 Self::Redis(c) => &c.id,
515 Self::Opendal(c) => &c.id,
516 Self::MySql(c) => &c.id,
517 Self::Postgres(c) => &c.id,
518 Self::SqlServer(c) => &c.id,
519 Self::SchemaRegistry(c) => &c.id,
520 Self::Lakekeeper(c) => &c.id,
521 Self::Moat(c) => &c.id,
522 }
523 }
524
525 pub fn port(&self) -> Option<u16> {
527 match self {
528 Self::ComputeNode(c) => Some(c.port),
529 Self::MetaNode(c) => Some(c.port),
530 Self::Frontend(c) => Some(c.port),
531 Self::Compactor(c) => Some(c.port),
532 Self::Minio(c) => Some(c.port),
533 Self::Sqlite(_) => None,
534 Self::Prometheus(c) => Some(c.port),
535 Self::Grafana(c) => Some(c.port),
536 Self::Tempo(c) => Some(c.port),
537 Self::AwsS3(_) => None,
538 Self::Kafka(c) => Some(c.port),
539 Self::Pubsub(c) => Some(c.port),
540 Self::Pulsar(c) => Some(c.http_port),
541 Self::Redis(c) => Some(c.port),
542 Self::Opendal(_) => None,
543 Self::MySql(c) => Some(c.port),
544 Self::Postgres(c) => Some(c.port),
545 Self::SqlServer(c) => Some(c.port),
546 Self::SchemaRegistry(c) => Some(c.port),
547 Self::Lakekeeper(c) => Some(c.port),
548 Self::Moat(c) => Some(c.port),
549 }
550 }
551
552 pub fn user_managed(&self) -> bool {
553 match self {
554 Self::ComputeNode(c) => c.user_managed,
555 Self::MetaNode(c) => c.user_managed,
556 Self::Frontend(c) => c.user_managed,
557 Self::Compactor(c) => c.user_managed,
558 Self::Minio(_c) => false,
559 Self::Sqlite(_c) => false,
560 Self::Prometheus(_c) => false,
561 Self::Grafana(_c) => false,
562 Self::Tempo(_c) => false,
563 Self::AwsS3(_c) => false,
564 Self::Kafka(c) => c.user_managed,
565 Self::Pubsub(c) => c.user_managed,
566 Self::Pulsar(c) => c.user_managed,
567 Self::Redis(_c) => false,
568 Self::Opendal(_c) => false,
569 Self::MySql(c) => c.user_managed,
570 Self::Postgres(c) => c.user_managed,
571 Self::SqlServer(c) => c.user_managed,
572 Self::SchemaRegistry(c) => c.user_managed,
573 Self::Lakekeeper(c) => c.user_managed,
574 Self::Moat(_c) => false,
575 }
576 }
577
578 pub fn task_group(&self) -> TaskGroup {
579 use TaskGroup::*;
580 match self {
581 ServiceConfig::ComputeNode(_)
582 | ServiceConfig::MetaNode(_)
583 | ServiceConfig::Frontend(_)
584 | ServiceConfig::Compactor(_)
585 | ServiceConfig::Minio(_)
586 | ServiceConfig::Sqlite(_) => RisingWave,
587 ServiceConfig::Prometheus(_) | ServiceConfig::Grafana(_) | ServiceConfig::Tempo(_) => {
588 Observability
589 }
590 ServiceConfig::Opendal(_) | ServiceConfig::AwsS3(_) => RisingWave,
591 ServiceConfig::Kafka(_) | ServiceConfig::SchemaRegistry(_) => Kafka,
592 ServiceConfig::Pubsub(_) => Pubsub,
593 ServiceConfig::Pulsar(_) => Pulsar,
594 ServiceConfig::Redis(_) => Redis,
595 ServiceConfig::MySql(my_sql_config) => {
596 if matches!(my_sql_config.application, Application::Metastore) {
597 RisingWave
598 } else {
599 MySql
600 }
601 }
602 ServiceConfig::Postgres(postgres_config) => {
603 if matches!(postgres_config.application, Application::Metastore) {
604 RisingWave
605 } else {
606 Postgres
607 }
608 }
609 ServiceConfig::SqlServer(_) => SqlServer,
610 ServiceConfig::Lakekeeper(_) => Lakekeeper,
611 ServiceConfig::Moat(_) => Moat,
612 }
613 }
614}
615
616mod string {
617 use std::fmt::Display;
618 use std::str::FromStr;
619
620 use serde::{Deserialize, Deserializer, Serializer, de};
621
622 pub fn serialize<T, S>(value: &T, serializer: S) -> Result<S::Ok, S::Error>
623 where
624 T: Display,
625 S: Serializer,
626 {
627 serializer.collect_str(value)
628 }
629
630 pub fn deserialize<'de, T, D>(deserializer: D) -> Result<T, D::Error>
631 where
632 T: FromStr,
633 T::Err: Display,
634 D: Deserializer<'de>,
635 {
636 String::deserialize(deserializer)?
637 .parse()
638 .map_err(de::Error::custom)
639 }
640}