1use serde::{Deserialize, Serialize};
16
17#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
18#[serde(rename_all = "kebab-case")]
19#[serde(deny_unknown_fields)]
20pub struct ComputeNodeConfig {
21 #[serde(rename = "use")]
22 phantom_use: Option<String>,
23 pub id: String,
24
25 pub address: String,
26 #[serde(with = "string")]
27 pub port: u16,
28 pub listen_address: String,
29 pub exporter_port: u16,
30 pub async_stack_trace: String,
31 pub enable_tiered_cache: bool,
32
33 pub provide_minio: Option<Vec<MinioConfig>>,
34 pub provide_meta_node: Option<Vec<MetaNodeConfig>>,
35 pub provide_compute_node: Option<Vec<ComputeNodeConfig>>,
36 pub provide_opendal: Option<Vec<OpendalConfig>>,
37 pub provide_aws_s3: Option<Vec<AwsS3Config>>,
38 pub provide_tempo: Option<Vec<TempoConfig>>,
39 pub user_managed: bool,
40 pub resource_group: String,
41
42 pub total_memory_bytes: usize,
43 pub parallelism: usize,
44 pub role: String,
45}
46
47#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
48#[serde(rename_all = "kebab-case")]
49#[serde(deny_unknown_fields)]
50pub enum MetaBackend {
51 Memory,
52 Sqlite,
53 Postgres,
54 Mysql,
55 Env,
56}
57
58#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
59#[serde(rename_all = "kebab-case")]
60#[serde(deny_unknown_fields)]
61pub struct MetaNodeConfig {
62 #[serde(rename = "use")]
63 phantom_use: Option<String>,
64 pub id: String,
65
66 pub address: String,
67 #[serde(with = "string")]
68 pub port: u16,
69 pub listen_address: String,
70 pub dashboard_port: u16,
71 pub exporter_port: u16,
72
73 pub user_managed: bool,
74
75 pub meta_backend: MetaBackend,
76 pub provide_sqlite_backend: Option<Vec<SqliteConfig>>,
77 pub provide_postgres_backend: Option<Vec<PostgresConfig>>,
78 pub provide_mysql_backend: Option<Vec<MySqlConfig>>,
79 pub provide_prometheus: Option<Vec<PrometheusConfig>>,
80
81 pub provide_compute_node: Option<Vec<ComputeNodeConfig>>,
82 pub provide_compactor: Option<Vec<CompactorConfig>>,
83
84 pub provide_tempo: Option<Vec<TempoConfig>>,
85
86 pub provide_aws_s3: Option<Vec<AwsS3Config>>,
87 pub provide_minio: Option<Vec<MinioConfig>>,
88 pub provide_opendal: Option<Vec<OpendalConfig>>,
89}
90
91#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
92#[serde(rename_all = "kebab-case")]
93#[serde(deny_unknown_fields)]
94pub struct FrontendConfig {
95 #[serde(rename = "use")]
96 phantom_use: Option<String>,
97 pub id: String,
98
99 pub address: String,
100 #[serde(with = "string")]
101 pub port: u16,
102 pub listen_address: String,
103 pub exporter_port: u16,
104 pub health_check_port: u16,
105
106 pub provide_meta_node: Option<Vec<MetaNodeConfig>>,
107 pub provide_tempo: Option<Vec<TempoConfig>>,
108
109 pub user_managed: bool,
110}
111
112#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
113#[serde(rename_all = "kebab-case")]
114#[serde(deny_unknown_fields)]
115pub struct CompactorConfig {
116 #[serde(rename = "use")]
117 phantom_use: Option<String>,
118 pub id: String,
119
120 pub address: String,
121 #[serde(with = "string")]
122 pub port: u16,
123 pub listen_address: String,
124 pub exporter_port: u16,
125
126 pub provide_minio: Option<Vec<MinioConfig>>,
127
128 pub provide_meta_node: Option<Vec<MetaNodeConfig>>,
129 pub provide_tempo: Option<Vec<TempoConfig>>,
130
131 pub user_managed: bool,
132 pub compaction_worker_threads_number: Option<usize>,
133
134 pub compactor_mode: String,
135}
136
137#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
138#[serde(rename_all = "kebab-case")]
139#[serde(deny_unknown_fields)]
140pub struct MinioConfig {
141 #[serde(rename = "use")]
142 phantom_use: Option<String>,
143 pub id: String,
144
145 pub address: String,
146 #[serde(with = "string")]
147 pub port: u16,
148 pub listen_address: String,
149
150 pub console_address: String,
151 #[serde(with = "string")]
152 pub console_port: u16,
153
154 pub root_user: String,
155 pub root_password: String,
156 pub hummock_bucket: String,
157
158 pub provide_prometheus: Option<Vec<PrometheusConfig>>,
159
160 pub api_requests_max: usize,
162 pub api_requests_deadline: String,
163}
164
165#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
166#[serde(rename_all = "kebab-case")]
167#[serde(deny_unknown_fields)]
168pub struct SqliteConfig {
169 #[serde(rename = "use")]
170 phantom_use: Option<String>,
171 pub id: String,
172
173 pub file: String,
174}
175
176#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
177#[serde(rename_all = "kebab-case")]
178#[serde(deny_unknown_fields)]
179pub struct PrometheusConfig {
180 #[serde(rename = "use")]
181 phantom_use: Option<String>,
182 pub id: String,
183
184 pub address: String,
185 #[serde(with = "string")]
186 pub port: u16,
187 pub listen_address: String,
188
189 pub remote_write: bool,
190 pub remote_write_region: String,
191 pub remote_write_url: String,
192
193 pub scrape_interval: String,
194
195 pub provide_compute_node: Option<Vec<ComputeNodeConfig>>,
196 pub provide_meta_node: Option<Vec<MetaNodeConfig>>,
197 pub provide_minio: Option<Vec<MinioConfig>>,
198 pub provide_compactor: Option<Vec<CompactorConfig>>,
199 pub provide_frontend: Option<Vec<FrontendConfig>>,
200}
201
202#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
203#[serde(rename_all = "kebab-case")]
204#[serde(deny_unknown_fields)]
205pub struct GrafanaConfig {
206 #[serde(rename = "use")]
207 phantom_use: Option<String>,
208 pub id: String,
209 pub address: String,
210 pub listen_address: String,
211 pub port: u16,
212
213 pub provide_prometheus: Option<Vec<PrometheusConfig>>,
214 pub provide_tempo: Option<Vec<TempoConfig>>,
215}
216
217#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
218#[serde(rename_all = "kebab-case")]
219#[serde(deny_unknown_fields)]
220pub struct TempoConfig {
221 #[serde(rename = "use")]
222 phantom_use: Option<String>,
223 pub id: String,
224
225 pub listen_address: String,
226 pub address: String,
227 pub port: u16,
228 pub otlp_port: u16,
229 pub max_bytes_per_trace: usize,
230}
231
232#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
233#[serde(rename_all = "kebab-case")]
234#[serde(deny_unknown_fields)]
235pub struct AwsS3Config {
236 #[serde(rename = "use")]
237 phantom_use: Option<String>,
238 pub id: String,
239 pub bucket: String,
240}
241
242#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
243#[serde(rename_all = "kebab-case")]
244#[serde(deny_unknown_fields)]
245pub struct OpendalConfig {
246 #[serde(rename = "use")]
247 phantom_use: Option<String>,
248
249 pub id: String,
250 pub engine: String,
251 pub namenode: String,
252 pub bucket: String,
253}
254
255#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
256#[serde(rename_all = "kebab-case")]
257#[serde(deny_unknown_fields)]
258pub struct KafkaConfig {
259 #[serde(rename = "use")]
260 phantom_use: Option<String>,
261 pub id: String,
262
263 pub address: String,
265 #[serde(with = "string")]
266 pub port: u16,
267 pub docker_port: u16,
270
271 #[serde(with = "string")]
272 pub controller_port: u16,
273
274 pub image: String,
275 pub persist_data: bool,
276 pub node_id: u32,
277
278 pub user_managed: bool,
279}
280
281#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
282#[serde(rename_all = "kebab-case")]
283#[serde(deny_unknown_fields)]
284pub struct SchemaRegistryConfig {
285 #[serde(rename = "use")]
286 phantom_use: Option<String>,
287
288 pub id: String,
289
290 pub address: String,
291 #[serde(with = "string")]
292 pub port: u16,
293
294 pub provide_kafka: Option<Vec<KafkaConfig>>,
295
296 pub image: String,
297 pub user_managed: bool,
300}
301
302#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
303#[serde(rename_all = "kebab-case")]
304#[serde(deny_unknown_fields)]
305pub struct PubsubConfig {
306 #[serde(rename = "use")]
307 phantom_use: Option<String>,
308 pub id: String,
309 #[serde(with = "string")]
310 pub port: u16,
311 pub address: String,
312
313 pub persist_data: bool,
314}
315
316#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
317#[serde(rename_all = "kebab-case")]
318#[serde(deny_unknown_fields)]
319pub struct PulsarConfig {
320 #[serde(rename = "use")]
321 phantom_use: Option<String>,
322 pub id: String,
323
324 pub address: String,
325 pub broker_port: u16,
326 pub http_port: u16,
327
328 pub user_managed: bool,
329 pub image: String,
330 pub persist_data: bool,
331}
332
333#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
334#[serde(rename_all = "kebab-case")]
335#[serde(deny_unknown_fields)]
336pub struct RedisConfig {
337 #[serde(rename = "use")]
338 phantom_use: Option<String>,
339 pub id: String,
340
341 pub port: u16,
342 pub address: String,
343}
344
345#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
346#[serde(rename_all = "kebab-case")]
347#[serde(deny_unknown_fields)]
348pub enum Application {
349 Metastore,
350 Connector,
351}
352
353#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
354#[serde(rename_all = "kebab-case")]
355#[serde(deny_unknown_fields)]
356pub struct MySqlConfig {
357 #[serde(rename = "use")]
358 phantom_use: Option<String>,
359 pub id: String,
360
361 pub port: u16,
362 pub address: String,
363
364 pub user: String,
365 pub password: String,
366 pub database: String,
367
368 pub application: Application,
369 pub image: String,
370 pub user_managed: bool,
371 pub persist_data: bool,
372}
373
374#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
375#[serde(rename_all = "kebab-case")]
376#[serde(deny_unknown_fields)]
377pub struct PostgresConfig {
378 #[serde(rename = "use")]
379 phantom_use: Option<String>,
380 pub id: String,
381
382 pub port: u16,
383 pub address: String,
384
385 pub user: String,
386 pub password: String,
387 pub database: String,
388
389 pub application: Application,
390 pub image: String,
391 pub user_managed: bool,
392 pub persist_data: bool,
393
394 pub latency_ms: u32,
396}
397
398#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
399#[serde(rename_all = "kebab-case")]
400#[serde(deny_unknown_fields)]
401pub struct SqlServerConfig {
402 #[serde(rename = "use")]
403 phantom_use: Option<String>,
404 pub id: String,
405
406 pub port: u16,
407 pub address: String,
408
409 pub user: String,
410 pub password: String,
411 pub database: String,
412
413 pub image: String,
414 pub user_managed: bool,
415 pub persist_data: bool,
416}
417
418#[derive(Clone, Debug, PartialEq)]
420pub enum ServiceConfig {
421 ComputeNode(ComputeNodeConfig),
422 MetaNode(MetaNodeConfig),
423 Frontend(FrontendConfig),
424 Compactor(CompactorConfig),
425 Minio(MinioConfig),
426 Sqlite(SqliteConfig),
427 Prometheus(PrometheusConfig),
428 Grafana(GrafanaConfig),
429 Tempo(TempoConfig),
430 Opendal(OpendalConfig),
431 AwsS3(AwsS3Config),
432 Kafka(KafkaConfig),
433 SchemaRegistry(SchemaRegistryConfig),
434 Pubsub(PubsubConfig),
435 Pulsar(PulsarConfig),
436 Redis(RedisConfig),
437 MySql(MySqlConfig),
438 Postgres(PostgresConfig),
439 SqlServer(SqlServerConfig),
440}
441
442#[derive(PartialEq, Eq, Hash, Debug)]
443pub enum TaskGroup {
444 RisingWave,
445 Observability,
446 Kafka,
447 Pubsub,
448 Pulsar,
449 MySql,
450 Postgres,
451 SqlServer,
452 Redis,
453}
454
455impl ServiceConfig {
456 pub fn id(&self) -> &str {
457 match self {
458 Self::ComputeNode(c) => &c.id,
459 Self::MetaNode(c) => &c.id,
460 Self::Frontend(c) => &c.id,
461 Self::Compactor(c) => &c.id,
462 Self::Minio(c) => &c.id,
463 Self::Sqlite(c) => &c.id,
464 Self::Prometheus(c) => &c.id,
465 Self::Grafana(c) => &c.id,
466 Self::Tempo(c) => &c.id,
467 Self::AwsS3(c) => &c.id,
468 Self::Kafka(c) => &c.id,
469 Self::Pubsub(c) => &c.id,
470 Self::Pulsar(c) => &c.id,
471 Self::Redis(c) => &c.id,
472 Self::Opendal(c) => &c.id,
473 Self::MySql(c) => &c.id,
474 Self::Postgres(c) => &c.id,
475 Self::SqlServer(c) => &c.id,
476 Self::SchemaRegistry(c) => &c.id,
477 }
478 }
479
480 pub fn port(&self) -> Option<u16> {
482 match self {
483 Self::ComputeNode(c) => Some(c.port),
484 Self::MetaNode(c) => Some(c.port),
485 Self::Frontend(c) => Some(c.port),
486 Self::Compactor(c) => Some(c.port),
487 Self::Minio(c) => Some(c.port),
488 Self::Sqlite(_) => None,
489 Self::Prometheus(c) => Some(c.port),
490 Self::Grafana(c) => Some(c.port),
491 Self::Tempo(c) => Some(c.port),
492 Self::AwsS3(_) => None,
493 Self::Kafka(c) => Some(c.port),
494 Self::Pubsub(c) => Some(c.port),
495 Self::Pulsar(c) => Some(c.http_port),
496 Self::Redis(c) => Some(c.port),
497 Self::Opendal(_) => None,
498 Self::MySql(c) => Some(c.port),
499 Self::Postgres(c) => Some(c.port),
500 Self::SqlServer(c) => Some(c.port),
501 Self::SchemaRegistry(c) => Some(c.port),
502 }
503 }
504
505 pub fn user_managed(&self) -> bool {
506 match self {
507 Self::ComputeNode(c) => c.user_managed,
508 Self::MetaNode(c) => c.user_managed,
509 Self::Frontend(c) => c.user_managed,
510 Self::Compactor(c) => c.user_managed,
511 Self::Minio(_c) => false,
512 Self::Sqlite(_c) => false,
513 Self::Prometheus(_c) => false,
514 Self::Grafana(_c) => false,
515 Self::Tempo(_c) => false,
516 Self::AwsS3(_c) => false,
517 Self::Kafka(c) => c.user_managed,
518 Self::Pubsub(_c) => false,
519 Self::Pulsar(c) => c.user_managed,
520 Self::Redis(_c) => false,
521 Self::Opendal(_c) => false,
522 Self::MySql(c) => c.user_managed,
523 Self::Postgres(c) => c.user_managed,
524 Self::SqlServer(c) => c.user_managed,
525 Self::SchemaRegistry(c) => c.user_managed,
526 }
527 }
528
529 pub fn task_group(&self) -> TaskGroup {
530 use TaskGroup::*;
531 match self {
532 ServiceConfig::ComputeNode(_)
533 | ServiceConfig::MetaNode(_)
534 | ServiceConfig::Frontend(_)
535 | ServiceConfig::Compactor(_)
536 | ServiceConfig::Minio(_)
537 | ServiceConfig::Sqlite(_) => RisingWave,
538 ServiceConfig::Prometheus(_) | ServiceConfig::Grafana(_) | ServiceConfig::Tempo(_) => {
539 Observability
540 }
541 ServiceConfig::Opendal(_) | ServiceConfig::AwsS3(_) => RisingWave,
542 ServiceConfig::Kafka(_) | ServiceConfig::SchemaRegistry(_) => Kafka,
543 ServiceConfig::Pubsub(_) => Pubsub,
544 ServiceConfig::Pulsar(_) => Pulsar,
545 ServiceConfig::Redis(_) => Redis,
546 ServiceConfig::MySql(my_sql_config) => {
547 if matches!(my_sql_config.application, Application::Metastore) {
548 RisingWave
549 } else {
550 MySql
551 }
552 }
553 ServiceConfig::Postgres(postgres_config) => {
554 if matches!(postgres_config.application, Application::Metastore) {
555 RisingWave
556 } else {
557 Postgres
558 }
559 }
560 ServiceConfig::SqlServer(_) => SqlServer,
561 }
562 }
563}
564
565mod string {
566 use std::fmt::Display;
567 use std::str::FromStr;
568
569 use serde::{Deserialize, Deserializer, Serializer, de};
570
571 pub fn serialize<T, S>(value: &T, serializer: S) -> Result<S::Ok, S::Error>
572 where
573 T: Display,
574 S: Serializer,
575 {
576 serializer.collect_str(value)
577 }
578
579 pub fn deserialize<'de, T, D>(deserializer: D) -> Result<T, D::Error>
580 where
581 T: FromStr,
582 T::Err: Display,
583 D: Deserializer<'de>,
584 {
585 String::deserialize(deserializer)?
586 .parse()
587 .map_err(de::Error::custom)
588 }
589}