Skip to main content

risingwave_frontend/binder/relation/
gap_fill.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use itertools::Itertools;
16use risingwave_common::catalog::Field;
17use risingwave_common::gap_fill::FillStrategy;
18use risingwave_common::types::{DataType, Interval};
19use risingwave_sqlparser::ast::{Expr as AstExpr, FunctionArg, FunctionArgExpr, TableAlias};
20
21use super::{Binder, Relation};
22use crate::binder::BoundFillStrategy;
23use crate::error::{ErrorCode, Result as RwResult};
24use crate::expr::{Expr, ExprImpl, InputRef};
25
26#[derive(Debug, Clone)]
27pub struct BoundGapFill {
28    pub input: Relation,
29    pub time_col: InputRef,
30    pub interval: ExprImpl,
31    pub fill_strategies: Vec<BoundFillStrategy>,
32    pub partition_by_cols: Vec<InputRef>,
33}
34
35impl Binder {
36    pub(super) fn bind_gap_fill(
37        &mut self,
38        alias: Option<&TableAlias>,
39        args: &[FunctionArg],
40    ) -> RwResult<BoundGapFill> {
41        if args.len() < 3 {
42            return Err(ErrorCode::BindError(
43                "GAP_FILL requires at least 3 arguments: input, time_col, interval".to_owned(),
44            )
45            .into());
46        }
47
48        let mut args_iter = args.iter();
49
50        self.push_context();
51
52        let (input, table_name) = self.bind_relation_by_function_arg(
53            args_iter.next(),
54            "The 1st arg of GAP_FILL should be a table name",
55        )?;
56
57        let time_col = self.bind_column_by_function_args(
58            args_iter.next(),
59            "The 2nd arg of GAP_FILL should be a column name",
60        )?;
61
62        if !matches!(
63            time_col.data_type,
64            DataType::Timestamp | DataType::Timestamptz
65        ) {
66            return Err(ErrorCode::BindError(
67                "The 2nd arg of GAP_FILL should be a column of type timestamp or timestamptz"
68                    .to_owned(),
69            )
70            .into());
71        }
72
73        let interval_arg = args_iter.next().unwrap();
74        let interval_exprs = self.bind_function_arg(interval_arg)?;
75        let interval = Itertools::exactly_one(interval_exprs.into_iter()).map_err(|_| {
76            ErrorCode::BindError("The 3rd arg of GAP_FILL should be a single expression".to_owned())
77        })?;
78        if interval.return_type() != DataType::Interval {
79            return Err(ErrorCode::BindError(
80                "The 3rd arg of GAP_FILL should be an interval".to_owned(),
81            )
82            .into());
83        }
84
85        // Reject non-positive intervals. Fold constant expressions
86        // (e.g. `INTERVAL '2' MINUTE - INTERVAL '3' MINUTE`) so the check is not limited to bare
87        // literals.
88        if let Some(folded) = interval.try_fold_const()
89            && let Some(risingwave_common::types::ScalarImpl::Interval(interval_value)) = folded?
90            && interval_value <= Interval::from_month_day_usec(0, 0, 0)
91        {
92            return Err(
93                ErrorCode::BindError("The gap fill interval must be positive".to_owned()).into(),
94            );
95        }
96
97        let mut fill_strategies = vec![];
98        let mut partition_by_cols = vec![];
99        let mut seen_partition_by_cols = std::collections::HashSet::new();
100        for arg in args_iter {
101            if let FunctionArg::Unnamed(FunctionArgExpr::Expr(AstExpr::Function(func))) = arg {
102                let name = func.name.0[0].real_value().to_ascii_lowercase();
103
104                // Handle PARTITION_BY(col1, col2, ...) as a special directive
105                if name == "partition_by" {
106                    if func.arg_list.args.is_empty() {
107                        return Err(ErrorCode::BindError(
108                            "PARTITION_BY requires at least one column argument".to_owned(),
109                        )
110                        .into());
111                    }
112                    for partition_arg in &func.arg_list.args {
113                        let arg_exprs = self.bind_function_arg(partition_arg)?;
114                        let arg_expr =
115                            Itertools::exactly_one(arg_exprs.into_iter()).map_err(|_| {
116                                ErrorCode::BindError(
117                                    "PARTITION_BY argument should be a single column reference"
118                                        .to_owned(),
119                                )
120                            })?;
121                        if let ExprImpl::InputRef(input_ref) = arg_expr {
122                            if input_ref.index() == time_col.index() {
123                                return Err(ErrorCode::BindError(
124                                    "PARTITION_BY cannot include the time column".to_owned(),
125                                )
126                                .into());
127                            }
128                            // Canonicalize duplicate partition columns eagerly so downstream
129                            // planning always sees a unique partition key list.
130                            if !seen_partition_by_cols.insert(input_ref.index()) {
131                                continue;
132                            }
133                            partition_by_cols.push(*input_ref);
134                        } else {
135                            return Err(ErrorCode::BindError(
136                                "PARTITION_BY argument must be a column reference".to_owned(),
137                            )
138                            .into());
139                        }
140                    }
141                    continue;
142                }
143
144                let strategy = match name.as_str() {
145                    "interpolate" => FillStrategy::Interpolate,
146                    "locf" => FillStrategy::Locf,
147                    "keepnull" => FillStrategy::Null,
148                    _ => {
149                        return Err(ErrorCode::BindError(format!(
150                            "Unsupported fill strategy: {}",
151                            name
152                        ))
153                        .into());
154                    }
155                };
156
157                if func.arg_list.args.len() != 1 {
158                    return Err(ErrorCode::BindError(format!(
159                        "Fill strategy function {} expects exactly one argument",
160                        name
161                    ))
162                    .into());
163                }
164
165                let arg_exprs = self.bind_function_arg(&func.arg_list.args[0])?;
166                let arg_expr = Itertools::exactly_one(arg_exprs.into_iter()).map_err(|_| {
167                    ErrorCode::BindError(
168                        "Fill strategy argument should be a single expression".to_owned(),
169                    )
170                })?;
171
172                if let ExprImpl::InputRef(input_ref) = arg_expr {
173                    if input_ref.index() == time_col.index() {
174                        return Err(ErrorCode::BindError(
175                            "Cannot apply a fill strategy to the time column".to_owned(),
176                        )
177                        .into());
178                    }
179                    // Check datatype for interpolate
180                    if matches!(strategy, FillStrategy::Interpolate) {
181                        let data_type = &input_ref.data_type;
182                        if !data_type.is_numeric() || matches!(data_type, DataType::Serial) {
183                            return Err(ErrorCode::BindError(format!(
184                                "INTERPOLATE only supports numeric types, got {}",
185                                data_type
186                            ))
187                            .into());
188                        }
189                    }
190                    fill_strategies.push(BoundFillStrategy {
191                        strategy,
192                        target_col: *input_ref,
193                    });
194                } else {
195                    return Err(ErrorCode::BindError(
196                        "Fill strategy argument must be a column reference".to_owned(),
197                    )
198                    .into());
199                }
200            } else {
201                return Err(ErrorCode::BindError(
202                    "Fill strategy must be a function call like LOCF(col) or PARTITION_BY(col)"
203                        .to_owned(),
204                )
205                .into());
206            }
207        }
208
209        // A PARTITION_BY column defines the time-series identity and is always carried into
210        // generated rows, so a fill strategy on it would be silently ignored. Reject it.
211        for strategy in &fill_strategies {
212            if partition_by_cols
213                .iter()
214                .any(|c| c.index() == strategy.target_col.index())
215            {
216                return Err(ErrorCode::BindError(
217                    "Cannot apply a fill strategy to a PARTITION_BY column".to_owned(),
218                )
219                .into());
220            }
221        }
222
223        let base_columns = std::mem::take(&mut self.context.columns);
224        self.pop_context()?;
225
226        let columns = base_columns
227            .into_iter()
228            .map(|c| (c.is_hidden, c.field))
229            .collect::<Vec<(bool, Field)>>();
230
231        // Parse into the relation argument of the `gap_fill` function.
232        let (schema_name, table_name) =
233            Self::resolve_schema_qualified_name(&self.db_name, &table_name)?;
234        self.bind_table_to_context(columns, table_name, schema_name, alias)?;
235
236        Ok(BoundGapFill {
237            input,
238            time_col: *time_col,
239            interval,
240            fill_strategies,
241            partition_by_cols,
242        })
243    }
244}