risingwave_expr_impl/aggregate/first_last_value.rs
1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::types::{Datum, ScalarRefImpl};
16use risingwave_common_estimate_size::EstimateSize;
17use risingwave_expr::aggregate::AggStateDyn;
18use risingwave_expr::{ExprError, aggregate};
19
20/// Note that different from `min` and `max`, `first_value` doesn't ignore `NULL` values.
21///
22/// ```slt
23/// statement ok
24/// create table t(v1 int, ts int);
25///
26/// statement ok
27/// insert into t values (null, 1), (2, 2), (null, 3);
28///
29/// query I
30/// select first_value(v1 order by ts) from t;
31/// ----
32/// NULL
33///
34/// statement ok
35/// drop table t;
36/// ```
37#[aggregate("first_value(any) -> any")]
38fn first_value(state: &mut FirstValueState, input: Option<ScalarRefImpl<'_>>) {
39 if state.0.is_none() {
40 state.0 = Some(input.map(|x| x.into_scalar_impl()));
41 }
42}
43
44#[derive(Debug, Clone, Default, EstimateSize)]
45struct FirstValueState(Option<Datum>);
46
47impl AggStateDyn for FirstValueState {}
48
49impl From<&FirstValueState> for Datum {
50 fn from(state: &FirstValueState) -> Self {
51 if let Some(state) = &state.0 {
52 state.clone()
53 } else {
54 None
55 }
56 }
57}
58
59/// Note that different from `min` and `max`, `last_value` doesn't ignore `NULL` values.
60///
61/// ```slt
62/// statement ok
63/// create table t(v1 int, ts int);
64///
65/// statement ok
66/// insert into t values (null, 1), (2, 2), (null, 3);
67///
68/// query I
69/// select last_value(v1 order by ts) from t;
70/// ----
71/// NULL
72///
73/// statement ok
74/// drop table t;
75/// ```
76#[aggregate(
77 "last_value(*) -> auto",
78 state = "ref",
79 type_infer = "|args| Ok(args[0].clone())"
80)] // TODO(rc): `last_value(any) -> any`
81fn last_value<T>(_: Option<T>, input: Option<T>) -> Option<T> {
82 input
83}
84
85#[aggregate(
86 "internal_last_seen_value(*) -> auto",
87 state = "ref",
88 internal,
89 type_infer = "|args| Ok(args[0].clone())"
90)]
91fn internal_last_seen_value<T>(state: T, input: T, retract: bool) -> T {
92 if retract { state } else { input }
93}
94
95#[aggregate("arg_min(any, any) -> any", rewritten)]
96fn _arg_min() {}
97
98#[aggregate("arg_max(any, any) -> any", rewritten)]
99fn _arg_max() {}