1use std::str::FromStr;
16use std::sync::Arc;
17
18use futures_util::FutureExt;
19use itertools::Itertools;
20use risingwave_common::array::{DataChunk, ListRef, ListValue, StructRef, StructValue, VectorVal};
21use risingwave_common::cast;
22use risingwave_common::row::OwnedRow;
23use risingwave_common::types::{
24 DataType, F64, Int256, JsonbRef, MapRef, MapValue, ScalarRef as _, ToText,
25};
26use risingwave_common::util::iter_util::ZipEqFast;
27use risingwave_expr::expr::{Context, ExpressionBoxExt, InputRefExpression, build_func};
28use risingwave_expr::{ExprError, Result, function};
29use risingwave_pb::expr::expr_node::PbType;
30use thiserror_ext::AsReport;
31
32#[function("cast(varchar) -> *int")]
33#[function("cast(varchar) -> decimal")]
34#[function("cast(varchar) -> *float")]
35#[function("cast(varchar) -> int256")]
36#[function("cast(varchar) -> date")]
37#[function("cast(varchar) -> time")]
38#[function("cast(varchar) -> timestamp")]
39#[function("cast(varchar) -> interval")]
40#[function("cast(varchar) -> jsonb")]
41pub fn str_parse<T>(elem: &str, ctx: &Context) -> Result<T>
42where
43 T: FromStr,
44 <T as FromStr>::Err: std::fmt::Display,
45{
46 elem.trim().parse().map_err(|err: <T as FromStr>::Err| {
47 ExprError::Parse(format!("{} {}", ctx.return_type, err).into())
48 })
49}
50
51#[function("pgwire_recv(bytea) -> int8")]
53pub fn pgwire_recv(elem: &[u8]) -> Result<i64> {
54 let fixed_length =
55 <[u8; 8]>::try_from(elem).map_err(|e| ExprError::Parse(e.to_report_string().into()))?;
56 Ok(i64::from_be_bytes(fixed_length))
57}
58
59#[function("cast(int2) -> int256")]
60#[function("cast(int4) -> int256")]
61#[function("cast(int8) -> int256")]
62pub fn to_int256<T: TryInto<Int256>>(elem: T) -> Result<Int256> {
63 elem.try_into()
64 .map_err(|_| ExprError::CastOutOfRange("int256"))
65}
66
67#[function("cast(jsonb) -> boolean")]
68pub fn jsonb_to_bool(v: JsonbRef<'_>) -> Result<bool> {
69 v.as_bool().map_err(|e| ExprError::Parse(e.into()))
70}
71
72#[function("cast(jsonb) -> int2")]
75#[function("cast(jsonb) -> int4")]
76#[function("cast(jsonb) -> int8")]
77#[function("cast(jsonb) -> decimal")]
78#[function("cast(jsonb) -> float4")]
79#[function("cast(jsonb) -> float8")]
80pub fn jsonb_to_number<T: TryFrom<F64>>(v: JsonbRef<'_>) -> Result<T> {
81 v.as_number()
82 .map_err(|e| ExprError::Parse(e.into()))?
83 .try_into()
84 .map_err(|_| ExprError::NumericOutOfRange)
85}
86
87#[function("cast(int4) -> int2")]
88#[function("cast(int8) -> int2")]
89#[function("cast(int8) -> int4")]
90#[function("cast(int8) -> serial")]
91#[function("cast(serial) -> int8")]
92#[function("cast(float4) -> int2")]
93#[function("cast(float8) -> int2")]
94#[function("cast(float4) -> int4")]
95#[function("cast(float8) -> int4")]
96#[function("cast(float4) -> int8")]
97#[function("cast(float8) -> int8")]
98#[function("cast(float8) -> float4")]
99#[function("cast(decimal) -> int2")]
100#[function("cast(decimal) -> int4")]
101#[function("cast(decimal) -> int8")]
102#[function("cast(decimal) -> float4")]
103#[function("cast(decimal) -> float8")]
104#[function("cast(float4) -> decimal")]
105#[function("cast(float8) -> decimal")]
106pub fn try_cast<T1, T2>(elem: T1) -> Result<T2>
107where
108 T1: TryInto<T2> + std::fmt::Debug + Copy,
109{
110 elem.try_into()
111 .map_err(|_| ExprError::CastOutOfRange(std::any::type_name::<T2>()))
112}
113
114#[function("cast(boolean) -> int4")]
115#[function("cast(int2) -> int4")]
116#[function("cast(int2) -> int8")]
117#[function("cast(int2) -> float4")]
118#[function("cast(int2) -> float8")]
119#[function("cast(int2) -> decimal")]
120#[function("cast(int4) -> int8")]
121#[function("cast(int4) -> float4")]
122#[function("cast(int4) -> float8")]
123#[function("cast(int4) -> decimal")]
124#[function("cast(int8) -> float4")]
125#[function("cast(int8) -> float8")]
126#[function("cast(int8) -> decimal")]
127#[function("cast(float4) -> float8")]
128#[function("cast(date) -> timestamp")]
129#[function("cast(time) -> interval")]
130#[function("cast(timestamp) -> date")]
131#[function("cast(timestamp) -> time")]
132#[function("cast(interval) -> time")]
133#[function("cast(varchar) -> varchar")]
134#[function("cast(int256) -> float8")]
135pub fn cast<T1, T2>(elem: T1) -> T2
136where
137 T1: Into<T2>,
138{
139 elem.into()
140}
141
142#[function("cast(varchar) -> boolean")]
143pub fn str_to_bool(input: &str) -> Result<bool> {
144 cast::str_to_bool(input).map_err(|err| ExprError::Parse(err.into()))
145}
146
147#[function("cast(int4) -> boolean")]
148pub fn int_to_bool(input: i32) -> bool {
149 input != 0
150}
151
152#[function("cast(*int) -> varchar")]
155#[function("cast(decimal) -> varchar")]
156#[function("cast(*float) -> varchar")]
157#[function("cast(int256) -> varchar")]
158#[function("cast(time) -> varchar")]
159#[function("cast(date) -> varchar")]
160#[function("cast(interval) -> varchar")]
161#[function("cast(timestamp) -> varchar")]
162#[function("cast(jsonb) -> varchar")]
163#[function("cast(bytea) -> varchar")]
164#[function("cast(anyarray) -> varchar")]
165#[function("cast(vector) -> varchar")]
166pub fn general_to_text(elem: impl ToText, mut writer: &mut impl std::fmt::Write) {
167 elem.write(&mut writer).unwrap();
168}
169
170#[function("pgwire_send(int8) -> bytea")]
172fn pgwire_send(elem: i64, writer: &mut impl std::io::Write) {
173 writer.write_all(&elem.to_be_bytes()).unwrap();
174}
175
176#[function("cast(boolean) -> varchar")]
177pub fn bool_to_varchar(input: bool, writer: &mut impl std::fmt::Write) {
178 writer
179 .write_str(if input { "true" } else { "false" })
180 .unwrap();
181}
182
183#[function("bool_out(boolean) -> varchar")]
186pub fn bool_out(input: bool, writer: &mut impl std::fmt::Write) {
187 writer.write_str(if input { "t" } else { "f" }).unwrap();
188}
189
190#[function("cast(varchar) -> bytea")]
191pub fn str_to_bytea(elem: &str, writer: &mut impl std::io::Write) -> Result<()> {
192 cast::str_to_bytea(elem, writer).map_err(|err| ExprError::Parse(err.into()))
193}
194
195#[function("cast(varchar) -> anyarray", type_infer = "unreachable")]
196fn str_to_list(input: &str, ctx: &Context) -> Result<ListValue> {
197 ListValue::from_str(input, &ctx.return_type).map_err(|err| ExprError::Parse(err.into()))
198}
199
200#[function("cast(varchar) -> vector", type_infer = "unreachable")]
201fn str_to_vector(input: &str, ctx: &Context) -> Result<VectorVal> {
202 let DataType::Vector(size) = &ctx.return_type else {
203 unreachable!()
204 };
205 VectorVal::from_text(input, *size).map_err(|err| ExprError::Parse(err.into()))
206}
207
208#[function("cast(anyarray) -> anyarray", type_infer = "unreachable")]
210fn list_cast(input: ListRef<'_>, ctx: &Context) -> Result<ListValue> {
211 let cast = build_func(
212 PbType::Cast,
213 ctx.return_type.as_list_elem().clone(),
214 vec![InputRefExpression::new(ctx.arg_types[0].as_list_elem().clone(), 0).boxed()],
215 )
216 .unwrap();
217 let items = Arc::new(input.to_owned_scalar().into_array());
218 let len = items.len();
219 let list = cast
220 .eval(&DataChunk::new(vec![items], len))
221 .now_or_never()
222 .unwrap()?;
223 Ok(ListValue::new(Arc::try_unwrap(list).unwrap()))
224}
225
226#[function("cast(struct) -> struct", type_infer = "unreachable")]
228fn struct_cast(input: StructRef<'_>, ctx: &Context) -> Result<StructValue> {
229 let fields = (input.iter_fields_ref())
230 .zip_eq_fast(ctx.arg_types[0].as_struct().types())
231 .zip_eq_fast(ctx.return_type.as_struct().types())
232 .map(|((datum_ref, source_field_type), target_field_type)| {
233 if source_field_type == target_field_type {
234 return Ok(datum_ref.map(|scalar_ref| scalar_ref.into_scalar_impl()));
235 }
236 let cast = build_func(
237 PbType::Cast,
238 target_field_type.clone(),
239 vec![InputRefExpression::new(source_field_type.clone(), 0).boxed()],
240 )
241 .unwrap();
242 let value = match datum_ref {
243 Some(scalar_ref) => cast
244 .eval_row(&OwnedRow::new(vec![Some(scalar_ref.into_scalar_impl())]))
245 .now_or_never()
246 .unwrap()?,
247 None => None,
248 };
249 Ok(value) as Result<_>
250 })
251 .try_collect()?;
252 Ok(StructValue::new(fields))
253}
254
255#[function("cast(anymap) -> anymap", type_infer = "unreachable")]
257fn map_cast(map: MapRef<'_>, ctx: &Context) -> Result<MapValue> {
258 let new_ctx = Context {
259 arg_types: vec![ctx.arg_types[0].clone().as_map().clone().into_list()],
260 return_type: ctx.return_type.as_map().clone().into_list(),
261 variadic: ctx.variadic,
262 };
263 list_cast(map.into_inner(), &new_ctx).map(MapValue::from_entries)
264}
265
266#[cfg(test)]
267mod tests {
268 use chrono::NaiveDateTime;
269 use risingwave_common::array::*;
270 use risingwave_common::types::*;
271 use risingwave_expr::expr::build_from_pretty;
272
273 use super::*;
274
275 #[test]
276 fn integer_cast_to_bool() {
277 assert!(int_to_bool(32));
278 assert!(int_to_bool(-32));
279 assert!(!int_to_bool(0));
280 }
281
282 #[test]
283 fn number_to_string() {
284 macro_rules! test {
285 ($fn:ident($value:expr), $right:literal) => {
286 let mut writer = String::new();
287 $fn($value, &mut writer);
288 assert_eq!(writer, $right);
289 };
290 }
291
292 test!(bool_to_varchar(true), "true");
293 test!(bool_to_varchar(true), "true");
294 test!(bool_to_varchar(false), "false");
295
296 test!(general_to_text(32), "32");
297 test!(general_to_text(-32), "-32");
298 test!(general_to_text(i32::MIN), "-2147483648");
299 test!(general_to_text(i32::MAX), "2147483647");
300
301 test!(general_to_text(i16::MIN), "-32768");
302 test!(general_to_text(i16::MAX), "32767");
303
304 test!(general_to_text(i64::MIN), "-9223372036854775808");
305 test!(general_to_text(i64::MAX), "9223372036854775807");
306
307 test!(general_to_text(F64::from(32.12)), "32.12");
308 test!(general_to_text(F64::from(-32.14)), "-32.14");
309
310 test!(general_to_text(F32::from(32.12_f32)), "32.12");
311 test!(general_to_text(F32::from(-32.14_f32)), "-32.14");
312
313 test!(general_to_text(Decimal::try_from(1.222).unwrap()), "1.222");
314
315 test!(general_to_text(Decimal::NaN), "NaN");
316 }
317
318 #[test]
319 fn test_str_to_list() {
320 let ctx = Context {
322 arg_types: vec![DataType::Varchar],
323 return_type: DataType::from_str("int[]").unwrap(),
324 variadic: false,
325 };
326 assert_eq!(
327 str_to_list("{}", &ctx).unwrap(),
328 ListValue::empty(&DataType::Varchar)
329 );
330
331 let list123 = ListValue::from_iter([1, 2, 3]);
332
333 let ctx = Context {
335 arg_types: vec![DataType::Varchar],
336 return_type: DataType::from_str("int[]").unwrap(),
337 variadic: false,
338 };
339 assert_eq!(str_to_list("{1, 2, 3}", &ctx).unwrap(), list123);
340
341 let nested_list123 = ListValue::from_iter([list123]);
343 let ctx = Context {
344 arg_types: vec![DataType::Varchar],
345 return_type: DataType::from_str("int[][]").unwrap(),
346 variadic: false,
347 };
348 assert_eq!(str_to_list("{{1, 2, 3}}", &ctx).unwrap(), nested_list123);
349
350 let nested_list445566 = ListValue::from_iter([ListValue::from_iter([44, 55, 66])]);
351
352 let double_nested_list123_445566 =
353 ListValue::from_iter([nested_list123.clone(), nested_list445566.clone()]);
354
355 let ctx = Context {
357 arg_types: vec![DataType::Varchar],
358 return_type: DataType::from_str("int[][][]").unwrap(),
359 variadic: false,
360 };
361 assert_eq!(
362 str_to_list("{{{1, 2, 3}}, {{44, 55, 66}}}", &ctx).unwrap(),
363 double_nested_list123_445566
364 );
365
366 let ctx = Context {
368 arg_types: vec![DataType::from_str("int[][]").unwrap()],
369 return_type: DataType::from_str("varchar[][]").unwrap(),
370 variadic: false,
371 };
372 let double_nested_varchar_list123_445566 = ListValue::from_iter([
373 list_cast(nested_list123.as_scalar_ref(), &ctx).unwrap(),
374 list_cast(nested_list445566.as_scalar_ref(), &ctx).unwrap(),
375 ]);
376
377 let ctx = Context {
379 arg_types: vec![DataType::Varchar],
380 return_type: DataType::from_str("varchar[][][]").unwrap(),
381 variadic: false,
382 };
383 assert_eq!(
384 str_to_list("{{{1, 2, 3}}, {{44, 55, 66}}}", &ctx).unwrap(),
385 double_nested_varchar_list123_445566
386 );
387 }
388
389 #[test]
390 fn test_invalid_str_to_list() {
391 let ctx = Context {
393 arg_types: vec![DataType::Varchar],
394 return_type: DataType::from_str("int[]").unwrap(),
395 variadic: false,
396 };
397 assert!(str_to_list("{{}", &ctx).is_err());
398 assert!(str_to_list("{}}", &ctx).is_err());
399 assert!(str_to_list("{{1, 2, 3}, {4, 5, 6}", &ctx).is_err());
400 assert!(str_to_list("{{1, 2, 3}, 4, 5, 6}}", &ctx).is_err());
401 }
402
403 #[test]
404 fn test_struct_cast() {
405 let ctx = Context {
406 arg_types: vec![DataType::Struct(StructType::new(vec![
407 ("a", DataType::Varchar),
408 ("b", DataType::Float32),
409 ]))],
410 return_type: DataType::Struct(StructType::new(vec![
411 ("a", DataType::Int32),
412 ("b", DataType::Int32),
413 ])),
414 variadic: false,
415 };
416 assert_eq!(
417 struct_cast(
418 StructValue::new(vec![
419 Some("1".into()),
420 Some(F32::from(0.0).to_scalar_value()),
421 ])
422 .as_scalar_ref(),
423 &ctx,
424 )
425 .unwrap(),
426 StructValue::new(vec![
427 Some(1i32.to_scalar_value()),
428 Some(0i32.to_scalar_value()),
429 ])
430 );
431 }
432
433 #[test]
434 fn test_timestamp() {
435 assert_eq!(
436 try_cast::<_, Timestamp>(Date::from_ymd_uncheck(1994, 1, 1)).unwrap(),
437 Timestamp::new(
438 NaiveDateTime::parse_from_str("1994-1-1 0:0:0", "%Y-%m-%d %H:%M:%S").unwrap()
439 )
440 )
441 }
442
443 #[tokio::test]
444 async fn test_unary() {
445 test_unary_bool::<BoolArray, _>(|x| !x, PbType::Not).await;
446 test_unary_date::<TimestampArray, _>(|x| try_cast(x).unwrap(), PbType::Cast).await;
447 let ctx_str_to_int16 = Context {
448 arg_types: vec![DataType::Varchar],
449 return_type: DataType::Int16,
450 variadic: false,
451 };
452 test_str_to_int16::<I16Array, _>(|x| str_parse(x, &ctx_str_to_int16).unwrap()).await;
453 }
454
455 #[tokio::test]
456 async fn test_i16_to_i32() {
457 let mut input = Vec::<Option<i16>>::new();
458 let mut target = Vec::<Option<i32>>::new();
459 for i in 0..100i16 {
460 if i % 2 == 0 {
461 target.push(Some(i as i32));
462 input.push(Some(i));
463 } else {
464 input.push(None);
465 target.push(None);
466 }
467 }
468 let col1 = I16Array::from_iter(&input).into_ref();
469 let data_chunk = DataChunk::new(vec![col1], 100);
470 let expr = build_from_pretty("(cast:int4 $0:int2)");
471 let res = expr.eval(&data_chunk).await.unwrap();
472 let arr: &I32Array = res.as_ref().into();
473 for (idx, item) in arr.iter().enumerate() {
474 let x = target[idx].as_ref().map(|x| x.as_scalar_ref());
475 assert_eq!(x, item);
476 }
477
478 for i in 0..input.len() {
479 let row = OwnedRow::new(vec![input[i].map(|int| int.to_scalar_value())]);
480 let result = expr.eval_row(&row).await.unwrap();
481 let expected = target[i].map(|int| int.to_scalar_value());
482 assert_eq!(result, expected);
483 }
484 }
485
486 #[tokio::test]
487 async fn test_neg() {
488 let input = [Some(1), Some(0), Some(-1)];
489 let target = [Some(-1), Some(0), Some(1)];
490
491 let col1 = I32Array::from_iter(&input).into_ref();
492 let data_chunk = DataChunk::new(vec![col1], 3);
493 let expr = build_from_pretty("(neg:int4 $0:int4)");
494 let res = expr.eval(&data_chunk).await.unwrap();
495 let arr: &I32Array = res.as_ref().into();
496 for (idx, item) in arr.iter().enumerate() {
497 let x = target[idx].as_ref().map(|x| x.as_scalar_ref());
498 assert_eq!(x, item);
499 }
500
501 for i in 0..input.len() {
502 let row = OwnedRow::new(vec![input[i].map(|int| int.to_scalar_value())]);
503 let result = expr.eval_row(&row).await.unwrap();
504 let expected = target[i].map(|int| int.to_scalar_value());
505 assert_eq!(result, expected);
506 }
507 }
508
509 async fn test_str_to_int16<A, F>(f: F)
510 where
511 A: Array,
512 for<'a> &'a A: std::convert::From<&'a ArrayImpl>,
513 for<'a> <A as Array>::RefItem<'a>: PartialEq,
514 F: Fn(&str) -> <A as Array>::OwnedItem,
515 {
516 let mut input = Vec::<Option<Box<str>>>::new();
517 let mut target = Vec::<Option<<A as Array>::OwnedItem>>::new();
518 for i in 0..1u32 {
519 if i % 2 == 0 {
520 let s = i.to_string().into_boxed_str();
521 target.push(Some(f(&s)));
522 input.push(Some(s));
523 } else {
524 input.push(None);
525 target.push(None);
526 }
527 }
528 let col1_data = &input.iter().map(|x| x.as_ref().map(|x| &**x)).collect_vec();
529 let col1 = Utf8Array::from_iter(col1_data).into_ref();
530 let data_chunk = DataChunk::new(vec![col1], 1);
531 let expr = build_from_pretty("(cast:int2 $0:varchar)");
532 let res = expr.eval(&data_chunk).await.unwrap();
533 let arr: &A = res.as_ref().into();
534 for (idx, item) in arr.iter().enumerate() {
535 let x = target[idx].as_ref().map(|x| x.as_scalar_ref());
536 assert_eq!(x, item);
537 }
538
539 for i in 0..input.len() {
540 let row = OwnedRow::new(vec![
541 input[i].as_ref().cloned().map(|str| str.to_scalar_value()),
542 ]);
543 let result = expr.eval_row(&row).await.unwrap();
544 let expected = target[i].as_ref().cloned().map(|x| x.to_scalar_value());
545 assert_eq!(result, expected);
546 }
547 }
548
549 async fn test_unary_bool<A, F>(f: F, kind: PbType)
550 where
551 A: Array,
552 for<'a> &'a A: std::convert::From<&'a ArrayImpl>,
553 for<'a> <A as Array>::RefItem<'a>: PartialEq,
554 F: Fn(bool) -> <A as Array>::OwnedItem,
555 {
556 let mut input = Vec::<Option<bool>>::new();
557 let mut target = Vec::<Option<<A as Array>::OwnedItem>>::new();
558 for i in 0..100 {
559 if i % 2 == 0 {
560 input.push(Some(true));
561 target.push(Some(f(true)));
562 } else if i % 3 == 0 {
563 input.push(Some(false));
564 target.push(Some(f(false)));
565 } else {
566 input.push(None);
567 target.push(None);
568 }
569 }
570
571 let col1 = BoolArray::from_iter(&input).into_ref();
572 let data_chunk = DataChunk::new(vec![col1], 100);
573 let expr = build_from_pretty(format!("({kind:?}:boolean $0:boolean)"));
574 let res = expr.eval(&data_chunk).await.unwrap();
575 let arr: &A = res.as_ref().into();
576 for (idx, item) in arr.iter().enumerate() {
577 let x = target[idx].as_ref().map(|x| x.as_scalar_ref());
578 assert_eq!(x, item);
579 }
580
581 for i in 0..input.len() {
582 let row = OwnedRow::new(vec![input[i].map(|b| b.to_scalar_value())]);
583 let result = expr.eval_row(&row).await.unwrap();
584 let expected = target[i].as_ref().cloned().map(|x| x.to_scalar_value());
585 assert_eq!(result, expected);
586 }
587 }
588
589 async fn test_unary_date<A, F>(f: F, kind: PbType)
590 where
591 A: Array,
592 for<'a> &'a A: std::convert::From<&'a ArrayImpl>,
593 for<'a> <A as Array>::RefItem<'a>: PartialEq,
594 F: Fn(Date) -> <A as Array>::OwnedItem,
595 {
596 let mut input = Vec::<Option<Date>>::new();
597 let mut target = Vec::<Option<<A as Array>::OwnedItem>>::new();
598 for i in 0..100 {
599 if i % 2 == 0 {
600 let date = Date::from_num_days_from_ce_uncheck(i);
601 input.push(Some(date));
602 target.push(Some(f(date)));
603 } else {
604 input.push(None);
605 target.push(None);
606 }
607 }
608
609 let col1 = DateArray::from_iter(&input).into_ref();
610 let data_chunk = DataChunk::new(vec![col1], 100);
611 let expr = build_from_pretty(format!("({kind:?}:timestamp $0:date)"));
612 let res = expr.eval(&data_chunk).await.unwrap();
613 let arr: &A = res.as_ref().into();
614 for (idx, item) in arr.iter().enumerate() {
615 let x = target[idx].as_ref().map(|x| x.as_scalar_ref());
616 assert_eq!(x, item);
617 }
618
619 for i in 0..input.len() {
620 let row = OwnedRow::new(vec![input[i].map(|d| d.to_scalar_value())]);
621 let result = expr.eval_row(&row).await.unwrap();
622 let expected = target[i].as_ref().cloned().map(|x| x.to_scalar_value());
623 assert_eq!(result, expected);
624 }
625 }
626}