risedev/
service_config.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use serde::{Deserialize, Serialize};
16
17#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
18#[serde(rename_all = "kebab-case")]
19#[serde(deny_unknown_fields)]
20pub struct ComputeNodeConfig {
21    #[serde(rename = "use")]
22    phantom_use: Option<String>,
23    pub id: String,
24
25    pub address: String,
26    #[serde(with = "string")]
27    pub port: u16,
28    pub listen_address: String,
29    pub exporter_port: u16,
30    pub async_stack_trace: String,
31    pub enable_tiered_cache: bool,
32
33    pub provide_minio: Option<Vec<MinioConfig>>,
34    pub provide_meta_node: Option<Vec<MetaNodeConfig>>,
35    pub provide_compute_node: Option<Vec<ComputeNodeConfig>>,
36    pub provide_opendal: Option<Vec<OpendalConfig>>,
37    pub provide_aws_s3: Option<Vec<AwsS3Config>>,
38    pub provide_moat: Option<Vec<MoatConfig>>,
39    pub provide_tempo: Option<Vec<TempoConfig>>,
40    pub user_managed: bool,
41    pub resource_group: String,
42
43    pub total_memory_bytes: usize,
44    pub parallelism: usize,
45    pub role: String,
46}
47
48#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
49#[serde(rename_all = "kebab-case")]
50#[serde(deny_unknown_fields)]
51pub enum MetaBackend {
52    Memory,
53    Sqlite,
54    Postgres,
55    Mysql,
56    Env,
57}
58
59#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
60#[serde(rename_all = "kebab-case")]
61#[serde(deny_unknown_fields)]
62pub struct MetaNodeConfig {
63    #[serde(rename = "use")]
64    phantom_use: Option<String>,
65    pub id: String,
66
67    pub address: String,
68    #[serde(with = "string")]
69    pub port: u16,
70    pub listen_address: String,
71    pub dashboard_port: u16,
72    pub exporter_port: u16,
73
74    pub user_managed: bool,
75
76    pub meta_backend: MetaBackend,
77    pub provide_sqlite_backend: Option<Vec<SqliteConfig>>,
78    pub provide_postgres_backend: Option<Vec<PostgresConfig>>,
79    pub provide_mysql_backend: Option<Vec<MySqlConfig>>,
80    pub provide_prometheus: Option<Vec<PrometheusConfig>>,
81
82    pub provide_compute_node: Option<Vec<ComputeNodeConfig>>,
83    pub provide_compactor: Option<Vec<CompactorConfig>>,
84
85    pub provide_tempo: Option<Vec<TempoConfig>>,
86
87    pub provide_aws_s3: Option<Vec<AwsS3Config>>,
88    pub provide_minio: Option<Vec<MinioConfig>>,
89    pub provide_opendal: Option<Vec<OpendalConfig>>,
90    pub provide_moat: Option<Vec<MoatConfig>>,
91}
92
93#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
94#[serde(rename_all = "kebab-case")]
95#[serde(deny_unknown_fields)]
96pub struct FrontendConfig {
97    #[serde(rename = "use")]
98    phantom_use: Option<String>,
99    pub id: String,
100
101    pub address: String,
102    #[serde(with = "string")]
103    pub port: u16,
104    pub listen_address: String,
105    pub exporter_port: u16,
106    pub health_check_port: u16,
107
108    pub provide_meta_node: Option<Vec<MetaNodeConfig>>,
109    pub provide_tempo: Option<Vec<TempoConfig>>,
110
111    pub user_managed: bool,
112}
113
114#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
115#[serde(rename_all = "kebab-case")]
116#[serde(deny_unknown_fields)]
117pub struct CompactorConfig {
118    #[serde(rename = "use")]
119    phantom_use: Option<String>,
120    pub id: String,
121
122    pub address: String,
123    #[serde(with = "string")]
124    pub port: u16,
125    pub listen_address: String,
126    pub exporter_port: u16,
127
128    pub provide_minio: Option<Vec<MinioConfig>>,
129
130    pub provide_meta_node: Option<Vec<MetaNodeConfig>>,
131    pub provide_tempo: Option<Vec<TempoConfig>>,
132
133    pub user_managed: bool,
134    pub compaction_worker_threads_number: Option<usize>,
135
136    pub compactor_mode: String,
137}
138
139#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
140#[serde(rename_all = "kebab-case")]
141#[serde(deny_unknown_fields)]
142pub struct MinioConfig {
143    #[serde(rename = "use")]
144    phantom_use: Option<String>,
145    pub id: String,
146
147    pub address: String,
148    #[serde(with = "string")]
149    pub port: u16,
150    pub listen_address: String,
151
152    pub console_address: String,
153    #[serde(with = "string")]
154    pub console_port: u16,
155
156    pub root_user: String,
157    pub root_password: String,
158    pub hummock_bucket: String,
159
160    pub provide_prometheus: Option<Vec<PrometheusConfig>>,
161
162    // For rate limiting minio in a test environment.
163    pub api_requests_max: usize,
164    pub api_requests_deadline: String,
165}
166
167#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
168#[serde(rename_all = "kebab-case")]
169#[serde(deny_unknown_fields)]
170pub struct SqliteConfig {
171    #[serde(rename = "use")]
172    phantom_use: Option<String>,
173    pub id: String,
174
175    pub file: String,
176}
177
178#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
179#[serde(rename_all = "kebab-case")]
180#[serde(deny_unknown_fields)]
181pub struct PrometheusConfig {
182    #[serde(rename = "use")]
183    phantom_use: Option<String>,
184    pub id: String,
185
186    pub address: String,
187    #[serde(with = "string")]
188    pub port: u16,
189    pub listen_address: String,
190
191    pub remote_write: bool,
192    pub remote_write_region: String,
193    pub remote_write_url: String,
194
195    pub scrape_interval: String,
196
197    pub provide_compute_node: Option<Vec<ComputeNodeConfig>>,
198    pub provide_meta_node: Option<Vec<MetaNodeConfig>>,
199    pub provide_minio: Option<Vec<MinioConfig>>,
200    pub provide_compactor: Option<Vec<CompactorConfig>>,
201    pub provide_frontend: Option<Vec<FrontendConfig>>,
202}
203
204#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
205#[serde(rename_all = "kebab-case")]
206#[serde(deny_unknown_fields)]
207pub struct GrafanaConfig {
208    #[serde(rename = "use")]
209    phantom_use: Option<String>,
210    pub id: String,
211    pub address: String,
212    pub listen_address: String,
213    pub port: u16,
214
215    pub provide_prometheus: Option<Vec<PrometheusConfig>>,
216    pub provide_tempo: Option<Vec<TempoConfig>>,
217}
218
219#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
220#[serde(rename_all = "kebab-case")]
221#[serde(deny_unknown_fields)]
222pub struct TempoConfig {
223    #[serde(rename = "use")]
224    phantom_use: Option<String>,
225    pub id: String,
226
227    pub listen_address: String,
228    pub address: String,
229    pub port: u16,
230    pub otlp_port: u16,
231    pub max_bytes_per_trace: usize,
232}
233
234#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
235#[serde(rename_all = "kebab-case")]
236#[serde(deny_unknown_fields)]
237pub struct AwsS3Config {
238    #[serde(rename = "use")]
239    phantom_use: Option<String>,
240    pub id: String,
241    pub bucket: String,
242}
243
244#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
245#[serde(rename_all = "kebab-case")]
246#[serde(deny_unknown_fields)]
247pub struct OpendalConfig {
248    #[serde(rename = "use")]
249    phantom_use: Option<String>,
250
251    pub id: String,
252    pub engine: String,
253    pub namenode: String,
254    pub bucket: String,
255}
256
257#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
258#[serde(rename_all = "kebab-case")]
259#[serde(deny_unknown_fields)]
260pub struct KafkaConfig {
261    #[serde(rename = "use")]
262    phantom_use: Option<String>,
263    pub id: String,
264
265    /// Advertise address
266    pub address: String,
267    #[serde(with = "string")]
268    pub port: u16,
269    /// Port for other services in docker. They need to connect to `host.docker.internal`, while the host
270    /// need to connect to `localhost`.
271    pub docker_port: u16,
272
273    #[serde(with = "string")]
274    pub controller_port: u16,
275
276    pub image: String,
277    pub persist_data: bool,
278    pub node_id: u32,
279
280    pub user_managed: bool,
281}
282
283#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
284#[serde(rename_all = "kebab-case")]
285#[serde(deny_unknown_fields)]
286pub struct SchemaRegistryConfig {
287    #[serde(rename = "use")]
288    phantom_use: Option<String>,
289
290    pub id: String,
291
292    pub address: String,
293    #[serde(with = "string")]
294    pub port: u16,
295
296    pub provide_kafka: Option<Vec<KafkaConfig>>,
297
298    pub image: String,
299    /// Redpanda supports schema registry natively. You can configure a `user_managed` schema registry
300    /// to use with redpanda.
301    pub user_managed: bool,
302}
303
304#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
305#[serde(rename_all = "kebab-case")]
306#[serde(deny_unknown_fields)]
307pub struct PubsubConfig {
308    #[serde(rename = "use")]
309    phantom_use: Option<String>,
310    pub id: String,
311    #[serde(with = "string")]
312    pub port: u16,
313    pub address: String,
314
315    pub persist_data: bool,
316}
317
318#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
319#[serde(rename_all = "kebab-case")]
320#[serde(deny_unknown_fields)]
321pub struct PulsarConfig {
322    #[serde(rename = "use")]
323    phantom_use: Option<String>,
324    pub id: String,
325
326    pub address: String,
327    pub broker_port: u16,
328    pub http_port: u16,
329
330    pub user_managed: bool,
331    pub image: String,
332    pub persist_data: bool,
333}
334
335#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
336#[serde(rename_all = "kebab-case")]
337#[serde(deny_unknown_fields)]
338pub struct RedisConfig {
339    #[serde(rename = "use")]
340    phantom_use: Option<String>,
341    pub id: String,
342
343    pub port: u16,
344    pub address: String,
345}
346
347#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
348#[serde(rename_all = "kebab-case")]
349#[serde(deny_unknown_fields)]
350pub enum Application {
351    Metastore,
352    Connector,
353}
354
355#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
356#[serde(rename_all = "kebab-case")]
357#[serde(deny_unknown_fields)]
358pub struct MySqlConfig {
359    #[serde(rename = "use")]
360    phantom_use: Option<String>,
361    pub id: String,
362
363    pub port: u16,
364    pub address: String,
365
366    pub user: String,
367    pub password: String,
368    pub database: String,
369
370    pub application: Application,
371    pub image: String,
372    pub user_managed: bool,
373    pub persist_data: bool,
374}
375
376#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
377#[serde(rename_all = "kebab-case")]
378#[serde(deny_unknown_fields)]
379pub struct PostgresConfig {
380    #[serde(rename = "use")]
381    phantom_use: Option<String>,
382    pub id: String,
383
384    pub port: u16,
385    pub address: String,
386
387    pub user: String,
388    pub password: String,
389    pub database: String,
390
391    pub application: Application,
392    pub image: String,
393    pub user_managed: bool,
394    pub persist_data: bool,
395
396    // Inject latency into any network calls to the postgres service.
397    pub latency_ms: u32,
398}
399
400#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
401#[serde(rename_all = "kebab-case")]
402#[serde(deny_unknown_fields)]
403pub struct SqlServerConfig {
404    #[serde(rename = "use")]
405    phantom_use: Option<String>,
406    pub id: String,
407
408    pub port: u16,
409    pub address: String,
410
411    pub user: String,
412    pub password: String,
413    pub database: String,
414
415    pub image: String,
416    pub user_managed: bool,
417    pub persist_data: bool,
418}
419
420#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
421#[serde(rename_all = "kebab-case")]
422#[serde(deny_unknown_fields)]
423pub struct LakekeeperConfig {
424    #[serde(rename = "use")]
425    phantom_use: Option<String>,
426    pub id: String,
427
428    pub port: u16,
429    pub address: String,
430
431    pub user_managed: bool,
432    pub persist_data: bool,
433
434    pub catalog_backend: String,
435    pub encryption_key: String,
436    pub provide_postgres_backend: Option<Vec<PostgresConfig>>,
437    pub provide_minio: Option<Vec<MinioConfig>>,
438}
439
440#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
441#[serde(rename_all = "kebab-case")]
442#[serde(deny_unknown_fields)]
443pub struct MoatConfig {
444    #[serde(rename = "use")]
445    phantom_use: Option<String>,
446    pub id: String,
447
448    pub address: String,
449    pub port: u16,
450
451    pub provide_minio: Option<Vec<MinioConfig>>,
452}
453
454/// All service configuration
455#[derive(Clone, Debug, PartialEq)]
456pub enum ServiceConfig {
457    ComputeNode(ComputeNodeConfig),
458    MetaNode(MetaNodeConfig),
459    Frontend(FrontendConfig),
460    Compactor(CompactorConfig),
461    Minio(MinioConfig),
462    Sqlite(SqliteConfig),
463    Prometheus(PrometheusConfig),
464    Grafana(GrafanaConfig),
465    Tempo(TempoConfig),
466    Opendal(OpendalConfig),
467    AwsS3(AwsS3Config),
468    Kafka(KafkaConfig),
469    SchemaRegistry(SchemaRegistryConfig),
470    Pubsub(PubsubConfig),
471    Pulsar(PulsarConfig),
472    Redis(RedisConfig),
473    MySql(MySqlConfig),
474    Postgres(PostgresConfig),
475    SqlServer(SqlServerConfig),
476    Lakekeeper(LakekeeperConfig),
477    Moat(MoatConfig),
478}
479
480#[derive(PartialEq, Eq, Hash, Debug)]
481pub enum TaskGroup {
482    RisingWave,
483    Observability,
484    Kafka,
485    Pubsub,
486    Pulsar,
487    MySql,
488    Postgres,
489    SqlServer,
490    Redis,
491    Lakekeeper,
492    Moat,
493}
494
495impl ServiceConfig {
496    pub fn id(&self) -> &str {
497        match self {
498            Self::ComputeNode(c) => &c.id,
499            Self::MetaNode(c) => &c.id,
500            Self::Frontend(c) => &c.id,
501            Self::Compactor(c) => &c.id,
502            Self::Minio(c) => &c.id,
503            Self::Sqlite(c) => &c.id,
504            Self::Prometheus(c) => &c.id,
505            Self::Grafana(c) => &c.id,
506            Self::Tempo(c) => &c.id,
507            Self::AwsS3(c) => &c.id,
508            Self::Kafka(c) => &c.id,
509            Self::Pubsub(c) => &c.id,
510            Self::Pulsar(c) => &c.id,
511            Self::Redis(c) => &c.id,
512            Self::Opendal(c) => &c.id,
513            Self::MySql(c) => &c.id,
514            Self::Postgres(c) => &c.id,
515            Self::SqlServer(c) => &c.id,
516            Self::SchemaRegistry(c) => &c.id,
517            Self::Lakekeeper(c) => &c.id,
518            Self::Moat(c) => &c.id,
519        }
520    }
521
522    /// Used to check whether the port is occupied before running the service.
523    pub fn port(&self) -> Option<u16> {
524        match self {
525            Self::ComputeNode(c) => Some(c.port),
526            Self::MetaNode(c) => Some(c.port),
527            Self::Frontend(c) => Some(c.port),
528            Self::Compactor(c) => Some(c.port),
529            Self::Minio(c) => Some(c.port),
530            Self::Sqlite(_) => None,
531            Self::Prometheus(c) => Some(c.port),
532            Self::Grafana(c) => Some(c.port),
533            Self::Tempo(c) => Some(c.port),
534            Self::AwsS3(_) => None,
535            Self::Kafka(c) => Some(c.port),
536            Self::Pubsub(c) => Some(c.port),
537            Self::Pulsar(c) => Some(c.http_port),
538            Self::Redis(c) => Some(c.port),
539            Self::Opendal(_) => None,
540            Self::MySql(c) => Some(c.port),
541            Self::Postgres(c) => Some(c.port),
542            Self::SqlServer(c) => Some(c.port),
543            Self::SchemaRegistry(c) => Some(c.port),
544            Self::Lakekeeper(c) => Some(c.port),
545            Self::Moat(c) => Some(c.port),
546        }
547    }
548
549    pub fn user_managed(&self) -> bool {
550        match self {
551            Self::ComputeNode(c) => c.user_managed,
552            Self::MetaNode(c) => c.user_managed,
553            Self::Frontend(c) => c.user_managed,
554            Self::Compactor(c) => c.user_managed,
555            Self::Minio(_c) => false,
556            Self::Sqlite(_c) => false,
557            Self::Prometheus(_c) => false,
558            Self::Grafana(_c) => false,
559            Self::Tempo(_c) => false,
560            Self::AwsS3(_c) => false,
561            Self::Kafka(c) => c.user_managed,
562            Self::Pubsub(_c) => false,
563            Self::Pulsar(c) => c.user_managed,
564            Self::Redis(_c) => false,
565            Self::Opendal(_c) => false,
566            Self::MySql(c) => c.user_managed,
567            Self::Postgres(c) => c.user_managed,
568            Self::SqlServer(c) => c.user_managed,
569            Self::SchemaRegistry(c) => c.user_managed,
570            Self::Lakekeeper(c) => c.user_managed,
571            Self::Moat(_c) => false,
572        }
573    }
574
575    pub fn task_group(&self) -> TaskGroup {
576        use TaskGroup::*;
577        match self {
578            ServiceConfig::ComputeNode(_)
579            | ServiceConfig::MetaNode(_)
580            | ServiceConfig::Frontend(_)
581            | ServiceConfig::Compactor(_)
582            | ServiceConfig::Minio(_)
583            | ServiceConfig::Sqlite(_) => RisingWave,
584            ServiceConfig::Prometheus(_) | ServiceConfig::Grafana(_) | ServiceConfig::Tempo(_) => {
585                Observability
586            }
587            ServiceConfig::Opendal(_) | ServiceConfig::AwsS3(_) => RisingWave,
588            ServiceConfig::Kafka(_) | ServiceConfig::SchemaRegistry(_) => Kafka,
589            ServiceConfig::Pubsub(_) => Pubsub,
590            ServiceConfig::Pulsar(_) => Pulsar,
591            ServiceConfig::Redis(_) => Redis,
592            ServiceConfig::MySql(my_sql_config) => {
593                if matches!(my_sql_config.application, Application::Metastore) {
594                    RisingWave
595                } else {
596                    MySql
597                }
598            }
599            ServiceConfig::Postgres(postgres_config) => {
600                if matches!(postgres_config.application, Application::Metastore) {
601                    RisingWave
602                } else {
603                    Postgres
604                }
605            }
606            ServiceConfig::SqlServer(_) => SqlServer,
607            ServiceConfig::Lakekeeper(_) => Lakekeeper,
608            ServiceConfig::Moat(_) => Moat,
609        }
610    }
611}
612
613mod string {
614    use std::fmt::Display;
615    use std::str::FromStr;
616
617    use serde::{Deserialize, Deserializer, Serializer, de};
618
619    pub fn serialize<T, S>(value: &T, serializer: S) -> Result<S::Ok, S::Error>
620    where
621        T: Display,
622        S: Serializer,
623    {
624        serializer.collect_str(value)
625    }
626
627    pub fn deserialize<'de, T, D>(deserializer: D) -> Result<T, D::Error>
628    where
629        T: FromStr,
630        T::Err: Display,
631        D: Deserializer<'de>,
632    {
633        String::deserialize(deserializer)?
634            .parse()
635            .map_err(de::Error::custom)
636    }
637}