risedev/
service_config.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use serde::{Deserialize, Serialize};
16
17#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
18#[serde(rename_all = "kebab-case")]
19#[serde(deny_unknown_fields)]
20pub struct ComputeNodeConfig {
21    #[serde(rename = "use")]
22    phantom_use: Option<String>,
23    pub id: String,
24
25    pub address: String,
26    #[serde(with = "string")]
27    pub port: u16,
28    pub listen_address: String,
29    pub exporter_port: u16,
30    pub async_stack_trace: String,
31    pub enable_tiered_cache: bool,
32
33    pub provide_minio: Option<Vec<MinioConfig>>,
34    pub provide_meta_node: Option<Vec<MetaNodeConfig>>,
35    pub provide_compute_node: Option<Vec<ComputeNodeConfig>>,
36    pub provide_opendal: Option<Vec<OpendalConfig>>,
37    pub provide_aws_s3: Option<Vec<AwsS3Config>>,
38    pub provide_tempo: Option<Vec<TempoConfig>>,
39    pub user_managed: bool,
40    pub resource_group: String,
41
42    pub total_memory_bytes: usize,
43    pub parallelism: usize,
44    pub role: String,
45}
46
47#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
48#[serde(rename_all = "kebab-case")]
49#[serde(deny_unknown_fields)]
50pub enum MetaBackend {
51    Memory,
52    Sqlite,
53    Postgres,
54    Mysql,
55    Env,
56}
57
58#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
59#[serde(rename_all = "kebab-case")]
60#[serde(deny_unknown_fields)]
61pub struct MetaNodeConfig {
62    #[serde(rename = "use")]
63    phantom_use: Option<String>,
64    pub id: String,
65
66    pub address: String,
67    #[serde(with = "string")]
68    pub port: u16,
69    pub listen_address: String,
70    pub dashboard_port: u16,
71    pub exporter_port: u16,
72
73    pub user_managed: bool,
74
75    pub meta_backend: MetaBackend,
76    pub provide_sqlite_backend: Option<Vec<SqliteConfig>>,
77    pub provide_postgres_backend: Option<Vec<PostgresConfig>>,
78    pub provide_mysql_backend: Option<Vec<MySqlConfig>>,
79    pub provide_prometheus: Option<Vec<PrometheusConfig>>,
80
81    pub provide_compute_node: Option<Vec<ComputeNodeConfig>>,
82    pub provide_compactor: Option<Vec<CompactorConfig>>,
83
84    pub provide_tempo: Option<Vec<TempoConfig>>,
85
86    pub provide_aws_s3: Option<Vec<AwsS3Config>>,
87    pub provide_minio: Option<Vec<MinioConfig>>,
88    pub provide_opendal: Option<Vec<OpendalConfig>>,
89}
90
91#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
92#[serde(rename_all = "kebab-case")]
93#[serde(deny_unknown_fields)]
94pub struct FrontendConfig {
95    #[serde(rename = "use")]
96    phantom_use: Option<String>,
97    pub id: String,
98
99    pub address: String,
100    #[serde(with = "string")]
101    pub port: u16,
102    pub listen_address: String,
103    pub exporter_port: u16,
104    pub health_check_port: u16,
105
106    pub provide_meta_node: Option<Vec<MetaNodeConfig>>,
107    pub provide_tempo: Option<Vec<TempoConfig>>,
108
109    pub user_managed: bool,
110}
111
112#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
113#[serde(rename_all = "kebab-case")]
114#[serde(deny_unknown_fields)]
115pub struct CompactorConfig {
116    #[serde(rename = "use")]
117    phantom_use: Option<String>,
118    pub id: String,
119
120    pub address: String,
121    #[serde(with = "string")]
122    pub port: u16,
123    pub listen_address: String,
124    pub exporter_port: u16,
125
126    pub provide_minio: Option<Vec<MinioConfig>>,
127
128    pub provide_meta_node: Option<Vec<MetaNodeConfig>>,
129    pub provide_tempo: Option<Vec<TempoConfig>>,
130
131    pub user_managed: bool,
132    pub compaction_worker_threads_number: Option<usize>,
133
134    pub compactor_mode: String,
135}
136
137#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
138#[serde(rename_all = "kebab-case")]
139#[serde(deny_unknown_fields)]
140pub struct MinioConfig {
141    #[serde(rename = "use")]
142    phantom_use: Option<String>,
143    pub id: String,
144
145    pub address: String,
146    #[serde(with = "string")]
147    pub port: u16,
148    pub listen_address: String,
149
150    pub console_address: String,
151    #[serde(with = "string")]
152    pub console_port: u16,
153
154    pub root_user: String,
155    pub root_password: String,
156    pub hummock_bucket: String,
157
158    pub provide_prometheus: Option<Vec<PrometheusConfig>>,
159
160    // For rate limiting minio in a test environment.
161    pub api_requests_max: usize,
162    pub api_requests_deadline: String,
163}
164
165#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
166#[serde(rename_all = "kebab-case")]
167#[serde(deny_unknown_fields)]
168pub struct SqliteConfig {
169    #[serde(rename = "use")]
170    phantom_use: Option<String>,
171    pub id: String,
172
173    pub file: String,
174}
175
176#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
177#[serde(rename_all = "kebab-case")]
178#[serde(deny_unknown_fields)]
179pub struct PrometheusConfig {
180    #[serde(rename = "use")]
181    phantom_use: Option<String>,
182    pub id: String,
183
184    pub address: String,
185    #[serde(with = "string")]
186    pub port: u16,
187    pub listen_address: String,
188
189    pub remote_write: bool,
190    pub remote_write_region: String,
191    pub remote_write_url: String,
192
193    pub scrape_interval: String,
194
195    pub provide_compute_node: Option<Vec<ComputeNodeConfig>>,
196    pub provide_meta_node: Option<Vec<MetaNodeConfig>>,
197    pub provide_minio: Option<Vec<MinioConfig>>,
198    pub provide_compactor: Option<Vec<CompactorConfig>>,
199    pub provide_frontend: Option<Vec<FrontendConfig>>,
200}
201
202#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
203#[serde(rename_all = "kebab-case")]
204#[serde(deny_unknown_fields)]
205pub struct GrafanaConfig {
206    #[serde(rename = "use")]
207    phantom_use: Option<String>,
208    pub id: String,
209    pub address: String,
210    pub listen_address: String,
211    pub port: u16,
212
213    pub provide_prometheus: Option<Vec<PrometheusConfig>>,
214    pub provide_tempo: Option<Vec<TempoConfig>>,
215}
216
217#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
218#[serde(rename_all = "kebab-case")]
219#[serde(deny_unknown_fields)]
220pub struct TempoConfig {
221    #[serde(rename = "use")]
222    phantom_use: Option<String>,
223    pub id: String,
224
225    pub listen_address: String,
226    pub address: String,
227    pub port: u16,
228    pub otlp_port: u16,
229    pub max_bytes_per_trace: usize,
230}
231
232#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
233#[serde(rename_all = "kebab-case")]
234#[serde(deny_unknown_fields)]
235pub struct AwsS3Config {
236    #[serde(rename = "use")]
237    phantom_use: Option<String>,
238    pub id: String,
239    pub bucket: String,
240}
241
242#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
243#[serde(rename_all = "kebab-case")]
244#[serde(deny_unknown_fields)]
245pub struct OpendalConfig {
246    #[serde(rename = "use")]
247    phantom_use: Option<String>,
248
249    pub id: String,
250    pub engine: String,
251    pub namenode: String,
252    pub bucket: String,
253}
254
255#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
256#[serde(rename_all = "kebab-case")]
257#[serde(deny_unknown_fields)]
258pub struct KafkaConfig {
259    #[serde(rename = "use")]
260    phantom_use: Option<String>,
261    pub id: String,
262
263    /// Advertise address
264    pub address: String,
265    #[serde(with = "string")]
266    pub port: u16,
267    /// Port for other services in docker. They need to connect to `host.docker.internal`, while the host
268    /// need to connect to `localhost`.
269    pub docker_port: u16,
270
271    #[serde(with = "string")]
272    pub controller_port: u16,
273
274    pub image: String,
275    pub persist_data: bool,
276    pub node_id: u32,
277
278    pub user_managed: bool,
279}
280
281#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
282#[serde(rename_all = "kebab-case")]
283#[serde(deny_unknown_fields)]
284pub struct SchemaRegistryConfig {
285    #[serde(rename = "use")]
286    phantom_use: Option<String>,
287
288    pub id: String,
289
290    pub address: String,
291    #[serde(with = "string")]
292    pub port: u16,
293
294    pub provide_kafka: Option<Vec<KafkaConfig>>,
295
296    pub image: String,
297    /// Redpanda supports schema registry natively. You can configure a `user_managed` schema registry
298    /// to use with redpanda.
299    pub user_managed: bool,
300}
301
302#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
303#[serde(rename_all = "kebab-case")]
304#[serde(deny_unknown_fields)]
305pub struct PubsubConfig {
306    #[serde(rename = "use")]
307    phantom_use: Option<String>,
308    pub id: String,
309    #[serde(with = "string")]
310    pub port: u16,
311    pub address: String,
312
313    pub persist_data: bool,
314}
315
316#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
317#[serde(rename_all = "kebab-case")]
318#[serde(deny_unknown_fields)]
319pub struct PulsarConfig {
320    #[serde(rename = "use")]
321    phantom_use: Option<String>,
322    pub id: String,
323
324    pub address: String,
325    pub broker_port: u16,
326    pub http_port: u16,
327
328    pub user_managed: bool,
329    pub image: String,
330    pub persist_data: bool,
331}
332
333#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
334#[serde(rename_all = "kebab-case")]
335#[serde(deny_unknown_fields)]
336pub struct RedisConfig {
337    #[serde(rename = "use")]
338    phantom_use: Option<String>,
339    pub id: String,
340
341    pub port: u16,
342    pub address: String,
343}
344
345#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
346#[serde(rename_all = "kebab-case")]
347#[serde(deny_unknown_fields)]
348pub enum Application {
349    Metastore,
350    Connector,
351}
352
353#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
354#[serde(rename_all = "kebab-case")]
355#[serde(deny_unknown_fields)]
356pub struct MySqlConfig {
357    #[serde(rename = "use")]
358    phantom_use: Option<String>,
359    pub id: String,
360
361    pub port: u16,
362    pub address: String,
363
364    pub user: String,
365    pub password: String,
366    pub database: String,
367
368    pub application: Application,
369    pub image: String,
370    pub user_managed: bool,
371    pub persist_data: bool,
372}
373
374#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
375#[serde(rename_all = "kebab-case")]
376#[serde(deny_unknown_fields)]
377pub struct PostgresConfig {
378    #[serde(rename = "use")]
379    phantom_use: Option<String>,
380    pub id: String,
381
382    pub port: u16,
383    pub address: String,
384
385    pub user: String,
386    pub password: String,
387    pub database: String,
388
389    pub application: Application,
390    pub image: String,
391    pub user_managed: bool,
392    pub persist_data: bool,
393
394    // Inject latency into any network calls to the postgres service.
395    pub latency_ms: u32,
396}
397
398#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
399#[serde(rename_all = "kebab-case")]
400#[serde(deny_unknown_fields)]
401pub struct SqlServerConfig {
402    #[serde(rename = "use")]
403    phantom_use: Option<String>,
404    pub id: String,
405
406    pub port: u16,
407    pub address: String,
408
409    pub user: String,
410    pub password: String,
411    pub database: String,
412
413    pub image: String,
414    pub user_managed: bool,
415    pub persist_data: bool,
416}
417
418/// All service configuration
419#[derive(Clone, Debug, PartialEq)]
420pub enum ServiceConfig {
421    ComputeNode(ComputeNodeConfig),
422    MetaNode(MetaNodeConfig),
423    Frontend(FrontendConfig),
424    Compactor(CompactorConfig),
425    Minio(MinioConfig),
426    Sqlite(SqliteConfig),
427    Prometheus(PrometheusConfig),
428    Grafana(GrafanaConfig),
429    Tempo(TempoConfig),
430    Opendal(OpendalConfig),
431    AwsS3(AwsS3Config),
432    Kafka(KafkaConfig),
433    SchemaRegistry(SchemaRegistryConfig),
434    Pubsub(PubsubConfig),
435    Pulsar(PulsarConfig),
436    Redis(RedisConfig),
437    MySql(MySqlConfig),
438    Postgres(PostgresConfig),
439    SqlServer(SqlServerConfig),
440}
441
442#[derive(PartialEq, Eq, Hash, Debug)]
443pub enum TaskGroup {
444    RisingWave,
445    Observability,
446    Kafka,
447    Pubsub,
448    Pulsar,
449    MySql,
450    Postgres,
451    SqlServer,
452    Redis,
453}
454
455impl ServiceConfig {
456    pub fn id(&self) -> &str {
457        match self {
458            Self::ComputeNode(c) => &c.id,
459            Self::MetaNode(c) => &c.id,
460            Self::Frontend(c) => &c.id,
461            Self::Compactor(c) => &c.id,
462            Self::Minio(c) => &c.id,
463            Self::Sqlite(c) => &c.id,
464            Self::Prometheus(c) => &c.id,
465            Self::Grafana(c) => &c.id,
466            Self::Tempo(c) => &c.id,
467            Self::AwsS3(c) => &c.id,
468            Self::Kafka(c) => &c.id,
469            Self::Pubsub(c) => &c.id,
470            Self::Pulsar(c) => &c.id,
471            Self::Redis(c) => &c.id,
472            Self::Opendal(c) => &c.id,
473            Self::MySql(c) => &c.id,
474            Self::Postgres(c) => &c.id,
475            Self::SqlServer(c) => &c.id,
476            Self::SchemaRegistry(c) => &c.id,
477        }
478    }
479
480    /// Used to check whether the port is occupied before running the service.
481    pub fn port(&self) -> Option<u16> {
482        match self {
483            Self::ComputeNode(c) => Some(c.port),
484            Self::MetaNode(c) => Some(c.port),
485            Self::Frontend(c) => Some(c.port),
486            Self::Compactor(c) => Some(c.port),
487            Self::Minio(c) => Some(c.port),
488            Self::Sqlite(_) => None,
489            Self::Prometheus(c) => Some(c.port),
490            Self::Grafana(c) => Some(c.port),
491            Self::Tempo(c) => Some(c.port),
492            Self::AwsS3(_) => None,
493            Self::Kafka(c) => Some(c.port),
494            Self::Pubsub(c) => Some(c.port),
495            Self::Pulsar(c) => Some(c.http_port),
496            Self::Redis(c) => Some(c.port),
497            Self::Opendal(_) => None,
498            Self::MySql(c) => Some(c.port),
499            Self::Postgres(c) => Some(c.port),
500            Self::SqlServer(c) => Some(c.port),
501            Self::SchemaRegistry(c) => Some(c.port),
502        }
503    }
504
505    pub fn user_managed(&self) -> bool {
506        match self {
507            Self::ComputeNode(c) => c.user_managed,
508            Self::MetaNode(c) => c.user_managed,
509            Self::Frontend(c) => c.user_managed,
510            Self::Compactor(c) => c.user_managed,
511            Self::Minio(_c) => false,
512            Self::Sqlite(_c) => false,
513            Self::Prometheus(_c) => false,
514            Self::Grafana(_c) => false,
515            Self::Tempo(_c) => false,
516            Self::AwsS3(_c) => false,
517            Self::Kafka(c) => c.user_managed,
518            Self::Pubsub(_c) => false,
519            Self::Pulsar(c) => c.user_managed,
520            Self::Redis(_c) => false,
521            Self::Opendal(_c) => false,
522            Self::MySql(c) => c.user_managed,
523            Self::Postgres(c) => c.user_managed,
524            Self::SqlServer(c) => c.user_managed,
525            Self::SchemaRegistry(c) => c.user_managed,
526        }
527    }
528
529    pub fn task_group(&self) -> TaskGroup {
530        use TaskGroup::*;
531        match self {
532            ServiceConfig::ComputeNode(_)
533            | ServiceConfig::MetaNode(_)
534            | ServiceConfig::Frontend(_)
535            | ServiceConfig::Compactor(_)
536            | ServiceConfig::Minio(_)
537            | ServiceConfig::Sqlite(_) => RisingWave,
538            ServiceConfig::Prometheus(_) | ServiceConfig::Grafana(_) | ServiceConfig::Tempo(_) => {
539                Observability
540            }
541            ServiceConfig::Opendal(_) | ServiceConfig::AwsS3(_) => RisingWave,
542            ServiceConfig::Kafka(_) | ServiceConfig::SchemaRegistry(_) => Kafka,
543            ServiceConfig::Pubsub(_) => Pubsub,
544            ServiceConfig::Pulsar(_) => Pulsar,
545            ServiceConfig::Redis(_) => Redis,
546            ServiceConfig::MySql(my_sql_config) => {
547                if matches!(my_sql_config.application, Application::Metastore) {
548                    RisingWave
549                } else {
550                    MySql
551                }
552            }
553            ServiceConfig::Postgres(postgres_config) => {
554                if matches!(postgres_config.application, Application::Metastore) {
555                    RisingWave
556                } else {
557                    Postgres
558                }
559            }
560            ServiceConfig::SqlServer(_) => SqlServer,
561        }
562    }
563}
564
565mod string {
566    use std::fmt::Display;
567    use std::str::FromStr;
568
569    use serde::{Deserialize, Deserializer, Serializer, de};
570
571    pub fn serialize<T, S>(value: &T, serializer: S) -> Result<S::Ok, S::Error>
572    where
573        T: Display,
574        S: Serializer,
575    {
576        serializer.collect_str(value)
577    }
578
579    pub fn deserialize<'de, T, D>(deserializer: D) -> Result<T, D::Error>
580    where
581        T: FromStr,
582        T::Err: Display,
583        D: Deserializer<'de>,
584    {
585        String::deserialize(deserializer)?
586            .parse()
587            .map_err(de::Error::custom)
588    }
589}