1use std::fmt::{self, Write};
16use std::hash::Hash;
17
18use bytes::{Buf, BufMut, BytesMut};
19use jsonbb::{Value, ValueRef};
20use postgres_types::{FromSql, IsNull, ToSql, Type, accepts, to_sql_checked};
21use risingwave_common_estimate_size::EstimateSize;
22use thiserror_ext::AsReport;
23
24use super::{
25    Datum, F64, IntoOrdered, ListValue, MapType, MapValue, ScalarImpl, StructRef, ToOwnedDatum,
26};
27use crate::types::{
28    DEBEZIUM_UNAVAILABLE_JSON, DEBEZIUM_UNAVAILABLE_VALUE, DataType, ListType, Scalar, ScalarRef,
29    StructType, StructValue,
30};
31use crate::util::iter_util::ZipEqDebug;
32
33#[derive(Debug, Clone, PartialEq, Eq, Hash)]
34pub struct JsonbVal(pub(crate) Value);
35
36#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
37pub struct JsonbRef<'a>(pub(crate) ValueRef<'a>);
38
39impl EstimateSize for JsonbVal {
40    fn estimated_heap_size(&self) -> usize {
41        self.0.capacity()
42    }
43}
44
45impl fmt::Display for JsonbVal {
48    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
49        crate::types::to_text::ToText::write(&self.as_scalar_ref(), f)
50    }
51}
52
53impl fmt::Display for JsonbRef<'_> {
56    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
57        crate::types::to_text::ToText::write(self, f)
58    }
59}
60
61impl Scalar for JsonbVal {
62    type ScalarRefType<'a> = JsonbRef<'a>;
63
64    fn as_scalar_ref(&self) -> Self::ScalarRefType<'_> {
65        JsonbRef(self.0.as_ref())
66    }
67}
68
69impl<'a> ScalarRef<'a> for JsonbRef<'a> {
70    type ScalarType = JsonbVal;
71
72    fn to_owned_scalar(&self) -> Self::ScalarType {
73        JsonbVal(self.0.into())
74    }
75
76    fn hash_scalar<H: std::hash::Hasher>(&self, state: &mut H) {
77        self.hash(state)
78    }
79}
80
81impl PartialOrd for JsonbVal {
82    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
83        Some(self.cmp(other))
84    }
85}
86
87impl Ord for JsonbVal {
88    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
89        self.as_scalar_ref().cmp(&other.as_scalar_ref())
90    }
91}
92
93impl PartialOrd for JsonbRef<'_> {
94    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
95        Some(self.cmp(other))
96    }
97}
98
99impl Ord for JsonbRef<'_> {
100    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
101        self.0.to_string().cmp(&other.0.to_string())
114    }
115}
116
117impl crate::types::to_text::ToText for JsonbRef<'_> {
118    fn write<W: std::fmt::Write>(&self, f: &mut W) -> std::fmt::Result {
119        use serde::Serialize as _;
122        let mut ser =
123            serde_json::ser::Serializer::with_formatter(FmtToIoUnchecked(f), ToTextFormatter);
124        self.0.serialize(&mut ser).map_err(|_| std::fmt::Error)
125    }
126
127    fn write_with_type<W: std::fmt::Write>(
128        &self,
129        _ty: &crate::types::DataType,
130        f: &mut W,
131    ) -> std::fmt::Result {
132        self.write(f)
133    }
134}
135
136impl crate::types::to_binary::ToBinary for JsonbRef<'_> {
137    fn to_binary_with_type(
138        &self,
139        _ty: &crate::types::DataType,
140    ) -> super::to_binary::Result<bytes::Bytes> {
141        Ok(self.value_serialize().into())
142    }
143}
144
145impl std::str::FromStr for JsonbVal {
146    type Err = <Value as std::str::FromStr>::Err;
147
148    fn from_str(s: &str) -> Result<Self, Self::Err> {
149        Ok(Self(s.parse()?))
150    }
151}
152
153impl JsonbVal {
154    pub fn from_debezium_unavailable_value(s: &str) -> Result<Self, serde_json::Error> {
157        if s.len() == DEBEZIUM_UNAVAILABLE_VALUE.len() && s == DEBEZIUM_UNAVAILABLE_VALUE {
159            return Ok(DEBEZIUM_UNAVAILABLE_JSON.clone());
160        }
161        Ok(Self(s.parse()?))
162    }
163}
164
165impl JsonbVal {
166    pub fn null() -> Self {
168        Self(Value::null())
169    }
170
171    pub fn empty_array() -> Self {
173        Self(Value::array([]))
174    }
175
176    pub fn empty_object() -> Self {
178        Self(Value::object([]))
179    }
180
181    pub fn memcmp_deserialize(
183        deserializer: &mut memcomparable::Deserializer<impl bytes::Buf>,
184    ) -> memcomparable::Result<Self> {
185        let v = <String as serde::Deserialize>::deserialize(deserializer)?
186            .parse()
187            .map_err(|_| memcomparable::Error::Message("invalid json".into()))?;
188        Ok(Self(v))
189    }
190
191    pub fn value_deserialize(mut buf: &[u8]) -> Option<Self> {
193        if buf.is_empty() || buf.get_u8() != 1 {
194            return None;
195        }
196        Value::from_text(buf).ok().map(Self)
197    }
198
199    pub fn take(self) -> serde_json::Value {
201        self.0.into()
202    }
203}
204
205impl From<serde_json::Value> for JsonbVal {
206    fn from(v: serde_json::Value) -> Self {
207        Self(v.into())
208    }
209}
210
211impl From<Value> for JsonbVal {
212    fn from(v: Value) -> Self {
213        Self(v)
214    }
215}
216
217impl From<JsonbRef<'_>> for JsonbVal {
218    fn from(v: JsonbRef<'_>) -> Self {
219        Self(v.0.to_owned())
220    }
221}
222
223impl From<f64> for JsonbVal {
224    fn from(v: f64) -> Self {
225        Self(v.into())
226    }
227}
228
229impl<'a> From<JsonbRef<'a>> for ValueRef<'a> {
230    fn from(v: JsonbRef<'a>) -> Self {
231        v.0
232    }
233}
234
235impl<'a> JsonbRef<'a> {
236    pub fn memcmp_serialize(
237        &self,
238        serializer: &mut memcomparable::Serializer<impl bytes::BufMut>,
239    ) -> memcomparable::Result<()> {
240        let s = self.0.to_string();
243        serde::Serialize::serialize(&s, serializer)
244    }
245
246    pub fn value_serialize(&self) -> Vec<u8> {
248        use std::io::Write;
249        let mut buf = Vec::with_capacity(self.0.capacity());
254        buf.push(1);
255        write!(&mut buf, "{}", self.0).unwrap();
256        buf
257    }
258
259    pub const fn null() -> Self {
261        Self(ValueRef::Null)
262    }
263
264    pub fn is_jsonb_null(&self) -> bool {
266        self.0.is_null()
267    }
268
269    pub fn is_scalar(&self) -> bool {
271        matches!(
272            self.0,
273            ValueRef::Null | ValueRef::Bool(_) | ValueRef::Number(_) | ValueRef::String(_)
274        )
275    }
276
277    pub fn is_array(&self) -> bool {
279        self.0.is_array()
280    }
281
282    pub fn is_object(&self) -> bool {
284        self.0.is_object()
285    }
286
287    pub fn type_name(&self) -> &'static str {
291        match self.0 {
292            ValueRef::Null => "null",
293            ValueRef::Bool(_) => "boolean",
294            ValueRef::Number(_) => "number",
295            ValueRef::String(_) => "string",
296            ValueRef::Array(_) => "array",
297            ValueRef::Object(_) => "object",
298        }
299    }
300
301    pub fn array_len(&self) -> Result<usize, String> {
303        let array = self
304            .0
305            .as_array()
306            .ok_or_else(|| format!("cannot get array length of a jsonb {}", self.type_name()))?;
307        Ok(array.len())
308    }
309
310    pub fn as_bool(&self) -> Result<bool, String> {
312        self.0
313            .as_bool()
314            .ok_or_else(|| format!("cannot cast jsonb {} to type boolean", self.type_name()))
315    }
316
317    pub fn as_string(&self) -> Result<String, String> {
319        self.0
320            .as_str()
321            .map(|s| s.to_owned())
322            .ok_or_else(|| format!("cannot cast jsonb {} to type string", self.type_name()))
323    }
324
325    pub fn as_str(&self) -> Result<&str, String> {
327        self.0
328            .as_str()
329            .ok_or_else(|| format!("cannot cast jsonb {} to type &str", self.type_name()))
330    }
331
332    pub fn as_number(&self) -> Result<F64, String> {
337        self.0
338            .as_number()
339            .ok_or_else(|| format!("cannot cast jsonb {} to type number", self.type_name()))?
340            .as_f64()
341            .map(|f| f.into_ordered())
342            .ok_or_else(|| "jsonb number out of range".into())
343    }
344
345    pub fn force_str<W: std::fmt::Write>(&self, writer: &mut W) -> std::fmt::Result {
355        match self.0 {
356            ValueRef::String(v) => writer.write_str(v.as_str()),
357            ValueRef::Null => Ok(()),
358            ValueRef::Bool(_) | ValueRef::Number(_) | ValueRef::Array(_) | ValueRef::Object(_) => {
359                use crate::types::to_text::ToText as _;
360                self.write_with_type(&crate::types::DataType::Jsonb, writer)
361            }
362        }
363    }
364
365    pub fn force_string(&self) -> String {
366        let mut s = String::new();
367        self.force_str(&mut s).unwrap();
368        s
369    }
370
371    pub fn access_object_field(&self, field: &str) -> Option<Self> {
372        self.0.get(field).map(Self)
373    }
374
375    pub fn access_array_element(&self, idx: usize) -> Option<Self> {
376        self.0.get(idx).map(Self)
377    }
378
379    pub fn array_elements(self) -> Result<impl Iterator<Item = JsonbRef<'a>>, String> {
381        let array = self
382            .0
383            .as_array()
384            .ok_or_else(|| format!("cannot extract elements from a jsonb {}", self.type_name()))?;
385        Ok(array.iter().map(Self))
386    }
387
388    pub fn object_keys(self) -> Result<impl Iterator<Item = &'a str>, String> {
390        let object = self.0.as_object().ok_or_else(|| {
391            format!(
392                "cannot call jsonb_object_keys on a jsonb {}",
393                self.type_name()
394            )
395        })?;
396        Ok(object.keys())
397    }
398
399    pub fn object_key_values(
401        self,
402    ) -> Result<impl Iterator<Item = (&'a str, JsonbRef<'a>)>, String> {
403        let object = self
404            .0
405            .as_object()
406            .ok_or_else(|| format!("cannot deconstruct a jsonb {}", self.type_name()))?;
407        Ok(object.iter().map(|(k, v)| (k, Self(v))))
408    }
409
410    pub fn pretty(self, f: &mut impl std::fmt::Write) -> std::fmt::Result {
412        use serde::Serialize;
413        use serde_json::ser::{PrettyFormatter, Serializer};
414
415        let mut ser =
416            Serializer::with_formatter(FmtToIoUnchecked(f), PrettyFormatter::with_indent(b"    "));
417        self.0.serialize(&mut ser).map_err(|_| std::fmt::Error)
418    }
419
420    pub fn to_datum(self, ty: &DataType) -> Result<Datum, String> {
422        if self.0.as_null().is_some() {
423            return Ok(None);
424        }
425        let datum = match ty {
426            DataType::Jsonb => ScalarImpl::Jsonb(self.into()),
427            DataType::List(l) => ScalarImpl::List(self.to_list(l)?),
428            DataType::Struct(s) => ScalarImpl::Struct(self.to_struct(s)?),
429            _ => {
430                let s = self.force_string();
431                ScalarImpl::from_text(&s, ty).map_err(|e| format!("{}", e.as_report()))?
432            }
433        };
434        Ok(Some(datum))
435    }
436
437    pub fn to_list(self, ty: &ListType) -> Result<ListValue, String> {
439        let elem_type = ty.elem();
440        let array = self
441            .0
442            .as_array()
443            .ok_or_else(|| format!("expected JSON array, but found {self}"))?;
444        let mut builder = elem_type.create_array_builder(array.len());
445        for v in array.iter() {
446            builder.append(Self(v).to_datum(elem_type)?);
447        }
448        Ok(ListValue::new(builder.finish()))
449    }
450
451    pub fn to_struct(self, ty: &StructType) -> Result<StructValue, String> {
453        let object = self.0.as_object().ok_or_else(|| {
454            format!(
455                "cannot call populate_composite on a jsonb {}",
456                self.type_name()
457            )
458        })?;
459        let mut fields = Vec::with_capacity(ty.len());
460        for (name, ty) in ty.iter() {
461            let datum = match object.get(name) {
462                Some(v) => Self(v).to_datum(ty)?,
463                None => None,
464            };
465            fields.push(datum);
466        }
467        Ok(StructValue::new(fields))
468    }
469
470    pub fn to_map(self, ty: &MapType) -> Result<MapValue, String> {
471        let object = self
472            .0
473            .as_object()
474            .ok_or_else(|| format!("cannot convert to map from a jsonb {}", self.type_name()))?;
475        if !matches!(ty.key(), DataType::Varchar) {
476            return Err("cannot convert jsonb to a map with non-string keys".to_owned());
477        }
478
479        let mut keys: Vec<Datum> = Vec::with_capacity(object.len());
480        let mut values: Vec<Datum> = Vec::with_capacity(object.len());
481        for (k, v) in object.iter() {
482            let v = Self(v).to_datum(ty.value())?;
483            keys.push(Some(ScalarImpl::Utf8(k.to_owned().into())));
484            values.push(v);
485        }
486        MapValue::try_from_kv(
487            ListValue::from_datum_iter(ty.key(), keys),
488            ListValue::from_datum_iter(ty.value(), values),
489        )
490    }
491
492    pub fn populate_struct(
494        self,
495        ty: &StructType,
496        base: Option<StructRef<'_>>,
497    ) -> Result<StructValue, String> {
498        let Some(base) = base else {
499            return self.to_struct(ty);
500        };
501        let object = self.0.as_object().ok_or_else(|| {
502            format!(
503                "cannot call populate_composite on a jsonb {}",
504                self.type_name()
505            )
506        })?;
507        let mut fields = Vec::with_capacity(ty.len());
508        for ((name, ty), base_field) in ty.iter().zip_eq_debug(base.iter_fields_ref()) {
509            let datum = match object.get(name) {
510                Some(v) => match ty {
511                    DataType::Struct(s) => Some(
513                        Self(v)
514                            .populate_struct(s, base_field.map(|s| s.into_struct()))?
515                            .into(),
516                    ),
517                    _ => Self(v).to_datum(ty)?,
518                },
519                None => base_field.to_owned_datum(),
520            };
521            fields.push(datum);
522        }
523        Ok(StructValue::new(fields))
524    }
525
526    pub fn capacity(self) -> usize {
528        self.0.capacity()
529    }
530}
531
532struct ToTextFormatter;
535
536impl serde_json::ser::Formatter for ToTextFormatter {
537    fn begin_array_value<W>(&mut self, writer: &mut W, first: bool) -> std::io::Result<()>
538    where
539        W: ?Sized + std::io::Write,
540    {
541        if first {
542            Ok(())
543        } else {
544            writer.write_all(b", ")
545        }
546    }
547
548    fn begin_object_key<W>(&mut self, writer: &mut W, first: bool) -> std::io::Result<()>
549    where
550        W: ?Sized + std::io::Write,
551    {
552        if first {
553            Ok(())
554        } else {
555            writer.write_all(b", ")
556        }
557    }
558
559    fn begin_object_value<W>(&mut self, writer: &mut W) -> std::io::Result<()>
560    where
561        W: ?Sized + std::io::Write,
562    {
563        writer.write_all(b": ")
564    }
565}
566
567struct FmtToIoUnchecked<F>(F);
569
570impl<F: std::fmt::Write> std::io::Write for FmtToIoUnchecked<F> {
571    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
572        let s = unsafe { std::str::from_utf8_unchecked(buf) };
573        self.0.write_str(s).map_err(|_| std::io::ErrorKind::Other)?;
574        Ok(buf.len())
575    }
576
577    fn flush(&mut self) -> std::io::Result<()> {
578        Ok(())
579    }
580}
581
582impl ToSql for JsonbVal {
583    accepts!(JSON, JSONB);
584
585    to_sql_checked!();
586
587    fn to_sql(
588        &self,
589        ty: &Type,
590        out: &mut BytesMut,
591    ) -> Result<IsNull, Box<dyn std::error::Error + Sync + Send>>
592    where
593        Self: Sized,
594    {
595        if matches!(*ty, Type::JSONB) {
596            out.put_u8(1);
597        }
598        write!(out, "{}", self.0).unwrap();
599        Ok(IsNull::No)
600    }
601}
602
603impl<'a> FromSql<'a> for JsonbVal {
604    accepts!(JSON, JSONB);
605
606    fn from_sql(
607        ty: &Type,
608        mut raw: &'a [u8],
609    ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
610        Ok(match *ty {
611            Type::JSON => JsonbVal::from(Value::from_text(raw)?),
627            Type::JSONB => {
628                if raw.is_empty() || raw.get_u8() != 1 {
629                    return Err("invalid jsonb encoding".into());
630                }
631                JsonbVal::from(Value::from_text(raw)?)
632            }
633            _ => {
634                bail_not_implemented!("the JsonbVal's postgres decoding for {ty} is unsupported")
635            }
636        })
637    }
638}
639
640impl ToSql for JsonbRef<'_> {
641    accepts!(JSON, JSONB);
642
643    to_sql_checked!();
644
645    fn to_sql(
646        &self,
647        ty: &Type,
648        out: &mut BytesMut,
649    ) -> Result<IsNull, Box<dyn std::error::Error + Sync + Send>>
650    where
651        Self: Sized,
652    {
653        if matches!(*ty, Type::JSONB) {
654            out.put_u8(1);
655        }
656        write!(out, "{}", self.0).unwrap();
657        Ok(IsNull::No)
658    }
659}