risingwave_frontend/binder/relation/
gap_fill.rs1use itertools::Itertools;
16use risingwave_common::catalog::Field;
17use risingwave_common::gap_fill_types::FillStrategy;
18use risingwave_common::types::DataType;
19use risingwave_sqlparser::ast::{Expr as AstExpr, FunctionArg, FunctionArgExpr, TableAlias};
20
21use super::{Binder, Relation};
22use crate::binder::BoundFillStrategy;
23use crate::error::{ErrorCode, Result as RwResult};
24use crate::expr::{Expr, ExprImpl, InputRef};
25
26#[derive(Debug, Clone)]
27pub struct BoundGapFill {
28 pub input: Relation,
29 pub time_col: InputRef,
30 pub interval: ExprImpl,
31 pub fill_strategies: Vec<BoundFillStrategy>,
32}
33
34impl Binder {
35 pub(super) fn bind_gap_fill(
36 &mut self,
37 alias: Option<&TableAlias>,
38 args: &[FunctionArg],
39 ) -> RwResult<BoundGapFill> {
40 if args.len() < 3 {
41 return Err(ErrorCode::BindError(
42 "GAP_FILL requires at least 3 arguments: input, time_col, interval".to_owned(),
43 )
44 .into());
45 }
46
47 let mut args_iter = args.iter();
48
49 self.push_context();
50
51 let (input, table_name) = self.bind_relation_by_function_arg(
52 args_iter.next(),
53 "The 1st arg of GAP_FILL should be a table name",
54 )?;
55
56 let time_col = self.bind_column_by_function_args(
57 args_iter.next(),
58 "The 2nd arg of GAP_FILL should be a column name",
59 )?;
60
61 if !matches!(
62 time_col.data_type,
63 DataType::Timestamp | DataType::Timestamptz
64 ) {
65 return Err(ErrorCode::BindError(
66 "The 2nd arg of GAP_FILL should be a column of type timestamp or timestamptz"
67 .to_owned(),
68 )
69 .into());
70 }
71
72 let interval_arg = args_iter.next().unwrap();
73 let interval_exprs = self.bind_function_arg(interval_arg)?;
74 let interval = interval_exprs.into_iter().exactly_one().map_err(|_| {
75 ErrorCode::BindError("The 3rd arg of GAP_FILL should be a single expression".to_owned())
76 })?;
77 if interval.return_type() != DataType::Interval {
78 return Err(ErrorCode::BindError(
79 "The 3rd arg of GAP_FILL should be an interval".to_owned(),
80 )
81 .into());
82 }
83
84 if let ExprImpl::Literal(literal) = &interval
86 && let Some(risingwave_common::types::ScalarImpl::Interval(interval_value)) =
87 literal.get_data()
88 && interval_value.months() == 0
89 && interval_value.days() == 0
90 && interval_value.usecs() == 0
91 {
92 return Err(
93 ErrorCode::BindError("The gap fill interval cannot be zero".to_owned()).into(),
94 );
95 }
96
97 let mut fill_strategies = vec![];
98 for arg in args_iter {
99 let (strategy, target_col) =
100 if let FunctionArg::Unnamed(FunctionArgExpr::Expr(AstExpr::Function(func))) = arg {
101 let name = func.name.0[0].real_value().to_ascii_lowercase();
102
103 let strategy = match name.as_str() {
104 "interpolate" => FillStrategy::Interpolate,
105 "locf" => FillStrategy::Locf,
106 "keepnull" => FillStrategy::Null,
107 _ => {
108 return Err(ErrorCode::BindError(format!(
109 "Unsupported fill strategy: {}",
110 name
111 ))
112 .into());
113 }
114 };
115
116 if func.arg_list.args.len() != 1 {
117 return Err(ErrorCode::BindError(format!(
118 "Fill strategy function {} expects exactly one argument",
119 name
120 ))
121 .into());
122 }
123
124 let arg_exprs = self.bind_function_arg(&func.arg_list.args[0])?;
125 let arg_expr = arg_exprs.into_iter().exactly_one().map_err(|_| {
126 ErrorCode::BindError(
127 "Fill strategy argument should be a single expression".to_owned(),
128 )
129 })?;
130
131 if let ExprImpl::InputRef(input_ref) = arg_expr {
132 if matches!(strategy, FillStrategy::Interpolate) {
134 let data_type = &input_ref.data_type;
135 if !data_type.is_numeric() || matches!(data_type, DataType::Serial) {
136 return Err(ErrorCode::BindError(format!(
137 "INTERPOLATE only supports numeric types, got {}",
138 data_type
139 ))
140 .into());
141 }
142 }
143 (strategy, *input_ref)
144 } else {
145 return Err(ErrorCode::BindError(
146 "Fill strategy argument must be a column reference".to_owned(),
147 )
148 .into());
149 }
150 } else {
151 return Err(ErrorCode::BindError(
152 "Fill strategy must be a function call like LOCF(col)".to_owned(),
153 )
154 .into());
155 };
156 fill_strategies.push(BoundFillStrategy {
157 strategy,
158 target_col,
159 });
160 }
161
162 let base_columns = std::mem::take(&mut self.context.columns);
163 self.pop_context()?;
164
165 let columns = base_columns
166 .into_iter()
167 .map(|c| (c.is_hidden, c.field))
168 .collect::<Vec<(bool, Field)>>();
169
170 let (_, table_name) = Self::resolve_schema_qualified_name(&self.db_name, &table_name)?;
171 self.bind_table_to_context(columns, table_name, alias)?;
172
173 Ok(BoundGapFill {
174 input,
175 time_col: *time_col,
176 interval,
177 fill_strategies,
178 })
179 }
180}