risingwave_expr_impl/aggregate/
first_last_value.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::types::{Datum, ScalarRefImpl};
16use risingwave_common_estimate_size::EstimateSize;
17use risingwave_expr::aggregate;
18use risingwave_expr::aggregate::AggStateDyn;
19
20/// Note that different from `min` and `max`, `first_value` doesn't ignore `NULL` values.
21///
22/// ```slt
23/// statement ok
24/// create table t(v1 int, ts int);
25///
26/// statement ok
27/// insert into t values (null, 1), (2, 2), (null, 3);
28///
29/// query I
30/// select first_value(v1 order by ts) from t;
31/// ----
32/// NULL
33///
34/// statement ok
35/// drop table t;
36/// ```
37#[aggregate("first_value(any) -> any")]
38fn first_value(state: &mut FirstValueState, input: Option<ScalarRefImpl<'_>>) {
39    if state.0.is_none() {
40        state.0 = Some(input.map(|x| x.into_scalar_impl()));
41    }
42}
43
44#[derive(Debug, Clone, Default, EstimateSize)]
45struct FirstValueState(Option<Datum>);
46
47impl AggStateDyn for FirstValueState {}
48
49impl From<&FirstValueState> for Datum {
50    fn from(state: &FirstValueState) -> Self {
51        if let Some(state) = &state.0 {
52            state.clone()
53        } else {
54            None
55        }
56    }
57}
58
59/// Note that different from `min` and `max`, `last_value` doesn't ignore `NULL` values.
60///
61/// ```slt
62/// statement ok
63/// create table t(v1 int, ts int);
64///
65/// statement ok
66/// insert into t values (null, 1), (2, 2), (null, 3);
67///
68/// query I
69/// select last_value(v1 order by ts) from t;
70/// ----
71/// NULL
72///
73/// statement ok
74/// drop table t;
75/// ```
76#[aggregate("last_value(*) -> auto", state = "ref")] // TODO(rc): `last_value(any) -> any`
77fn last_value<T>(_: Option<T>, input: Option<T>) -> Option<T> {
78    input
79}
80
81#[aggregate("internal_last_seen_value(*) -> auto", state = "ref", internal)]
82fn internal_last_seen_value<T>(state: T, input: T, retract: bool) -> T {
83    if retract { state } else { input }
84}