risingwave_common/types/
jsonb.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt::{self, Write};
16use std::hash::Hash;
17
18use bytes::{Buf, BufMut, BytesMut};
19use jsonbb::{Value, ValueRef};
20use postgres_types::{FromSql, IsNull, ToSql, Type, accepts, to_sql_checked};
21use risingwave_common_estimate_size::EstimateSize;
22use thiserror_ext::AsReport;
23
24use super::{
25    Datum, F64, IntoOrdered, ListValue, MapType, MapValue, ScalarImpl, StructRef, ToOwnedDatum,
26};
27use crate::types::{
28    DEBEZIUM_UNAVAILABLE_JSON, DEBEZIUM_UNAVAILABLE_VALUE, DataType, ListType, Scalar, ScalarRef,
29    StructType, StructValue,
30};
31use crate::util::iter_util::ZipEqDebug;
32
33#[derive(Debug, Clone, PartialEq, Eq, Hash)]
34pub struct JsonbVal(pub(crate) Value);
35
36#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
37pub struct JsonbRef<'a>(pub(crate) ValueRef<'a>);
38
39impl EstimateSize for JsonbVal {
40    fn estimated_heap_size(&self) -> usize {
41        self.0.capacity()
42    }
43}
44
45/// The display of `JsonbVal` is pg-compatible format which has slightly different from
46/// `serde_json::Value`.
47impl fmt::Display for JsonbVal {
48    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
49        crate::types::to_text::ToText::write(&self.as_scalar_ref(), f)
50    }
51}
52
53/// The display of `JsonbRef` is pg-compatible format which has slightly different from
54/// `serde_json::Value`.
55impl fmt::Display for JsonbRef<'_> {
56    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
57        crate::types::to_text::ToText::write(self, f)
58    }
59}
60
61impl Scalar for JsonbVal {
62    type ScalarRefType<'a> = JsonbRef<'a>;
63
64    fn as_scalar_ref(&self) -> Self::ScalarRefType<'_> {
65        JsonbRef(self.0.as_ref())
66    }
67}
68
69impl<'a> ScalarRef<'a> for JsonbRef<'a> {
70    type ScalarType = JsonbVal;
71
72    fn to_owned_scalar(&self) -> Self::ScalarType {
73        JsonbVal(self.0.into())
74    }
75
76    fn hash_scalar<H: std::hash::Hasher>(&self, state: &mut H) {
77        self.hash(state)
78    }
79}
80
81impl PartialOrd for JsonbVal {
82    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
83        Some(self.cmp(other))
84    }
85}
86
87impl Ord for JsonbVal {
88    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
89        self.as_scalar_ref().cmp(&other.as_scalar_ref())
90    }
91}
92
93impl PartialOrd for JsonbRef<'_> {
94    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
95        Some(self.cmp(other))
96    }
97}
98
99impl Ord for JsonbRef<'_> {
100    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
101        // We do not intend to support ordering `jsonb` type.
102        // Before #7981 is done, we do not panic but just compare its string representation.
103        // Note that `serde_json` without feature `preserve_order` uses `BTreeMap` for json object.
104        // So its string form always have keys sorted.
105        //
106        // In PostgreSQL, Object > Array > Boolean > Number > String > Null.
107        // But here we have Object > true > Null > false > Array > Number > String.
108        // Because in ascii: `{` > `t` > `n` > `f` > `[` > `9` `-` > `"`.
109        //
110        // This is just to keep consistent with the memcomparable encoding, which uses string form.
111        // If we implemented the same typed comparison as PostgreSQL, we would need a corresponding
112        // memcomparable encoding for it.
113        self.0.to_string().cmp(&other.0.to_string())
114    }
115}
116
117impl crate::types::to_text::ToText for JsonbRef<'_> {
118    fn write<W: std::fmt::Write>(&self, f: &mut W) -> std::fmt::Result {
119        // Use custom [`ToTextFormatter`] to serialize. If we are okay with the default, this can be
120        // just `write!(f, "{}", self.0)`
121        use serde::Serialize as _;
122        let mut ser =
123            serde_json::ser::Serializer::with_formatter(FmtToIoUnchecked(f), ToTextFormatter);
124        self.0.serialize(&mut ser).map_err(|_| std::fmt::Error)
125    }
126
127    fn write_with_type<W: std::fmt::Write>(
128        &self,
129        _ty: &crate::types::DataType,
130        f: &mut W,
131    ) -> std::fmt::Result {
132        self.write(f)
133    }
134}
135
136impl crate::types::to_binary::ToBinary for JsonbRef<'_> {
137    fn to_binary_with_type(
138        &self,
139        _ty: &crate::types::DataType,
140    ) -> super::to_binary::Result<bytes::Bytes> {
141        Ok(self.value_serialize().into())
142    }
143}
144
145impl std::str::FromStr for JsonbVal {
146    type Err = <Value as std::str::FromStr>::Err;
147
148    fn from_str(s: &str) -> Result<Self, Self::Err> {
149        Ok(Self(s.parse()?))
150    }
151}
152
153impl JsonbVal {
154    /// Create a `JsonbVal` from a string, with special handling for Debezium's unavailable value placeholder.
155    /// Returns a Result to handle parsing errors properly.
156    pub fn from_debezium_unavailable_value(s: &str) -> Result<Self, serde_json::Error> {
157        // Special handling for Debezium's unavailable value placeholder
158        if s.len() == DEBEZIUM_UNAVAILABLE_VALUE.len() && s == DEBEZIUM_UNAVAILABLE_VALUE {
159            return Ok(DEBEZIUM_UNAVAILABLE_JSON.clone());
160        }
161        Ok(Self(s.parse()?))
162    }
163}
164
165impl JsonbVal {
166    /// Returns a jsonb `null`.
167    pub fn null() -> Self {
168        Self(Value::null())
169    }
170
171    /// Returns an empty array `[]`.
172    pub fn empty_array() -> Self {
173        Self(Value::array([]))
174    }
175
176    /// Returns an empty array `{}`.
177    pub fn empty_object() -> Self {
178        Self(Value::object([]))
179    }
180
181    /// Deserialize from a memcomparable encoding.
182    pub fn memcmp_deserialize(
183        deserializer: &mut memcomparable::Deserializer<impl bytes::Buf>,
184    ) -> memcomparable::Result<Self> {
185        let v = <String as serde::Deserialize>::deserialize(deserializer)?
186            .parse()
187            .map_err(|_| memcomparable::Error::Message("invalid json".into()))?;
188        Ok(Self(v))
189    }
190
191    /// Deserialize from a pgwire "BINARY" encoding.
192    pub fn value_deserialize(mut buf: &[u8]) -> Option<Self> {
193        if buf.is_empty() || buf.get_u8() != 1 {
194            return None;
195        }
196        Value::from_text(buf).ok().map(Self)
197    }
198
199    /// Convert the value to a [`serde_json::Value`].
200    pub fn take(self) -> serde_json::Value {
201        self.0.into()
202    }
203}
204
205impl From<serde_json::Value> for JsonbVal {
206    fn from(v: serde_json::Value) -> Self {
207        Self(v.into())
208    }
209}
210
211impl From<Value> for JsonbVal {
212    fn from(v: Value) -> Self {
213        Self(v)
214    }
215}
216
217impl From<JsonbRef<'_>> for JsonbVal {
218    fn from(v: JsonbRef<'_>) -> Self {
219        Self(v.0.to_owned())
220    }
221}
222
223impl From<f64> for JsonbVal {
224    fn from(v: f64) -> Self {
225        Self(v.into())
226    }
227}
228
229impl<'a> From<JsonbRef<'a>> for ValueRef<'a> {
230    fn from(v: JsonbRef<'a>) -> Self {
231        v.0
232    }
233}
234
235impl<'a> JsonbRef<'a> {
236    pub fn memcmp_serialize(
237        &self,
238        serializer: &mut memcomparable::Serializer<impl bytes::BufMut>,
239    ) -> memcomparable::Result<()> {
240        // As mentioned with `cmp`, this implementation is not intended to be used.
241        // But before #7981 is done, we do not want to `panic` here.
242        let s = self.0.to_string();
243        serde::Serialize::serialize(&s, serializer)
244    }
245
246    /// Serialize to a pgwire "BINARY" encoding.
247    pub fn value_serialize(&self) -> Vec<u8> {
248        use std::io::Write;
249        // Reuse the pgwire "BINARY" encoding for jsonb type.
250        // It is not truly binary, but one byte of version `1u8` followed by string form.
251        // This version number helps us maintain compatibility when we switch to more efficient
252        // encoding later.
253        let mut buf = Vec::with_capacity(self.0.capacity());
254        buf.push(1);
255        write!(&mut buf, "{}", self.0).unwrap();
256        buf
257    }
258
259    /// Returns a jsonb `null` value.
260    pub const fn null() -> Self {
261        Self(ValueRef::Null)
262    }
263
264    /// Returns true if this is a jsonb `null`.
265    pub fn is_jsonb_null(&self) -> bool {
266        self.0.is_null()
267    }
268
269    /// Returns true if this is a jsonb null, boolean, number or string.
270    pub fn is_scalar(&self) -> bool {
271        matches!(
272            self.0,
273            ValueRef::Null | ValueRef::Bool(_) | ValueRef::Number(_) | ValueRef::String(_)
274        )
275    }
276
277    /// Returns true if this is a jsonb array.
278    pub fn is_array(&self) -> bool {
279        self.0.is_array()
280    }
281
282    /// Returns true if this is a jsonb object.
283    pub fn is_object(&self) -> bool {
284        self.0.is_object()
285    }
286
287    /// Returns the type name of this jsonb.
288    ///
289    /// Possible values are: `null`, `boolean`, `number`, `string`, `array`, `object`.
290    pub fn type_name(&self) -> &'static str {
291        match self.0 {
292            ValueRef::Null => "null",
293            ValueRef::Bool(_) => "boolean",
294            ValueRef::Number(_) => "number",
295            ValueRef::String(_) => "string",
296            ValueRef::Array(_) => "array",
297            ValueRef::Object(_) => "object",
298        }
299    }
300
301    /// Returns the length of this json array.
302    pub fn array_len(&self) -> Result<usize, String> {
303        let array = self
304            .0
305            .as_array()
306            .ok_or_else(|| format!("cannot get array length of a jsonb {}", self.type_name()))?;
307        Ok(array.len())
308    }
309
310    /// If the JSON is a boolean, returns the associated bool.
311    pub fn as_bool(&self) -> Result<bool, String> {
312        self.0
313            .as_bool()
314            .ok_or_else(|| format!("cannot cast jsonb {} to type boolean", self.type_name()))
315    }
316
317    /// If the JSON is a string, returns the associated string.
318    pub fn as_string(&self) -> Result<String, String> {
319        self.0
320            .as_str()
321            .map(|s| s.to_owned())
322            .ok_or_else(|| format!("cannot cast jsonb {} to type string", self.type_name()))
323    }
324
325    /// If the JSON is a string, returns the associated &str.
326    pub fn as_str(&self) -> Result<&str, String> {
327        self.0
328            .as_str()
329            .ok_or_else(|| format!("cannot cast jsonb {} to type &str", self.type_name()))
330    }
331
332    /// Attempt to read jsonb as a JSON number.
333    ///
334    /// According to RFC 8259, only number within IEEE 754 binary64 (double precision) has good
335    /// interoperability. We do not support arbitrary precision like PostgreSQL `numeric` right now.
336    pub fn as_number(&self) -> Result<F64, String> {
337        self.0
338            .as_number()
339            .ok_or_else(|| format!("cannot cast jsonb {} to type number", self.type_name()))?
340            .as_f64()
341            .map(|f| f.into_ordered())
342            .ok_or_else(|| "jsonb number out of range".into())
343    }
344
345    /// This is part of the `->>` or `#>>` syntax to access a child as string.
346    ///
347    /// * It is not `as_str`, because there is no runtime error when the jsonb type is not string.
348    /// * It is not same as [`std::fmt::Display`] or [`super::ToText`] (cast to string) in the
349    ///   following 2 cases:
350    ///   * Jsonb null is displayed as 4-letter `null` but treated as sql null here.
351    ///       * This function writes nothing and the caller is responsible for checking
352    ///         [`Self::is_jsonb_null`] to differentiate it from an empty string.
353    ///   * Jsonb string is displayed with quotes but treated as its inner value here.
354    pub fn force_str<W: std::fmt::Write>(&self, writer: &mut W) -> std::fmt::Result {
355        match self.0 {
356            ValueRef::String(v) => writer.write_str(v.as_str()),
357            ValueRef::Null => Ok(()),
358            ValueRef::Bool(_) | ValueRef::Number(_) | ValueRef::Array(_) | ValueRef::Object(_) => {
359                use crate::types::to_text::ToText as _;
360                self.write_with_type(&crate::types::DataType::Jsonb, writer)
361            }
362        }
363    }
364
365    pub fn force_string(&self) -> String {
366        let mut s = String::new();
367        self.force_str(&mut s).unwrap();
368        s
369    }
370
371    pub fn access_object_field(&self, field: &str) -> Option<Self> {
372        self.0.get(field).map(Self)
373    }
374
375    pub fn access_array_element(&self, idx: usize) -> Option<Self> {
376        self.0.get(idx).map(Self)
377    }
378
379    /// Returns an iterator over the elements if this is an array.
380    pub fn array_elements(self) -> Result<impl Iterator<Item = JsonbRef<'a>>, String> {
381        let array = self
382            .0
383            .as_array()
384            .ok_or_else(|| format!("cannot extract elements from a jsonb {}", self.type_name()))?;
385        Ok(array.iter().map(Self))
386    }
387
388    /// Returns an iterator over the keys if this is an object.
389    pub fn object_keys(self) -> Result<impl Iterator<Item = &'a str>, String> {
390        let object = self.0.as_object().ok_or_else(|| {
391            format!(
392                "cannot call jsonb_object_keys on a jsonb {}",
393                self.type_name()
394            )
395        })?;
396        Ok(object.keys())
397    }
398
399    /// Returns an iterator over the key-value pairs if this is an object.
400    pub fn object_key_values(
401        self,
402    ) -> Result<impl Iterator<Item = (&'a str, JsonbRef<'a>)>, String> {
403        let object = self
404            .0
405            .as_object()
406            .ok_or_else(|| format!("cannot deconstruct a jsonb {}", self.type_name()))?;
407        Ok(object.iter().map(|(k, v)| (k, Self(v))))
408    }
409
410    /// Pretty print the jsonb value to the given writer, with 4 spaces indentation.
411    pub fn pretty(self, f: &mut impl std::fmt::Write) -> std::fmt::Result {
412        use serde::Serialize;
413        use serde_json::ser::{PrettyFormatter, Serializer};
414
415        let mut ser =
416            Serializer::with_formatter(FmtToIoUnchecked(f), PrettyFormatter::with_indent(b"    "));
417        self.0.serialize(&mut ser).map_err(|_| std::fmt::Error)
418    }
419
420    /// Convert the jsonb value to a datum.
421    pub fn to_datum(self, ty: &DataType) -> Result<Datum, String> {
422        if self.0.as_null().is_some() {
423            return Ok(None);
424        }
425        let datum = match ty {
426            DataType::Jsonb => ScalarImpl::Jsonb(self.into()),
427            DataType::List(l) => ScalarImpl::List(self.to_list(l)?),
428            DataType::Struct(s) => ScalarImpl::Struct(self.to_struct(s)?),
429            _ => {
430                let s = self.force_string();
431                ScalarImpl::from_text(&s, ty).map_err(|e| format!("{}", e.as_report()))?
432            }
433        };
434        Ok(Some(datum))
435    }
436
437    /// Convert the jsonb value to a list value.
438    pub fn to_list(self, ty: &ListType) -> Result<ListValue, String> {
439        let elem_type = ty.elem();
440        let array = self
441            .0
442            .as_array()
443            .ok_or_else(|| format!("expected JSON array, but found {self}"))?;
444        let mut builder = elem_type.create_array_builder(array.len());
445        for v in array.iter() {
446            builder.append(Self(v).to_datum(elem_type)?);
447        }
448        Ok(ListValue::new(builder.finish()))
449    }
450
451    /// Convert the jsonb value to a struct value.
452    pub fn to_struct(self, ty: &StructType) -> Result<StructValue, String> {
453        let object = self.0.as_object().ok_or_else(|| {
454            format!(
455                "cannot call populate_composite on a jsonb {}",
456                self.type_name()
457            )
458        })?;
459        let mut fields = Vec::with_capacity(ty.len());
460        for (name, ty) in ty.iter() {
461            let datum = match object.get(name) {
462                Some(v) => Self(v).to_datum(ty)?,
463                None => None,
464            };
465            fields.push(datum);
466        }
467        Ok(StructValue::new(fields))
468    }
469
470    pub fn to_map(self, ty: &MapType) -> Result<MapValue, String> {
471        let object = self
472            .0
473            .as_object()
474            .ok_or_else(|| format!("cannot convert to map from a jsonb {}", self.type_name()))?;
475        if !matches!(ty.key(), DataType::Varchar) {
476            return Err("cannot convert jsonb to a map with non-string keys".to_owned());
477        }
478
479        let mut keys: Vec<Datum> = Vec::with_capacity(object.len());
480        let mut values: Vec<Datum> = Vec::with_capacity(object.len());
481        for (k, v) in object.iter() {
482            let v = Self(v).to_datum(ty.value())?;
483            keys.push(Some(ScalarImpl::Utf8(k.to_owned().into())));
484            values.push(v);
485        }
486        MapValue::try_from_kv(
487            ListValue::from_datum_iter(ty.key(), keys),
488            ListValue::from_datum_iter(ty.value(), values),
489        )
490    }
491
492    /// Expands the top-level JSON object to a row having the struct type of the `base` argument.
493    pub fn populate_struct(
494        self,
495        ty: &StructType,
496        base: Option<StructRef<'_>>,
497    ) -> Result<StructValue, String> {
498        let Some(base) = base else {
499            return self.to_struct(ty);
500        };
501        let object = self.0.as_object().ok_or_else(|| {
502            format!(
503                "cannot call populate_composite on a jsonb {}",
504                self.type_name()
505            )
506        })?;
507        let mut fields = Vec::with_capacity(ty.len());
508        for ((name, ty), base_field) in ty.iter().zip_eq_debug(base.iter_fields_ref()) {
509            let datum = match object.get(name) {
510                Some(v) => match ty {
511                    // recursively populate the nested struct
512                    DataType::Struct(s) => Some(
513                        Self(v)
514                            .populate_struct(s, base_field.map(|s| s.into_struct()))?
515                            .into(),
516                    ),
517                    _ => Self(v).to_datum(ty)?,
518                },
519                None => base_field.to_owned_datum(),
520            };
521            fields.push(datum);
522        }
523        Ok(StructValue::new(fields))
524    }
525
526    /// Returns the capacity of the underlying buffer.
527    pub fn capacity(self) -> usize {
528        self.0.capacity()
529    }
530}
531
532/// A custom implementation for [`serde_json::ser::Formatter`] to match PostgreSQL, which adds extra
533/// space after `,` and `:` in array and object.
534struct ToTextFormatter;
535
536impl serde_json::ser::Formatter for ToTextFormatter {
537    fn begin_array_value<W>(&mut self, writer: &mut W, first: bool) -> std::io::Result<()>
538    where
539        W: ?Sized + std::io::Write,
540    {
541        if first {
542            Ok(())
543        } else {
544            writer.write_all(b", ")
545        }
546    }
547
548    fn begin_object_key<W>(&mut self, writer: &mut W, first: bool) -> std::io::Result<()>
549    where
550        W: ?Sized + std::io::Write,
551    {
552        if first {
553            Ok(())
554        } else {
555            writer.write_all(b", ")
556        }
557    }
558
559    fn begin_object_value<W>(&mut self, writer: &mut W) -> std::io::Result<()>
560    where
561        W: ?Sized + std::io::Write,
562    {
563        writer.write_all(b": ")
564    }
565}
566
567/// A wrapper of [`std::fmt::Write`] to implement [`std::io::Write`].
568struct FmtToIoUnchecked<F>(F);
569
570impl<F: std::fmt::Write> std::io::Write for FmtToIoUnchecked<F> {
571    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
572        let s = unsafe { std::str::from_utf8_unchecked(buf) };
573        self.0.write_str(s).map_err(|_| std::io::ErrorKind::Other)?;
574        Ok(buf.len())
575    }
576
577    fn flush(&mut self) -> std::io::Result<()> {
578        Ok(())
579    }
580}
581
582impl ToSql for JsonbVal {
583    accepts!(JSON, JSONB);
584
585    to_sql_checked!();
586
587    fn to_sql(
588        &self,
589        ty: &Type,
590        out: &mut BytesMut,
591    ) -> Result<IsNull, Box<dyn std::error::Error + Sync + Send>>
592    where
593        Self: Sized,
594    {
595        if matches!(*ty, Type::JSONB) {
596            out.put_u8(1);
597        }
598        write!(out, "{}", self.0).unwrap();
599        Ok(IsNull::No)
600    }
601}
602
603impl<'a> FromSql<'a> for JsonbVal {
604    accepts!(JSON, JSONB);
605
606    fn from_sql(
607        ty: &Type,
608        mut raw: &'a [u8],
609    ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
610        Ok(match *ty {
611            // Here we allow mapping JSON of pg to JSONB of rw. But please note the JSONB and JSON have different behaviors in postgres.
612            // An example of different semantics for duplicated keys in an object:
613            // test=# select jsonb_each('{"foo": 1, "bar": 2, "foo": 3}');
614            //  jsonb_each
615            //  ------------
616            //   (bar,2)
617            //   (foo,3)
618            //  (2 rows)
619            // test=# select json_each('{"foo": 1, "bar": 2, "foo": 3}');
620            //   json_each
621            //  -----------
622            //   (foo,1)
623            //   (bar,2)
624            //   (foo,3)
625            //  (3 rows)
626            Type::JSON => JsonbVal::from(Value::from_text(raw)?),
627            Type::JSONB => {
628                if raw.is_empty() || raw.get_u8() != 1 {
629                    return Err("invalid jsonb encoding".into());
630                }
631                JsonbVal::from(Value::from_text(raw)?)
632            }
633            _ => {
634                bail_not_implemented!("the JsonbVal's postgres decoding for {ty} is unsupported")
635            }
636        })
637    }
638}
639
640impl ToSql for JsonbRef<'_> {
641    accepts!(JSON, JSONB);
642
643    to_sql_checked!();
644
645    fn to_sql(
646        &self,
647        ty: &Type,
648        out: &mut BytesMut,
649    ) -> Result<IsNull, Box<dyn std::error::Error + Sync + Send>>
650    where
651        Self: Sized,
652    {
653        if matches!(*ty, Type::JSONB) {
654            out.put_u8(1);
655        }
656        write!(out, "{}", self.0).unwrap();
657        Ok(IsNull::No)
658    }
659}