risingwave_expr_impl/scalar/
array.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::array::{ListValue, StructValue};
16use risingwave_common::row::Row;
17use risingwave_common::types::{
18    DataType, ListRef, MapRef, MapType, MapValue, ScalarRef, ScalarRefImpl, ToOwnedDatum,
19};
20use risingwave_expr::expr::Context;
21use risingwave_expr::{ExprError, function};
22
23use super::array_positions::array_position;
24
25#[function("array(...) -> anyarray", type_infer = "unreachable")]
26fn array(row: impl Row, ctx: &Context) -> ListValue {
27    ListValue::from_datum_iter(ctx.return_type.as_list(), row.iter())
28}
29
30#[function("row(...) -> struct", type_infer = "unreachable")]
31fn row_(row: impl Row) -> StructValue {
32    StructValue::new(row.iter().map(|d| d.to_owned_datum()).collect())
33}
34
35fn map_from_key_values_type_infer(args: &[DataType]) -> Result<DataType, ExprError> {
36    let map = MapType::try_from_kv(args[0].as_list().clone(), args[1].as_list().clone())
37        .map_err(ExprError::Custom)?;
38    Ok(map.into())
39}
40
41fn map_from_entries_type_infer(args: &[DataType]) -> Result<DataType, ExprError> {
42    let map = MapType::try_from_entries(args[0].as_list().clone()).map_err(ExprError::Custom)?;
43    Ok(map.into())
44}
45
46/// # Example
47///
48/// ```slt
49/// query T
50/// select map_from_key_values(null::int[], array[1,2,3]);
51/// ----
52/// NULL
53///
54/// query T
55/// select map_from_key_values(array['a','b','c'], array[1,2,3]);
56/// ----
57/// {a:1,b:2,c:3}
58/// ```
59#[function(
60    "map_from_key_values(anyarray, anyarray) -> anymap",
61    type_infer = "map_from_key_values_type_infer"
62)]
63fn map_from_key_values(key: ListRef<'_>, value: ListRef<'_>) -> Result<MapValue, ExprError> {
64    MapValue::try_from_kv(key.to_owned(), value.to_owned()).map_err(ExprError::Custom)
65}
66
67#[function(
68    "map_from_entries(anyarray) -> anymap",
69    type_infer = "map_from_entries_type_infer"
70)]
71fn map_from_entries(entries: ListRef<'_>) -> Result<MapValue, ExprError> {
72    MapValue::try_from_entries(entries.to_owned()).map_err(ExprError::Custom)
73}
74
75/// # Example
76///
77/// ```slt
78/// query T
79/// select map_access(map_from_key_values(array[1,2,3], array[100,200,300]), 3);
80/// ----
81/// 300
82///
83/// query T
84/// select map_access(map_from_key_values(array[1,2,3], array[100,200,300]), '3');
85/// ----
86/// 300
87///
88/// query error
89/// select map_access(map_from_key_values(array[1,2,3], array[100,200,300]), 1.0);
90/// ----
91/// db error: ERROR: Failed to run the query
92///
93/// Caused by these errors (recent errors listed first):
94///   1: Failed to bind expression: map_access(map_from_key_values(ARRAY[1, 2, 3], ARRAY[100, 200, 300]), 1.0)
95///   2: Bind error: Cannot access numeric in map(integer,integer)
96///
97///
98/// query T
99/// select map_access(map_from_key_values(array['a','b','c'], array[1,2,3]), 'a');
100/// ----
101/// 1
102///
103/// query T
104/// select map_access(map_from_key_values(array['a','b','c'], array[1,2,3]), 'd');
105/// ----
106/// NULL
107///
108/// query T
109/// select map_access(map_from_key_values(array['a','b','c'], array[1,2,3]), null);
110/// ----
111/// NULL
112/// ```
113#[function("map_access(anymap, any) -> any", type_infer = "unreachable")]
114fn map_access<'a>(
115    map: MapRef<'a>,
116    key: ScalarRefImpl<'_>,
117) -> Result<Option<ScalarRefImpl<'a>>, ExprError> {
118    // FIXME: DatumRef in return value is not support by the macro yet.
119
120    let (keys, values) = map.into_kv();
121    let idx = array_position(keys, Some(key))?;
122    match idx {
123        Some(idx) => Ok(values.get((idx - 1) as usize).unwrap()),
124        None => Ok(None),
125    }
126}
127
128/// ```slt
129/// query T
130/// select
131///     map_contains(MAP{1:1}, 1),
132///     map_contains(MAP{1:1}, 2),
133///     map_contains(MAP{1:1}, NULL::varchar),
134///     map_contains(MAP{1:1}, 1.0)
135/// ----
136/// t f NULL f
137/// ```
138#[function("map_contains(anymap, any) -> boolean")]
139fn map_contains(map: MapRef<'_>, key: ScalarRefImpl<'_>) -> Result<bool, ExprError> {
140    let (keys, _values) = map.into_kv();
141    let idx = array_position(keys, Some(key))?;
142    Ok(idx.is_some())
143}
144
145/// ```slt
146/// query I
147/// select
148///     map_length(NULL::map(int,int)),
149///     map_length(MAP {}::map(int,int)),
150///     map_length(MAP {1:1,2:2}::map(int,int))
151/// ----
152/// NULL 0 2
153/// ```
154#[function("map_length(anymap) -> int4")]
155fn map_length<T: TryFrom<usize>>(map: MapRef<'_>) -> Result<T, ExprError> {
156    map.len().try_into().map_err(|_| ExprError::NumericOverflow)
157}
158
159/// If both `m1` and `m2` have a value with the same key, then the output map contains the value from `m2`.
160///
161/// ```slt
162/// query T
163/// select map_cat(MAP{'a':1,'b':2},null::map(varchar,int));
164/// ----
165/// {a:1,b:2}
166///
167/// query T
168/// select map_cat(MAP{'a':1,'b':2},MAP{'b':3,'c':4});
169/// ----
170/// {a:1,b:3,c:4}
171///
172/// # implicit type cast
173/// query T
174/// select map_cat(MAP{'a':1,'b':2},MAP{'b':3.0,'c':4.0});
175/// ----
176/// {a:1,b:3.0,c:4.0}
177/// ```
178#[function("map_cat(anymap, anymap) -> anymap")]
179fn map_cat(m1: Option<MapRef<'_>>, m2: Option<MapRef<'_>>) -> Result<Option<MapValue>, ExprError> {
180    match (m1, m2) {
181        (None, None) => Ok(None),
182        (Some(m), None) | (None, Some(m)) => Ok(Some(m.to_owned())),
183        (Some(m1), Some(m2)) => Ok(Some(MapValue::concat(m1, m2))),
184    }
185}
186
187/// Inserts a key-value pair into the map. If the key already exists, the value is updated.
188///
189/// # Example
190///
191/// ```slt
192/// query T
193/// select map_insert(map{'a':1, 'b':2}, 'c', 3);
194/// ----
195/// {a:1,b:2,c:3}
196///
197/// query T
198/// select map_insert(map{'a':1, 'b':2}, 'b', 4);
199/// ----
200/// {a:1,b:4}
201/// ```
202///
203/// TODO: support variadic arguments
204#[function("map_insert(anymap, any, any) -> anymap")]
205fn map_insert(
206    map: MapRef<'_>,
207    key: Option<ScalarRefImpl<'_>>,
208    value: Option<ScalarRefImpl<'_>>,
209) -> MapValue {
210    let Some(key) = key else {
211        return map.to_owned();
212    };
213    MapValue::insert(map, key.into_scalar_impl(), value.to_owned_datum())
214}
215
216/// Deletes a key-value pair from the map.
217///
218/// # Example
219///
220/// ```slt
221/// query T
222/// select map_delete(map{'a':1, 'b':2, 'c':3}, 'b');
223/// ----
224/// {a:1,c:3}
225///
226/// query T
227/// select map_delete(map{'a':1, 'b':2, 'c':3}, 'd');
228/// ----
229/// {a:1,b:2,c:3}
230/// ```
231///
232/// TODO: support variadic arguments
233#[function("map_delete(anymap, any) -> anymap")]
234fn map_delete(map: MapRef<'_>, key: Option<ScalarRefImpl<'_>>) -> MapValue {
235    let Some(key) = key else {
236        return map.to_owned();
237    };
238    MapValue::delete(map, key)
239}
240
241/// # Example
242///
243/// ```slt
244/// query T
245/// select map_keys(map{'a':1, 'b':2, 'c':3});
246/// ----
247/// {a,b,c}
248/// ```
249#[function(
250    "map_keys(anymap) -> anyarray",
251    type_infer = "|args|{
252        Ok(DataType::List(Box::new(args[0].as_map().key().clone())))
253    }"
254)]
255fn map_keys(map: MapRef<'_>) -> ListValue {
256    map.into_kv().0.to_owned_scalar()
257}
258
259/// # Example
260///
261/// ```slt
262/// query T
263/// select map_values(map{'a':1, 'b':2, 'c':3});
264/// ----
265/// {1,2,3}
266/// ```
267#[function(
268    "map_values(anymap) -> anyarray",
269    type_infer = "|args|{
270        Ok(DataType::List(Box::new(args[0].as_map().value().clone())))
271    }"
272)]
273fn map_values(map: MapRef<'_>) -> ListValue {
274    map.into_kv().1.to_owned_scalar()
275}
276
277/// # Example
278///
279/// ```slt
280/// query T
281/// select map_entries(map{'a':1, 'b':2, 'c':3});
282/// ----
283/// {"(a,1)","(b,2)","(c,3)"}
284/// ```
285#[function(
286    "map_entries(anymap) -> anyarray",
287    type_infer = "|args|{
288        Ok(args[0].as_map().clone().into_list())
289    }"
290)]
291fn map_entries(map: MapRef<'_>) -> ListValue {
292    map.into_inner().to_owned()
293}
294
295#[cfg(test)]
296mod tests {
297    use risingwave_common::array::DataChunk;
298    use risingwave_common::row::Row;
299    use risingwave_common::test_prelude::DataChunkTestExt;
300    use risingwave_common::types::ToOwnedDatum;
301    use risingwave_common::util::iter_util::ZipEqDebug;
302    use risingwave_expr::expr::build_from_pretty;
303
304    #[tokio::test]
305    async fn test_row_expr() {
306        let expr = build_from_pretty("(row:struct<a_int4,b_int4,c_int4> $0:int4 $1:int4 $2:int4)");
307        let (input, expected) = DataChunk::from_pretty(
308            "i i i <i,i,i>
309             1 2 3 (1,2,3)
310             4 2 1 (4,2,1)
311             9 1 3 (9,1,3)
312             1 1 1 (1,1,1)",
313        )
314        .split_column_at(3);
315
316        // test eval
317        let output = expr.eval(&input).await.unwrap();
318        assert_eq!(&output, expected.column_at(0));
319
320        // test eval_row
321        for (row, expected) in input.rows().zip_eq_debug(expected.rows()) {
322            let result = expr.eval_row(&row.to_owned_row()).await.unwrap();
323            assert_eq!(result, expected.datum_at(0).to_owned_datum());
324        }
325    }
326}