risingwave_expr_impl/scalar/
array.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::array::{ListValue, StructValue};
16use risingwave_common::row::Row;
17use risingwave_common::types::{
18    DataType, ListRef, MapRef, MapType, MapValue, ScalarRef, ScalarRefImpl, ToOwnedDatum,
19};
20use risingwave_expr::expr::Context;
21use risingwave_expr::{ExprError, function};
22
23use super::array_positions::array_position;
24
25#[function("array(...) -> anyarray", type_infer = "unreachable")]
26fn array(row: impl Row, ctx: &Context) -> ListValue {
27    ListValue::from_datum_iter(ctx.return_type.as_list_element_type(), row.iter())
28}
29
30#[function("row(...) -> struct", type_infer = "unreachable")]
31fn row_(row: impl Row) -> StructValue {
32    StructValue::new(row.iter().map(|d| d.to_owned_datum()).collect())
33}
34
35fn map_from_key_values_type_infer(args: &[DataType]) -> Result<DataType, ExprError> {
36    let map = MapType::try_from_kv(
37        args[0].as_list_element_type().clone(),
38        args[1].as_list_element_type().clone(),
39    )
40    .map_err(ExprError::Custom)?;
41    Ok(map.into())
42}
43
44fn map_from_entries_type_infer(args: &[DataType]) -> Result<DataType, ExprError> {
45    let map = MapType::try_from_entries(args[0].as_list_element_type().clone())
46        .map_err(ExprError::Custom)?;
47    Ok(map.into())
48}
49
50/// # Example
51///
52/// ```slt
53/// query T
54/// select map_from_key_values(null::int[], array[1,2,3]);
55/// ----
56/// NULL
57///
58/// query T
59/// select map_from_key_values(array['a','b','c'], array[1,2,3]);
60/// ----
61/// {a:1,b:2,c:3}
62/// ```
63#[function(
64    "map_from_key_values(anyarray, anyarray) -> anymap",
65    type_infer = "map_from_key_values_type_infer"
66)]
67fn map_from_key_values(key: ListRef<'_>, value: ListRef<'_>) -> Result<MapValue, ExprError> {
68    MapValue::try_from_kv(key.to_owned(), value.to_owned()).map_err(ExprError::Custom)
69}
70
71#[function(
72    "map_from_entries(anyarray) -> anymap",
73    type_infer = "map_from_entries_type_infer"
74)]
75fn map_from_entries(entries: ListRef<'_>) -> Result<MapValue, ExprError> {
76    MapValue::try_from_entries(entries.to_owned()).map_err(ExprError::Custom)
77}
78
79/// # Example
80///
81/// ```slt
82/// query T
83/// select map_access(map_from_key_values(array[1,2,3], array[100,200,300]), 3);
84/// ----
85/// 300
86///
87/// query T
88/// select map_access(map_from_key_values(array[1,2,3], array[100,200,300]), '3');
89/// ----
90/// 300
91///
92/// query error
93/// select map_access(map_from_key_values(array[1,2,3], array[100,200,300]), 1.0);
94/// ----
95/// db error: ERROR: Failed to run the query
96///
97/// Caused by these errors (recent errors listed first):
98///   1: Failed to bind expression: map_access(map_from_key_values(ARRAY[1, 2, 3], ARRAY[100, 200, 300]), 1.0)
99///   2: Bind error: Cannot access numeric in map(integer,integer)
100///
101///
102/// query T
103/// select map_access(map_from_key_values(array['a','b','c'], array[1,2,3]), 'a');
104/// ----
105/// 1
106///
107/// query T
108/// select map_access(map_from_key_values(array['a','b','c'], array[1,2,3]), 'd');
109/// ----
110/// NULL
111///
112/// query T
113/// select map_access(map_from_key_values(array['a','b','c'], array[1,2,3]), null);
114/// ----
115/// NULL
116/// ```
117#[function("map_access(anymap, any) -> any", type_infer = "unreachable")]
118fn map_access<'a>(
119    map: MapRef<'a>,
120    key: ScalarRefImpl<'_>,
121) -> Result<Option<ScalarRefImpl<'a>>, ExprError> {
122    // FIXME: DatumRef in return value is not support by the macro yet.
123
124    let (keys, values) = map.into_kv();
125    let idx = array_position(keys, Some(key))?;
126    match idx {
127        Some(idx) => Ok(values.get((idx - 1) as usize).unwrap()),
128        None => Ok(None),
129    }
130}
131
132/// ```slt
133/// query T
134/// select
135///     map_contains(MAP{1:1}, 1),
136///     map_contains(MAP{1:1}, 2),
137///     map_contains(MAP{1:1}, NULL::varchar),
138///     map_contains(MAP{1:1}, 1.0)
139/// ----
140/// t f NULL f
141/// ```
142#[function("map_contains(anymap, any) -> boolean")]
143fn map_contains(map: MapRef<'_>, key: ScalarRefImpl<'_>) -> Result<bool, ExprError> {
144    let (keys, _values) = map.into_kv();
145    let idx = array_position(keys, Some(key))?;
146    Ok(idx.is_some())
147}
148
149/// ```slt
150/// query I
151/// select
152///     map_length(NULL::map(int,int)),
153///     map_length(MAP {}::map(int,int)),
154///     map_length(MAP {1:1,2:2}::map(int,int))
155/// ----
156/// NULL 0 2
157/// ```
158#[function("map_length(anymap) -> int4")]
159fn map_length<T: TryFrom<usize>>(map: MapRef<'_>) -> Result<T, ExprError> {
160    map.len().try_into().map_err(|_| ExprError::NumericOverflow)
161}
162
163/// If both `m1` and `m2` have a value with the same key, then the output map contains the value from `m2`.
164///
165/// ```slt
166/// query T
167/// select map_cat(MAP{'a':1,'b':2},null::map(varchar,int));
168/// ----
169/// {a:1,b:2}
170///
171/// query T
172/// select map_cat(MAP{'a':1,'b':2},MAP{'b':3,'c':4});
173/// ----
174/// {a:1,b:3,c:4}
175///
176/// # implicit type cast
177/// query T
178/// select map_cat(MAP{'a':1,'b':2},MAP{'b':3.0,'c':4.0});
179/// ----
180/// {a:1,b:3.0,c:4.0}
181/// ```
182#[function("map_cat(anymap, anymap) -> anymap")]
183fn map_cat(m1: Option<MapRef<'_>>, m2: Option<MapRef<'_>>) -> Result<Option<MapValue>, ExprError> {
184    match (m1, m2) {
185        (None, None) => Ok(None),
186        (Some(m), None) | (None, Some(m)) => Ok(Some(m.to_owned())),
187        (Some(m1), Some(m2)) => Ok(Some(MapValue::concat(m1, m2))),
188    }
189}
190
191/// Inserts a key-value pair into the map. If the key already exists, the value is updated.
192///
193/// # Example
194///
195/// ```slt
196/// query T
197/// select map_insert(map{'a':1, 'b':2}, 'c', 3);
198/// ----
199/// {a:1,b:2,c:3}
200///
201/// query T
202/// select map_insert(map{'a':1, 'b':2}, 'b', 4);
203/// ----
204/// {a:1,b:4}
205/// ```
206///
207/// TODO: support variadic arguments
208#[function("map_insert(anymap, any, any) -> anymap")]
209fn map_insert(
210    map: MapRef<'_>,
211    key: Option<ScalarRefImpl<'_>>,
212    value: Option<ScalarRefImpl<'_>>,
213) -> MapValue {
214    let Some(key) = key else {
215        return map.to_owned();
216    };
217    MapValue::insert(map, key.into_scalar_impl(), value.to_owned_datum())
218}
219
220/// Deletes a key-value pair from the map.
221///
222/// # Example
223///
224/// ```slt
225/// query T
226/// select map_delete(map{'a':1, 'b':2, 'c':3}, 'b');
227/// ----
228/// {a:1,c:3}
229///
230/// query T
231/// select map_delete(map{'a':1, 'b':2, 'c':3}, 'd');
232/// ----
233/// {a:1,b:2,c:3}
234/// ```
235///
236/// TODO: support variadic arguments
237#[function("map_delete(anymap, any) -> anymap")]
238fn map_delete(map: MapRef<'_>, key: Option<ScalarRefImpl<'_>>) -> MapValue {
239    let Some(key) = key else {
240        return map.to_owned();
241    };
242    MapValue::delete(map, key)
243}
244
245/// # Example
246///
247/// ```slt
248/// query T
249/// select map_keys(map{'a':1, 'b':2, 'c':3});
250/// ----
251/// {a,b,c}
252/// ```
253#[function(
254    "map_keys(anymap) -> anyarray",
255    type_infer = "|args|{
256        Ok(DataType::List(Box::new(args[0].as_map().key().clone())))
257    }"
258)]
259fn map_keys(map: MapRef<'_>) -> ListValue {
260    map.into_kv().0.to_owned_scalar()
261}
262
263/// # Example
264///
265/// ```slt
266/// query T
267/// select map_values(map{'a':1, 'b':2, 'c':3});
268/// ----
269/// {1,2,3}
270/// ```
271#[function(
272    "map_values(anymap) -> anyarray",
273    type_infer = "|args|{
274        Ok(DataType::List(Box::new(args[0].as_map().value().clone())))
275    }"
276)]
277fn map_values(map: MapRef<'_>) -> ListValue {
278    map.into_kv().1.to_owned_scalar()
279}
280
281/// # Example
282///
283/// ```slt
284/// query T
285/// select map_entries(map{'a':1, 'b':2, 'c':3});
286/// ----
287/// {"(a,1)","(b,2)","(c,3)"}
288/// ```
289#[function(
290    "map_entries(anymap) -> anyarray",
291    type_infer = "|args|{
292        Ok(args[0].as_map().clone().into_list())
293    }"
294)]
295fn map_entries(map: MapRef<'_>) -> ListValue {
296    map.into_inner().to_owned()
297}
298
299#[cfg(test)]
300mod tests {
301    use risingwave_common::array::DataChunk;
302    use risingwave_common::row::Row;
303    use risingwave_common::test_prelude::DataChunkTestExt;
304    use risingwave_common::types::ToOwnedDatum;
305    use risingwave_common::util::iter_util::ZipEqDebug;
306    use risingwave_expr::expr::build_from_pretty;
307
308    #[tokio::test]
309    async fn test_row_expr() {
310        let expr = build_from_pretty("(row:struct<a_int4,b_int4,c_int4> $0:int4 $1:int4 $2:int4)");
311        let (input, expected) = DataChunk::from_pretty(
312            "i i i <i,i,i>
313             1 2 3 (1,2,3)
314             4 2 1 (4,2,1)
315             9 1 3 (9,1,3)
316             1 1 1 (1,1,1)",
317        )
318        .split_column_at(3);
319
320        // test eval
321        let output = expr.eval(&input).await.unwrap();
322        assert_eq!(&output, expected.column_at(0));
323
324        // test eval_row
325        for (row, expected) in input.rows().zip_eq_debug(expected.rows()) {
326            let result = expr.eval_row(&row.to_owned_row()).await.unwrap();
327            assert_eq!(result, expected.datum_at(0).to_owned_datum());
328        }
329    }
330}