1use std::fmt::Write;
16use std::str::FromStr;
17use std::sync::Arc;
18
19use futures_util::FutureExt;
20use itertools::Itertools;
21use risingwave_common::array::{ArrayImpl, DataChunk, ListRef, ListValue, StructRef, StructValue};
22use risingwave_common::cast;
23use risingwave_common::row::OwnedRow;
24use risingwave_common::types::{F64, Int256, JsonbRef, MapRef, MapValue, ToText};
25use risingwave_common::util::iter_util::ZipEqFast;
26use risingwave_expr::expr::{Context, ExpressionBoxExt, InputRefExpression, build_func};
27use risingwave_expr::{ExprError, Result, function};
28use risingwave_pb::expr::expr_node::PbType;
29use thiserror_ext::AsReport;
30
31#[function("cast(varchar) -> *int")]
32#[function("cast(varchar) -> decimal")]
33#[function("cast(varchar) -> *float")]
34#[function("cast(varchar) -> int256")]
35#[function("cast(varchar) -> date")]
36#[function("cast(varchar) -> time")]
37#[function("cast(varchar) -> timestamp")]
38#[function("cast(varchar) -> interval")]
39#[function("cast(varchar) -> jsonb")]
40pub fn str_parse<T>(elem: &str, ctx: &Context) -> Result<T>
41where
42 T: FromStr,
43 <T as FromStr>::Err: std::fmt::Display,
44{
45 elem.trim().parse().map_err(|err: <T as FromStr>::Err| {
46 ExprError::Parse(format!("{} {}", ctx.return_type, err).into())
47 })
48}
49
50#[function("pgwire_recv(bytea) -> int8")]
52pub fn pgwire_recv(elem: &[u8]) -> Result<i64> {
53 let fixed_length =
54 <[u8; 8]>::try_from(elem).map_err(|e| ExprError::Parse(e.to_report_string().into()))?;
55 Ok(i64::from_be_bytes(fixed_length))
56}
57
58#[function("cast(int2) -> int256")]
59#[function("cast(int4) -> int256")]
60#[function("cast(int8) -> int256")]
61pub fn to_int256<T: TryInto<Int256>>(elem: T) -> Result<Int256> {
62 elem.try_into()
63 .map_err(|_| ExprError::CastOutOfRange("int256"))
64}
65
66#[function("cast(jsonb) -> boolean")]
67pub fn jsonb_to_bool(v: JsonbRef<'_>) -> Result<bool> {
68 v.as_bool().map_err(|e| ExprError::Parse(e.into()))
69}
70
71#[function("cast(jsonb) -> int2")]
74#[function("cast(jsonb) -> int4")]
75#[function("cast(jsonb) -> int8")]
76#[function("cast(jsonb) -> decimal")]
77#[function("cast(jsonb) -> float4")]
78#[function("cast(jsonb) -> float8")]
79pub fn jsonb_to_number<T: TryFrom<F64>>(v: JsonbRef<'_>) -> Result<T> {
80 v.as_number()
81 .map_err(|e| ExprError::Parse(e.into()))?
82 .try_into()
83 .map_err(|_| ExprError::NumericOutOfRange)
84}
85
86#[function("cast(int4) -> int2")]
87#[function("cast(int8) -> int2")]
88#[function("cast(int8) -> int4")]
89#[function("cast(int8) -> serial")]
90#[function("cast(serial) -> int8")]
91#[function("cast(float4) -> int2")]
92#[function("cast(float8) -> int2")]
93#[function("cast(float4) -> int4")]
94#[function("cast(float8) -> int4")]
95#[function("cast(float4) -> int8")]
96#[function("cast(float8) -> int8")]
97#[function("cast(float8) -> float4")]
98#[function("cast(decimal) -> int2")]
99#[function("cast(decimal) -> int4")]
100#[function("cast(decimal) -> int8")]
101#[function("cast(decimal) -> float4")]
102#[function("cast(decimal) -> float8")]
103#[function("cast(float4) -> decimal")]
104#[function("cast(float8) -> decimal")]
105pub fn try_cast<T1, T2>(elem: T1) -> Result<T2>
106where
107 T1: TryInto<T2> + std::fmt::Debug + Copy,
108{
109 elem.try_into()
110 .map_err(|_| ExprError::CastOutOfRange(std::any::type_name::<T2>()))
111}
112
113#[function("cast(boolean) -> int4")]
114#[function("cast(int2) -> int4")]
115#[function("cast(int2) -> int8")]
116#[function("cast(int2) -> float4")]
117#[function("cast(int2) -> float8")]
118#[function("cast(int2) -> decimal")]
119#[function("cast(int4) -> int8")]
120#[function("cast(int4) -> float4")]
121#[function("cast(int4) -> float8")]
122#[function("cast(int4) -> decimal")]
123#[function("cast(int8) -> float4")]
124#[function("cast(int8) -> float8")]
125#[function("cast(int8) -> decimal")]
126#[function("cast(float4) -> float8")]
127#[function("cast(date) -> timestamp")]
128#[function("cast(time) -> interval")]
129#[function("cast(timestamp) -> date")]
130#[function("cast(timestamp) -> time")]
131#[function("cast(interval) -> time")]
132#[function("cast(varchar) -> varchar")]
133#[function("cast(int256) -> float8")]
134pub fn cast<T1, T2>(elem: T1) -> T2
135where
136 T1: Into<T2>,
137{
138 elem.into()
139}
140
141#[function("cast(varchar) -> boolean")]
142pub fn str_to_bool(input: &str) -> Result<bool> {
143 cast::str_to_bool(input).map_err(|err| ExprError::Parse(err.into()))
144}
145
146#[function("cast(int4) -> boolean")]
147pub fn int_to_bool(input: i32) -> bool {
148 input != 0
149}
150
151#[function("cast(*int) -> varchar")]
154#[function("cast(decimal) -> varchar")]
155#[function("cast(*float) -> varchar")]
156#[function("cast(int256) -> varchar")]
157#[function("cast(time) -> varchar")]
158#[function("cast(date) -> varchar")]
159#[function("cast(interval) -> varchar")]
160#[function("cast(timestamp) -> varchar")]
161#[function("cast(jsonb) -> varchar")]
162#[function("cast(bytea) -> varchar")]
163#[function("cast(anyarray) -> varchar")]
164pub fn general_to_text(elem: impl ToText, mut writer: &mut impl Write) {
165 elem.write(&mut writer).unwrap();
166}
167
168#[function("pgwire_send(int8) -> bytea")]
170fn pgwire_send(elem: i64) -> Box<[u8]> {
171 elem.to_be_bytes().into()
172}
173
174#[function("cast(boolean) -> varchar")]
175pub fn bool_to_varchar(input: bool, writer: &mut impl Write) {
176 writer
177 .write_str(if input { "true" } else { "false" })
178 .unwrap();
179}
180
181#[function("bool_out(boolean) -> varchar")]
184pub fn bool_out(input: bool, writer: &mut impl Write) {
185 writer.write_str(if input { "t" } else { "f" }).unwrap();
186}
187
188#[function("cast(varchar) -> bytea")]
189pub fn str_to_bytea(elem: &str) -> Result<Box<[u8]>> {
190 cast::str_to_bytea(elem).map_err(|err| ExprError::Parse(err.into()))
191}
192
193#[function("cast(varchar) -> anyarray", type_infer = "unreachable")]
194fn str_to_list(input: &str, ctx: &Context) -> Result<ListValue> {
195 ListValue::from_str(input, &ctx.return_type).map_err(|err| ExprError::Parse(err.into()))
196}
197
198#[function("cast(anyarray) -> anyarray", type_infer = "unreachable")]
200fn list_cast(input: ListRef<'_>, ctx: &Context) -> Result<ListValue> {
201 let cast = build_func(
202 PbType::Cast,
203 ctx.return_type.as_list().clone(),
204 vec![InputRefExpression::new(ctx.arg_types[0].as_list().clone(), 0).boxed()],
205 )
206 .unwrap();
207 let items = Arc::new(ArrayImpl::from(input.to_owned()));
208 let len = items.len();
209 let list = cast
210 .eval(&DataChunk::new(vec![items], len))
211 .now_or_never()
212 .unwrap()?;
213 Ok(ListValue::new(Arc::try_unwrap(list).unwrap()))
214}
215
216#[function("cast(struct) -> struct", type_infer = "unreachable")]
218fn struct_cast(input: StructRef<'_>, ctx: &Context) -> Result<StructValue> {
219 let fields = (input.iter_fields_ref())
220 .zip_eq_fast(ctx.arg_types[0].as_struct().types())
221 .zip_eq_fast(ctx.return_type.as_struct().types())
222 .map(|((datum_ref, source_field_type), target_field_type)| {
223 if source_field_type == target_field_type {
224 return Ok(datum_ref.map(|scalar_ref| scalar_ref.into_scalar_impl()));
225 }
226 let cast = build_func(
227 PbType::Cast,
228 target_field_type.clone(),
229 vec![InputRefExpression::new(source_field_type.clone(), 0).boxed()],
230 )
231 .unwrap();
232 let value = match datum_ref {
233 Some(scalar_ref) => cast
234 .eval_row(&OwnedRow::new(vec![Some(scalar_ref.into_scalar_impl())]))
235 .now_or_never()
236 .unwrap()?,
237 None => None,
238 };
239 Ok(value) as Result<_>
240 })
241 .try_collect()?;
242 Ok(StructValue::new(fields))
243}
244
245#[function("cast(anymap) -> anymap", type_infer = "unreachable")]
247fn map_cast(map: MapRef<'_>, ctx: &Context) -> Result<MapValue> {
248 let new_ctx = Context {
249 arg_types: vec![ctx.arg_types[0].clone().as_map().clone().into_list()],
250 return_type: ctx.return_type.as_map().clone().into_list(),
251 variadic: ctx.variadic,
252 };
253 list_cast(map.into_inner(), &new_ctx).map(MapValue::from_entries)
254}
255
256#[cfg(test)]
257mod tests {
258 use chrono::NaiveDateTime;
259 use risingwave_common::array::*;
260 use risingwave_common::types::*;
261 use risingwave_expr::expr::build_from_pretty;
262
263 use super::*;
264
265 #[test]
266 fn integer_cast_to_bool() {
267 assert!(int_to_bool(32));
268 assert!(int_to_bool(-32));
269 assert!(!int_to_bool(0));
270 }
271
272 #[test]
273 fn number_to_string() {
274 macro_rules! test {
275 ($fn:ident($value:expr), $right:literal) => {
276 let mut writer = String::new();
277 $fn($value, &mut writer);
278 assert_eq!(writer, $right);
279 };
280 }
281
282 test!(bool_to_varchar(true), "true");
283 test!(bool_to_varchar(true), "true");
284 test!(bool_to_varchar(false), "false");
285
286 test!(general_to_text(32), "32");
287 test!(general_to_text(-32), "-32");
288 test!(general_to_text(i32::MIN), "-2147483648");
289 test!(general_to_text(i32::MAX), "2147483647");
290
291 test!(general_to_text(i16::MIN), "-32768");
292 test!(general_to_text(i16::MAX), "32767");
293
294 test!(general_to_text(i64::MIN), "-9223372036854775808");
295 test!(general_to_text(i64::MAX), "9223372036854775807");
296
297 test!(general_to_text(F64::from(32.12)), "32.12");
298 test!(general_to_text(F64::from(-32.14)), "-32.14");
299
300 test!(general_to_text(F32::from(32.12_f32)), "32.12");
301 test!(general_to_text(F32::from(-32.14_f32)), "-32.14");
302
303 test!(general_to_text(Decimal::try_from(1.222).unwrap()), "1.222");
304
305 test!(general_to_text(Decimal::NaN), "NaN");
306 }
307
308 #[test]
309 fn test_str_to_list() {
310 let ctx = Context {
312 arg_types: vec![DataType::Varchar],
313 return_type: DataType::from_str("int[]").unwrap(),
314 variadic: false,
315 };
316 assert_eq!(
317 str_to_list("{}", &ctx).unwrap(),
318 ListValue::empty(&DataType::Varchar)
319 );
320
321 let list123 = ListValue::from_iter([1, 2, 3]);
322
323 let ctx = Context {
325 arg_types: vec![DataType::Varchar],
326 return_type: DataType::from_str("int[]").unwrap(),
327 variadic: false,
328 };
329 assert_eq!(str_to_list("{1, 2, 3}", &ctx).unwrap(), list123);
330
331 let nested_list123 = ListValue::from_iter([list123]);
333 let ctx = Context {
334 arg_types: vec![DataType::Varchar],
335 return_type: DataType::from_str("int[][]").unwrap(),
336 variadic: false,
337 };
338 assert_eq!(str_to_list("{{1, 2, 3}}", &ctx).unwrap(), nested_list123);
339
340 let nested_list445566 = ListValue::from_iter([ListValue::from_iter([44, 55, 66])]);
341
342 let double_nested_list123_445566 =
343 ListValue::from_iter([nested_list123.clone(), nested_list445566.clone()]);
344
345 let ctx = Context {
347 arg_types: vec![DataType::Varchar],
348 return_type: DataType::from_str("int[][][]").unwrap(),
349 variadic: false,
350 };
351 assert_eq!(
352 str_to_list("{{{1, 2, 3}}, {{44, 55, 66}}}", &ctx).unwrap(),
353 double_nested_list123_445566
354 );
355
356 let ctx = Context {
358 arg_types: vec![DataType::from_str("int[][]").unwrap()],
359 return_type: DataType::from_str("varchar[][]").unwrap(),
360 variadic: false,
361 };
362 let double_nested_varchar_list123_445566 = ListValue::from_iter([
363 list_cast(nested_list123.as_scalar_ref(), &ctx).unwrap(),
364 list_cast(nested_list445566.as_scalar_ref(), &ctx).unwrap(),
365 ]);
366
367 let ctx = Context {
369 arg_types: vec![DataType::Varchar],
370 return_type: DataType::from_str("varchar[][][]").unwrap(),
371 variadic: false,
372 };
373 assert_eq!(
374 str_to_list("{{{1, 2, 3}}, {{44, 55, 66}}}", &ctx).unwrap(),
375 double_nested_varchar_list123_445566
376 );
377 }
378
379 #[test]
380 fn test_invalid_str_to_list() {
381 let ctx = Context {
383 arg_types: vec![DataType::Varchar],
384 return_type: DataType::from_str("int[]").unwrap(),
385 variadic: false,
386 };
387 assert!(str_to_list("{{}", &ctx).is_err());
388 assert!(str_to_list("{}}", &ctx).is_err());
389 assert!(str_to_list("{{1, 2, 3}, {4, 5, 6}", &ctx).is_err());
390 assert!(str_to_list("{{1, 2, 3}, 4, 5, 6}}", &ctx).is_err());
391 }
392
393 #[test]
394 fn test_struct_cast() {
395 let ctx = Context {
396 arg_types: vec![DataType::Struct(StructType::new(vec![
397 ("a", DataType::Varchar),
398 ("b", DataType::Float32),
399 ]))],
400 return_type: DataType::Struct(StructType::new(vec![
401 ("a", DataType::Int32),
402 ("b", DataType::Int32),
403 ])),
404 variadic: false,
405 };
406 assert_eq!(
407 struct_cast(
408 StructValue::new(vec![
409 Some("1".into()),
410 Some(F32::from(0.0).to_scalar_value()),
411 ])
412 .as_scalar_ref(),
413 &ctx,
414 )
415 .unwrap(),
416 StructValue::new(vec![
417 Some(1i32.to_scalar_value()),
418 Some(0i32.to_scalar_value()),
419 ])
420 );
421 }
422
423 #[test]
424 fn test_timestamp() {
425 assert_eq!(
426 try_cast::<_, Timestamp>(Date::from_ymd_uncheck(1994, 1, 1)).unwrap(),
427 Timestamp::new(
428 NaiveDateTime::parse_from_str("1994-1-1 0:0:0", "%Y-%m-%d %H:%M:%S").unwrap()
429 )
430 )
431 }
432
433 #[tokio::test]
434 async fn test_unary() {
435 test_unary_bool::<BoolArray, _>(|x| !x, PbType::Not).await;
436 test_unary_date::<TimestampArray, _>(|x| try_cast(x).unwrap(), PbType::Cast).await;
437 let ctx_str_to_int16 = Context {
438 arg_types: vec![DataType::Varchar],
439 return_type: DataType::Int16,
440 variadic: false,
441 };
442 test_str_to_int16::<I16Array, _>(|x| str_parse(x, &ctx_str_to_int16).unwrap()).await;
443 }
444
445 #[tokio::test]
446 async fn test_i16_to_i32() {
447 let mut input = Vec::<Option<i16>>::new();
448 let mut target = Vec::<Option<i32>>::new();
449 for i in 0..100i16 {
450 if i % 2 == 0 {
451 target.push(Some(i as i32));
452 input.push(Some(i));
453 } else {
454 input.push(None);
455 target.push(None);
456 }
457 }
458 let col1 = I16Array::from_iter(&input).into_ref();
459 let data_chunk = DataChunk::new(vec![col1], 100);
460 let expr = build_from_pretty("(cast:int4 $0:int2)");
461 let res = expr.eval(&data_chunk).await.unwrap();
462 let arr: &I32Array = res.as_ref().into();
463 for (idx, item) in arr.iter().enumerate() {
464 let x = target[idx].as_ref().map(|x| x.as_scalar_ref());
465 assert_eq!(x, item);
466 }
467
468 for i in 0..input.len() {
469 let row = OwnedRow::new(vec![input[i].map(|int| int.to_scalar_value())]);
470 let result = expr.eval_row(&row).await.unwrap();
471 let expected = target[i].map(|int| int.to_scalar_value());
472 assert_eq!(result, expected);
473 }
474 }
475
476 #[tokio::test]
477 async fn test_neg() {
478 let input = [Some(1), Some(0), Some(-1)];
479 let target = [Some(-1), Some(0), Some(1)];
480
481 let col1 = I32Array::from_iter(&input).into_ref();
482 let data_chunk = DataChunk::new(vec![col1], 3);
483 let expr = build_from_pretty("(neg:int4 $0:int4)");
484 let res = expr.eval(&data_chunk).await.unwrap();
485 let arr: &I32Array = res.as_ref().into();
486 for (idx, item) in arr.iter().enumerate() {
487 let x = target[idx].as_ref().map(|x| x.as_scalar_ref());
488 assert_eq!(x, item);
489 }
490
491 for i in 0..input.len() {
492 let row = OwnedRow::new(vec![input[i].map(|int| int.to_scalar_value())]);
493 let result = expr.eval_row(&row).await.unwrap();
494 let expected = target[i].map(|int| int.to_scalar_value());
495 assert_eq!(result, expected);
496 }
497 }
498
499 async fn test_str_to_int16<A, F>(f: F)
500 where
501 A: Array,
502 for<'a> &'a A: std::convert::From<&'a ArrayImpl>,
503 for<'a> <A as Array>::RefItem<'a>: PartialEq,
504 F: Fn(&str) -> <A as Array>::OwnedItem,
505 {
506 let mut input = Vec::<Option<Box<str>>>::new();
507 let mut target = Vec::<Option<<A as Array>::OwnedItem>>::new();
508 for i in 0..1u32 {
509 if i % 2 == 0 {
510 let s = i.to_string().into_boxed_str();
511 target.push(Some(f(&s)));
512 input.push(Some(s));
513 } else {
514 input.push(None);
515 target.push(None);
516 }
517 }
518 let col1_data = &input.iter().map(|x| x.as_ref().map(|x| &**x)).collect_vec();
519 let col1 = Utf8Array::from_iter(col1_data).into_ref();
520 let data_chunk = DataChunk::new(vec![col1], 1);
521 let expr = build_from_pretty("(cast:int2 $0:varchar)");
522 let res = expr.eval(&data_chunk).await.unwrap();
523 let arr: &A = res.as_ref().into();
524 for (idx, item) in arr.iter().enumerate() {
525 let x = target[idx].as_ref().map(|x| x.as_scalar_ref());
526 assert_eq!(x, item);
527 }
528
529 for i in 0..input.len() {
530 let row = OwnedRow::new(vec![
531 input[i].as_ref().cloned().map(|str| str.to_scalar_value()),
532 ]);
533 let result = expr.eval_row(&row).await.unwrap();
534 let expected = target[i].as_ref().cloned().map(|x| x.to_scalar_value());
535 assert_eq!(result, expected);
536 }
537 }
538
539 async fn test_unary_bool<A, F>(f: F, kind: PbType)
540 where
541 A: Array,
542 for<'a> &'a A: std::convert::From<&'a ArrayImpl>,
543 for<'a> <A as Array>::RefItem<'a>: PartialEq,
544 F: Fn(bool) -> <A as Array>::OwnedItem,
545 {
546 let mut input = Vec::<Option<bool>>::new();
547 let mut target = Vec::<Option<<A as Array>::OwnedItem>>::new();
548 for i in 0..100 {
549 if i % 2 == 0 {
550 input.push(Some(true));
551 target.push(Some(f(true)));
552 } else if i % 3 == 0 {
553 input.push(Some(false));
554 target.push(Some(f(false)));
555 } else {
556 input.push(None);
557 target.push(None);
558 }
559 }
560
561 let col1 = BoolArray::from_iter(&input).into_ref();
562 let data_chunk = DataChunk::new(vec![col1], 100);
563 let expr = build_from_pretty(format!("({kind:?}:boolean $0:boolean)"));
564 let res = expr.eval(&data_chunk).await.unwrap();
565 let arr: &A = res.as_ref().into();
566 for (idx, item) in arr.iter().enumerate() {
567 let x = target[idx].as_ref().map(|x| x.as_scalar_ref());
568 assert_eq!(x, item);
569 }
570
571 for i in 0..input.len() {
572 let row = OwnedRow::new(vec![input[i].map(|b| b.to_scalar_value())]);
573 let result = expr.eval_row(&row).await.unwrap();
574 let expected = target[i].as_ref().cloned().map(|x| x.to_scalar_value());
575 assert_eq!(result, expected);
576 }
577 }
578
579 async fn test_unary_date<A, F>(f: F, kind: PbType)
580 where
581 A: Array,
582 for<'a> &'a A: std::convert::From<&'a ArrayImpl>,
583 for<'a> <A as Array>::RefItem<'a>: PartialEq,
584 F: Fn(Date) -> <A as Array>::OwnedItem,
585 {
586 let mut input = Vec::<Option<Date>>::new();
587 let mut target = Vec::<Option<<A as Array>::OwnedItem>>::new();
588 for i in 0..100 {
589 if i % 2 == 0 {
590 let date = Date::from_num_days_from_ce_uncheck(i);
591 input.push(Some(date));
592 target.push(Some(f(date)));
593 } else {
594 input.push(None);
595 target.push(None);
596 }
597 }
598
599 let col1 = DateArray::from_iter(&input).into_ref();
600 let data_chunk = DataChunk::new(vec![col1], 100);
601 let expr = build_from_pretty(format!("({kind:?}:timestamp $0:date)"));
602 let res = expr.eval(&data_chunk).await.unwrap();
603 let arr: &A = res.as_ref().into();
604 for (idx, item) in arr.iter().enumerate() {
605 let x = target[idx].as_ref().map(|x| x.as_scalar_ref());
606 assert_eq!(x, item);
607 }
608
609 for i in 0..input.len() {
610 let row = OwnedRow::new(vec![input[i].map(|d| d.to_scalar_value())]);
611 let result = expr.eval_row(&row).await.unwrap();
612 let expected = target[i].as_ref().cloned().map(|x| x.to_scalar_value());
613 assert_eq!(result, expected);
614 }
615 }
616}