risingwave_expr_impl/aggregate/
string_agg.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_expr::aggregate;
16
17#[aggregate("string_agg(varchar, varchar) -> varchar")]
18fn string_agg(
19    state: Option<Box<str>>,
20    value: Option<&str>,
21    delimiter: Option<&str>,
22) -> Option<Box<str>> {
23    let Some(value) = value else { return state };
24    let Some(state) = state else {
25        return Some(value.into());
26    };
27    let mut state = String::from(state);
28    state += delimiter.unwrap_or("");
29    state += value;
30    Some(state.into())
31}
32
33#[cfg(test)]
34mod tests {
35    use risingwave_common::array::*;
36    use risingwave_expr::Result;
37    use risingwave_expr::aggregate::{AggCall, build_append_only};
38
39    #[tokio::test]
40    async fn test_string_agg_basic() -> Result<()> {
41        let chunk = StreamChunk::from_pretty(
42            " T   T
43            + aaa ,
44            + bbb ,
45            + ccc ,
46            + ddd ,",
47        );
48        let string_agg = build_append_only(&AggCall::from_pretty(
49            "(string_agg:varchar $0:varchar $1:varchar)",
50        ))?;
51        let mut state = string_agg.create_state()?;
52        string_agg.update(&mut state, &chunk).await?;
53        assert_eq!(
54            string_agg.get_result(&state).await?,
55            Some("aaa,bbb,ccc,ddd".into())
56        );
57        Ok(())
58    }
59
60    #[tokio::test]
61    async fn test_string_agg_complex() -> Result<()> {
62        let chunk = StreamChunk::from_pretty(
63            " T   T
64            + aaa ,
65            + .   _
66            + ccc _
67            + ddd .",
68        );
69        let string_agg = build_append_only(&AggCall::from_pretty(
70            "(string_agg:varchar $0:varchar $1:varchar)",
71        ))?;
72        let mut state = string_agg.create_state()?;
73        string_agg.update(&mut state, &chunk).await?;
74        assert_eq!(
75            string_agg.get_result(&state).await?,
76            Some("aaa_cccddd".into())
77        );
78        Ok(())
79    }
80}