risingwave_frontend/binder/relation/
gap_fill.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use itertools::Itertools;
16use risingwave_common::catalog::Field;
17use risingwave_common::gap_fill_types::FillStrategy;
18use risingwave_common::types::DataType;
19use risingwave_sqlparser::ast::{Expr as AstExpr, FunctionArg, FunctionArgExpr, TableAlias};
20
21use super::{Binder, Relation};
22use crate::binder::BoundFillStrategy;
23use crate::error::{ErrorCode, Result as RwResult};
24use crate::expr::{Expr, ExprImpl, InputRef};
25
26#[derive(Debug, Clone)]
27pub struct BoundGapFill {
28    pub input: Relation,
29    pub time_col: InputRef,
30    pub interval: ExprImpl,
31    pub fill_strategies: Vec<BoundFillStrategy>,
32}
33
34impl Binder {
35    pub(super) fn bind_gap_fill(
36        &mut self,
37        alias: Option<&TableAlias>,
38        args: &[FunctionArg],
39    ) -> RwResult<BoundGapFill> {
40        if args.len() < 3 {
41            return Err(ErrorCode::BindError(
42                "GAP_FILL requires at least 3 arguments: input, time_col, interval".to_owned(),
43            )
44            .into());
45        }
46
47        let mut args_iter = args.iter();
48
49        self.push_context();
50
51        let (input, table_name) = self.bind_relation_by_function_arg(
52            args_iter.next(),
53            "The 1st arg of GAP_FILL should be a table name",
54        )?;
55
56        let time_col = self.bind_column_by_function_args(
57            args_iter.next(),
58            "The 2nd arg of GAP_FILL should be a column name",
59        )?;
60
61        if !matches!(
62            time_col.data_type,
63            DataType::Timestamp | DataType::Timestamptz
64        ) {
65            return Err(ErrorCode::BindError(
66                "The 2nd arg of GAP_FILL should be a column of type timestamp or timestamptz"
67                    .to_owned(),
68            )
69            .into());
70        }
71
72        let interval_arg = args_iter.next().unwrap();
73        let interval_exprs = self.bind_function_arg(interval_arg)?;
74        let interval = interval_exprs.into_iter().exactly_one().map_err(|_| {
75            ErrorCode::BindError("The 3rd arg of GAP_FILL should be a single expression".to_owned())
76        })?;
77        if interval.return_type() != DataType::Interval {
78            return Err(ErrorCode::BindError(
79                "The 3rd arg of GAP_FILL should be an interval".to_owned(),
80            )
81            .into());
82        }
83
84        // Validate that the interval is not zero (only works for constant intervals)
85        if let ExprImpl::Literal(literal) = &interval
86            && let Some(risingwave_common::types::ScalarImpl::Interval(interval_value)) =
87                literal.get_data()
88            && interval_value.months() == 0
89            && interval_value.days() == 0
90            && interval_value.usecs() == 0
91        {
92            return Err(
93                ErrorCode::BindError("The gap fill interval cannot be zero".to_owned()).into(),
94            );
95        }
96
97        let mut fill_strategies = vec![];
98        for arg in args_iter {
99            let (strategy, target_col) =
100                if let FunctionArg::Unnamed(FunctionArgExpr::Expr(AstExpr::Function(func))) = arg {
101                    let name = func.name.0[0].real_value().to_ascii_lowercase();
102
103                    let strategy = match name.as_str() {
104                        "interpolate" => FillStrategy::Interpolate,
105                        "locf" => FillStrategy::Locf,
106                        "keepnull" => FillStrategy::Null,
107                        _ => {
108                            return Err(ErrorCode::BindError(format!(
109                                "Unsupported fill strategy: {}",
110                                name
111                            ))
112                            .into());
113                        }
114                    };
115
116                    if func.arg_list.args.len() != 1 {
117                        return Err(ErrorCode::BindError(format!(
118                            "Fill strategy function {} expects exactly one argument",
119                            name
120                        ))
121                        .into());
122                    }
123
124                    let arg_exprs = self.bind_function_arg(&func.arg_list.args[0])?;
125                    let arg_expr = arg_exprs.into_iter().exactly_one().map_err(|_| {
126                        ErrorCode::BindError(
127                            "Fill strategy argument should be a single expression".to_owned(),
128                        )
129                    })?;
130
131                    if let ExprImpl::InputRef(input_ref) = arg_expr {
132                        // Check datatype for interpolate
133                        if matches!(strategy, FillStrategy::Interpolate) {
134                            let data_type = &input_ref.data_type;
135                            if !data_type.is_numeric() || matches!(data_type, DataType::Serial) {
136                                return Err(ErrorCode::BindError(format!(
137                                    "INTERPOLATE only supports numeric types, got {}",
138                                    data_type
139                                ))
140                                .into());
141                            }
142                        }
143                        (strategy, *input_ref)
144                    } else {
145                        return Err(ErrorCode::BindError(
146                            "Fill strategy argument must be a column reference".to_owned(),
147                        )
148                        .into());
149                    }
150                } else {
151                    return Err(ErrorCode::BindError(
152                        "Fill strategy must be a function call like LOCF(col)".to_owned(),
153                    )
154                    .into());
155                };
156            fill_strategies.push(BoundFillStrategy {
157                strategy,
158                target_col,
159            });
160        }
161
162        let base_columns = std::mem::take(&mut self.context.columns);
163        self.pop_context()?;
164
165        let columns = base_columns
166            .into_iter()
167            .map(|c| (c.is_hidden, c.field))
168            .collect::<Vec<(bool, Field)>>();
169
170        let (_, table_name) = Self::resolve_schema_qualified_name(&self.db_name, &table_name)?;
171        self.bind_table_to_context(columns, table_name, alias)?;
172
173        Ok(BoundGapFill {
174            input,
175            time_col: *time_col,
176            interval,
177            fill_strategies,
178        })
179    }
180}