risingwave_expr_impl/scalar/
array.rs1use risingwave_common::array::{ListValue, StructValue};
16use risingwave_common::row::Row;
17use risingwave_common::types::{
18 DataType, ListRef, MapRef, MapType, MapValue, ScalarRef, ScalarRefImpl, ToOwnedDatum,
19};
20use risingwave_expr::expr::Context;
21use risingwave_expr::{ExprError, function};
22
23use super::array_positions::array_position;
24
25#[function("array(...) -> anyarray", type_infer = "unreachable")]
26fn array(row: impl Row, ctx: &Context) -> ListValue {
27 ListValue::from_datum_iter(ctx.return_type.as_list(), row.iter())
28}
29
30#[function("row(...) -> struct", type_infer = "unreachable")]
31fn row_(row: impl Row) -> StructValue {
32 StructValue::new(row.iter().map(|d| d.to_owned_datum()).collect())
33}
34
35fn map_from_key_values_type_infer(args: &[DataType]) -> Result<DataType, ExprError> {
36 let map = MapType::try_from_kv(args[0].as_list().clone(), args[1].as_list().clone())
37 .map_err(ExprError::Custom)?;
38 Ok(map.into())
39}
40
41fn map_from_entries_type_infer(args: &[DataType]) -> Result<DataType, ExprError> {
42 let map = MapType::try_from_entries(args[0].as_list().clone()).map_err(ExprError::Custom)?;
43 Ok(map.into())
44}
45
46#[function(
60 "map_from_key_values(anyarray, anyarray) -> anymap",
61 type_infer = "map_from_key_values_type_infer"
62)]
63fn map_from_key_values(key: ListRef<'_>, value: ListRef<'_>) -> Result<MapValue, ExprError> {
64 MapValue::try_from_kv(key.to_owned(), value.to_owned()).map_err(ExprError::Custom)
65}
66
67#[function(
68 "map_from_entries(anyarray) -> anymap",
69 type_infer = "map_from_entries_type_infer"
70)]
71fn map_from_entries(entries: ListRef<'_>) -> Result<MapValue, ExprError> {
72 MapValue::try_from_entries(entries.to_owned()).map_err(ExprError::Custom)
73}
74
75#[function("map_access(anymap, any) -> any", type_infer = "unreachable")]
114fn map_access<'a>(
115 map: MapRef<'a>,
116 key: ScalarRefImpl<'_>,
117) -> Result<Option<ScalarRefImpl<'a>>, ExprError> {
118 let (keys, values) = map.into_kv();
121 let idx = array_position(keys, Some(key))?;
122 match idx {
123 Some(idx) => Ok(values.get((idx - 1) as usize).unwrap()),
124 None => Ok(None),
125 }
126}
127
128#[function("map_contains(anymap, any) -> boolean")]
139fn map_contains(map: MapRef<'_>, key: ScalarRefImpl<'_>) -> Result<bool, ExprError> {
140 let (keys, _values) = map.into_kv();
141 let idx = array_position(keys, Some(key))?;
142 Ok(idx.is_some())
143}
144
145#[function("map_length(anymap) -> int4")]
155fn map_length<T: TryFrom<usize>>(map: MapRef<'_>) -> Result<T, ExprError> {
156 map.len().try_into().map_err(|_| ExprError::NumericOverflow)
157}
158
159#[function("map_cat(anymap, anymap) -> anymap")]
179fn map_cat(m1: Option<MapRef<'_>>, m2: Option<MapRef<'_>>) -> Result<Option<MapValue>, ExprError> {
180 match (m1, m2) {
181 (None, None) => Ok(None),
182 (Some(m), None) | (None, Some(m)) => Ok(Some(m.to_owned())),
183 (Some(m1), Some(m2)) => Ok(Some(MapValue::concat(m1, m2))),
184 }
185}
186
187#[function("map_insert(anymap, any, any) -> anymap")]
205fn map_insert(
206 map: MapRef<'_>,
207 key: Option<ScalarRefImpl<'_>>,
208 value: Option<ScalarRefImpl<'_>>,
209) -> MapValue {
210 let Some(key) = key else {
211 return map.to_owned();
212 };
213 MapValue::insert(map, key.into_scalar_impl(), value.to_owned_datum())
214}
215
216#[function("map_delete(anymap, any) -> anymap")]
234fn map_delete(map: MapRef<'_>, key: Option<ScalarRefImpl<'_>>) -> MapValue {
235 let Some(key) = key else {
236 return map.to_owned();
237 };
238 MapValue::delete(map, key)
239}
240
241#[function(
250 "map_keys(anymap) -> anyarray",
251 type_infer = "|args|{
252 Ok(DataType::List(Box::new(args[0].as_map().key().clone())))
253 }"
254)]
255fn map_keys(map: MapRef<'_>) -> ListValue {
256 map.into_kv().0.to_owned_scalar()
257}
258
259#[function(
268 "map_values(anymap) -> anyarray",
269 type_infer = "|args|{
270 Ok(DataType::List(Box::new(args[0].as_map().value().clone())))
271 }"
272)]
273fn map_values(map: MapRef<'_>) -> ListValue {
274 map.into_kv().1.to_owned_scalar()
275}
276
277#[function(
286 "map_entries(anymap) -> anyarray",
287 type_infer = "|args|{
288 Ok(args[0].as_map().clone().into_list())
289 }"
290)]
291fn map_entries(map: MapRef<'_>) -> ListValue {
292 map.into_inner().to_owned()
293}
294
295#[cfg(test)]
296mod tests {
297 use risingwave_common::array::DataChunk;
298 use risingwave_common::row::Row;
299 use risingwave_common::test_prelude::DataChunkTestExt;
300 use risingwave_common::types::ToOwnedDatum;
301 use risingwave_common::util::iter_util::ZipEqDebug;
302 use risingwave_expr::expr::build_from_pretty;
303
304 #[tokio::test]
305 async fn test_row_expr() {
306 let expr = build_from_pretty("(row:struct<a_int4,b_int4,c_int4> $0:int4 $1:int4 $2:int4)");
307 let (input, expected) = DataChunk::from_pretty(
308 "i i i <i,i,i>
309 1 2 3 (1,2,3)
310 4 2 1 (4,2,1)
311 9 1 3 (9,1,3)
312 1 1 1 (1,1,1)",
313 )
314 .split_column_at(3);
315
316 let output = expr.eval(&input).await.unwrap();
318 assert_eq!(&output, expected.column_at(0));
319
320 for (row, expected) in input.rows().zip_eq_debug(expected.rows()) {
322 let result = expr.eval_row(&row.to_owned_row()).await.unwrap();
323 assert_eq!(result, expected.datum_at(0).to_owned_datum());
324 }
325 }
326}