1use std::fmt::Write;
16use std::str::FromStr;
17use std::sync::Arc;
18
19use futures_util::FutureExt;
20use itertools::Itertools;
21use risingwave_common::array::{
22 ArrayImpl, DataChunk, ListRef, ListValue, StructRef, StructValue, VectorVal,
23};
24use risingwave_common::cast;
25use risingwave_common::row::OwnedRow;
26use risingwave_common::types::{DataType, F64, Int256, JsonbRef, MapRef, MapValue, ToText};
27use risingwave_common::util::iter_util::ZipEqFast;
28use risingwave_expr::expr::{Context, ExpressionBoxExt, InputRefExpression, build_func};
29use risingwave_expr::{ExprError, Result, function};
30use risingwave_pb::expr::expr_node::PbType;
31use thiserror_ext::AsReport;
32
33#[function("cast(varchar) -> *int")]
34#[function("cast(varchar) -> decimal")]
35#[function("cast(varchar) -> *float")]
36#[function("cast(varchar) -> int256")]
37#[function("cast(varchar) -> date")]
38#[function("cast(varchar) -> time")]
39#[function("cast(varchar) -> timestamp")]
40#[function("cast(varchar) -> interval")]
41#[function("cast(varchar) -> jsonb")]
42pub fn str_parse<T>(elem: &str, ctx: &Context) -> Result<T>
43where
44 T: FromStr,
45 <T as FromStr>::Err: std::fmt::Display,
46{
47 elem.trim().parse().map_err(|err: <T as FromStr>::Err| {
48 ExprError::Parse(format!("{} {}", ctx.return_type, err).into())
49 })
50}
51
52#[function("pgwire_recv(bytea) -> int8")]
54pub fn pgwire_recv(elem: &[u8]) -> Result<i64> {
55 let fixed_length =
56 <[u8; 8]>::try_from(elem).map_err(|e| ExprError::Parse(e.to_report_string().into()))?;
57 Ok(i64::from_be_bytes(fixed_length))
58}
59
60#[function("cast(int2) -> int256")]
61#[function("cast(int4) -> int256")]
62#[function("cast(int8) -> int256")]
63pub fn to_int256<T: TryInto<Int256>>(elem: T) -> Result<Int256> {
64 elem.try_into()
65 .map_err(|_| ExprError::CastOutOfRange("int256"))
66}
67
68#[function("cast(jsonb) -> boolean")]
69pub fn jsonb_to_bool(v: JsonbRef<'_>) -> Result<bool> {
70 v.as_bool().map_err(|e| ExprError::Parse(e.into()))
71}
72
73#[function("cast(jsonb) -> int2")]
76#[function("cast(jsonb) -> int4")]
77#[function("cast(jsonb) -> int8")]
78#[function("cast(jsonb) -> decimal")]
79#[function("cast(jsonb) -> float4")]
80#[function("cast(jsonb) -> float8")]
81pub fn jsonb_to_number<T: TryFrom<F64>>(v: JsonbRef<'_>) -> Result<T> {
82 v.as_number()
83 .map_err(|e| ExprError::Parse(e.into()))?
84 .try_into()
85 .map_err(|_| ExprError::NumericOutOfRange)
86}
87
88#[function("cast(int4) -> int2")]
89#[function("cast(int8) -> int2")]
90#[function("cast(int8) -> int4")]
91#[function("cast(int8) -> serial")]
92#[function("cast(serial) -> int8")]
93#[function("cast(float4) -> int2")]
94#[function("cast(float8) -> int2")]
95#[function("cast(float4) -> int4")]
96#[function("cast(float8) -> int4")]
97#[function("cast(float4) -> int8")]
98#[function("cast(float8) -> int8")]
99#[function("cast(float8) -> float4")]
100#[function("cast(decimal) -> int2")]
101#[function("cast(decimal) -> int4")]
102#[function("cast(decimal) -> int8")]
103#[function("cast(decimal) -> float4")]
104#[function("cast(decimal) -> float8")]
105#[function("cast(float4) -> decimal")]
106#[function("cast(float8) -> decimal")]
107pub fn try_cast<T1, T2>(elem: T1) -> Result<T2>
108where
109 T1: TryInto<T2> + std::fmt::Debug + Copy,
110{
111 elem.try_into()
112 .map_err(|_| ExprError::CastOutOfRange(std::any::type_name::<T2>()))
113}
114
115#[function("cast(boolean) -> int4")]
116#[function("cast(int2) -> int4")]
117#[function("cast(int2) -> int8")]
118#[function("cast(int2) -> float4")]
119#[function("cast(int2) -> float8")]
120#[function("cast(int2) -> decimal")]
121#[function("cast(int4) -> int8")]
122#[function("cast(int4) -> float4")]
123#[function("cast(int4) -> float8")]
124#[function("cast(int4) -> decimal")]
125#[function("cast(int8) -> float4")]
126#[function("cast(int8) -> float8")]
127#[function("cast(int8) -> decimal")]
128#[function("cast(float4) -> float8")]
129#[function("cast(date) -> timestamp")]
130#[function("cast(time) -> interval")]
131#[function("cast(timestamp) -> date")]
132#[function("cast(timestamp) -> time")]
133#[function("cast(interval) -> time")]
134#[function("cast(varchar) -> varchar")]
135#[function("cast(int256) -> float8")]
136pub fn cast<T1, T2>(elem: T1) -> T2
137where
138 T1: Into<T2>,
139{
140 elem.into()
141}
142
143#[function("cast(varchar) -> boolean")]
144pub fn str_to_bool(input: &str) -> Result<bool> {
145 cast::str_to_bool(input).map_err(|err| ExprError::Parse(err.into()))
146}
147
148#[function("cast(int4) -> boolean")]
149pub fn int_to_bool(input: i32) -> bool {
150 input != 0
151}
152
153#[function("cast(*int) -> varchar")]
156#[function("cast(decimal) -> varchar")]
157#[function("cast(*float) -> varchar")]
158#[function("cast(int256) -> varchar")]
159#[function("cast(time) -> varchar")]
160#[function("cast(date) -> varchar")]
161#[function("cast(interval) -> varchar")]
162#[function("cast(timestamp) -> varchar")]
163#[function("cast(jsonb) -> varchar")]
164#[function("cast(bytea) -> varchar")]
165#[function("cast(anyarray) -> varchar")]
166#[function("cast(vector) -> varchar")]
167pub fn general_to_text(elem: impl ToText, mut writer: &mut impl Write) {
168 elem.write(&mut writer).unwrap();
169}
170
171#[function("pgwire_send(int8) -> bytea")]
173fn pgwire_send(elem: i64) -> Box<[u8]> {
174 elem.to_be_bytes().into()
175}
176
177#[function("cast(boolean) -> varchar")]
178pub fn bool_to_varchar(input: bool, writer: &mut impl Write) {
179 writer
180 .write_str(if input { "true" } else { "false" })
181 .unwrap();
182}
183
184#[function("bool_out(boolean) -> varchar")]
187pub fn bool_out(input: bool, writer: &mut impl Write) {
188 writer.write_str(if input { "t" } else { "f" }).unwrap();
189}
190
191#[function("cast(varchar) -> bytea")]
192pub fn str_to_bytea(elem: &str) -> Result<Box<[u8]>> {
193 cast::str_to_bytea(elem).map_err(|err| ExprError::Parse(err.into()))
194}
195
196#[function("cast(varchar) -> anyarray", type_infer = "unreachable")]
197fn str_to_list(input: &str, ctx: &Context) -> Result<ListValue> {
198 ListValue::from_str(input, &ctx.return_type).map_err(|err| ExprError::Parse(err.into()))
199}
200
201#[function("cast(varchar) -> vector", type_infer = "unreachable")]
202fn str_to_vector(input: &str, ctx: &Context) -> Result<VectorVal> {
203 let DataType::Vector(size) = &ctx.return_type else {
204 unreachable!()
205 };
206 VectorVal::from_text(input, *size).map_err(|err| ExprError::Parse(err.into()))
207}
208
209#[function("cast(anyarray) -> anyarray", type_infer = "unreachable")]
211fn list_cast(input: ListRef<'_>, ctx: &Context) -> Result<ListValue> {
212 let cast = build_func(
213 PbType::Cast,
214 ctx.return_type.as_list_element_type().clone(),
215 vec![InputRefExpression::new(ctx.arg_types[0].as_list_element_type().clone(), 0).boxed()],
216 )
217 .unwrap();
218 let items = Arc::new(ArrayImpl::from(input.to_owned()));
219 let len = items.len();
220 let list = cast
221 .eval(&DataChunk::new(vec![items], len))
222 .now_or_never()
223 .unwrap()?;
224 Ok(ListValue::new(Arc::try_unwrap(list).unwrap()))
225}
226
227#[function("cast(struct) -> struct", type_infer = "unreachable")]
229fn struct_cast(input: StructRef<'_>, ctx: &Context) -> Result<StructValue> {
230 let fields = (input.iter_fields_ref())
231 .zip_eq_fast(ctx.arg_types[0].as_struct().types())
232 .zip_eq_fast(ctx.return_type.as_struct().types())
233 .map(|((datum_ref, source_field_type), target_field_type)| {
234 if source_field_type == target_field_type {
235 return Ok(datum_ref.map(|scalar_ref| scalar_ref.into_scalar_impl()));
236 }
237 let cast = build_func(
238 PbType::Cast,
239 target_field_type.clone(),
240 vec![InputRefExpression::new(source_field_type.clone(), 0).boxed()],
241 )
242 .unwrap();
243 let value = match datum_ref {
244 Some(scalar_ref) => cast
245 .eval_row(&OwnedRow::new(vec![Some(scalar_ref.into_scalar_impl())]))
246 .now_or_never()
247 .unwrap()?,
248 None => None,
249 };
250 Ok(value) as Result<_>
251 })
252 .try_collect()?;
253 Ok(StructValue::new(fields))
254}
255
256#[function("cast(anymap) -> anymap", type_infer = "unreachable")]
258fn map_cast(map: MapRef<'_>, ctx: &Context) -> Result<MapValue> {
259 let new_ctx = Context {
260 arg_types: vec![ctx.arg_types[0].clone().as_map().clone().into_list()],
261 return_type: ctx.return_type.as_map().clone().into_list(),
262 variadic: ctx.variadic,
263 };
264 list_cast(map.into_inner(), &new_ctx).map(MapValue::from_entries)
265}
266
267#[cfg(test)]
268mod tests {
269 use chrono::NaiveDateTime;
270 use risingwave_common::array::*;
271 use risingwave_common::types::*;
272 use risingwave_expr::expr::build_from_pretty;
273
274 use super::*;
275
276 #[test]
277 fn integer_cast_to_bool() {
278 assert!(int_to_bool(32));
279 assert!(int_to_bool(-32));
280 assert!(!int_to_bool(0));
281 }
282
283 #[test]
284 fn number_to_string() {
285 macro_rules! test {
286 ($fn:ident($value:expr), $right:literal) => {
287 let mut writer = String::new();
288 $fn($value, &mut writer);
289 assert_eq!(writer, $right);
290 };
291 }
292
293 test!(bool_to_varchar(true), "true");
294 test!(bool_to_varchar(true), "true");
295 test!(bool_to_varchar(false), "false");
296
297 test!(general_to_text(32), "32");
298 test!(general_to_text(-32), "-32");
299 test!(general_to_text(i32::MIN), "-2147483648");
300 test!(general_to_text(i32::MAX), "2147483647");
301
302 test!(general_to_text(i16::MIN), "-32768");
303 test!(general_to_text(i16::MAX), "32767");
304
305 test!(general_to_text(i64::MIN), "-9223372036854775808");
306 test!(general_to_text(i64::MAX), "9223372036854775807");
307
308 test!(general_to_text(F64::from(32.12)), "32.12");
309 test!(general_to_text(F64::from(-32.14)), "-32.14");
310
311 test!(general_to_text(F32::from(32.12_f32)), "32.12");
312 test!(general_to_text(F32::from(-32.14_f32)), "-32.14");
313
314 test!(general_to_text(Decimal::try_from(1.222).unwrap()), "1.222");
315
316 test!(general_to_text(Decimal::NaN), "NaN");
317 }
318
319 #[test]
320 fn test_str_to_list() {
321 let ctx = Context {
323 arg_types: vec![DataType::Varchar],
324 return_type: DataType::from_str("int[]").unwrap(),
325 variadic: false,
326 };
327 assert_eq!(
328 str_to_list("{}", &ctx).unwrap(),
329 ListValue::empty(&DataType::Varchar)
330 );
331
332 let list123 = ListValue::from_iter([1, 2, 3]);
333
334 let ctx = Context {
336 arg_types: vec![DataType::Varchar],
337 return_type: DataType::from_str("int[]").unwrap(),
338 variadic: false,
339 };
340 assert_eq!(str_to_list("{1, 2, 3}", &ctx).unwrap(), list123);
341
342 let nested_list123 = ListValue::from_iter([list123]);
344 let ctx = Context {
345 arg_types: vec![DataType::Varchar],
346 return_type: DataType::from_str("int[][]").unwrap(),
347 variadic: false,
348 };
349 assert_eq!(str_to_list("{{1, 2, 3}}", &ctx).unwrap(), nested_list123);
350
351 let nested_list445566 = ListValue::from_iter([ListValue::from_iter([44, 55, 66])]);
352
353 let double_nested_list123_445566 =
354 ListValue::from_iter([nested_list123.clone(), nested_list445566.clone()]);
355
356 let ctx = Context {
358 arg_types: vec![DataType::Varchar],
359 return_type: DataType::from_str("int[][][]").unwrap(),
360 variadic: false,
361 };
362 assert_eq!(
363 str_to_list("{{{1, 2, 3}}, {{44, 55, 66}}}", &ctx).unwrap(),
364 double_nested_list123_445566
365 );
366
367 let ctx = Context {
369 arg_types: vec![DataType::from_str("int[][]").unwrap()],
370 return_type: DataType::from_str("varchar[][]").unwrap(),
371 variadic: false,
372 };
373 let double_nested_varchar_list123_445566 = ListValue::from_iter([
374 list_cast(nested_list123.as_scalar_ref(), &ctx).unwrap(),
375 list_cast(nested_list445566.as_scalar_ref(), &ctx).unwrap(),
376 ]);
377
378 let ctx = Context {
380 arg_types: vec![DataType::Varchar],
381 return_type: DataType::from_str("varchar[][][]").unwrap(),
382 variadic: false,
383 };
384 assert_eq!(
385 str_to_list("{{{1, 2, 3}}, {{44, 55, 66}}}", &ctx).unwrap(),
386 double_nested_varchar_list123_445566
387 );
388 }
389
390 #[test]
391 fn test_invalid_str_to_list() {
392 let ctx = Context {
394 arg_types: vec![DataType::Varchar],
395 return_type: DataType::from_str("int[]").unwrap(),
396 variadic: false,
397 };
398 assert!(str_to_list("{{}", &ctx).is_err());
399 assert!(str_to_list("{}}", &ctx).is_err());
400 assert!(str_to_list("{{1, 2, 3}, {4, 5, 6}", &ctx).is_err());
401 assert!(str_to_list("{{1, 2, 3}, 4, 5, 6}}", &ctx).is_err());
402 }
403
404 #[test]
405 fn test_struct_cast() {
406 let ctx = Context {
407 arg_types: vec![DataType::Struct(StructType::new(vec![
408 ("a", DataType::Varchar),
409 ("b", DataType::Float32),
410 ]))],
411 return_type: DataType::Struct(StructType::new(vec![
412 ("a", DataType::Int32),
413 ("b", DataType::Int32),
414 ])),
415 variadic: false,
416 };
417 assert_eq!(
418 struct_cast(
419 StructValue::new(vec![
420 Some("1".into()),
421 Some(F32::from(0.0).to_scalar_value()),
422 ])
423 .as_scalar_ref(),
424 &ctx,
425 )
426 .unwrap(),
427 StructValue::new(vec![
428 Some(1i32.to_scalar_value()),
429 Some(0i32.to_scalar_value()),
430 ])
431 );
432 }
433
434 #[test]
435 fn test_timestamp() {
436 assert_eq!(
437 try_cast::<_, Timestamp>(Date::from_ymd_uncheck(1994, 1, 1)).unwrap(),
438 Timestamp::new(
439 NaiveDateTime::parse_from_str("1994-1-1 0:0:0", "%Y-%m-%d %H:%M:%S").unwrap()
440 )
441 )
442 }
443
444 #[tokio::test]
445 async fn test_unary() {
446 test_unary_bool::<BoolArray, _>(|x| !x, PbType::Not).await;
447 test_unary_date::<TimestampArray, _>(|x| try_cast(x).unwrap(), PbType::Cast).await;
448 let ctx_str_to_int16 = Context {
449 arg_types: vec![DataType::Varchar],
450 return_type: DataType::Int16,
451 variadic: false,
452 };
453 test_str_to_int16::<I16Array, _>(|x| str_parse(x, &ctx_str_to_int16).unwrap()).await;
454 }
455
456 #[tokio::test]
457 async fn test_i16_to_i32() {
458 let mut input = Vec::<Option<i16>>::new();
459 let mut target = Vec::<Option<i32>>::new();
460 for i in 0..100i16 {
461 if i % 2 == 0 {
462 target.push(Some(i as i32));
463 input.push(Some(i));
464 } else {
465 input.push(None);
466 target.push(None);
467 }
468 }
469 let col1 = I16Array::from_iter(&input).into_ref();
470 let data_chunk = DataChunk::new(vec![col1], 100);
471 let expr = build_from_pretty("(cast:int4 $0:int2)");
472 let res = expr.eval(&data_chunk).await.unwrap();
473 let arr: &I32Array = res.as_ref().into();
474 for (idx, item) in arr.iter().enumerate() {
475 let x = target[idx].as_ref().map(|x| x.as_scalar_ref());
476 assert_eq!(x, item);
477 }
478
479 for i in 0..input.len() {
480 let row = OwnedRow::new(vec![input[i].map(|int| int.to_scalar_value())]);
481 let result = expr.eval_row(&row).await.unwrap();
482 let expected = target[i].map(|int| int.to_scalar_value());
483 assert_eq!(result, expected);
484 }
485 }
486
487 #[tokio::test]
488 async fn test_neg() {
489 let input = [Some(1), Some(0), Some(-1)];
490 let target = [Some(-1), Some(0), Some(1)];
491
492 let col1 = I32Array::from_iter(&input).into_ref();
493 let data_chunk = DataChunk::new(vec![col1], 3);
494 let expr = build_from_pretty("(neg:int4 $0:int4)");
495 let res = expr.eval(&data_chunk).await.unwrap();
496 let arr: &I32Array = res.as_ref().into();
497 for (idx, item) in arr.iter().enumerate() {
498 let x = target[idx].as_ref().map(|x| x.as_scalar_ref());
499 assert_eq!(x, item);
500 }
501
502 for i in 0..input.len() {
503 let row = OwnedRow::new(vec![input[i].map(|int| int.to_scalar_value())]);
504 let result = expr.eval_row(&row).await.unwrap();
505 let expected = target[i].map(|int| int.to_scalar_value());
506 assert_eq!(result, expected);
507 }
508 }
509
510 async fn test_str_to_int16<A, F>(f: F)
511 where
512 A: Array,
513 for<'a> &'a A: std::convert::From<&'a ArrayImpl>,
514 for<'a> <A as Array>::RefItem<'a>: PartialEq,
515 F: Fn(&str) -> <A as Array>::OwnedItem,
516 {
517 let mut input = Vec::<Option<Box<str>>>::new();
518 let mut target = Vec::<Option<<A as Array>::OwnedItem>>::new();
519 for i in 0..1u32 {
520 if i % 2 == 0 {
521 let s = i.to_string().into_boxed_str();
522 target.push(Some(f(&s)));
523 input.push(Some(s));
524 } else {
525 input.push(None);
526 target.push(None);
527 }
528 }
529 let col1_data = &input.iter().map(|x| x.as_ref().map(|x| &**x)).collect_vec();
530 let col1 = Utf8Array::from_iter(col1_data).into_ref();
531 let data_chunk = DataChunk::new(vec![col1], 1);
532 let expr = build_from_pretty("(cast:int2 $0:varchar)");
533 let res = expr.eval(&data_chunk).await.unwrap();
534 let arr: &A = res.as_ref().into();
535 for (idx, item) in arr.iter().enumerate() {
536 let x = target[idx].as_ref().map(|x| x.as_scalar_ref());
537 assert_eq!(x, item);
538 }
539
540 for i in 0..input.len() {
541 let row = OwnedRow::new(vec![
542 input[i].as_ref().cloned().map(|str| str.to_scalar_value()),
543 ]);
544 let result = expr.eval_row(&row).await.unwrap();
545 let expected = target[i].as_ref().cloned().map(|x| x.to_scalar_value());
546 assert_eq!(result, expected);
547 }
548 }
549
550 async fn test_unary_bool<A, F>(f: F, kind: PbType)
551 where
552 A: Array,
553 for<'a> &'a A: std::convert::From<&'a ArrayImpl>,
554 for<'a> <A as Array>::RefItem<'a>: PartialEq,
555 F: Fn(bool) -> <A as Array>::OwnedItem,
556 {
557 let mut input = Vec::<Option<bool>>::new();
558 let mut target = Vec::<Option<<A as Array>::OwnedItem>>::new();
559 for i in 0..100 {
560 if i % 2 == 0 {
561 input.push(Some(true));
562 target.push(Some(f(true)));
563 } else if i % 3 == 0 {
564 input.push(Some(false));
565 target.push(Some(f(false)));
566 } else {
567 input.push(None);
568 target.push(None);
569 }
570 }
571
572 let col1 = BoolArray::from_iter(&input).into_ref();
573 let data_chunk = DataChunk::new(vec![col1], 100);
574 let expr = build_from_pretty(format!("({kind:?}:boolean $0:boolean)"));
575 let res = expr.eval(&data_chunk).await.unwrap();
576 let arr: &A = res.as_ref().into();
577 for (idx, item) in arr.iter().enumerate() {
578 let x = target[idx].as_ref().map(|x| x.as_scalar_ref());
579 assert_eq!(x, item);
580 }
581
582 for i in 0..input.len() {
583 let row = OwnedRow::new(vec![input[i].map(|b| b.to_scalar_value())]);
584 let result = expr.eval_row(&row).await.unwrap();
585 let expected = target[i].as_ref().cloned().map(|x| x.to_scalar_value());
586 assert_eq!(result, expected);
587 }
588 }
589
590 async fn test_unary_date<A, F>(f: F, kind: PbType)
591 where
592 A: Array,
593 for<'a> &'a A: std::convert::From<&'a ArrayImpl>,
594 for<'a> <A as Array>::RefItem<'a>: PartialEq,
595 F: Fn(Date) -> <A as Array>::OwnedItem,
596 {
597 let mut input = Vec::<Option<Date>>::new();
598 let mut target = Vec::<Option<<A as Array>::OwnedItem>>::new();
599 for i in 0..100 {
600 if i % 2 == 0 {
601 let date = Date::from_num_days_from_ce_uncheck(i);
602 input.push(Some(date));
603 target.push(Some(f(date)));
604 } else {
605 input.push(None);
606 target.push(None);
607 }
608 }
609
610 let col1 = DateArray::from_iter(&input).into_ref();
611 let data_chunk = DataChunk::new(vec![col1], 100);
612 let expr = build_from_pretty(format!("({kind:?}:timestamp $0:date)"));
613 let res = expr.eval(&data_chunk).await.unwrap();
614 let arr: &A = res.as_ref().into();
615 for (idx, item) in arr.iter().enumerate() {
616 let x = target[idx].as_ref().map(|x| x.as_scalar_ref());
617 assert_eq!(x, item);
618 }
619
620 for i in 0..input.len() {
621 let row = OwnedRow::new(vec![input[i].map(|d| d.to_scalar_value())]);
622 let result = expr.eval_row(&row).await.unwrap();
623 let expected = target[i].as_ref().cloned().map(|x| x.to_scalar_value());
624 assert_eq!(result, expected);
625 }
626 }
627}