risingwave_frontend/utils/
overwrite_options.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use crate::handler::HandlerArgs;
16
17#[derive(Debug, Clone, Default)]
18pub struct OverwriteOptions {
19    pub source_rate_limit: Option<u32>,
20    pub backfill_rate_limit: Option<u32>,
21    pub dml_rate_limit: Option<u32>,
22    pub sink_rate_limit: Option<u32>,
23}
24
25impl OverwriteOptions {
26    pub(crate) const BACKFILL_RATE_LIMIT_KEY: &'static str = "backfill_rate_limit";
27    pub(crate) const DML_RATE_LIMIT_KEY: &'static str = "dml_rate_limit";
28    pub(crate) const SINK_RATE_LIMIT_KEY: &'static str = "sink_rate_limit";
29    pub(crate) const SOURCE_RATE_LIMIT_KEY: &'static str = "source_rate_limit";
30
31    pub fn new(args: &mut HandlerArgs) -> Self {
32        let source_rate_limit = {
33            if let Some(x) = args.with_options.remove(Self::SOURCE_RATE_LIMIT_KEY) {
34                // FIXME(tabVersion): validate the value
35                Some(x.parse::<u32>().unwrap())
36            } else {
37                let rate_limit = args.session.config().source_rate_limit();
38                if rate_limit < 0 {
39                    None
40                } else {
41                    Some(rate_limit as u32)
42                }
43            }
44        };
45        let backfill_rate_limit = {
46            if let Some(x) = args.with_options.remove(Self::BACKFILL_RATE_LIMIT_KEY) {
47                // FIXME(tabVersion): validate the value
48                Some(x.parse::<u32>().unwrap())
49            } else {
50                let rate_limit = args.session.config().backfill_rate_limit();
51                if rate_limit < 0 {
52                    None
53                } else {
54                    Some(rate_limit as u32)
55                }
56            }
57        };
58        let dml_rate_limit = {
59            if let Some(x) = args.with_options.remove(Self::DML_RATE_LIMIT_KEY) {
60                // FIXME(tabVersion): validate the value
61                Some(x.parse::<u32>().unwrap())
62            } else {
63                let rate_limit = args.session.config().dml_rate_limit();
64                if rate_limit < 0 {
65                    None
66                } else {
67                    Some(rate_limit as u32)
68                }
69            }
70        };
71        let sink_rate_limit = {
72            if let Some(x) = args.with_options.remove(Self::SINK_RATE_LIMIT_KEY) {
73                // FIXME(tabVersion): validate the value
74                Some(x.parse::<u32>().unwrap())
75            } else {
76                let rate_limit = args.session.config().sink_rate_limit();
77                if rate_limit < 0 {
78                    None
79                } else {
80                    Some(rate_limit as u32)
81                }
82            }
83        };
84        Self {
85            source_rate_limit,
86            backfill_rate_limit,
87            dml_rate_limit,
88            sink_rate_limit,
89        }
90    }
91}