risingwave_expr_impl/aggregate/
jsonb_agg.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::types::{Datum, JsonbVal};
16use risingwave_common_estimate_size::EstimateSize;
17use risingwave_expr::aggregate::AggStateDyn;
18use risingwave_expr::expr::Context;
19use risingwave_expr::{ExprError, Result, aggregate};
20
21use crate::scalar::ToJsonb;
22
23/// Collects all the input values, including nulls, into a JSON array.
24/// Values are converted to JSON as per `to_jsonb`.
25#[aggregate("jsonb_agg(*) -> jsonb")]
26fn jsonb_agg(
27    state: &mut JsonbArrayState,
28    input: Option<impl ToJsonb>,
29    ctx: &Context,
30) -> Result<()> {
31    input.add_to(&ctx.arg_types[0], &mut state.0)?;
32    Ok(())
33}
34
35/// Collects all the key/value pairs into a JSON object.
36/// // Key arguments are coerced to text;
37/// value arguments are converted as per `to_jsonb`.
38/// Values can be null, but keys cannot.
39#[aggregate("jsonb_object_agg(varchar, *) -> jsonb")]
40fn jsonb_object_agg(
41    state: &mut JsonbObjectState,
42    key: Option<&str>,
43    value: Option<impl ToJsonb>,
44    ctx: &Context,
45) -> Result<()> {
46    let key = key.ok_or(ExprError::FieldNameNull)?;
47    state.0.add_string(key);
48    value.add_to(&ctx.arg_types[1], &mut state.0)?;
49    Ok(())
50}
51
52#[derive(Debug)]
53struct JsonbArrayState(jsonbb::Builder);
54
55impl EstimateSize for JsonbArrayState {
56    fn estimated_heap_size(&self) -> usize {
57        self.0.capacity()
58    }
59}
60
61impl AggStateDyn for JsonbArrayState {}
62
63/// Creates an initial state.
64impl Default for JsonbArrayState {
65    fn default() -> Self {
66        let mut builder = jsonbb::Builder::default();
67        builder.begin_array();
68        Self(builder)
69    }
70}
71
72/// Finishes aggregation and returns the result.
73impl From<&JsonbArrayState> for Datum {
74    fn from(builder: &JsonbArrayState) -> Self {
75        // TODO: avoid clone
76        let mut builder = builder.0.clone();
77        builder.end_array();
78        let jsonb: JsonbVal = builder.finish().into();
79        Some(jsonb.into())
80    }
81}
82
83#[derive(Debug)]
84struct JsonbObjectState(jsonbb::Builder);
85
86impl EstimateSize for JsonbObjectState {
87    fn estimated_heap_size(&self) -> usize {
88        self.0.capacity()
89    }
90}
91
92impl AggStateDyn for JsonbObjectState {}
93
94/// Creates an initial state.
95impl Default for JsonbObjectState {
96    fn default() -> Self {
97        let mut builder = jsonbb::Builder::default();
98        builder.begin_object();
99        Self(builder)
100    }
101}
102
103/// Finishes aggregation and returns the result.
104impl From<&JsonbObjectState> for Datum {
105    fn from(builder: &JsonbObjectState) -> Self {
106        // TODO: avoid clone
107        let mut builder = builder.0.clone();
108        builder.end_object();
109        let jsonb: JsonbVal = builder.finish().into();
110        Some(jsonb.into())
111    }
112}