risedev/
service_config.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use serde::{Deserialize, Serialize};
16
17#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
18#[serde(rename_all = "kebab-case")]
19#[serde(deny_unknown_fields)]
20pub struct ComputeNodeConfig {
21    #[serde(rename = "use")]
22    phantom_use: Option<String>,
23    pub id: String,
24
25    pub address: String,
26    #[serde(with = "string")]
27    pub port: u16,
28    pub listen_address: String,
29    pub exporter_port: u16,
30    pub async_stack_trace: String,
31    pub enable_tiered_cache: bool,
32
33    pub provide_minio: Option<Vec<MinioConfig>>,
34    pub provide_meta_node: Option<Vec<MetaNodeConfig>>,
35    pub provide_compute_node: Option<Vec<ComputeNodeConfig>>,
36    pub provide_opendal: Option<Vec<OpendalConfig>>,
37    pub provide_aws_s3: Option<Vec<AwsS3Config>>,
38    pub provide_moat: Option<Vec<MoatConfig>>,
39    pub provide_tempo: Option<Vec<TempoConfig>>,
40    pub user_managed: bool,
41    pub resource_group: String,
42
43    pub total_memory_bytes: usize,
44    pub parallelism: usize,
45    pub role: String,
46}
47
48#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
49#[serde(rename_all = "kebab-case")]
50#[serde(deny_unknown_fields)]
51pub enum MetaBackend {
52    Memory,
53    Sqlite,
54    Postgres,
55    Mysql,
56    Env,
57}
58
59#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
60#[serde(rename_all = "kebab-case")]
61#[serde(deny_unknown_fields)]
62pub struct MetaNodeConfig {
63    #[serde(rename = "use")]
64    phantom_use: Option<String>,
65    pub id: String,
66
67    pub address: String,
68    #[serde(with = "string")]
69    pub port: u16,
70    pub listen_address: String,
71    pub dashboard_port: u16,
72    pub exporter_port: u16,
73
74    pub user_managed: bool,
75
76    pub meta_backend: MetaBackend,
77    pub provide_sqlite_backend: Option<Vec<SqliteConfig>>,
78    pub provide_postgres_backend: Option<Vec<PostgresConfig>>,
79    pub provide_mysql_backend: Option<Vec<MySqlConfig>>,
80    pub provide_prometheus: Option<Vec<PrometheusConfig>>,
81
82    pub provide_compute_node: Option<Vec<ComputeNodeConfig>>,
83    pub provide_compactor: Option<Vec<CompactorConfig>>,
84
85    pub provide_tempo: Option<Vec<TempoConfig>>,
86
87    pub provide_aws_s3: Option<Vec<AwsS3Config>>,
88    pub provide_minio: Option<Vec<MinioConfig>>,
89    pub provide_opendal: Option<Vec<OpendalConfig>>,
90    pub provide_moat: Option<Vec<MoatConfig>>,
91}
92
93#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
94#[serde(rename_all = "kebab-case")]
95#[serde(deny_unknown_fields)]
96pub struct FrontendConfig {
97    #[serde(rename = "use")]
98    phantom_use: Option<String>,
99    pub id: String,
100
101    pub address: String,
102    #[serde(with = "string")]
103    pub port: u16,
104    pub listen_address: String,
105    pub exporter_port: u16,
106    pub health_check_port: u16,
107
108    pub provide_meta_node: Option<Vec<MetaNodeConfig>>,
109    pub provide_tempo: Option<Vec<TempoConfig>>,
110    pub provide_prometheus: Option<Vec<PrometheusConfig>>,
111
112    pub user_managed: bool,
113}
114
115#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
116#[serde(rename_all = "kebab-case")]
117#[serde(deny_unknown_fields)]
118pub struct CompactorConfig {
119    #[serde(rename = "use")]
120    phantom_use: Option<String>,
121    pub id: String,
122
123    pub address: String,
124    #[serde(with = "string")]
125    pub port: u16,
126    pub listen_address: String,
127    pub exporter_port: u16,
128
129    pub provide_minio: Option<Vec<MinioConfig>>,
130
131    pub provide_meta_node: Option<Vec<MetaNodeConfig>>,
132    pub provide_tempo: Option<Vec<TempoConfig>>,
133
134    pub user_managed: bool,
135    pub compaction_worker_threads_number: Option<usize>,
136
137    pub compactor_mode: String,
138}
139
140#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
141#[serde(rename_all = "kebab-case")]
142#[serde(deny_unknown_fields)]
143pub struct MinioConfig {
144    #[serde(rename = "use")]
145    phantom_use: Option<String>,
146    pub id: String,
147
148    pub address: String,
149    #[serde(with = "string")]
150    pub port: u16,
151    pub listen_address: String,
152
153    pub console_address: String,
154    #[serde(with = "string")]
155    pub console_port: u16,
156
157    pub root_user: String,
158    pub root_password: String,
159    pub hummock_bucket: String,
160
161    pub provide_prometheus: Option<Vec<PrometheusConfig>>,
162
163    // For rate limiting minio in a test environment.
164    pub api_requests_max: usize,
165    pub api_requests_deadline: String,
166}
167
168#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
169#[serde(rename_all = "kebab-case")]
170#[serde(deny_unknown_fields)]
171pub struct SqliteConfig {
172    #[serde(rename = "use")]
173    phantom_use: Option<String>,
174    pub id: String,
175
176    pub file: String,
177}
178
179#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
180#[serde(rename_all = "kebab-case")]
181#[serde(deny_unknown_fields)]
182pub struct PrometheusConfig {
183    #[serde(rename = "use")]
184    phantom_use: Option<String>,
185    pub id: String,
186
187    pub address: String,
188    #[serde(with = "string")]
189    pub port: u16,
190    pub listen_address: String,
191
192    pub remote_write: bool,
193    pub remote_write_region: String,
194    pub remote_write_url: String,
195
196    pub scrape_interval: String,
197
198    pub provide_compute_node: Option<Vec<ComputeNodeConfig>>,
199    pub provide_meta_node: Option<Vec<MetaNodeConfig>>,
200    pub provide_minio: Option<Vec<MinioConfig>>,
201    pub provide_compactor: Option<Vec<CompactorConfig>>,
202    pub provide_frontend: Option<Vec<FrontendConfig>>,
203}
204
205#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
206#[serde(rename_all = "kebab-case")]
207#[serde(deny_unknown_fields)]
208pub struct GrafanaConfig {
209    #[serde(rename = "use")]
210    phantom_use: Option<String>,
211    pub id: String,
212    pub address: String,
213    pub listen_address: String,
214    pub port: u16,
215
216    pub provide_prometheus: Option<Vec<PrometheusConfig>>,
217    pub provide_tempo: Option<Vec<TempoConfig>>,
218}
219
220#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
221#[serde(rename_all = "kebab-case")]
222#[serde(deny_unknown_fields)]
223pub struct TempoConfig {
224    #[serde(rename = "use")]
225    phantom_use: Option<String>,
226    pub id: String,
227
228    pub listen_address: String,
229    pub address: String,
230    pub port: u16,
231    pub otlp_port: u16,
232    pub max_bytes_per_trace: usize,
233}
234
235#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
236#[serde(rename_all = "kebab-case")]
237#[serde(deny_unknown_fields)]
238pub struct AwsS3Config {
239    #[serde(rename = "use")]
240    phantom_use: Option<String>,
241    pub id: String,
242    pub bucket: String,
243}
244
245#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
246#[serde(rename_all = "kebab-case")]
247#[serde(deny_unknown_fields)]
248pub struct OpendalConfig {
249    #[serde(rename = "use")]
250    phantom_use: Option<String>,
251
252    pub id: String,
253    pub engine: String,
254    pub namenode: String,
255    pub bucket: String,
256}
257
258#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
259#[serde(rename_all = "kebab-case")]
260#[serde(deny_unknown_fields)]
261pub struct KafkaConfig {
262    #[serde(rename = "use")]
263    phantom_use: Option<String>,
264    pub id: String,
265
266    /// Advertise address
267    pub address: String,
268    #[serde(with = "string")]
269    pub port: u16,
270    /// Port for other services in docker. They need to connect to `host.docker.internal`, while the host
271    /// need to connect to `localhost`.
272    pub docker_port: u16,
273
274    #[serde(with = "string")]
275    pub controller_port: u16,
276
277    pub image: String,
278    pub persist_data: bool,
279    pub node_id: u32,
280
281    pub user_managed: bool,
282}
283
284#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
285#[serde(rename_all = "kebab-case")]
286#[serde(deny_unknown_fields)]
287pub struct SchemaRegistryConfig {
288    #[serde(rename = "use")]
289    phantom_use: Option<String>,
290
291    pub id: String,
292
293    pub address: String,
294    #[serde(with = "string")]
295    pub port: u16,
296
297    pub provide_kafka: Option<Vec<KafkaConfig>>,
298
299    pub image: String,
300    /// Redpanda supports schema registry natively. You can configure a `user_managed` schema registry
301    /// to use with redpanda.
302    pub user_managed: bool,
303}
304
305#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
306#[serde(rename_all = "kebab-case")]
307#[serde(deny_unknown_fields)]
308pub struct PubsubConfig {
309    #[serde(rename = "use")]
310    phantom_use: Option<String>,
311    pub id: String,
312    #[serde(default)]
313    pub user_managed: bool,
314    #[serde(with = "string")]
315    pub port: u16,
316    pub address: String,
317
318    pub persist_data: bool,
319}
320
321#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
322#[serde(rename_all = "kebab-case")]
323#[serde(deny_unknown_fields)]
324pub struct PulsarConfig {
325    #[serde(rename = "use")]
326    phantom_use: Option<String>,
327    pub id: String,
328
329    pub address: String,
330    pub broker_port: u16,
331    pub http_port: u16,
332
333    pub user_managed: bool,
334    pub image: String,
335    pub persist_data: bool,
336}
337
338#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
339#[serde(rename_all = "kebab-case")]
340#[serde(deny_unknown_fields)]
341pub struct RedisConfig {
342    #[serde(rename = "use")]
343    phantom_use: Option<String>,
344    pub id: String,
345
346    pub port: u16,
347    pub address: String,
348}
349
350#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
351#[serde(rename_all = "kebab-case")]
352#[serde(deny_unknown_fields)]
353pub enum Application {
354    Metastore,
355    Connector,
356}
357
358#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
359#[serde(rename_all = "kebab-case")]
360#[serde(deny_unknown_fields)]
361pub struct MySqlConfig {
362    #[serde(rename = "use")]
363    phantom_use: Option<String>,
364    pub id: String,
365
366    pub port: u16,
367    pub address: String,
368
369    pub user: String,
370    pub password: String,
371    pub database: String,
372
373    pub application: Application,
374    pub image: String,
375    pub user_managed: bool,
376    pub persist_data: bool,
377}
378
379#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
380#[serde(rename_all = "kebab-case")]
381#[serde(deny_unknown_fields)]
382pub struct PostgresConfig {
383    #[serde(rename = "use")]
384    phantom_use: Option<String>,
385    pub id: String,
386
387    pub port: u16,
388    pub address: String,
389
390    pub user: String,
391    pub password: String,
392    pub database: String,
393
394    pub application: Application,
395    pub image: String,
396    pub user_managed: bool,
397    pub persist_data: bool,
398
399    // Inject latency into any network calls to the postgres service.
400    pub latency_ms: u32,
401}
402
403#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
404#[serde(rename_all = "kebab-case")]
405#[serde(deny_unknown_fields)]
406pub struct SqlServerConfig {
407    #[serde(rename = "use")]
408    phantom_use: Option<String>,
409    pub id: String,
410
411    pub port: u16,
412    pub address: String,
413
414    pub user: String,
415    pub password: String,
416    pub database: String,
417
418    pub image: String,
419    pub user_managed: bool,
420    pub persist_data: bool,
421}
422
423#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
424#[serde(rename_all = "kebab-case")]
425#[serde(deny_unknown_fields)]
426pub struct NatsConfig {
427    #[serde(rename = "use")]
428    phantom_use: Option<String>,
429    pub id: String,
430
431    pub address: String,
432    pub port: u16,
433    pub monitor_port: u16,
434
435    pub image: String,
436    pub user_managed: bool,
437    pub persist_data: bool,
438}
439
440#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
441#[serde(rename_all = "kebab-case")]
442#[serde(deny_unknown_fields)]
443pub struct MqttConfig {
444    #[serde(rename = "use")]
445    phantom_use: Option<String>,
446    pub id: String,
447
448    pub address: String,
449    pub port: u16,
450
451    pub image: String,
452    pub user_managed: bool,
453    pub persist_data: bool,
454}
455
456#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
457#[serde(rename_all = "kebab-case")]
458#[serde(deny_unknown_fields)]
459pub struct LakekeeperConfig {
460    #[serde(rename = "use")]
461    phantom_use: Option<String>,
462    pub id: String,
463
464    pub port: u16,
465    pub address: String,
466
467    pub user_managed: bool,
468    pub persist_data: bool,
469
470    pub catalog_backend: String,
471    pub encryption_key: String,
472    pub provide_postgres_backend: Option<Vec<PostgresConfig>>,
473    pub provide_minio: Option<Vec<MinioConfig>>,
474}
475
476#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
477#[serde(rename_all = "kebab-case")]
478#[serde(deny_unknown_fields)]
479pub struct MoatConfig {
480    #[serde(rename = "use")]
481    phantom_use: Option<String>,
482    pub id: String,
483
484    pub address: String,
485    pub port: u16,
486
487    pub provide_minio: Option<Vec<MinioConfig>>,
488}
489
490/// All service configuration
491#[derive(Clone, Debug, PartialEq)]
492pub enum ServiceConfig {
493    ComputeNode(ComputeNodeConfig),
494    MetaNode(MetaNodeConfig),
495    Frontend(FrontendConfig),
496    Compactor(CompactorConfig),
497    Minio(MinioConfig),
498    Sqlite(SqliteConfig),
499    Prometheus(PrometheusConfig),
500    Grafana(GrafanaConfig),
501    Tempo(TempoConfig),
502    Opendal(OpendalConfig),
503    AwsS3(AwsS3Config),
504    Kafka(KafkaConfig),
505    SchemaRegistry(SchemaRegistryConfig),
506    Pubsub(PubsubConfig),
507    Pulsar(PulsarConfig),
508    Redis(RedisConfig),
509    MySql(MySqlConfig),
510    Postgres(PostgresConfig),
511    SqlServer(SqlServerConfig),
512    Nats(NatsConfig),
513    Mqtt(MqttConfig),
514    Lakekeeper(LakekeeperConfig),
515    Moat(MoatConfig),
516}
517
518#[derive(PartialEq, Eq, Hash, Debug)]
519pub enum TaskGroup {
520    RisingWave,
521    Observability,
522    Kafka,
523    Pubsub,
524    Pulsar,
525    MySql,
526    Postgres,
527    SqlServer,
528    Nats,
529    Mqtt,
530    Redis,
531    Lakekeeper,
532    Moat,
533}
534
535impl ServiceConfig {
536    pub fn id(&self) -> &str {
537        match self {
538            Self::ComputeNode(c) => &c.id,
539            Self::MetaNode(c) => &c.id,
540            Self::Frontend(c) => &c.id,
541            Self::Compactor(c) => &c.id,
542            Self::Minio(c) => &c.id,
543            Self::Sqlite(c) => &c.id,
544            Self::Prometheus(c) => &c.id,
545            Self::Grafana(c) => &c.id,
546            Self::Tempo(c) => &c.id,
547            Self::AwsS3(c) => &c.id,
548            Self::Kafka(c) => &c.id,
549            Self::Pubsub(c) => &c.id,
550            Self::Pulsar(c) => &c.id,
551            Self::Redis(c) => &c.id,
552            Self::Opendal(c) => &c.id,
553            Self::MySql(c) => &c.id,
554            Self::Postgres(c) => &c.id,
555            Self::SqlServer(c) => &c.id,
556            Self::Nats(c) => &c.id,
557            Self::Mqtt(c) => &c.id,
558            Self::SchemaRegistry(c) => &c.id,
559            Self::Lakekeeper(c) => &c.id,
560            Self::Moat(c) => &c.id,
561        }
562    }
563
564    /// Used to check whether the port is occupied before running the service.
565    pub fn port(&self) -> Option<u16> {
566        match self {
567            Self::ComputeNode(c) => Some(c.port),
568            Self::MetaNode(c) => Some(c.port),
569            Self::Frontend(c) => Some(c.port),
570            Self::Compactor(c) => Some(c.port),
571            Self::Minio(c) => Some(c.port),
572            Self::Sqlite(_) => None,
573            Self::Prometheus(c) => Some(c.port),
574            Self::Grafana(c) => Some(c.port),
575            Self::Tempo(c) => Some(c.port),
576            Self::AwsS3(_) => None,
577            Self::Kafka(c) => Some(c.port),
578            Self::Pubsub(c) => Some(c.port),
579            Self::Pulsar(c) => Some(c.http_port),
580            Self::Redis(c) => Some(c.port),
581            Self::Opendal(_) => None,
582            Self::MySql(c) => Some(c.port),
583            Self::Postgres(c) => Some(c.port),
584            Self::SqlServer(c) => Some(c.port),
585            Self::Nats(c) => Some(c.port),
586            Self::Mqtt(c) => Some(c.port),
587            Self::SchemaRegistry(c) => Some(c.port),
588            Self::Lakekeeper(c) => Some(c.port),
589            Self::Moat(c) => Some(c.port),
590        }
591    }
592
593    pub fn user_managed(&self) -> bool {
594        match self {
595            Self::ComputeNode(c) => c.user_managed,
596            Self::MetaNode(c) => c.user_managed,
597            Self::Frontend(c) => c.user_managed,
598            Self::Compactor(c) => c.user_managed,
599            Self::Minio(_c) => false,
600            Self::Sqlite(_c) => false,
601            Self::Prometheus(_c) => false,
602            Self::Grafana(_c) => false,
603            Self::Tempo(_c) => false,
604            Self::AwsS3(_c) => false,
605            Self::Kafka(c) => c.user_managed,
606            Self::Pubsub(c) => c.user_managed,
607            Self::Pulsar(c) => c.user_managed,
608            Self::Redis(_c) => false,
609            Self::Opendal(_c) => false,
610            Self::MySql(c) => c.user_managed,
611            Self::Postgres(c) => c.user_managed,
612            Self::SqlServer(c) => c.user_managed,
613            Self::Nats(c) => c.user_managed,
614            Self::Mqtt(c) => c.user_managed,
615            Self::SchemaRegistry(c) => c.user_managed,
616            Self::Lakekeeper(c) => c.user_managed,
617            Self::Moat(_c) => false,
618        }
619    }
620
621    pub fn task_group(&self) -> TaskGroup {
622        use TaskGroup::*;
623        match self {
624            ServiceConfig::ComputeNode(_)
625            | ServiceConfig::MetaNode(_)
626            | ServiceConfig::Frontend(_)
627            | ServiceConfig::Compactor(_)
628            | ServiceConfig::Minio(_)
629            | ServiceConfig::Sqlite(_) => RisingWave,
630            ServiceConfig::Prometheus(_) | ServiceConfig::Grafana(_) | ServiceConfig::Tempo(_) => {
631                Observability
632            }
633            ServiceConfig::Opendal(_) | ServiceConfig::AwsS3(_) => RisingWave,
634            ServiceConfig::Kafka(_) | ServiceConfig::SchemaRegistry(_) => Kafka,
635            ServiceConfig::Pubsub(_) => Pubsub,
636            ServiceConfig::Pulsar(_) => Pulsar,
637            ServiceConfig::Redis(_) => Redis,
638            ServiceConfig::MySql(my_sql_config) => {
639                if matches!(my_sql_config.application, Application::Metastore) {
640                    RisingWave
641                } else {
642                    MySql
643                }
644            }
645            ServiceConfig::Postgres(postgres_config) => {
646                if matches!(postgres_config.application, Application::Metastore) {
647                    RisingWave
648                } else {
649                    Postgres
650                }
651            }
652            ServiceConfig::SqlServer(_) => SqlServer,
653            ServiceConfig::Nats(_) => Nats,
654            ServiceConfig::Mqtt(_) => Mqtt,
655            ServiceConfig::Lakekeeper(_) => Lakekeeper,
656            ServiceConfig::Moat(_) => Moat,
657        }
658    }
659}
660
661mod string {
662    use std::fmt::Display;
663    use std::str::FromStr;
664
665    use serde::{Deserialize, Deserializer, Serializer, de};
666
667    pub fn serialize<T, S>(value: &T, serializer: S) -> Result<S::Ok, S::Error>
668    where
669        T: Display,
670        S: Serializer,
671    {
672        serializer.collect_str(value)
673    }
674
675    pub fn deserialize<'de, T, D>(deserializer: D) -> Result<T, D::Error>
676    where
677        T: FromStr,
678        T::Err: Display,
679        D: Deserializer<'de>,
680    {
681        String::deserialize(deserializer)?
682            .parse()
683            .map_err(de::Error::custom)
684    }
685}