risedev/
service_config.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use serde::{Deserialize, Serialize};
16
17#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
18#[serde(rename_all = "kebab-case")]
19#[serde(deny_unknown_fields)]
20pub struct ComputeNodeConfig {
21    #[serde(rename = "use")]
22    phantom_use: Option<String>,
23    pub id: String,
24
25    pub address: String,
26    #[serde(with = "string")]
27    pub port: u16,
28    pub listen_address: String,
29    pub exporter_port: u16,
30    pub async_stack_trace: String,
31    pub enable_tiered_cache: bool,
32
33    pub provide_minio: Option<Vec<MinioConfig>>,
34    pub provide_meta_node: Option<Vec<MetaNodeConfig>>,
35    pub provide_compute_node: Option<Vec<ComputeNodeConfig>>,
36    pub provide_opendal: Option<Vec<OpendalConfig>>,
37    pub provide_aws_s3: Option<Vec<AwsS3Config>>,
38    pub provide_tempo: Option<Vec<TempoConfig>>,
39    pub user_managed: bool,
40    pub resource_group: String,
41
42    pub total_memory_bytes: usize,
43    pub parallelism: usize,
44    pub role: String,
45}
46
47#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
48#[serde(rename_all = "kebab-case")]
49#[serde(deny_unknown_fields)]
50pub enum MetaBackend {
51    Memory,
52    Sqlite,
53    Postgres,
54    Mysql,
55    Env,
56}
57
58#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
59#[serde(rename_all = "kebab-case")]
60#[serde(deny_unknown_fields)]
61pub struct MetaNodeConfig {
62    #[serde(rename = "use")]
63    phantom_use: Option<String>,
64    pub id: String,
65
66    pub address: String,
67    #[serde(with = "string")]
68    pub port: u16,
69    pub listen_address: String,
70    pub dashboard_port: u16,
71    pub exporter_port: u16,
72
73    pub user_managed: bool,
74
75    pub meta_backend: MetaBackend,
76    pub provide_sqlite_backend: Option<Vec<SqliteConfig>>,
77    pub provide_postgres_backend: Option<Vec<PostgresConfig>>,
78    pub provide_mysql_backend: Option<Vec<MySqlConfig>>,
79    pub provide_prometheus: Option<Vec<PrometheusConfig>>,
80
81    pub provide_compute_node: Option<Vec<ComputeNodeConfig>>,
82    pub provide_compactor: Option<Vec<CompactorConfig>>,
83
84    pub provide_tempo: Option<Vec<TempoConfig>>,
85
86    pub provide_aws_s3: Option<Vec<AwsS3Config>>,
87    pub provide_minio: Option<Vec<MinioConfig>>,
88    pub provide_opendal: Option<Vec<OpendalConfig>>,
89    pub enable_in_memory_kv_state_backend: bool,
90}
91
92#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
93#[serde(rename_all = "kebab-case")]
94#[serde(deny_unknown_fields)]
95pub struct FrontendConfig {
96    #[serde(rename = "use")]
97    phantom_use: Option<String>,
98    pub id: String,
99
100    pub address: String,
101    #[serde(with = "string")]
102    pub port: u16,
103    pub listen_address: String,
104    pub exporter_port: u16,
105    pub health_check_port: u16,
106
107    pub provide_meta_node: Option<Vec<MetaNodeConfig>>,
108    pub provide_tempo: Option<Vec<TempoConfig>>,
109
110    pub user_managed: bool,
111}
112
113#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
114#[serde(rename_all = "kebab-case")]
115#[serde(deny_unknown_fields)]
116pub struct CompactorConfig {
117    #[serde(rename = "use")]
118    phantom_use: Option<String>,
119    pub id: String,
120
121    pub address: String,
122    #[serde(with = "string")]
123    pub port: u16,
124    pub listen_address: String,
125    pub exporter_port: u16,
126
127    pub provide_minio: Option<Vec<MinioConfig>>,
128
129    pub provide_meta_node: Option<Vec<MetaNodeConfig>>,
130    pub provide_tempo: Option<Vec<TempoConfig>>,
131
132    pub user_managed: bool,
133    pub compaction_worker_threads_number: Option<usize>,
134}
135
136#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
137#[serde(rename_all = "kebab-case")]
138#[serde(deny_unknown_fields)]
139pub struct MinioConfig {
140    #[serde(rename = "use")]
141    phantom_use: Option<String>,
142    pub id: String,
143
144    pub address: String,
145    #[serde(with = "string")]
146    pub port: u16,
147    pub listen_address: String,
148
149    pub console_address: String,
150    #[serde(with = "string")]
151    pub console_port: u16,
152
153    pub root_user: String,
154    pub root_password: String,
155    pub hummock_bucket: String,
156
157    pub provide_prometheus: Option<Vec<PrometheusConfig>>,
158
159    // For rate limiting minio in a test environment.
160    pub api_requests_max: usize,
161    pub api_requests_deadline: String,
162}
163
164#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
165#[serde(rename_all = "kebab-case")]
166#[serde(deny_unknown_fields)]
167pub struct SqliteConfig {
168    #[serde(rename = "use")]
169    phantom_use: Option<String>,
170    pub id: String,
171
172    pub file: String,
173}
174
175#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
176#[serde(rename_all = "kebab-case")]
177#[serde(deny_unknown_fields)]
178pub struct PrometheusConfig {
179    #[serde(rename = "use")]
180    phantom_use: Option<String>,
181    pub id: String,
182
183    pub address: String,
184    #[serde(with = "string")]
185    pub port: u16,
186    pub listen_address: String,
187
188    pub remote_write: bool,
189    pub remote_write_region: String,
190    pub remote_write_url: String,
191
192    pub scrape_interval: String,
193
194    pub provide_compute_node: Option<Vec<ComputeNodeConfig>>,
195    pub provide_meta_node: Option<Vec<MetaNodeConfig>>,
196    pub provide_minio: Option<Vec<MinioConfig>>,
197    pub provide_compactor: Option<Vec<CompactorConfig>>,
198    pub provide_redpanda: Option<Vec<RedPandaConfig>>,
199    pub provide_frontend: Option<Vec<FrontendConfig>>,
200}
201
202#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
203#[serde(rename_all = "kebab-case")]
204#[serde(deny_unknown_fields)]
205pub struct GrafanaConfig {
206    #[serde(rename = "use")]
207    phantom_use: Option<String>,
208    pub id: String,
209    pub address: String,
210    pub listen_address: String,
211    pub port: u16,
212
213    pub provide_prometheus: Option<Vec<PrometheusConfig>>,
214    pub provide_tempo: Option<Vec<TempoConfig>>,
215}
216
217#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
218#[serde(rename_all = "kebab-case")]
219#[serde(deny_unknown_fields)]
220pub struct TempoConfig {
221    #[serde(rename = "use")]
222    phantom_use: Option<String>,
223    pub id: String,
224
225    pub listen_address: String,
226    pub address: String,
227    pub port: u16,
228    pub otlp_port: u16,
229    pub max_bytes_per_trace: usize,
230}
231
232#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
233#[serde(rename_all = "kebab-case")]
234#[serde(deny_unknown_fields)]
235pub struct AwsS3Config {
236    #[serde(rename = "use")]
237    phantom_use: Option<String>,
238    pub id: String,
239    pub bucket: String,
240}
241
242#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
243#[serde(rename_all = "kebab-case")]
244#[serde(deny_unknown_fields)]
245pub struct OpendalConfig {
246    #[serde(rename = "use")]
247    phantom_use: Option<String>,
248
249    pub id: String,
250    pub engine: String,
251    pub namenode: String,
252    pub bucket: String,
253}
254
255#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
256#[serde(rename_all = "kebab-case")]
257#[serde(deny_unknown_fields)]
258pub struct KafkaConfig {
259    #[serde(rename = "use")]
260    phantom_use: Option<String>,
261    pub id: String,
262
263    /// Advertise address
264    pub address: String,
265    #[serde(with = "string")]
266    pub port: u16,
267    /// Port for other services in docker. They need to connect to `host.docker.internal`, while the host
268    /// need to connect to `localhost`.
269    pub docker_port: u16,
270
271    #[serde(with = "string")]
272    pub controller_port: u16,
273
274    pub image: String,
275    pub persist_data: bool,
276    pub node_id: u32,
277
278    pub user_managed: bool,
279}
280
281#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
282#[serde(rename_all = "kebab-case")]
283#[serde(deny_unknown_fields)]
284pub struct SchemaRegistryConfig {
285    #[serde(rename = "use")]
286    phantom_use: Option<String>,
287
288    pub id: String,
289
290    pub address: String,
291    #[serde(with = "string")]
292    pub port: u16,
293
294    pub provide_kafka: Option<Vec<KafkaConfig>>,
295
296    pub image: String,
297    /// Redpanda supports schema registry natively. You can configure a `user_managed` schema registry
298    /// to use with redpanda.
299    pub user_managed: bool,
300}
301
302#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
303#[serde(rename_all = "kebab-case")]
304#[serde(deny_unknown_fields)]
305pub struct PubsubConfig {
306    #[serde(rename = "use")]
307    phantom_use: Option<String>,
308    pub id: String,
309    #[serde(with = "string")]
310    pub port: u16,
311    pub address: String,
312
313    pub persist_data: bool,
314}
315
316#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
317#[serde(rename_all = "kebab-case")]
318#[serde(deny_unknown_fields)]
319pub struct RedPandaConfig {
320    #[serde(rename = "use")]
321    phantom_use: Option<String>,
322    pub id: String,
323    pub internal_port: u16,
324    pub outside_port: u16,
325    pub address: String,
326    pub cpus: usize,
327    pub memory: String,
328}
329
330#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
331#[serde(rename_all = "kebab-case")]
332#[serde(deny_unknown_fields)]
333pub struct RedisConfig {
334    #[serde(rename = "use")]
335    phantom_use: Option<String>,
336    pub id: String,
337
338    pub port: u16,
339    pub address: String,
340}
341
342#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
343#[serde(rename_all = "kebab-case")]
344#[serde(deny_unknown_fields)]
345pub enum Application {
346    Metastore,
347    Connector,
348}
349
350#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
351#[serde(rename_all = "kebab-case")]
352#[serde(deny_unknown_fields)]
353pub struct MySqlConfig {
354    #[serde(rename = "use")]
355    phantom_use: Option<String>,
356    pub id: String,
357
358    pub port: u16,
359    pub address: String,
360
361    pub user: String,
362    pub password: String,
363    pub database: String,
364
365    pub application: Application,
366    pub image: String,
367    pub user_managed: bool,
368    pub persist_data: bool,
369}
370
371#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
372#[serde(rename_all = "kebab-case")]
373#[serde(deny_unknown_fields)]
374pub struct PostgresConfig {
375    #[serde(rename = "use")]
376    phantom_use: Option<String>,
377    pub id: String,
378
379    pub port: u16,
380    pub address: String,
381
382    pub user: String,
383    pub password: String,
384    pub database: String,
385
386    pub application: Application,
387    pub image: String,
388    pub user_managed: bool,
389    pub persist_data: bool,
390}
391
392#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
393#[serde(rename_all = "kebab-case")]
394#[serde(deny_unknown_fields)]
395pub struct SqlServerConfig {
396    #[serde(rename = "use")]
397    phantom_use: Option<String>,
398    pub id: String,
399
400    pub port: u16,
401    pub address: String,
402
403    pub user: String,
404    pub password: String,
405    pub database: String,
406
407    pub image: String,
408    pub user_managed: bool,
409    pub persist_data: bool,
410}
411
412/// All service configuration
413#[derive(Clone, Debug, PartialEq)]
414pub enum ServiceConfig {
415    ComputeNode(ComputeNodeConfig),
416    MetaNode(MetaNodeConfig),
417    Frontend(FrontendConfig),
418    Compactor(CompactorConfig),
419    Minio(MinioConfig),
420    Sqlite(SqliteConfig),
421    Prometheus(PrometheusConfig),
422    Grafana(GrafanaConfig),
423    Tempo(TempoConfig),
424    Opendal(OpendalConfig),
425    AwsS3(AwsS3Config),
426    Kafka(KafkaConfig),
427    SchemaRegistry(SchemaRegistryConfig),
428    Pubsub(PubsubConfig),
429    Redis(RedisConfig),
430    RedPanda(RedPandaConfig),
431    MySql(MySqlConfig),
432    Postgres(PostgresConfig),
433    SqlServer(SqlServerConfig),
434}
435
436#[derive(PartialEq, Eq, Hash, Debug)]
437pub enum TaskGroup {
438    RisingWave,
439    Observability,
440    Kafka,
441    Pubsub,
442    MySql,
443    Postgres,
444    SqlServer,
445    Redis,
446}
447
448impl ServiceConfig {
449    pub fn id(&self) -> &str {
450        match self {
451            Self::ComputeNode(c) => &c.id,
452            Self::MetaNode(c) => &c.id,
453            Self::Frontend(c) => &c.id,
454            Self::Compactor(c) => &c.id,
455            Self::Minio(c) => &c.id,
456            Self::Sqlite(c) => &c.id,
457            Self::Prometheus(c) => &c.id,
458            Self::Grafana(c) => &c.id,
459            Self::Tempo(c) => &c.id,
460            Self::AwsS3(c) => &c.id,
461            Self::Kafka(c) => &c.id,
462            Self::Pubsub(c) => &c.id,
463            Self::Redis(c) => &c.id,
464            Self::RedPanda(c) => &c.id,
465            Self::Opendal(c) => &c.id,
466            Self::MySql(c) => &c.id,
467            Self::Postgres(c) => &c.id,
468            Self::SqlServer(c) => &c.id,
469            Self::SchemaRegistry(c) => &c.id,
470        }
471    }
472
473    /// Used to check whether the port is occupied before running the service.
474    pub fn port(&self) -> Option<u16> {
475        match self {
476            Self::ComputeNode(c) => Some(c.port),
477            Self::MetaNode(c) => Some(c.port),
478            Self::Frontend(c) => Some(c.port),
479            Self::Compactor(c) => Some(c.port),
480            Self::Minio(c) => Some(c.port),
481            Self::Sqlite(_) => None,
482            Self::Prometheus(c) => Some(c.port),
483            Self::Grafana(c) => Some(c.port),
484            Self::Tempo(c) => Some(c.port),
485            Self::AwsS3(_) => None,
486            Self::Kafka(c) => Some(c.port),
487            Self::Pubsub(c) => Some(c.port),
488            Self::Redis(c) => Some(c.port),
489            Self::RedPanda(_c) => None,
490            Self::Opendal(_) => None,
491            Self::MySql(c) => Some(c.port),
492            Self::Postgres(c) => Some(c.port),
493            Self::SqlServer(c) => Some(c.port),
494            Self::SchemaRegistry(c) => Some(c.port),
495        }
496    }
497
498    pub fn user_managed(&self) -> bool {
499        match self {
500            Self::ComputeNode(c) => c.user_managed,
501            Self::MetaNode(c) => c.user_managed,
502            Self::Frontend(c) => c.user_managed,
503            Self::Compactor(c) => c.user_managed,
504            Self::Minio(_c) => false,
505            Self::Sqlite(_c) => false,
506            Self::Prometheus(_c) => false,
507            Self::Grafana(_c) => false,
508            Self::Tempo(_c) => false,
509            Self::AwsS3(_c) => false,
510            Self::Kafka(c) => c.user_managed,
511            Self::Pubsub(_c) => false,
512            Self::Redis(_c) => false,
513            Self::RedPanda(_c) => false,
514            Self::Opendal(_c) => false,
515            Self::MySql(c) => c.user_managed,
516            Self::Postgres(c) => c.user_managed,
517            Self::SqlServer(c) => c.user_managed,
518            Self::SchemaRegistry(c) => c.user_managed,
519        }
520    }
521
522    pub fn task_group(&self) -> TaskGroup {
523        use TaskGroup::*;
524        match self {
525            ServiceConfig::ComputeNode(_)
526            | ServiceConfig::MetaNode(_)
527            | ServiceConfig::Frontend(_)
528            | ServiceConfig::Compactor(_)
529            | ServiceConfig::Minio(_)
530            | ServiceConfig::Sqlite(_) => RisingWave,
531            ServiceConfig::Prometheus(_) | ServiceConfig::Grafana(_) | ServiceConfig::Tempo(_) => {
532                Observability
533            }
534            ServiceConfig::Opendal(_) | ServiceConfig::AwsS3(_) => RisingWave,
535            ServiceConfig::Kafka(_)
536            | ServiceConfig::SchemaRegistry(_)
537            | ServiceConfig::RedPanda(_) => Kafka,
538            ServiceConfig::Pubsub(_) => Pubsub,
539            ServiceConfig::Redis(_) => Redis,
540            ServiceConfig::MySql(my_sql_config) => {
541                if matches!(my_sql_config.application, Application::Metastore) {
542                    RisingWave
543                } else {
544                    MySql
545                }
546            }
547            ServiceConfig::Postgres(postgres_config) => {
548                if matches!(postgres_config.application, Application::Metastore) {
549                    RisingWave
550                } else {
551                    Postgres
552                }
553            }
554            ServiceConfig::SqlServer(_) => SqlServer,
555        }
556    }
557}
558
559mod string {
560    use std::fmt::Display;
561    use std::str::FromStr;
562
563    use serde::{Deserialize, Deserializer, Serializer, de};
564
565    pub fn serialize<T, S>(value: &T, serializer: S) -> Result<S::Ok, S::Error>
566    where
567        T: Display,
568        S: Serializer,
569    {
570        serializer.collect_str(value)
571    }
572
573    pub fn deserialize<'de, T, D>(deserializer: D) -> Result<T, D::Error>
574    where
575        T: FromStr,
576        T::Err: Display,
577        D: Deserializer<'de>,
578    {
579        String::deserialize(deserializer)?
580            .parse()
581            .map_err(de::Error::custom)
582    }
583}