risingwave_frontend/utils/
overwrite_options.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use crate::handler::HandlerArgs;
16
17/// Some options can be configured both in `WITH` clause and session variables.
18/// The config in `WITH` clause has higher priority.
19#[derive(Debug, Clone, Default)]
20pub struct OverwriteOptions {
21    pub source_rate_limit: Option<u32>,
22    pub backfill_rate_limit: Option<u32>,
23    pub dml_rate_limit: Option<u32>,
24    pub sink_rate_limit: Option<u32>,
25}
26
27impl OverwriteOptions {
28    pub(crate) const BACKFILL_RATE_LIMIT_KEY: &'static str = "backfill_rate_limit";
29    pub(crate) const DML_RATE_LIMIT_KEY: &'static str = "dml_rate_limit";
30    pub(crate) const SINK_RATE_LIMIT_KEY: &'static str = "sink_rate_limit";
31    pub(crate) const SOURCE_RATE_LIMIT_KEY: &'static str = "source_rate_limit";
32
33    pub fn new(args: &mut HandlerArgs) -> Self {
34        let source_rate_limit = {
35            if let Some(x) = args.with_options.remove(Self::SOURCE_RATE_LIMIT_KEY) {
36                // FIXME(tabVersion): validate the value
37                Some(x.parse::<u32>().unwrap())
38            } else {
39                let rate_limit = args.session.config().source_rate_limit();
40                if rate_limit < 0 {
41                    None
42                } else {
43                    Some(rate_limit as u32)
44                }
45            }
46        };
47        let backfill_rate_limit = {
48            if let Some(x) = args.with_options.remove(Self::BACKFILL_RATE_LIMIT_KEY) {
49                // FIXME(tabVersion): validate the value
50                Some(x.parse::<u32>().unwrap())
51            } else {
52                let rate_limit = args.session.config().backfill_rate_limit();
53                if rate_limit < 0 {
54                    None
55                } else {
56                    Some(rate_limit as u32)
57                }
58            }
59        };
60        let dml_rate_limit = {
61            if let Some(x) = args.with_options.remove(Self::DML_RATE_LIMIT_KEY) {
62                // FIXME(tabVersion): validate the value
63                Some(x.parse::<u32>().unwrap())
64            } else {
65                let rate_limit = args.session.config().dml_rate_limit();
66                if rate_limit < 0 {
67                    None
68                } else {
69                    Some(rate_limit as u32)
70                }
71            }
72        };
73        let sink_rate_limit = {
74            if let Some(x) = args.with_options.remove(Self::SINK_RATE_LIMIT_KEY) {
75                // FIXME(tabVersion): validate the value
76                Some(x.parse::<u32>().unwrap())
77            } else {
78                let rate_limit = args.session.config().sink_rate_limit();
79                if rate_limit < 0 {
80                    None
81                } else {
82                    Some(rate_limit as u32)
83                }
84            }
85        };
86        Self {
87            source_rate_limit,
88            backfill_rate_limit,
89            dml_rate_limit,
90            sink_rate_limit,
91        }
92    }
93}