risingwave_expr_impl/scalar/
array.rs1use risingwave_common::array::{ListValue, StructValue};
16use risingwave_common::row::Row;
17use risingwave_common::types::{
18 DataType, ListRef, MapRef, MapType, MapValue, ScalarRef, ScalarRefImpl, ToOwnedDatum,
19};
20use risingwave_expr::expr::Context;
21use risingwave_expr::{ExprError, function};
22
23use super::array_positions::array_position;
24
25#[function("array(...) -> anyarray", type_infer = "unreachable")]
26fn array(row: impl Row, ctx: &Context) -> ListValue {
27 ListValue::from_datum_iter(ctx.return_type.as_list_element_type(), row.iter())
28}
29
30#[function("row(...) -> struct", type_infer = "unreachable")]
31fn row_(row: impl Row) -> StructValue {
32 StructValue::new(row.iter().map(|d| d.to_owned_datum()).collect())
33}
34
35fn map_from_key_values_type_infer(args: &[DataType]) -> Result<DataType, ExprError> {
36 let map = MapType::try_from_kv(
37 args[0].as_list_element_type().clone(),
38 args[1].as_list_element_type().clone(),
39 )
40 .map_err(ExprError::Custom)?;
41 Ok(map.into())
42}
43
44fn map_from_entries_type_infer(args: &[DataType]) -> Result<DataType, ExprError> {
45 let map = MapType::try_from_entries(args[0].as_list_element_type().clone())
46 .map_err(ExprError::Custom)?;
47 Ok(map.into())
48}
49
50#[function(
64 "map_from_key_values(anyarray, anyarray) -> anymap",
65 type_infer = "map_from_key_values_type_infer"
66)]
67fn map_from_key_values(key: ListRef<'_>, value: ListRef<'_>) -> Result<MapValue, ExprError> {
68 MapValue::try_from_kv(key.to_owned(), value.to_owned()).map_err(ExprError::Custom)
69}
70
71#[function(
72 "map_from_entries(anyarray) -> anymap",
73 type_infer = "map_from_entries_type_infer"
74)]
75fn map_from_entries(entries: ListRef<'_>) -> Result<MapValue, ExprError> {
76 MapValue::try_from_entries(entries.to_owned()).map_err(ExprError::Custom)
77}
78
79#[function("map_access(anymap, any) -> any", type_infer = "unreachable")]
118fn map_access<'a>(
119 map: MapRef<'a>,
120 key: ScalarRefImpl<'_>,
121) -> Result<Option<ScalarRefImpl<'a>>, ExprError> {
122 let (keys, values) = map.into_kv();
125 let idx = array_position(keys, Some(key))?;
126 match idx {
127 Some(idx) => Ok(values.get((idx - 1) as usize).unwrap()),
128 None => Ok(None),
129 }
130}
131
132#[function("map_contains(anymap, any) -> boolean")]
143fn map_contains(map: MapRef<'_>, key: ScalarRefImpl<'_>) -> Result<bool, ExprError> {
144 let (keys, _values) = map.into_kv();
145 let idx = array_position(keys, Some(key))?;
146 Ok(idx.is_some())
147}
148
149#[function("map_length(anymap) -> int4")]
159fn map_length<T: TryFrom<usize>>(map: MapRef<'_>) -> Result<T, ExprError> {
160 map.len().try_into().map_err(|_| ExprError::NumericOverflow)
161}
162
163#[function("map_cat(anymap, anymap) -> anymap")]
183fn map_cat(m1: Option<MapRef<'_>>, m2: Option<MapRef<'_>>) -> Result<Option<MapValue>, ExprError> {
184 match (m1, m2) {
185 (None, None) => Ok(None),
186 (Some(m), None) | (None, Some(m)) => Ok(Some(m.to_owned())),
187 (Some(m1), Some(m2)) => Ok(Some(MapValue::concat(m1, m2))),
188 }
189}
190
191#[function("map_insert(anymap, any, any) -> anymap")]
209fn map_insert(
210 map: MapRef<'_>,
211 key: Option<ScalarRefImpl<'_>>,
212 value: Option<ScalarRefImpl<'_>>,
213) -> MapValue {
214 let Some(key) = key else {
215 return map.to_owned();
216 };
217 MapValue::insert(map, key.into_scalar_impl(), value.to_owned_datum())
218}
219
220#[function("map_delete(anymap, any) -> anymap")]
238fn map_delete(map: MapRef<'_>, key: Option<ScalarRefImpl<'_>>) -> MapValue {
239 let Some(key) = key else {
240 return map.to_owned();
241 };
242 MapValue::delete(map, key)
243}
244
245#[function(
254 "map_keys(anymap) -> anyarray",
255 type_infer = "|args|{
256 Ok(DataType::List(Box::new(args[0].as_map().key().clone())))
257 }"
258)]
259fn map_keys(map: MapRef<'_>) -> ListValue {
260 map.into_kv().0.to_owned_scalar()
261}
262
263#[function(
272 "map_values(anymap) -> anyarray",
273 type_infer = "|args|{
274 Ok(DataType::List(Box::new(args[0].as_map().value().clone())))
275 }"
276)]
277fn map_values(map: MapRef<'_>) -> ListValue {
278 map.into_kv().1.to_owned_scalar()
279}
280
281#[function(
290 "map_entries(anymap) -> anyarray",
291 type_infer = "|args|{
292 Ok(args[0].as_map().clone().into_list())
293 }"
294)]
295fn map_entries(map: MapRef<'_>) -> ListValue {
296 map.into_inner().to_owned()
297}
298
299#[cfg(test)]
300mod tests {
301 use risingwave_common::array::DataChunk;
302 use risingwave_common::row::Row;
303 use risingwave_common::test_prelude::DataChunkTestExt;
304 use risingwave_common::types::ToOwnedDatum;
305 use risingwave_common::util::iter_util::ZipEqDebug;
306 use risingwave_expr::expr::build_from_pretty;
307
308 #[tokio::test]
309 async fn test_row_expr() {
310 let expr = build_from_pretty("(row:struct<a_int4,b_int4,c_int4> $0:int4 $1:int4 $2:int4)");
311 let (input, expected) = DataChunk::from_pretty(
312 "i i i <i,i,i>
313 1 2 3 (1,2,3)
314 4 2 1 (4,2,1)
315 9 1 3 (9,1,3)
316 1 1 1 (1,1,1)",
317 )
318 .split_column_at(3);
319
320 let output = expr.eval(&input).await.unwrap();
322 assert_eq!(&output, expected.column_at(0));
323
324 for (row, expected) in input.rows().zip_eq_debug(expected.rows()) {
326 let result = expr.eval_row(&row.to_owned_row()).await.unwrap();
327 assert_eq!(result, expected.datum_at(0).to_owned_datum());
328 }
329 }
330}