risingwave_frontend/binder/relation/
gap_fill.rs1use itertools::Itertools;
16use risingwave_common::catalog::Field;
17use risingwave_common::gap_fill::FillStrategy;
18use risingwave_common::types::{DataType, Interval};
19use risingwave_sqlparser::ast::{Expr as AstExpr, FunctionArg, FunctionArgExpr, TableAlias};
20
21use super::{Binder, Relation};
22use crate::binder::BoundFillStrategy;
23use crate::error::{ErrorCode, Result as RwResult};
24use crate::expr::{Expr, ExprImpl, InputRef};
25
26#[derive(Debug, Clone)]
27pub struct BoundGapFill {
28 pub input: Relation,
29 pub time_col: InputRef,
30 pub interval: ExprImpl,
31 pub fill_strategies: Vec<BoundFillStrategy>,
32 pub partition_by_cols: Vec<InputRef>,
33}
34
35impl Binder {
36 pub(super) fn bind_gap_fill(
37 &mut self,
38 alias: Option<&TableAlias>,
39 args: &[FunctionArg],
40 ) -> RwResult<BoundGapFill> {
41 if args.len() < 3 {
42 return Err(ErrorCode::BindError(
43 "GAP_FILL requires at least 3 arguments: input, time_col, interval".to_owned(),
44 )
45 .into());
46 }
47
48 let mut args_iter = args.iter();
49
50 self.push_context();
51
52 let (input, table_name) = self.bind_relation_by_function_arg(
53 args_iter.next(),
54 "The 1st arg of GAP_FILL should be a table name",
55 )?;
56
57 let time_col = self.bind_column_by_function_args(
58 args_iter.next(),
59 "The 2nd arg of GAP_FILL should be a column name",
60 )?;
61
62 if !matches!(
63 time_col.data_type,
64 DataType::Timestamp | DataType::Timestamptz
65 ) {
66 return Err(ErrorCode::BindError(
67 "The 2nd arg of GAP_FILL should be a column of type timestamp or timestamptz"
68 .to_owned(),
69 )
70 .into());
71 }
72
73 let interval_arg = args_iter.next().unwrap();
74 let interval_exprs = self.bind_function_arg(interval_arg)?;
75 let interval = Itertools::exactly_one(interval_exprs.into_iter()).map_err(|_| {
76 ErrorCode::BindError("The 3rd arg of GAP_FILL should be a single expression".to_owned())
77 })?;
78 if interval.return_type() != DataType::Interval {
79 return Err(ErrorCode::BindError(
80 "The 3rd arg of GAP_FILL should be an interval".to_owned(),
81 )
82 .into());
83 }
84
85 if let Some(folded) = interval.try_fold_const()
89 && let Some(risingwave_common::types::ScalarImpl::Interval(interval_value)) = folded?
90 && interval_value <= Interval::from_month_day_usec(0, 0, 0)
91 {
92 return Err(
93 ErrorCode::BindError("The gap fill interval must be positive".to_owned()).into(),
94 );
95 }
96
97 let mut fill_strategies = vec![];
98 let mut partition_by_cols = vec![];
99 let mut seen_partition_by_cols = std::collections::HashSet::new();
100 for arg in args_iter {
101 if let FunctionArg::Unnamed(FunctionArgExpr::Expr(AstExpr::Function(func))) = arg {
102 let name = func.name.0[0].real_value().to_ascii_lowercase();
103
104 if name == "partition_by" {
106 if func.arg_list.args.is_empty() {
107 return Err(ErrorCode::BindError(
108 "PARTITION_BY requires at least one column argument".to_owned(),
109 )
110 .into());
111 }
112 for partition_arg in &func.arg_list.args {
113 let arg_exprs = self.bind_function_arg(partition_arg)?;
114 let arg_expr =
115 Itertools::exactly_one(arg_exprs.into_iter()).map_err(|_| {
116 ErrorCode::BindError(
117 "PARTITION_BY argument should be a single column reference"
118 .to_owned(),
119 )
120 })?;
121 if let ExprImpl::InputRef(input_ref) = arg_expr {
122 if input_ref.index() == time_col.index() {
123 return Err(ErrorCode::BindError(
124 "PARTITION_BY cannot include the time column".to_owned(),
125 )
126 .into());
127 }
128 if !seen_partition_by_cols.insert(input_ref.index()) {
131 continue;
132 }
133 partition_by_cols.push(*input_ref);
134 } else {
135 return Err(ErrorCode::BindError(
136 "PARTITION_BY argument must be a column reference".to_owned(),
137 )
138 .into());
139 }
140 }
141 continue;
142 }
143
144 let strategy = match name.as_str() {
145 "interpolate" => FillStrategy::Interpolate,
146 "locf" => FillStrategy::Locf,
147 "keepnull" => FillStrategy::Null,
148 _ => {
149 return Err(ErrorCode::BindError(format!(
150 "Unsupported fill strategy: {}",
151 name
152 ))
153 .into());
154 }
155 };
156
157 if func.arg_list.args.len() != 1 {
158 return Err(ErrorCode::BindError(format!(
159 "Fill strategy function {} expects exactly one argument",
160 name
161 ))
162 .into());
163 }
164
165 let arg_exprs = self.bind_function_arg(&func.arg_list.args[0])?;
166 let arg_expr = Itertools::exactly_one(arg_exprs.into_iter()).map_err(|_| {
167 ErrorCode::BindError(
168 "Fill strategy argument should be a single expression".to_owned(),
169 )
170 })?;
171
172 if let ExprImpl::InputRef(input_ref) = arg_expr {
173 if input_ref.index() == time_col.index() {
174 return Err(ErrorCode::BindError(
175 "Cannot apply a fill strategy to the time column".to_owned(),
176 )
177 .into());
178 }
179 if matches!(strategy, FillStrategy::Interpolate) {
181 let data_type = &input_ref.data_type;
182 if !data_type.is_numeric() || matches!(data_type, DataType::Serial) {
183 return Err(ErrorCode::BindError(format!(
184 "INTERPOLATE only supports numeric types, got {}",
185 data_type
186 ))
187 .into());
188 }
189 }
190 fill_strategies.push(BoundFillStrategy {
191 strategy,
192 target_col: *input_ref,
193 });
194 } else {
195 return Err(ErrorCode::BindError(
196 "Fill strategy argument must be a column reference".to_owned(),
197 )
198 .into());
199 }
200 } else {
201 return Err(ErrorCode::BindError(
202 "Fill strategy must be a function call like LOCF(col) or PARTITION_BY(col)"
203 .to_owned(),
204 )
205 .into());
206 }
207 }
208
209 for strategy in &fill_strategies {
212 if partition_by_cols
213 .iter()
214 .any(|c| c.index() == strategy.target_col.index())
215 {
216 return Err(ErrorCode::BindError(
217 "Cannot apply a fill strategy to a PARTITION_BY column".to_owned(),
218 )
219 .into());
220 }
221 }
222
223 let base_columns = std::mem::take(&mut self.context.columns);
224 self.pop_context()?;
225
226 let columns = base_columns
227 .into_iter()
228 .map(|c| (c.is_hidden, c.field))
229 .collect::<Vec<(bool, Field)>>();
230
231 let (schema_name, table_name) =
233 Self::resolve_schema_qualified_name(&self.db_name, &table_name)?;
234 self.bind_table_to_context(columns, table_name, schema_name, alias)?;
235
236 Ok(BoundGapFill {
237 input,
238 time_col: *time_col,
239 interval,
240 fill_strategies,
241 partition_by_cols,
242 })
243 }
244}