risingwave_common/
gap_fill.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use num_integer::Integer;
16
17use crate::types::{Datum, DatumRef, Decimal, ScalarImpl, ScalarRefImpl};
18
19/// Strategy for filling gaps in time series data.
20#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
21pub enum FillStrategy {
22    Interpolate,
23    Locf,
24    Null,
25}
26
27/// Calculates the step size for interpolation between two values.
28///
29/// # Parameters
30/// - `d1`: The starting value as a `DatumRef`.
31/// - `d2`: The ending value as a `DatumRef`.
32/// - `steps`: The number of steps to interpolate between `d1` and `d2`.
33///
34/// # Returns
35/// Returns a `Datum` representing the step size for each interpolation step,
36/// or `None` if the input values are not compatible or `steps` is zero.
37///
38/// # Calculation
39/// For supported types, computes `(d2 - d1) / steps` and returns the result as a `Datum`.
40pub fn calculate_interpolation_step(d1: DatumRef<'_>, d2: DatumRef<'_>, steps: usize) -> Datum {
41    let (Some(s1), Some(s2)) = (d1, d2) else {
42        return None;
43    };
44    if steps == 0 {
45        return None;
46    }
47    match (s1, s2) {
48        (ScalarRefImpl::Int16(v1), ScalarRefImpl::Int16(v2)) => {
49            // Convert to i64 to avoid overflow, then convert back
50            let diff = (v2 as i64) - (v1 as i64);
51            Some(ScalarImpl::Int16(
52                Integer::div_floor(&diff, &(steps as i64)) as i16,
53            ))
54        }
55        (ScalarRefImpl::Int32(v1), ScalarRefImpl::Int32(v2)) => {
56            // Convert to i64 to avoid overflow, then convert back
57            let diff = (v2 as i64) - (v1 as i64);
58            Some(ScalarImpl::Int32(
59                Integer::div_floor(&diff, &(steps as i64)) as i32,
60            ))
61        }
62        (ScalarRefImpl::Int64(v1), ScalarRefImpl::Int64(v2)) => Some(ScalarImpl::Int64(
63            Integer::div_floor(&(v2 - v1), &(steps as i64)),
64        )),
65        (ScalarRefImpl::Float32(v1), ScalarRefImpl::Float32(v2)) => {
66            Some(ScalarImpl::Float32((v2 - v1) / steps as f32))
67        }
68        (ScalarRefImpl::Float64(v1), ScalarRefImpl::Float64(v2)) => {
69            Some(ScalarImpl::Float64((v2 - v1) / steps as f64))
70        }
71        (ScalarRefImpl::Decimal(v1), ScalarRefImpl::Decimal(v2)) => {
72            Some(ScalarImpl::Decimal((v2 - v1) / Decimal::from(steps)))
73        }
74        _ => None,
75    }
76}
77
78/// Mutates the `current` datum by adding the value of `step` to it.
79///
80/// This function is used during the interpolation process in gap filling,
81/// where it incrementally updates the datum to generate intermediate values
82/// between known data points.
83pub fn apply_interpolation_step(current: &mut Datum, step: &ScalarImpl) {
84    if let Some(curr) = current.as_mut() {
85        match (curr, step) {
86            (ScalarImpl::Int16(v1), &ScalarImpl::Int16(v2)) => *v1 += v2,
87            (ScalarImpl::Int32(v1), &ScalarImpl::Int32(v2)) => *v1 += v2,
88            (ScalarImpl::Int64(v1), &ScalarImpl::Int64(v2)) => *v1 += v2,
89            (ScalarImpl::Float32(v1), &ScalarImpl::Float32(v2)) => *v1 += v2,
90            (ScalarImpl::Float64(v1), &ScalarImpl::Float64(v2)) => *v1 += v2,
91            (ScalarImpl::Decimal(v1), &ScalarImpl::Decimal(v2)) => *v1 = *v1 + v2,
92            _ => (),
93        }
94    }
95}