1use serde::{Deserialize, Serialize};
16
17#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
18#[serde(rename_all = "kebab-case")]
19#[serde(deny_unknown_fields)]
20pub struct ComputeNodeConfig {
21 #[serde(rename = "use")]
22 phantom_use: Option<String>,
23 pub id: String,
24
25 pub address: String,
26 #[serde(with = "string")]
27 pub port: u16,
28 pub listen_address: String,
29 pub exporter_port: u16,
30 pub async_stack_trace: String,
31 pub enable_tiered_cache: bool,
32
33 pub provide_minio: Option<Vec<MinioConfig>>,
34 pub provide_meta_node: Option<Vec<MetaNodeConfig>>,
35 pub provide_compute_node: Option<Vec<ComputeNodeConfig>>,
36 pub provide_opendal: Option<Vec<OpendalConfig>>,
37 pub provide_aws_s3: Option<Vec<AwsS3Config>>,
38 pub provide_tempo: Option<Vec<TempoConfig>>,
39 pub user_managed: bool,
40 pub resource_group: String,
41
42 pub total_memory_bytes: usize,
43 pub parallelism: usize,
44 pub role: String,
45}
46
47#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
48#[serde(rename_all = "kebab-case")]
49#[serde(deny_unknown_fields)]
50pub enum MetaBackend {
51 Memory,
52 Sqlite,
53 Postgres,
54 Mysql,
55 Env,
56}
57
58#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
59#[serde(rename_all = "kebab-case")]
60#[serde(deny_unknown_fields)]
61pub struct MetaNodeConfig {
62 #[serde(rename = "use")]
63 phantom_use: Option<String>,
64 pub id: String,
65
66 pub address: String,
67 #[serde(with = "string")]
68 pub port: u16,
69 pub listen_address: String,
70 pub dashboard_port: u16,
71 pub exporter_port: u16,
72
73 pub user_managed: bool,
74
75 pub meta_backend: MetaBackend,
76 pub provide_sqlite_backend: Option<Vec<SqliteConfig>>,
77 pub provide_postgres_backend: Option<Vec<PostgresConfig>>,
78 pub provide_mysql_backend: Option<Vec<MySqlConfig>>,
79 pub provide_prometheus: Option<Vec<PrometheusConfig>>,
80
81 pub provide_compute_node: Option<Vec<ComputeNodeConfig>>,
82 pub provide_compactor: Option<Vec<CompactorConfig>>,
83
84 pub provide_tempo: Option<Vec<TempoConfig>>,
85
86 pub provide_aws_s3: Option<Vec<AwsS3Config>>,
87 pub provide_minio: Option<Vec<MinioConfig>>,
88 pub provide_opendal: Option<Vec<OpendalConfig>>,
89 pub enable_in_memory_kv_state_backend: bool,
90}
91
92#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
93#[serde(rename_all = "kebab-case")]
94#[serde(deny_unknown_fields)]
95pub struct FrontendConfig {
96 #[serde(rename = "use")]
97 phantom_use: Option<String>,
98 pub id: String,
99
100 pub address: String,
101 #[serde(with = "string")]
102 pub port: u16,
103 pub listen_address: String,
104 pub exporter_port: u16,
105 pub health_check_port: u16,
106
107 pub provide_meta_node: Option<Vec<MetaNodeConfig>>,
108 pub provide_tempo: Option<Vec<TempoConfig>>,
109
110 pub user_managed: bool,
111}
112
113#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
114#[serde(rename_all = "kebab-case")]
115#[serde(deny_unknown_fields)]
116pub struct CompactorConfig {
117 #[serde(rename = "use")]
118 phantom_use: Option<String>,
119 pub id: String,
120
121 pub address: String,
122 #[serde(with = "string")]
123 pub port: u16,
124 pub listen_address: String,
125 pub exporter_port: u16,
126
127 pub provide_minio: Option<Vec<MinioConfig>>,
128
129 pub provide_meta_node: Option<Vec<MetaNodeConfig>>,
130 pub provide_tempo: Option<Vec<TempoConfig>>,
131
132 pub user_managed: bool,
133 pub compaction_worker_threads_number: Option<usize>,
134}
135
136#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
137#[serde(rename_all = "kebab-case")]
138#[serde(deny_unknown_fields)]
139pub struct MinioConfig {
140 #[serde(rename = "use")]
141 phantom_use: Option<String>,
142 pub id: String,
143
144 pub address: String,
145 #[serde(with = "string")]
146 pub port: u16,
147 pub listen_address: String,
148
149 pub console_address: String,
150 #[serde(with = "string")]
151 pub console_port: u16,
152
153 pub root_user: String,
154 pub root_password: String,
155 pub hummock_bucket: String,
156
157 pub provide_prometheus: Option<Vec<PrometheusConfig>>,
158
159 pub api_requests_max: usize,
161 pub api_requests_deadline: String,
162}
163
164#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
165#[serde(rename_all = "kebab-case")]
166#[serde(deny_unknown_fields)]
167pub struct SqliteConfig {
168 #[serde(rename = "use")]
169 phantom_use: Option<String>,
170 pub id: String,
171
172 pub file: String,
173}
174
175#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
176#[serde(rename_all = "kebab-case")]
177#[serde(deny_unknown_fields)]
178pub struct PrometheusConfig {
179 #[serde(rename = "use")]
180 phantom_use: Option<String>,
181 pub id: String,
182
183 pub address: String,
184 #[serde(with = "string")]
185 pub port: u16,
186 pub listen_address: String,
187
188 pub remote_write: bool,
189 pub remote_write_region: String,
190 pub remote_write_url: String,
191
192 pub scrape_interval: String,
193
194 pub provide_compute_node: Option<Vec<ComputeNodeConfig>>,
195 pub provide_meta_node: Option<Vec<MetaNodeConfig>>,
196 pub provide_minio: Option<Vec<MinioConfig>>,
197 pub provide_compactor: Option<Vec<CompactorConfig>>,
198 pub provide_redpanda: Option<Vec<RedPandaConfig>>,
199 pub provide_frontend: Option<Vec<FrontendConfig>>,
200}
201
202#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
203#[serde(rename_all = "kebab-case")]
204#[serde(deny_unknown_fields)]
205pub struct GrafanaConfig {
206 #[serde(rename = "use")]
207 phantom_use: Option<String>,
208 pub id: String,
209 pub address: String,
210 pub listen_address: String,
211 pub port: u16,
212
213 pub provide_prometheus: Option<Vec<PrometheusConfig>>,
214 pub provide_tempo: Option<Vec<TempoConfig>>,
215}
216
217#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
218#[serde(rename_all = "kebab-case")]
219#[serde(deny_unknown_fields)]
220pub struct TempoConfig {
221 #[serde(rename = "use")]
222 phantom_use: Option<String>,
223 pub id: String,
224
225 pub listen_address: String,
226 pub address: String,
227 pub port: u16,
228 pub otlp_port: u16,
229 pub max_bytes_per_trace: usize,
230}
231
232#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
233#[serde(rename_all = "kebab-case")]
234#[serde(deny_unknown_fields)]
235pub struct AwsS3Config {
236 #[serde(rename = "use")]
237 phantom_use: Option<String>,
238 pub id: String,
239 pub bucket: String,
240}
241
242#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
243#[serde(rename_all = "kebab-case")]
244#[serde(deny_unknown_fields)]
245pub struct OpendalConfig {
246 #[serde(rename = "use")]
247 phantom_use: Option<String>,
248
249 pub id: String,
250 pub engine: String,
251 pub namenode: String,
252 pub bucket: String,
253}
254
255#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
256#[serde(rename_all = "kebab-case")]
257#[serde(deny_unknown_fields)]
258pub struct KafkaConfig {
259 #[serde(rename = "use")]
260 phantom_use: Option<String>,
261 pub id: String,
262
263 pub address: String,
265 #[serde(with = "string")]
266 pub port: u16,
267 pub docker_port: u16,
270
271 #[serde(with = "string")]
272 pub controller_port: u16,
273
274 pub image: String,
275 pub persist_data: bool,
276 pub node_id: u32,
277
278 pub user_managed: bool,
279}
280
281#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
282#[serde(rename_all = "kebab-case")]
283#[serde(deny_unknown_fields)]
284pub struct SchemaRegistryConfig {
285 #[serde(rename = "use")]
286 phantom_use: Option<String>,
287
288 pub id: String,
289
290 pub address: String,
291 #[serde(with = "string")]
292 pub port: u16,
293
294 pub provide_kafka: Option<Vec<KafkaConfig>>,
295
296 pub image: String,
297 pub user_managed: bool,
300}
301
302#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
303#[serde(rename_all = "kebab-case")]
304#[serde(deny_unknown_fields)]
305pub struct PubsubConfig {
306 #[serde(rename = "use")]
307 phantom_use: Option<String>,
308 pub id: String,
309 #[serde(with = "string")]
310 pub port: u16,
311 pub address: String,
312
313 pub persist_data: bool,
314}
315
316#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
317#[serde(rename_all = "kebab-case")]
318#[serde(deny_unknown_fields)]
319pub struct RedPandaConfig {
320 #[serde(rename = "use")]
321 phantom_use: Option<String>,
322 pub id: String,
323 pub internal_port: u16,
324 pub outside_port: u16,
325 pub address: String,
326 pub cpus: usize,
327 pub memory: String,
328}
329
330#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
331#[serde(rename_all = "kebab-case")]
332#[serde(deny_unknown_fields)]
333pub struct RedisConfig {
334 #[serde(rename = "use")]
335 phantom_use: Option<String>,
336 pub id: String,
337
338 pub port: u16,
339 pub address: String,
340}
341
342#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
343#[serde(rename_all = "kebab-case")]
344#[serde(deny_unknown_fields)]
345pub enum Application {
346 Metastore,
347 Connector,
348}
349
350#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
351#[serde(rename_all = "kebab-case")]
352#[serde(deny_unknown_fields)]
353pub struct MySqlConfig {
354 #[serde(rename = "use")]
355 phantom_use: Option<String>,
356 pub id: String,
357
358 pub port: u16,
359 pub address: String,
360
361 pub user: String,
362 pub password: String,
363 pub database: String,
364
365 pub application: Application,
366 pub image: String,
367 pub user_managed: bool,
368 pub persist_data: bool,
369}
370
371#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
372#[serde(rename_all = "kebab-case")]
373#[serde(deny_unknown_fields)]
374pub struct PostgresConfig {
375 #[serde(rename = "use")]
376 phantom_use: Option<String>,
377 pub id: String,
378
379 pub port: u16,
380 pub address: String,
381
382 pub user: String,
383 pub password: String,
384 pub database: String,
385
386 pub application: Application,
387 pub image: String,
388 pub user_managed: bool,
389 pub persist_data: bool,
390}
391
392#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
393#[serde(rename_all = "kebab-case")]
394#[serde(deny_unknown_fields)]
395pub struct SqlServerConfig {
396 #[serde(rename = "use")]
397 phantom_use: Option<String>,
398 pub id: String,
399
400 pub port: u16,
401 pub address: String,
402
403 pub user: String,
404 pub password: String,
405 pub database: String,
406
407 pub image: String,
408 pub user_managed: bool,
409 pub persist_data: bool,
410}
411
412#[derive(Clone, Debug, PartialEq)]
414pub enum ServiceConfig {
415 ComputeNode(ComputeNodeConfig),
416 MetaNode(MetaNodeConfig),
417 Frontend(FrontendConfig),
418 Compactor(CompactorConfig),
419 Minio(MinioConfig),
420 Sqlite(SqliteConfig),
421 Prometheus(PrometheusConfig),
422 Grafana(GrafanaConfig),
423 Tempo(TempoConfig),
424 Opendal(OpendalConfig),
425 AwsS3(AwsS3Config),
426 Kafka(KafkaConfig),
427 SchemaRegistry(SchemaRegistryConfig),
428 Pubsub(PubsubConfig),
429 Redis(RedisConfig),
430 RedPanda(RedPandaConfig),
431 MySql(MySqlConfig),
432 Postgres(PostgresConfig),
433 SqlServer(SqlServerConfig),
434}
435
436#[derive(PartialEq, Eq, Hash, Debug)]
437pub enum TaskGroup {
438 RisingWave,
439 Observability,
440 Kafka,
441 Pubsub,
442 MySql,
443 Postgres,
444 SqlServer,
445 Redis,
446}
447
448impl ServiceConfig {
449 pub fn id(&self) -> &str {
450 match self {
451 Self::ComputeNode(c) => &c.id,
452 Self::MetaNode(c) => &c.id,
453 Self::Frontend(c) => &c.id,
454 Self::Compactor(c) => &c.id,
455 Self::Minio(c) => &c.id,
456 Self::Sqlite(c) => &c.id,
457 Self::Prometheus(c) => &c.id,
458 Self::Grafana(c) => &c.id,
459 Self::Tempo(c) => &c.id,
460 Self::AwsS3(c) => &c.id,
461 Self::Kafka(c) => &c.id,
462 Self::Pubsub(c) => &c.id,
463 Self::Redis(c) => &c.id,
464 Self::RedPanda(c) => &c.id,
465 Self::Opendal(c) => &c.id,
466 Self::MySql(c) => &c.id,
467 Self::Postgres(c) => &c.id,
468 Self::SqlServer(c) => &c.id,
469 Self::SchemaRegistry(c) => &c.id,
470 }
471 }
472
473 pub fn port(&self) -> Option<u16> {
475 match self {
476 Self::ComputeNode(c) => Some(c.port),
477 Self::MetaNode(c) => Some(c.port),
478 Self::Frontend(c) => Some(c.port),
479 Self::Compactor(c) => Some(c.port),
480 Self::Minio(c) => Some(c.port),
481 Self::Sqlite(_) => None,
482 Self::Prometheus(c) => Some(c.port),
483 Self::Grafana(c) => Some(c.port),
484 Self::Tempo(c) => Some(c.port),
485 Self::AwsS3(_) => None,
486 Self::Kafka(c) => Some(c.port),
487 Self::Pubsub(c) => Some(c.port),
488 Self::Redis(c) => Some(c.port),
489 Self::RedPanda(_c) => None,
490 Self::Opendal(_) => None,
491 Self::MySql(c) => Some(c.port),
492 Self::Postgres(c) => Some(c.port),
493 Self::SqlServer(c) => Some(c.port),
494 Self::SchemaRegistry(c) => Some(c.port),
495 }
496 }
497
498 pub fn user_managed(&self) -> bool {
499 match self {
500 Self::ComputeNode(c) => c.user_managed,
501 Self::MetaNode(c) => c.user_managed,
502 Self::Frontend(c) => c.user_managed,
503 Self::Compactor(c) => c.user_managed,
504 Self::Minio(_c) => false,
505 Self::Sqlite(_c) => false,
506 Self::Prometheus(_c) => false,
507 Self::Grafana(_c) => false,
508 Self::Tempo(_c) => false,
509 Self::AwsS3(_c) => false,
510 Self::Kafka(c) => c.user_managed,
511 Self::Pubsub(_c) => false,
512 Self::Redis(_c) => false,
513 Self::RedPanda(_c) => false,
514 Self::Opendal(_c) => false,
515 Self::MySql(c) => c.user_managed,
516 Self::Postgres(c) => c.user_managed,
517 Self::SqlServer(c) => c.user_managed,
518 Self::SchemaRegistry(c) => c.user_managed,
519 }
520 }
521
522 pub fn task_group(&self) -> TaskGroup {
523 use TaskGroup::*;
524 match self {
525 ServiceConfig::ComputeNode(_)
526 | ServiceConfig::MetaNode(_)
527 | ServiceConfig::Frontend(_)
528 | ServiceConfig::Compactor(_)
529 | ServiceConfig::Minio(_)
530 | ServiceConfig::Sqlite(_) => RisingWave,
531 ServiceConfig::Prometheus(_) | ServiceConfig::Grafana(_) | ServiceConfig::Tempo(_) => {
532 Observability
533 }
534 ServiceConfig::Opendal(_) | ServiceConfig::AwsS3(_) => RisingWave,
535 ServiceConfig::Kafka(_)
536 | ServiceConfig::SchemaRegistry(_)
537 | ServiceConfig::RedPanda(_) => Kafka,
538 ServiceConfig::Pubsub(_) => Pubsub,
539 ServiceConfig::Redis(_) => Redis,
540 ServiceConfig::MySql(my_sql_config) => {
541 if matches!(my_sql_config.application, Application::Metastore) {
542 RisingWave
543 } else {
544 MySql
545 }
546 }
547 ServiceConfig::Postgres(postgres_config) => {
548 if matches!(postgres_config.application, Application::Metastore) {
549 RisingWave
550 } else {
551 Postgres
552 }
553 }
554 ServiceConfig::SqlServer(_) => SqlServer,
555 }
556 }
557}
558
559mod string {
560 use std::fmt::Display;
561 use std::str::FromStr;
562
563 use serde::{Deserialize, Deserializer, Serializer, de};
564
565 pub fn serialize<T, S>(value: &T, serializer: S) -> Result<S::Ok, S::Error>
566 where
567 T: Display,
568 S: Serializer,
569 {
570 serializer.collect_str(value)
571 }
572
573 pub fn deserialize<'de, T, D>(deserializer: D) -> Result<T, D::Error>
574 where
575 T: FromStr,
576 T::Err: Display,
577 D: Deserializer<'de>,
578 {
579 String::deserialize(deserializer)?
580 .parse()
581 .map_err(de::Error::custom)
582 }
583}