risingwave_connector_codec/decoder/
utils.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use num_bigint::{BigInt, Sign};
16
17use super::{AccessResult, bail_uncategorized};
18
19pub fn scaled_bigint_to_rust_decimal(
20    value: BigInt,
21    scale: usize,
22) -> AccessResult<rust_decimal::Decimal> {
23    let (sign, bytes) = value.to_bytes_be();
24    let negative = sign == Sign::Minus;
25    let (lo, mid, hi) = extract_decimal(bytes)?;
26
27    Ok(rust_decimal::Decimal::from_parts(
28        lo,
29        mid,
30        hi,
31        negative,
32        scale as u32,
33    ))
34}
35
36/// Converts a Rust Decimal back to a `BigInt` with scale for Avro encoding
37pub fn rust_decimal_to_scaled_bigint(
38    decimal: rust_decimal::Decimal,
39    expect_scale: usize,
40) -> Result<Vec<u8>, String> {
41    let mantissa = decimal.mantissa();
42    let scale = decimal.scale();
43    let big_decimal = bigdecimal::BigDecimal::from((mantissa, scale as i64));
44    let scaled_big_decimal = big_decimal.with_scale(expect_scale as i64);
45    let (scaled_big_int, _) = scaled_big_decimal.as_bigint_and_scale();
46
47    Ok(scaled_big_int.to_signed_bytes_be())
48}
49
50fn extract_decimal(bytes: Vec<u8>) -> AccessResult<(u32, u32, u32)> {
51    match bytes.len() {
52        len @ 0..=4 => {
53            let mut pad = vec![0; 4 - len];
54            pad.extend_from_slice(&bytes);
55            let lo = u32::from_be_bytes(pad.try_into().unwrap());
56            Ok((lo, 0, 0))
57        }
58        len @ 5..=8 => {
59            let zero_len = 8 - len;
60            let mid_end = 4 - zero_len;
61
62            let mut pad = vec![0; zero_len];
63            pad.extend_from_slice(&bytes[..mid_end]);
64            let mid = u32::from_be_bytes(pad.try_into().unwrap());
65
66            let lo = u32::from_be_bytes(bytes[mid_end..].to_owned().try_into().unwrap());
67            Ok((lo, mid, 0))
68        }
69        len @ 9..=12 => {
70            let zero_len = 12 - len;
71            let hi_end = 4 - zero_len;
72            let mid_end = hi_end + 4;
73
74            let mut pad = vec![0; zero_len];
75            pad.extend_from_slice(&bytes[..hi_end]);
76            let hi = u32::from_be_bytes(pad.try_into().unwrap());
77
78            let mid = u32::from_be_bytes(bytes[hi_end..mid_end].to_owned().try_into().unwrap());
79
80            let lo = u32::from_be_bytes(bytes[mid_end..].to_owned().try_into().unwrap());
81            Ok((lo, mid, hi))
82        }
83        _ => bail_uncategorized!("invalid decimal bytes length {}", bytes.len()),
84    }
85}
86
87#[cfg(test)]
88mod tests {
89
90    #[test]
91    fn test_extract_decimal() {
92        let decimal_max = rust_decimal::Decimal::MAX; // 79228162514264337593543950335
93        let decimal_min = rust_decimal::Decimal::MIN; // -79228162514264337593543950335
94
95        println!("decimal_max: {}", decimal_max);
96        println!("decimal_min: {}", decimal_min);
97    }
98}