risingwave_frontend/utils/
overwrite_options.rs1use crate::handler::HandlerArgs;
16
17#[derive(Debug, Clone, Default)]
18pub struct OverwriteOptions {
19 pub source_rate_limit: Option<u32>,
20 pub backfill_rate_limit: Option<u32>,
21 pub dml_rate_limit: Option<u32>,
22 pub sink_rate_limit: Option<u32>,
23}
24
25impl OverwriteOptions {
26 pub(crate) const BACKFILL_RATE_LIMIT_KEY: &'static str = "backfill_rate_limit";
27 pub(crate) const DML_RATE_LIMIT_KEY: &'static str = "dml_rate_limit";
28 pub(crate) const SINK_RATE_LIMIT_KEY: &'static str = "sink_rate_limit";
29 pub(crate) const SOURCE_RATE_LIMIT_KEY: &'static str = "source_rate_limit";
30
31 pub fn new(args: &mut HandlerArgs) -> Self {
32 let source_rate_limit = {
33 if let Some(x) = args.with_options.remove(Self::SOURCE_RATE_LIMIT_KEY) {
34 Some(x.parse::<u32>().unwrap())
36 } else {
37 let rate_limit = args.session.config().source_rate_limit();
38 if rate_limit < 0 {
39 None
40 } else {
41 Some(rate_limit as u32)
42 }
43 }
44 };
45 let backfill_rate_limit = {
46 if let Some(x) = args.with_options.remove(Self::BACKFILL_RATE_LIMIT_KEY) {
47 Some(x.parse::<u32>().unwrap())
49 } else {
50 let rate_limit = args.session.config().backfill_rate_limit();
51 if rate_limit < 0 {
52 None
53 } else {
54 Some(rate_limit as u32)
55 }
56 }
57 };
58 let dml_rate_limit = {
59 if let Some(x) = args.with_options.remove(Self::DML_RATE_LIMIT_KEY) {
60 Some(x.parse::<u32>().unwrap())
62 } else {
63 let rate_limit = args.session.config().dml_rate_limit();
64 if rate_limit < 0 {
65 None
66 } else {
67 Some(rate_limit as u32)
68 }
69 }
70 };
71 let sink_rate_limit = {
72 if let Some(x) = args.with_options.remove(Self::SINK_RATE_LIMIT_KEY) {
73 Some(x.parse::<u32>().unwrap())
75 } else {
76 let rate_limit = args.session.config().sink_rate_limit();
77 if rate_limit < 0 {
78 None
79 } else {
80 Some(rate_limit as u32)
81 }
82 }
83 };
84 Self {
85 source_rate_limit,
86 backfill_rate_limit,
87 dml_rate_limit,
88 sink_rate_limit,
89 }
90 }
91}