risedev/
service_config.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use serde::{Deserialize, Serialize};
16
17#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
18#[serde(rename_all = "kebab-case")]
19#[serde(deny_unknown_fields)]
20pub struct ComputeNodeConfig {
21    #[serde(rename = "use")]
22    phantom_use: Option<String>,
23    pub id: String,
24
25    pub address: String,
26    #[serde(with = "string")]
27    pub port: u16,
28    pub listen_address: String,
29    pub exporter_port: u16,
30    pub async_stack_trace: String,
31    pub enable_tiered_cache: bool,
32
33    pub provide_minio: Option<Vec<MinioConfig>>,
34    pub provide_meta_node: Option<Vec<MetaNodeConfig>>,
35    pub provide_compute_node: Option<Vec<ComputeNodeConfig>>,
36    pub provide_opendal: Option<Vec<OpendalConfig>>,
37    pub provide_aws_s3: Option<Vec<AwsS3Config>>,
38    pub provide_moat: Option<Vec<MoatConfig>>,
39    pub provide_tempo: Option<Vec<TempoConfig>>,
40    pub user_managed: bool,
41    pub resource_group: String,
42
43    pub total_memory_bytes: usize,
44    pub parallelism: usize,
45    pub role: String,
46}
47
48#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
49#[serde(rename_all = "kebab-case")]
50#[serde(deny_unknown_fields)]
51pub enum MetaBackend {
52    Memory,
53    Sqlite,
54    Postgres,
55    Mysql,
56    Env,
57}
58
59#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
60#[serde(rename_all = "kebab-case")]
61#[serde(deny_unknown_fields)]
62pub struct MetaNodeConfig {
63    #[serde(rename = "use")]
64    phantom_use: Option<String>,
65    pub id: String,
66
67    pub address: String,
68    #[serde(with = "string")]
69    pub port: u16,
70    pub listen_address: String,
71    pub dashboard_port: u16,
72    pub exporter_port: u16,
73
74    pub user_managed: bool,
75
76    pub meta_backend: MetaBackend,
77    pub provide_sqlite_backend: Option<Vec<SqliteConfig>>,
78    pub provide_postgres_backend: Option<Vec<PostgresConfig>>,
79    pub provide_mysql_backend: Option<Vec<MySqlConfig>>,
80    pub provide_prometheus: Option<Vec<PrometheusConfig>>,
81
82    pub provide_compute_node: Option<Vec<ComputeNodeConfig>>,
83    pub provide_compactor: Option<Vec<CompactorConfig>>,
84
85    pub provide_tempo: Option<Vec<TempoConfig>>,
86
87    pub provide_aws_s3: Option<Vec<AwsS3Config>>,
88    pub provide_minio: Option<Vec<MinioConfig>>,
89    pub provide_opendal: Option<Vec<OpendalConfig>>,
90    pub provide_moat: Option<Vec<MoatConfig>>,
91}
92
93#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
94#[serde(rename_all = "kebab-case")]
95#[serde(deny_unknown_fields)]
96pub struct FrontendConfig {
97    #[serde(rename = "use")]
98    phantom_use: Option<String>,
99    pub id: String,
100
101    pub address: String,
102    #[serde(with = "string")]
103    pub port: u16,
104    pub listen_address: String,
105    pub exporter_port: u16,
106    pub health_check_port: u16,
107
108    pub provide_meta_node: Option<Vec<MetaNodeConfig>>,
109    pub provide_tempo: Option<Vec<TempoConfig>>,
110    pub provide_prometheus: Option<Vec<PrometheusConfig>>,
111
112    pub user_managed: bool,
113}
114
115#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
116#[serde(rename_all = "kebab-case")]
117#[serde(deny_unknown_fields)]
118pub struct CompactorConfig {
119    #[serde(rename = "use")]
120    phantom_use: Option<String>,
121    pub id: String,
122
123    pub address: String,
124    #[serde(with = "string")]
125    pub port: u16,
126    pub listen_address: String,
127    pub exporter_port: u16,
128
129    pub provide_minio: Option<Vec<MinioConfig>>,
130
131    pub provide_meta_node: Option<Vec<MetaNodeConfig>>,
132    pub provide_tempo: Option<Vec<TempoConfig>>,
133
134    pub user_managed: bool,
135    pub compaction_worker_threads_number: Option<usize>,
136
137    pub compactor_mode: String,
138}
139
140#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
141#[serde(rename_all = "kebab-case")]
142#[serde(deny_unknown_fields)]
143pub struct MinioConfig {
144    #[serde(rename = "use")]
145    phantom_use: Option<String>,
146    pub id: String,
147
148    pub address: String,
149    #[serde(with = "string")]
150    pub port: u16,
151    pub listen_address: String,
152
153    pub console_address: String,
154    #[serde(with = "string")]
155    pub console_port: u16,
156
157    pub root_user: String,
158    pub root_password: String,
159    pub hummock_bucket: String,
160
161    pub provide_prometheus: Option<Vec<PrometheusConfig>>,
162
163    // For rate limiting minio in a test environment.
164    pub api_requests_max: usize,
165    pub api_requests_deadline: String,
166}
167
168#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
169#[serde(rename_all = "kebab-case")]
170#[serde(deny_unknown_fields)]
171pub struct SqliteConfig {
172    #[serde(rename = "use")]
173    phantom_use: Option<String>,
174    pub id: String,
175
176    pub file: String,
177}
178
179#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
180#[serde(rename_all = "kebab-case")]
181#[serde(deny_unknown_fields)]
182pub struct PrometheusConfig {
183    #[serde(rename = "use")]
184    phantom_use: Option<String>,
185    pub id: String,
186
187    pub address: String,
188    #[serde(with = "string")]
189    pub port: u16,
190    pub listen_address: String,
191
192    pub remote_write: bool,
193    pub remote_write_region: String,
194    pub remote_write_url: String,
195
196    pub scrape_interval: String,
197
198    pub provide_compute_node: Option<Vec<ComputeNodeConfig>>,
199    pub provide_meta_node: Option<Vec<MetaNodeConfig>>,
200    pub provide_minio: Option<Vec<MinioConfig>>,
201    pub provide_compactor: Option<Vec<CompactorConfig>>,
202    pub provide_frontend: Option<Vec<FrontendConfig>>,
203}
204
205#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
206#[serde(rename_all = "kebab-case")]
207#[serde(deny_unknown_fields)]
208pub struct GrafanaConfig {
209    #[serde(rename = "use")]
210    phantom_use: Option<String>,
211    pub id: String,
212    pub address: String,
213    pub listen_address: String,
214    pub port: u16,
215
216    pub provide_prometheus: Option<Vec<PrometheusConfig>>,
217    pub provide_tempo: Option<Vec<TempoConfig>>,
218}
219
220#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
221#[serde(rename_all = "kebab-case")]
222#[serde(deny_unknown_fields)]
223pub struct TempoConfig {
224    #[serde(rename = "use")]
225    phantom_use: Option<String>,
226    pub id: String,
227
228    pub listen_address: String,
229    pub address: String,
230    pub port: u16,
231    pub otlp_port: u16,
232    pub max_bytes_per_trace: usize,
233}
234
235#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
236#[serde(rename_all = "kebab-case")]
237#[serde(deny_unknown_fields)]
238pub struct AwsS3Config {
239    #[serde(rename = "use")]
240    phantom_use: Option<String>,
241    pub id: String,
242    pub bucket: String,
243}
244
245#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
246#[serde(rename_all = "kebab-case")]
247#[serde(deny_unknown_fields)]
248pub struct OpendalConfig {
249    #[serde(rename = "use")]
250    phantom_use: Option<String>,
251
252    pub id: String,
253    pub engine: String,
254    pub namenode: String,
255    pub bucket: String,
256}
257
258#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
259#[serde(rename_all = "kebab-case")]
260#[serde(deny_unknown_fields)]
261pub struct KafkaConfig {
262    #[serde(rename = "use")]
263    phantom_use: Option<String>,
264    pub id: String,
265
266    /// Advertise address
267    pub address: String,
268    #[serde(with = "string")]
269    pub port: u16,
270    /// Port for other services in docker. They need to connect to `host.docker.internal`, while the host
271    /// need to connect to `localhost`.
272    pub docker_port: u16,
273
274    #[serde(with = "string")]
275    pub controller_port: u16,
276
277    pub image: String,
278    pub persist_data: bool,
279    pub node_id: u32,
280
281    pub user_managed: bool,
282}
283
284#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
285#[serde(rename_all = "kebab-case")]
286#[serde(deny_unknown_fields)]
287pub struct SchemaRegistryConfig {
288    #[serde(rename = "use")]
289    phantom_use: Option<String>,
290
291    pub id: String,
292
293    pub address: String,
294    #[serde(with = "string")]
295    pub port: u16,
296
297    pub provide_kafka: Option<Vec<KafkaConfig>>,
298
299    pub image: String,
300    /// Redpanda supports schema registry natively. You can configure a `user_managed` schema registry
301    /// to use with redpanda.
302    pub user_managed: bool,
303}
304
305#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
306#[serde(rename_all = "kebab-case")]
307#[serde(deny_unknown_fields)]
308pub struct PubsubConfig {
309    #[serde(rename = "use")]
310    phantom_use: Option<String>,
311    pub id: String,
312    #[serde(default)]
313    pub user_managed: bool,
314    #[serde(with = "string")]
315    pub port: u16,
316    pub address: String,
317
318    pub persist_data: bool,
319}
320
321#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
322#[serde(rename_all = "kebab-case")]
323#[serde(deny_unknown_fields)]
324pub struct PulsarConfig {
325    #[serde(rename = "use")]
326    phantom_use: Option<String>,
327    pub id: String,
328
329    pub address: String,
330    pub broker_port: u16,
331    pub http_port: u16,
332
333    pub user_managed: bool,
334    pub image: String,
335    pub persist_data: bool,
336}
337
338#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
339#[serde(rename_all = "kebab-case")]
340#[serde(deny_unknown_fields)]
341pub struct RedisConfig {
342    #[serde(rename = "use")]
343    phantom_use: Option<String>,
344    pub id: String,
345
346    pub port: u16,
347    pub address: String,
348}
349
350#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
351#[serde(rename_all = "kebab-case")]
352#[serde(deny_unknown_fields)]
353pub enum Application {
354    Metastore,
355    Connector,
356}
357
358#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
359#[serde(rename_all = "kebab-case")]
360#[serde(deny_unknown_fields)]
361pub struct MySqlConfig {
362    #[serde(rename = "use")]
363    phantom_use: Option<String>,
364    pub id: String,
365
366    pub port: u16,
367    pub address: String,
368
369    pub user: String,
370    pub password: String,
371    pub database: String,
372
373    pub application: Application,
374    pub image: String,
375    pub user_managed: bool,
376    pub persist_data: bool,
377}
378
379#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
380#[serde(rename_all = "kebab-case")]
381#[serde(deny_unknown_fields)]
382pub struct PostgresConfig {
383    #[serde(rename = "use")]
384    phantom_use: Option<String>,
385    pub id: String,
386
387    pub port: u16,
388    pub address: String,
389
390    pub user: String,
391    pub password: String,
392    pub database: String,
393
394    pub application: Application,
395    pub image: String,
396    pub user_managed: bool,
397    pub persist_data: bool,
398
399    // Inject latency into any network calls to the postgres service.
400    pub latency_ms: u32,
401}
402
403#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
404#[serde(rename_all = "kebab-case")]
405#[serde(deny_unknown_fields)]
406pub struct SqlServerConfig {
407    #[serde(rename = "use")]
408    phantom_use: Option<String>,
409    pub id: String,
410
411    pub port: u16,
412    pub address: String,
413
414    pub user: String,
415    pub password: String,
416    pub database: String,
417
418    pub image: String,
419    pub user_managed: bool,
420    pub persist_data: bool,
421}
422
423#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
424#[serde(rename_all = "kebab-case")]
425#[serde(deny_unknown_fields)]
426pub struct LakekeeperConfig {
427    #[serde(rename = "use")]
428    phantom_use: Option<String>,
429    pub id: String,
430
431    pub port: u16,
432    pub address: String,
433
434    pub user_managed: bool,
435    pub persist_data: bool,
436
437    pub catalog_backend: String,
438    pub encryption_key: String,
439    pub provide_postgres_backend: Option<Vec<PostgresConfig>>,
440    pub provide_minio: Option<Vec<MinioConfig>>,
441}
442
443#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
444#[serde(rename_all = "kebab-case")]
445#[serde(deny_unknown_fields)]
446pub struct MoatConfig {
447    #[serde(rename = "use")]
448    phantom_use: Option<String>,
449    pub id: String,
450
451    pub address: String,
452    pub port: u16,
453
454    pub provide_minio: Option<Vec<MinioConfig>>,
455}
456
457/// All service configuration
458#[derive(Clone, Debug, PartialEq)]
459pub enum ServiceConfig {
460    ComputeNode(ComputeNodeConfig),
461    MetaNode(MetaNodeConfig),
462    Frontend(FrontendConfig),
463    Compactor(CompactorConfig),
464    Minio(MinioConfig),
465    Sqlite(SqliteConfig),
466    Prometheus(PrometheusConfig),
467    Grafana(GrafanaConfig),
468    Tempo(TempoConfig),
469    Opendal(OpendalConfig),
470    AwsS3(AwsS3Config),
471    Kafka(KafkaConfig),
472    SchemaRegistry(SchemaRegistryConfig),
473    Pubsub(PubsubConfig),
474    Pulsar(PulsarConfig),
475    Redis(RedisConfig),
476    MySql(MySqlConfig),
477    Postgres(PostgresConfig),
478    SqlServer(SqlServerConfig),
479    Lakekeeper(LakekeeperConfig),
480    Moat(MoatConfig),
481}
482
483#[derive(PartialEq, Eq, Hash, Debug)]
484pub enum TaskGroup {
485    RisingWave,
486    Observability,
487    Kafka,
488    Pubsub,
489    Pulsar,
490    MySql,
491    Postgres,
492    SqlServer,
493    Redis,
494    Lakekeeper,
495    Moat,
496}
497
498impl ServiceConfig {
499    pub fn id(&self) -> &str {
500        match self {
501            Self::ComputeNode(c) => &c.id,
502            Self::MetaNode(c) => &c.id,
503            Self::Frontend(c) => &c.id,
504            Self::Compactor(c) => &c.id,
505            Self::Minio(c) => &c.id,
506            Self::Sqlite(c) => &c.id,
507            Self::Prometheus(c) => &c.id,
508            Self::Grafana(c) => &c.id,
509            Self::Tempo(c) => &c.id,
510            Self::AwsS3(c) => &c.id,
511            Self::Kafka(c) => &c.id,
512            Self::Pubsub(c) => &c.id,
513            Self::Pulsar(c) => &c.id,
514            Self::Redis(c) => &c.id,
515            Self::Opendal(c) => &c.id,
516            Self::MySql(c) => &c.id,
517            Self::Postgres(c) => &c.id,
518            Self::SqlServer(c) => &c.id,
519            Self::SchemaRegistry(c) => &c.id,
520            Self::Lakekeeper(c) => &c.id,
521            Self::Moat(c) => &c.id,
522        }
523    }
524
525    /// Used to check whether the port is occupied before running the service.
526    pub fn port(&self) -> Option<u16> {
527        match self {
528            Self::ComputeNode(c) => Some(c.port),
529            Self::MetaNode(c) => Some(c.port),
530            Self::Frontend(c) => Some(c.port),
531            Self::Compactor(c) => Some(c.port),
532            Self::Minio(c) => Some(c.port),
533            Self::Sqlite(_) => None,
534            Self::Prometheus(c) => Some(c.port),
535            Self::Grafana(c) => Some(c.port),
536            Self::Tempo(c) => Some(c.port),
537            Self::AwsS3(_) => None,
538            Self::Kafka(c) => Some(c.port),
539            Self::Pubsub(c) => Some(c.port),
540            Self::Pulsar(c) => Some(c.http_port),
541            Self::Redis(c) => Some(c.port),
542            Self::Opendal(_) => None,
543            Self::MySql(c) => Some(c.port),
544            Self::Postgres(c) => Some(c.port),
545            Self::SqlServer(c) => Some(c.port),
546            Self::SchemaRegistry(c) => Some(c.port),
547            Self::Lakekeeper(c) => Some(c.port),
548            Self::Moat(c) => Some(c.port),
549        }
550    }
551
552    pub fn user_managed(&self) -> bool {
553        match self {
554            Self::ComputeNode(c) => c.user_managed,
555            Self::MetaNode(c) => c.user_managed,
556            Self::Frontend(c) => c.user_managed,
557            Self::Compactor(c) => c.user_managed,
558            Self::Minio(_c) => false,
559            Self::Sqlite(_c) => false,
560            Self::Prometheus(_c) => false,
561            Self::Grafana(_c) => false,
562            Self::Tempo(_c) => false,
563            Self::AwsS3(_c) => false,
564            Self::Kafka(c) => c.user_managed,
565            Self::Pubsub(c) => c.user_managed,
566            Self::Pulsar(c) => c.user_managed,
567            Self::Redis(_c) => false,
568            Self::Opendal(_c) => false,
569            Self::MySql(c) => c.user_managed,
570            Self::Postgres(c) => c.user_managed,
571            Self::SqlServer(c) => c.user_managed,
572            Self::SchemaRegistry(c) => c.user_managed,
573            Self::Lakekeeper(c) => c.user_managed,
574            Self::Moat(_c) => false,
575        }
576    }
577
578    pub fn task_group(&self) -> TaskGroup {
579        use TaskGroup::*;
580        match self {
581            ServiceConfig::ComputeNode(_)
582            | ServiceConfig::MetaNode(_)
583            | ServiceConfig::Frontend(_)
584            | ServiceConfig::Compactor(_)
585            | ServiceConfig::Minio(_)
586            | ServiceConfig::Sqlite(_) => RisingWave,
587            ServiceConfig::Prometheus(_) | ServiceConfig::Grafana(_) | ServiceConfig::Tempo(_) => {
588                Observability
589            }
590            ServiceConfig::Opendal(_) | ServiceConfig::AwsS3(_) => RisingWave,
591            ServiceConfig::Kafka(_) | ServiceConfig::SchemaRegistry(_) => Kafka,
592            ServiceConfig::Pubsub(_) => Pubsub,
593            ServiceConfig::Pulsar(_) => Pulsar,
594            ServiceConfig::Redis(_) => Redis,
595            ServiceConfig::MySql(my_sql_config) => {
596                if matches!(my_sql_config.application, Application::Metastore) {
597                    RisingWave
598                } else {
599                    MySql
600                }
601            }
602            ServiceConfig::Postgres(postgres_config) => {
603                if matches!(postgres_config.application, Application::Metastore) {
604                    RisingWave
605                } else {
606                    Postgres
607                }
608            }
609            ServiceConfig::SqlServer(_) => SqlServer,
610            ServiceConfig::Lakekeeper(_) => Lakekeeper,
611            ServiceConfig::Moat(_) => Moat,
612        }
613    }
614}
615
616mod string {
617    use std::fmt::Display;
618    use std::str::FromStr;
619
620    use serde::{Deserialize, Deserializer, Serializer, de};
621
622    pub fn serialize<T, S>(value: &T, serializer: S) -> Result<S::Ok, S::Error>
623    where
624        T: Display,
625        S: Serializer,
626    {
627        serializer.collect_str(value)
628    }
629
630    pub fn deserialize<'de, T, D>(deserializer: D) -> Result<T, D::Error>
631    where
632        T: FromStr,
633        T::Err: Display,
634        D: Deserializer<'de>,
635    {
636        String::deserialize(deserializer)?
637            .parse()
638            .map_err(de::Error::custom)
639    }
640}