risingwave_expr_impl/aggregate/
string_agg.rs1use risingwave_expr::aggregate;
16
17#[aggregate("string_agg(varchar, varchar) -> varchar")]
18fn string_agg(
19 state: Option<Box<str>>,
20 value: Option<&str>,
21 delimiter: Option<&str>,
22) -> Option<Box<str>> {
23 let Some(value) = value else { return state };
24 let Some(state) = state else {
25 return Some(value.into());
26 };
27 let mut state = String::from(state);
28 state += delimiter.unwrap_or("");
29 state += value;
30 Some(state.into())
31}
32
33#[cfg(test)]
34mod tests {
35 use risingwave_common::array::*;
36 use risingwave_expr::Result;
37 use risingwave_expr::aggregate::{AggCall, build_append_only};
38
39 #[tokio::test]
40 async fn test_string_agg_basic() -> Result<()> {
41 let chunk = StreamChunk::from_pretty(
42 " T T
43 + aaa ,
44 + bbb ,
45 + ccc ,
46 + ddd ,",
47 );
48 let string_agg = build_append_only(&AggCall::from_pretty(
49 "(string_agg:varchar $0:varchar $1:varchar)",
50 ))?;
51 let mut state = string_agg.create_state()?;
52 string_agg.update(&mut state, &chunk).await?;
53 assert_eq!(
54 string_agg.get_result(&state).await?,
55 Some("aaa,bbb,ccc,ddd".into())
56 );
57 Ok(())
58 }
59
60 #[tokio::test]
61 async fn test_string_agg_complex() -> Result<()> {
62 let chunk = StreamChunk::from_pretty(
63 " T T
64 + aaa ,
65 + . _
66 + ccc _
67 + ddd .",
68 );
69 let string_agg = build_append_only(&AggCall::from_pretty(
70 "(string_agg:varchar $0:varchar $1:varchar)",
71 ))?;
72 let mut state = string_agg.create_state()?;
73 string_agg.update(&mut state, &chunk).await?;
74 assert_eq!(
75 string_agg.get_result(&state).await?,
76 Some("aaa_cccddd".into())
77 );
78 Ok(())
79 }
80}