risingwave_expr_impl/aggregate/
first_last_value.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::types::{Datum, ScalarRefImpl};
16use risingwave_common_estimate_size::EstimateSize;
17use risingwave_expr::aggregate::AggStateDyn;
18use risingwave_expr::{ExprError, aggregate};
19
20/// Note that different from `min` and `max`, `first_value` doesn't ignore `NULL` values.
21///
22/// ```slt
23/// statement ok
24/// create table t(v1 int, ts int);
25///
26/// statement ok
27/// insert into t values (null, 1), (2, 2), (null, 3);
28///
29/// query I
30/// select first_value(v1 order by ts) from t;
31/// ----
32/// NULL
33///
34/// statement ok
35/// drop table t;
36/// ```
37#[aggregate("first_value(any) -> any")]
38fn first_value(state: &mut FirstValueState, input: Option<ScalarRefImpl<'_>>) {
39    if state.0.is_none() {
40        state.0 = Some(input.map(|x| x.into_scalar_impl()));
41    }
42}
43
44#[derive(Debug, Clone, Default, EstimateSize)]
45struct FirstValueState(Option<Datum>);
46
47impl AggStateDyn for FirstValueState {}
48
49impl From<&FirstValueState> for Datum {
50    fn from(state: &FirstValueState) -> Self {
51        if let Some(state) = &state.0 {
52            state.clone()
53        } else {
54            None
55        }
56    }
57}
58
59/// Note that different from `min` and `max`, `last_value` doesn't ignore `NULL` values.
60///
61/// ```slt
62/// statement ok
63/// create table t(v1 int, ts int);
64///
65/// statement ok
66/// insert into t values (null, 1), (2, 2), (null, 3);
67///
68/// query I
69/// select last_value(v1 order by ts) from t;
70/// ----
71/// NULL
72///
73/// statement ok
74/// drop table t;
75/// ```
76#[aggregate(
77    "last_value(*) -> auto",
78    state = "ref",
79    type_infer = "|args| Ok(args[0].clone())"
80)] // TODO(rc): `last_value(any) -> any`
81fn last_value<T>(_: Option<T>, input: Option<T>) -> Option<T> {
82    input
83}
84
85#[aggregate(
86    "internal_last_seen_value(*) -> auto",
87    state = "ref",
88    internal,
89    type_infer = "|args| Ok(args[0].clone())"
90)]
91fn internal_last_seen_value<T>(state: T, input: T, retract: bool) -> T {
92    if retract { state } else { input }
93}
94
95#[aggregate("arg_min(any, any) -> any", rewritten)]
96fn _arg_min() {}
97
98#[aggregate("arg_max(any, any) -> any", rewritten)]
99fn _arg_max() {}