risingwave_common/gap_fill.rs
1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use num_integer::Integer;
16
17use crate::types::{Datum, DatumRef, Decimal, ScalarImpl, ScalarRefImpl};
18
19/// Strategy for filling gaps in time series data.
20#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
21pub enum FillStrategy {
22 Interpolate,
23 Locf,
24 Null,
25}
26
27/// Calculates the step size for interpolation between two values.
28///
29/// # Parameters
30/// - `d1`: The starting value as a `DatumRef`.
31/// - `d2`: The ending value as a `DatumRef`.
32/// - `steps`: The number of steps to interpolate between `d1` and `d2`.
33///
34/// # Returns
35/// Returns a `Datum` representing the step size for each interpolation step,
36/// or `None` if the input values are not compatible or `steps` is zero.
37///
38/// # Calculation
39/// For supported types, computes `(d2 - d1) / steps` and returns the result as a `Datum`.
40pub fn calculate_interpolation_step(d1: DatumRef<'_>, d2: DatumRef<'_>, steps: usize) -> Datum {
41 let (Some(s1), Some(s2)) = (d1, d2) else {
42 return None;
43 };
44 if steps == 0 {
45 return None;
46 }
47 match (s1, s2) {
48 (ScalarRefImpl::Int16(v1), ScalarRefImpl::Int16(v2)) => {
49 // Convert to i64 to avoid overflow, then convert back
50 let diff = (v2 as i64) - (v1 as i64);
51 Some(ScalarImpl::Int16(
52 Integer::div_floor(&diff, &(steps as i64)) as i16,
53 ))
54 }
55 (ScalarRefImpl::Int32(v1), ScalarRefImpl::Int32(v2)) => {
56 // Convert to i64 to avoid overflow, then convert back
57 let diff = (v2 as i64) - (v1 as i64);
58 Some(ScalarImpl::Int32(
59 Integer::div_floor(&diff, &(steps as i64)) as i32,
60 ))
61 }
62 (ScalarRefImpl::Int64(v1), ScalarRefImpl::Int64(v2)) => Some(ScalarImpl::Int64(
63 Integer::div_floor(&(v2 - v1), &(steps as i64)),
64 )),
65 (ScalarRefImpl::Float32(v1), ScalarRefImpl::Float32(v2)) => {
66 Some(ScalarImpl::Float32((v2 - v1) / steps as f32))
67 }
68 (ScalarRefImpl::Float64(v1), ScalarRefImpl::Float64(v2)) => {
69 Some(ScalarImpl::Float64((v2 - v1) / steps as f64))
70 }
71 (ScalarRefImpl::Decimal(v1), ScalarRefImpl::Decimal(v2)) => {
72 Some(ScalarImpl::Decimal((v2 - v1) / Decimal::from(steps)))
73 }
74 _ => None,
75 }
76}
77
78/// Mutates the `current` datum by adding the value of `step` to it.
79///
80/// This function is used during the interpolation process in gap filling,
81/// where it incrementally updates the datum to generate intermediate values
82/// between known data points.
83pub fn apply_interpolation_step(current: &mut Datum, step: &ScalarImpl) {
84 if let Some(curr) = current.as_mut() {
85 match (curr, step) {
86 (ScalarImpl::Int16(v1), &ScalarImpl::Int16(v2)) => *v1 += v2,
87 (ScalarImpl::Int32(v1), &ScalarImpl::Int32(v2)) => *v1 += v2,
88 (ScalarImpl::Int64(v1), &ScalarImpl::Int64(v2)) => *v1 += v2,
89 (ScalarImpl::Float32(v1), &ScalarImpl::Float32(v2)) => *v1 += v2,
90 (ScalarImpl::Float64(v1), &ScalarImpl::Float64(v2)) => *v1 += v2,
91 (ScalarImpl::Decimal(v1), &ScalarImpl::Decimal(v2)) => *v1 = *v1 + v2,
92 _ => (),
93 }
94 }
95}