risedev/
service_config.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use serde::{Deserialize, Serialize};
16
17#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
18#[serde(rename_all = "kebab-case")]
19#[serde(deny_unknown_fields)]
20pub struct ComputeNodeConfig {
21    #[serde(rename = "use")]
22    phantom_use: Option<String>,
23    pub id: String,
24
25    pub address: String,
26    #[serde(with = "string")]
27    pub port: u16,
28    pub listen_address: String,
29    pub exporter_port: u16,
30    pub async_stack_trace: String,
31    pub enable_tiered_cache: bool,
32
33    pub provide_minio: Option<Vec<MinioConfig>>,
34    pub provide_meta_node: Option<Vec<MetaNodeConfig>>,
35    pub provide_compute_node: Option<Vec<ComputeNodeConfig>>,
36    pub provide_opendal: Option<Vec<OpendalConfig>>,
37    pub provide_aws_s3: Option<Vec<AwsS3Config>>,
38    pub provide_moat: Option<Vec<MoatConfig>>,
39    pub provide_tempo: Option<Vec<TempoConfig>>,
40    pub user_managed: bool,
41    pub resource_group: String,
42
43    pub total_memory_bytes: usize,
44    pub parallelism: usize,
45    pub role: String,
46}
47
48#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
49#[serde(rename_all = "kebab-case")]
50#[serde(deny_unknown_fields)]
51pub enum MetaBackend {
52    Memory,
53    Sqlite,
54    Postgres,
55    Mysql,
56    Env,
57}
58
59#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
60#[serde(rename_all = "kebab-case")]
61#[serde(deny_unknown_fields)]
62pub struct MetaNodeConfig {
63    #[serde(rename = "use")]
64    phantom_use: Option<String>,
65    pub id: String,
66
67    pub address: String,
68    #[serde(with = "string")]
69    pub port: u16,
70    pub listen_address: String,
71    pub dashboard_port: u16,
72    pub exporter_port: u16,
73
74    pub user_managed: bool,
75
76    pub meta_backend: MetaBackend,
77    pub provide_sqlite_backend: Option<Vec<SqliteConfig>>,
78    pub provide_postgres_backend: Option<Vec<PostgresConfig>>,
79    pub provide_mysql_backend: Option<Vec<MySqlConfig>>,
80    pub provide_prometheus: Option<Vec<PrometheusConfig>>,
81
82    pub provide_compute_node: Option<Vec<ComputeNodeConfig>>,
83    pub provide_compactor: Option<Vec<CompactorConfig>>,
84
85    pub provide_tempo: Option<Vec<TempoConfig>>,
86
87    pub provide_aws_s3: Option<Vec<AwsS3Config>>,
88    pub provide_minio: Option<Vec<MinioConfig>>,
89    pub provide_opendal: Option<Vec<OpendalConfig>>,
90    pub provide_moat: Option<Vec<MoatConfig>>,
91}
92
93#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
94#[serde(rename_all = "kebab-case")]
95#[serde(deny_unknown_fields)]
96pub struct FrontendConfig {
97    #[serde(rename = "use")]
98    phantom_use: Option<String>,
99    pub id: String,
100
101    pub address: String,
102    #[serde(with = "string")]
103    pub port: u16,
104    pub listen_address: String,
105    pub exporter_port: u16,
106    pub health_check_port: u16,
107
108    pub provide_meta_node: Option<Vec<MetaNodeConfig>>,
109    pub provide_tempo: Option<Vec<TempoConfig>>,
110    pub provide_prometheus: Option<Vec<PrometheusConfig>>,
111
112    pub user_managed: bool,
113}
114
115#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
116#[serde(rename_all = "kebab-case")]
117#[serde(deny_unknown_fields)]
118pub struct CompactorConfig {
119    #[serde(rename = "use")]
120    phantom_use: Option<String>,
121    pub id: String,
122
123    pub address: String,
124    #[serde(with = "string")]
125    pub port: u16,
126    pub listen_address: String,
127    pub exporter_port: u16,
128
129    pub provide_minio: Option<Vec<MinioConfig>>,
130
131    pub provide_meta_node: Option<Vec<MetaNodeConfig>>,
132    pub provide_tempo: Option<Vec<TempoConfig>>,
133
134    pub user_managed: bool,
135    pub compaction_worker_threads_number: Option<usize>,
136
137    pub compactor_mode: String,
138}
139
140#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
141#[serde(rename_all = "kebab-case")]
142#[serde(deny_unknown_fields)]
143pub struct MinioConfig {
144    #[serde(rename = "use")]
145    phantom_use: Option<String>,
146    pub id: String,
147
148    pub address: String,
149    #[serde(with = "string")]
150    pub port: u16,
151    pub listen_address: String,
152
153    pub console_address: String,
154    #[serde(with = "string")]
155    pub console_port: u16,
156
157    pub root_user: String,
158    pub root_password: String,
159    pub hummock_bucket: String,
160
161    pub provide_prometheus: Option<Vec<PrometheusConfig>>,
162
163    // For rate limiting minio in a test environment.
164    pub api_requests_max: usize,
165    pub api_requests_deadline: String,
166}
167
168#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
169#[serde(rename_all = "kebab-case")]
170#[serde(deny_unknown_fields)]
171pub struct SqliteConfig {
172    #[serde(rename = "use")]
173    phantom_use: Option<String>,
174    pub id: String,
175
176    pub file: String,
177}
178
179#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
180#[serde(rename_all = "kebab-case")]
181#[serde(deny_unknown_fields)]
182pub struct PrometheusConfig {
183    #[serde(rename = "use")]
184    phantom_use: Option<String>,
185    pub id: String,
186
187    pub address: String,
188    #[serde(with = "string")]
189    pub port: u16,
190    pub listen_address: String,
191
192    pub remote_write: bool,
193    pub remote_write_region: String,
194    pub remote_write_url: String,
195
196    pub scrape_interval: String,
197
198    pub provide_compute_node: Option<Vec<ComputeNodeConfig>>,
199    pub provide_meta_node: Option<Vec<MetaNodeConfig>>,
200    pub provide_minio: Option<Vec<MinioConfig>>,
201    pub provide_compactor: Option<Vec<CompactorConfig>>,
202    pub provide_frontend: Option<Vec<FrontendConfig>>,
203}
204
205#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
206#[serde(rename_all = "kebab-case")]
207#[serde(deny_unknown_fields)]
208pub struct GrafanaConfig {
209    #[serde(rename = "use")]
210    phantom_use: Option<String>,
211    pub id: String,
212    pub address: String,
213    pub listen_address: String,
214    pub port: u16,
215
216    pub provide_prometheus: Option<Vec<PrometheusConfig>>,
217    pub provide_tempo: Option<Vec<TempoConfig>>,
218}
219
220#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
221#[serde(rename_all = "kebab-case")]
222#[serde(deny_unknown_fields)]
223pub struct TempoConfig {
224    #[serde(rename = "use")]
225    phantom_use: Option<String>,
226    pub id: String,
227
228    pub listen_address: String,
229    pub address: String,
230    pub port: u16,
231    pub otlp_port: u16,
232    pub max_bytes_per_trace: usize,
233}
234
235#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
236#[serde(rename_all = "kebab-case")]
237#[serde(deny_unknown_fields)]
238pub struct AwsS3Config {
239    #[serde(rename = "use")]
240    phantom_use: Option<String>,
241    pub id: String,
242    pub bucket: String,
243}
244
245#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
246#[serde(rename_all = "kebab-case")]
247#[serde(deny_unknown_fields)]
248pub struct OpendalConfig {
249    #[serde(rename = "use")]
250    phantom_use: Option<String>,
251
252    pub id: String,
253    pub engine: String,
254    pub namenode: String,
255    pub bucket: String,
256}
257
258#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
259#[serde(rename_all = "kebab-case")]
260#[serde(deny_unknown_fields)]
261pub struct KafkaConfig {
262    #[serde(rename = "use")]
263    phantom_use: Option<String>,
264    pub id: String,
265
266    /// Advertise address
267    pub address: String,
268    #[serde(with = "string")]
269    pub port: u16,
270    /// Port for other services in docker. They need to connect to `host.docker.internal`, while the host
271    /// need to connect to `localhost`.
272    pub docker_port: u16,
273
274    #[serde(with = "string")]
275    pub controller_port: u16,
276
277    pub image: String,
278    pub persist_data: bool,
279    pub node_id: u32,
280
281    pub user_managed: bool,
282}
283
284#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
285#[serde(rename_all = "kebab-case")]
286#[serde(deny_unknown_fields)]
287pub struct SchemaRegistryConfig {
288    #[serde(rename = "use")]
289    phantom_use: Option<String>,
290
291    pub id: String,
292
293    pub address: String,
294    #[serde(with = "string")]
295    pub port: u16,
296
297    pub provide_kafka: Option<Vec<KafkaConfig>>,
298
299    pub image: String,
300    /// Redpanda supports schema registry natively. You can configure a `user_managed` schema registry
301    /// to use with redpanda.
302    pub user_managed: bool,
303}
304
305#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
306#[serde(rename_all = "kebab-case")]
307#[serde(deny_unknown_fields)]
308pub struct PubsubConfig {
309    #[serde(rename = "use")]
310    phantom_use: Option<String>,
311    pub id: String,
312    #[serde(with = "string")]
313    pub port: u16,
314    pub address: String,
315
316    pub persist_data: bool,
317}
318
319#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
320#[serde(rename_all = "kebab-case")]
321#[serde(deny_unknown_fields)]
322pub struct PulsarConfig {
323    #[serde(rename = "use")]
324    phantom_use: Option<String>,
325    pub id: String,
326
327    pub address: String,
328    pub broker_port: u16,
329    pub http_port: u16,
330
331    pub user_managed: bool,
332    pub image: String,
333    pub persist_data: bool,
334}
335
336#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
337#[serde(rename_all = "kebab-case")]
338#[serde(deny_unknown_fields)]
339pub struct RedisConfig {
340    #[serde(rename = "use")]
341    phantom_use: Option<String>,
342    pub id: String,
343
344    pub port: u16,
345    pub address: String,
346}
347
348#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
349#[serde(rename_all = "kebab-case")]
350#[serde(deny_unknown_fields)]
351pub enum Application {
352    Metastore,
353    Connector,
354}
355
356#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
357#[serde(rename_all = "kebab-case")]
358#[serde(deny_unknown_fields)]
359pub struct MySqlConfig {
360    #[serde(rename = "use")]
361    phantom_use: Option<String>,
362    pub id: String,
363
364    pub port: u16,
365    pub address: String,
366
367    pub user: String,
368    pub password: String,
369    pub database: String,
370
371    pub application: Application,
372    pub image: String,
373    pub user_managed: bool,
374    pub persist_data: bool,
375}
376
377#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
378#[serde(rename_all = "kebab-case")]
379#[serde(deny_unknown_fields)]
380pub struct PostgresConfig {
381    #[serde(rename = "use")]
382    phantom_use: Option<String>,
383    pub id: String,
384
385    pub port: u16,
386    pub address: String,
387
388    pub user: String,
389    pub password: String,
390    pub database: String,
391
392    pub application: Application,
393    pub image: String,
394    pub user_managed: bool,
395    pub persist_data: bool,
396
397    // Inject latency into any network calls to the postgres service.
398    pub latency_ms: u32,
399}
400
401#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
402#[serde(rename_all = "kebab-case")]
403#[serde(deny_unknown_fields)]
404pub struct SqlServerConfig {
405    #[serde(rename = "use")]
406    phantom_use: Option<String>,
407    pub id: String,
408
409    pub port: u16,
410    pub address: String,
411
412    pub user: String,
413    pub password: String,
414    pub database: String,
415
416    pub image: String,
417    pub user_managed: bool,
418    pub persist_data: bool,
419}
420
421#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
422#[serde(rename_all = "kebab-case")]
423#[serde(deny_unknown_fields)]
424pub struct LakekeeperConfig {
425    #[serde(rename = "use")]
426    phantom_use: Option<String>,
427    pub id: String,
428
429    pub port: u16,
430    pub address: String,
431
432    pub user_managed: bool,
433    pub persist_data: bool,
434
435    pub catalog_backend: String,
436    pub encryption_key: String,
437    pub provide_postgres_backend: Option<Vec<PostgresConfig>>,
438    pub provide_minio: Option<Vec<MinioConfig>>,
439}
440
441#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
442#[serde(rename_all = "kebab-case")]
443#[serde(deny_unknown_fields)]
444pub struct MoatConfig {
445    #[serde(rename = "use")]
446    phantom_use: Option<String>,
447    pub id: String,
448
449    pub address: String,
450    pub port: u16,
451
452    pub provide_minio: Option<Vec<MinioConfig>>,
453}
454
455/// All service configuration
456#[derive(Clone, Debug, PartialEq)]
457pub enum ServiceConfig {
458    ComputeNode(ComputeNodeConfig),
459    MetaNode(MetaNodeConfig),
460    Frontend(FrontendConfig),
461    Compactor(CompactorConfig),
462    Minio(MinioConfig),
463    Sqlite(SqliteConfig),
464    Prometheus(PrometheusConfig),
465    Grafana(GrafanaConfig),
466    Tempo(TempoConfig),
467    Opendal(OpendalConfig),
468    AwsS3(AwsS3Config),
469    Kafka(KafkaConfig),
470    SchemaRegistry(SchemaRegistryConfig),
471    Pubsub(PubsubConfig),
472    Pulsar(PulsarConfig),
473    Redis(RedisConfig),
474    MySql(MySqlConfig),
475    Postgres(PostgresConfig),
476    SqlServer(SqlServerConfig),
477    Lakekeeper(LakekeeperConfig),
478    Moat(MoatConfig),
479}
480
481#[derive(PartialEq, Eq, Hash, Debug)]
482pub enum TaskGroup {
483    RisingWave,
484    Observability,
485    Kafka,
486    Pubsub,
487    Pulsar,
488    MySql,
489    Postgres,
490    SqlServer,
491    Redis,
492    Lakekeeper,
493    Moat,
494}
495
496impl ServiceConfig {
497    pub fn id(&self) -> &str {
498        match self {
499            Self::ComputeNode(c) => &c.id,
500            Self::MetaNode(c) => &c.id,
501            Self::Frontend(c) => &c.id,
502            Self::Compactor(c) => &c.id,
503            Self::Minio(c) => &c.id,
504            Self::Sqlite(c) => &c.id,
505            Self::Prometheus(c) => &c.id,
506            Self::Grafana(c) => &c.id,
507            Self::Tempo(c) => &c.id,
508            Self::AwsS3(c) => &c.id,
509            Self::Kafka(c) => &c.id,
510            Self::Pubsub(c) => &c.id,
511            Self::Pulsar(c) => &c.id,
512            Self::Redis(c) => &c.id,
513            Self::Opendal(c) => &c.id,
514            Self::MySql(c) => &c.id,
515            Self::Postgres(c) => &c.id,
516            Self::SqlServer(c) => &c.id,
517            Self::SchemaRegistry(c) => &c.id,
518            Self::Lakekeeper(c) => &c.id,
519            Self::Moat(c) => &c.id,
520        }
521    }
522
523    /// Used to check whether the port is occupied before running the service.
524    pub fn port(&self) -> Option<u16> {
525        match self {
526            Self::ComputeNode(c) => Some(c.port),
527            Self::MetaNode(c) => Some(c.port),
528            Self::Frontend(c) => Some(c.port),
529            Self::Compactor(c) => Some(c.port),
530            Self::Minio(c) => Some(c.port),
531            Self::Sqlite(_) => None,
532            Self::Prometheus(c) => Some(c.port),
533            Self::Grafana(c) => Some(c.port),
534            Self::Tempo(c) => Some(c.port),
535            Self::AwsS3(_) => None,
536            Self::Kafka(c) => Some(c.port),
537            Self::Pubsub(c) => Some(c.port),
538            Self::Pulsar(c) => Some(c.http_port),
539            Self::Redis(c) => Some(c.port),
540            Self::Opendal(_) => None,
541            Self::MySql(c) => Some(c.port),
542            Self::Postgres(c) => Some(c.port),
543            Self::SqlServer(c) => Some(c.port),
544            Self::SchemaRegistry(c) => Some(c.port),
545            Self::Lakekeeper(c) => Some(c.port),
546            Self::Moat(c) => Some(c.port),
547        }
548    }
549
550    pub fn user_managed(&self) -> bool {
551        match self {
552            Self::ComputeNode(c) => c.user_managed,
553            Self::MetaNode(c) => c.user_managed,
554            Self::Frontend(c) => c.user_managed,
555            Self::Compactor(c) => c.user_managed,
556            Self::Minio(_c) => false,
557            Self::Sqlite(_c) => false,
558            Self::Prometheus(_c) => false,
559            Self::Grafana(_c) => false,
560            Self::Tempo(_c) => false,
561            Self::AwsS3(_c) => false,
562            Self::Kafka(c) => c.user_managed,
563            Self::Pubsub(_c) => false,
564            Self::Pulsar(c) => c.user_managed,
565            Self::Redis(_c) => false,
566            Self::Opendal(_c) => false,
567            Self::MySql(c) => c.user_managed,
568            Self::Postgres(c) => c.user_managed,
569            Self::SqlServer(c) => c.user_managed,
570            Self::SchemaRegistry(c) => c.user_managed,
571            Self::Lakekeeper(c) => c.user_managed,
572            Self::Moat(_c) => false,
573        }
574    }
575
576    pub fn task_group(&self) -> TaskGroup {
577        use TaskGroup::*;
578        match self {
579            ServiceConfig::ComputeNode(_)
580            | ServiceConfig::MetaNode(_)
581            | ServiceConfig::Frontend(_)
582            | ServiceConfig::Compactor(_)
583            | ServiceConfig::Minio(_)
584            | ServiceConfig::Sqlite(_) => RisingWave,
585            ServiceConfig::Prometheus(_) | ServiceConfig::Grafana(_) | ServiceConfig::Tempo(_) => {
586                Observability
587            }
588            ServiceConfig::Opendal(_) | ServiceConfig::AwsS3(_) => RisingWave,
589            ServiceConfig::Kafka(_) | ServiceConfig::SchemaRegistry(_) => Kafka,
590            ServiceConfig::Pubsub(_) => Pubsub,
591            ServiceConfig::Pulsar(_) => Pulsar,
592            ServiceConfig::Redis(_) => Redis,
593            ServiceConfig::MySql(my_sql_config) => {
594                if matches!(my_sql_config.application, Application::Metastore) {
595                    RisingWave
596                } else {
597                    MySql
598                }
599            }
600            ServiceConfig::Postgres(postgres_config) => {
601                if matches!(postgres_config.application, Application::Metastore) {
602                    RisingWave
603                } else {
604                    Postgres
605                }
606            }
607            ServiceConfig::SqlServer(_) => SqlServer,
608            ServiceConfig::Lakekeeper(_) => Lakekeeper,
609            ServiceConfig::Moat(_) => Moat,
610        }
611    }
612}
613
614mod string {
615    use std::fmt::Display;
616    use std::str::FromStr;
617
618    use serde::{Deserialize, Deserializer, Serializer, de};
619
620    pub fn serialize<T, S>(value: &T, serializer: S) -> Result<S::Ok, S::Error>
621    where
622        T: Display,
623        S: Serializer,
624    {
625        serializer.collect_str(value)
626    }
627
628    pub fn deserialize<'de, T, D>(deserializer: D) -> Result<T, D::Error>
629    where
630        T: FromStr,
631        T::Err: Display,
632        D: Deserializer<'de>,
633    {
634        String::deserialize(deserializer)?
635            .parse()
636            .map_err(de::Error::custom)
637    }
638}