risingwave_common/types/
jsonb.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt::{self, Write};
16use std::hash::Hash;
17
18use bytes::{Buf, BufMut, BytesMut};
19use jsonbb::{Value, ValueRef};
20use postgres_types::{FromSql, IsNull, ToSql, Type, accepts, to_sql_checked};
21use risingwave_common_estimate_size::EstimateSize;
22use thiserror_ext::AsReport;
23
24use super::{
25    Datum, F64, IntoOrdered, ListValue, MapType, MapValue, ScalarImpl, StructRef, ToOwnedDatum,
26};
27use crate::types::{
28    DEBEZIUM_UNAVAILABLE_JSON, DEBEZIUM_UNAVAILABLE_VALUE, DataType, Scalar, ScalarRef, StructType,
29    StructValue,
30};
31use crate::util::iter_util::ZipEqDebug;
32
33#[derive(Debug, Clone, PartialEq, Eq, Hash)]
34pub struct JsonbVal(pub(crate) Value);
35
36#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
37pub struct JsonbRef<'a>(pub(crate) ValueRef<'a>);
38
39impl EstimateSize for JsonbVal {
40    fn estimated_heap_size(&self) -> usize {
41        self.0.capacity()
42    }
43}
44
45/// The display of `JsonbVal` is pg-compatible format which has slightly different from
46/// `serde_json::Value`.
47impl fmt::Display for JsonbVal {
48    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
49        crate::types::to_text::ToText::write(&self.as_scalar_ref(), f)
50    }
51}
52
53/// The display of `JsonbRef` is pg-compatible format which has slightly different from
54/// `serde_json::Value`.
55impl fmt::Display for JsonbRef<'_> {
56    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
57        crate::types::to_text::ToText::write(self, f)
58    }
59}
60
61impl Scalar for JsonbVal {
62    type ScalarRefType<'a> = JsonbRef<'a>;
63
64    fn as_scalar_ref(&self) -> Self::ScalarRefType<'_> {
65        JsonbRef(self.0.as_ref())
66    }
67}
68
69impl<'a> ScalarRef<'a> for JsonbRef<'a> {
70    type ScalarType = JsonbVal;
71
72    fn to_owned_scalar(&self) -> Self::ScalarType {
73        JsonbVal(self.0.into())
74    }
75
76    fn hash_scalar<H: std::hash::Hasher>(&self, state: &mut H) {
77        self.hash(state)
78    }
79}
80
81impl PartialOrd for JsonbVal {
82    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
83        Some(self.cmp(other))
84    }
85}
86
87impl Ord for JsonbVal {
88    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
89        self.as_scalar_ref().cmp(&other.as_scalar_ref())
90    }
91}
92
93impl PartialOrd for JsonbRef<'_> {
94    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
95        Some(self.cmp(other))
96    }
97}
98
99impl Ord for JsonbRef<'_> {
100    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
101        // We do not intend to support ordering `jsonb` type.
102        // Before #7981 is done, we do not panic but just compare its string representation.
103        // Note that `serde_json` without feature `preserve_order` uses `BTreeMap` for json object.
104        // So its string form always have keys sorted.
105        //
106        // In PostgreSQL, Object > Array > Boolean > Number > String > Null.
107        // But here we have Object > true > Null > false > Array > Number > String.
108        // Because in ascii: `{` > `t` > `n` > `f` > `[` > `9` `-` > `"`.
109        //
110        // This is just to keep consistent with the memcomparable encoding, which uses string form.
111        // If we implemented the same typed comparison as PostgreSQL, we would need a corresponding
112        // memcomparable encoding for it.
113        self.0.to_string().cmp(&other.0.to_string())
114    }
115}
116
117impl crate::types::to_text::ToText for JsonbRef<'_> {
118    fn write<W: std::fmt::Write>(&self, f: &mut W) -> std::fmt::Result {
119        // Use custom [`ToTextFormatter`] to serialize. If we are okay with the default, this can be
120        // just `write!(f, "{}", self.0)`
121        use serde::Serialize as _;
122        let mut ser =
123            serde_json::ser::Serializer::with_formatter(FmtToIoUnchecked(f), ToTextFormatter);
124        self.0.serialize(&mut ser).map_err(|_| std::fmt::Error)
125    }
126
127    fn write_with_type<W: std::fmt::Write>(
128        &self,
129        _ty: &crate::types::DataType,
130        f: &mut W,
131    ) -> std::fmt::Result {
132        self.write(f)
133    }
134}
135
136impl crate::types::to_binary::ToBinary for JsonbRef<'_> {
137    fn to_binary_with_type(
138        &self,
139        _ty: &crate::types::DataType,
140    ) -> super::to_binary::Result<bytes::Bytes> {
141        Ok(self.value_serialize().into())
142    }
143}
144
145impl std::str::FromStr for JsonbVal {
146    type Err = <Value as std::str::FromStr>::Err;
147
148    fn from_str(s: &str) -> Result<Self, Self::Err> {
149        Ok(Self(s.parse()?))
150    }
151}
152
153impl JsonbVal {
154    /// Create a `JsonbVal` from a string, with special handling for Debezium's unavailable value placeholder.
155    /// Returns a Result to handle parsing errors properly.
156    pub fn from_debezium_unavailable_value(s: &str) -> Result<Self, serde_json::Error> {
157        // Special handling for Debezium's unavailable value placeholder
158        if s.len() == DEBEZIUM_UNAVAILABLE_VALUE.len() && s == DEBEZIUM_UNAVAILABLE_VALUE {
159            return Ok(DEBEZIUM_UNAVAILABLE_JSON.clone());
160        }
161        Ok(Self(s.parse()?))
162    }
163}
164
165impl JsonbVal {
166    /// Returns a jsonb `null`.
167    pub fn null() -> Self {
168        Self(Value::null())
169    }
170
171    /// Returns an empty array `[]`.
172    pub fn empty_array() -> Self {
173        Self(Value::array([]))
174    }
175
176    /// Returns an empty array `{}`.
177    pub fn empty_object() -> Self {
178        Self(Value::object([]))
179    }
180
181    /// Deserialize from a memcomparable encoding.
182    pub fn memcmp_deserialize(
183        deserializer: &mut memcomparable::Deserializer<impl bytes::Buf>,
184    ) -> memcomparable::Result<Self> {
185        let v = <String as serde::Deserialize>::deserialize(deserializer)?
186            .parse()
187            .map_err(|_| memcomparable::Error::Message("invalid json".into()))?;
188        Ok(Self(v))
189    }
190
191    /// Deserialize from a pgwire "BINARY" encoding.
192    pub fn value_deserialize(mut buf: &[u8]) -> Option<Self> {
193        if buf.is_empty() || buf.get_u8() != 1 {
194            return None;
195        }
196        Value::from_text(buf).ok().map(Self)
197    }
198
199    /// Convert the value to a [`serde_json::Value`].
200    pub fn take(self) -> serde_json::Value {
201        self.0.into()
202    }
203}
204
205impl From<serde_json::Value> for JsonbVal {
206    fn from(v: serde_json::Value) -> Self {
207        Self(v.into())
208    }
209}
210
211impl From<Value> for JsonbVal {
212    fn from(v: Value) -> Self {
213        Self(v)
214    }
215}
216
217impl From<JsonbRef<'_>> for JsonbVal {
218    fn from(v: JsonbRef<'_>) -> Self {
219        Self(v.0.to_owned())
220    }
221}
222
223impl From<f64> for JsonbVal {
224    fn from(v: f64) -> Self {
225        Self(v.into())
226    }
227}
228
229impl<'a> From<JsonbRef<'a>> for ValueRef<'a> {
230    fn from(v: JsonbRef<'a>) -> Self {
231        v.0
232    }
233}
234
235impl<'a> JsonbRef<'a> {
236    pub fn memcmp_serialize(
237        &self,
238        serializer: &mut memcomparable::Serializer<impl bytes::BufMut>,
239    ) -> memcomparable::Result<()> {
240        // As mentioned with `cmp`, this implementation is not intended to be used.
241        // But before #7981 is done, we do not want to `panic` here.
242        let s = self.0.to_string();
243        serde::Serialize::serialize(&s, serializer)
244    }
245
246    /// Serialize to a pgwire "BINARY" encoding.
247    pub fn value_serialize(&self) -> Vec<u8> {
248        use std::io::Write;
249        // Reuse the pgwire "BINARY" encoding for jsonb type.
250        // It is not truly binary, but one byte of version `1u8` followed by string form.
251        // This version number helps us maintain compatibility when we switch to more efficient
252        // encoding later.
253        let mut buf = Vec::with_capacity(self.0.capacity());
254        buf.push(1);
255        write!(&mut buf, "{}", self.0).unwrap();
256        buf
257    }
258
259    /// Returns a jsonb `null` value.
260    pub const fn null() -> Self {
261        Self(ValueRef::Null)
262    }
263
264    /// Returns a value for empty string.
265    pub const fn empty_string() -> Self {
266        Self(ValueRef::String(""))
267    }
268
269    /// Returns true if this is a jsonb `null`.
270    pub fn is_jsonb_null(&self) -> bool {
271        self.0.is_null()
272    }
273
274    /// Returns true if this is a jsonb null, boolean, number or string.
275    pub fn is_scalar(&self) -> bool {
276        matches!(
277            self.0,
278            ValueRef::Null | ValueRef::Bool(_) | ValueRef::Number(_) | ValueRef::String(_)
279        )
280    }
281
282    /// Returns true if this is a jsonb array.
283    pub fn is_array(&self) -> bool {
284        self.0.is_array()
285    }
286
287    /// Returns true if this is a jsonb object.
288    pub fn is_object(&self) -> bool {
289        self.0.is_object()
290    }
291
292    /// Returns the type name of this jsonb.
293    ///
294    /// Possible values are: `null`, `boolean`, `number`, `string`, `array`, `object`.
295    pub fn type_name(&self) -> &'static str {
296        match self.0 {
297            ValueRef::Null => "null",
298            ValueRef::Bool(_) => "boolean",
299            ValueRef::Number(_) => "number",
300            ValueRef::String(_) => "string",
301            ValueRef::Array(_) => "array",
302            ValueRef::Object(_) => "object",
303        }
304    }
305
306    /// Returns the length of this json array.
307    pub fn array_len(&self) -> Result<usize, String> {
308        let array = self
309            .0
310            .as_array()
311            .ok_or_else(|| format!("cannot get array length of a jsonb {}", self.type_name()))?;
312        Ok(array.len())
313    }
314
315    /// If the JSON is a boolean, returns the associated bool.
316    pub fn as_bool(&self) -> Result<bool, String> {
317        self.0
318            .as_bool()
319            .ok_or_else(|| format!("cannot cast jsonb {} to type boolean", self.type_name()))
320    }
321
322    /// If the JSON is a string, returns the associated string.
323    pub fn as_string(&self) -> Result<String, String> {
324        self.0
325            .as_str()
326            .map(|s| s.to_owned())
327            .ok_or_else(|| format!("cannot cast jsonb {} to type string", self.type_name()))
328    }
329
330    /// If the JSON is a string, returns the associated &str.
331    pub fn as_str(&self) -> Result<&str, String> {
332        self.0
333            .as_str()
334            .ok_or_else(|| format!("cannot cast jsonb {} to type &str", self.type_name()))
335    }
336
337    /// Attempt to read jsonb as a JSON number.
338    ///
339    /// According to RFC 8259, only number within IEEE 754 binary64 (double precision) has good
340    /// interoperability. We do not support arbitrary precision like PostgreSQL `numeric` right now.
341    pub fn as_number(&self) -> Result<F64, String> {
342        self.0
343            .as_number()
344            .ok_or_else(|| format!("cannot cast jsonb {} to type number", self.type_name()))?
345            .as_f64()
346            .map(|f| f.into_ordered())
347            .ok_or_else(|| "jsonb number out of range".into())
348    }
349
350    /// This is part of the `->>` or `#>>` syntax to access a child as string.
351    ///
352    /// * It is not `as_str`, because there is no runtime error when the jsonb type is not string.
353    /// * It is not same as [`std::fmt::Display`] or [`super::ToText`] (cast to string) in the
354    ///   following 2 cases:
355    ///   * Jsonb null is displayed as 4-letter `null` but treated as sql null here.
356    ///       * This function writes nothing and the caller is responsible for checking
357    ///         [`Self::is_jsonb_null`] to differentiate it from an empty string.
358    ///   * Jsonb string is displayed with quotes but treated as its inner value here.
359    pub fn force_str<W: std::fmt::Write>(&self, writer: &mut W) -> std::fmt::Result {
360        match self.0 {
361            ValueRef::String(v) => writer.write_str(v),
362            ValueRef::Null => Ok(()),
363            ValueRef::Bool(_) | ValueRef::Number(_) | ValueRef::Array(_) | ValueRef::Object(_) => {
364                use crate::types::to_text::ToText as _;
365                self.write_with_type(&crate::types::DataType::Jsonb, writer)
366            }
367        }
368    }
369
370    pub fn force_string(&self) -> String {
371        let mut s = String::new();
372        self.force_str(&mut s).unwrap();
373        s
374    }
375
376    pub fn access_object_field(&self, field: &str) -> Option<Self> {
377        self.0.get(field).map(Self)
378    }
379
380    pub fn access_array_element(&self, idx: usize) -> Option<Self> {
381        self.0.get(idx).map(Self)
382    }
383
384    /// Returns an iterator over the elements if this is an array.
385    pub fn array_elements(self) -> Result<impl Iterator<Item = JsonbRef<'a>>, String> {
386        let array = self
387            .0
388            .as_array()
389            .ok_or_else(|| format!("cannot extract elements from a jsonb {}", self.type_name()))?;
390        Ok(array.iter().map(Self))
391    }
392
393    /// Returns an iterator over the keys if this is an object.
394    pub fn object_keys(self) -> Result<impl Iterator<Item = &'a str>, String> {
395        let object = self.0.as_object().ok_or_else(|| {
396            format!(
397                "cannot call jsonb_object_keys on a jsonb {}",
398                self.type_name()
399            )
400        })?;
401        Ok(object.keys())
402    }
403
404    /// Returns an iterator over the key-value pairs if this is an object.
405    pub fn object_key_values(
406        self,
407    ) -> Result<impl Iterator<Item = (&'a str, JsonbRef<'a>)>, String> {
408        let object = self
409            .0
410            .as_object()
411            .ok_or_else(|| format!("cannot deconstruct a jsonb {}", self.type_name()))?;
412        Ok(object.iter().map(|(k, v)| (k, Self(v))))
413    }
414
415    /// Pretty print the jsonb value to the given writer, with 4 spaces indentation.
416    pub fn pretty(self, f: &mut impl std::fmt::Write) -> std::fmt::Result {
417        use serde::Serialize;
418        use serde_json::ser::{PrettyFormatter, Serializer};
419
420        let mut ser =
421            Serializer::with_formatter(FmtToIoUnchecked(f), PrettyFormatter::with_indent(b"    "));
422        self.0.serialize(&mut ser).map_err(|_| std::fmt::Error)
423    }
424
425    /// Convert the jsonb value to a datum.
426    pub fn to_datum(self, ty: &DataType) -> Result<Datum, String> {
427        if self.0.as_null().is_some() {
428            return Ok(None);
429        }
430        let datum = match ty {
431            DataType::Jsonb => ScalarImpl::Jsonb(self.into()),
432            DataType::List(t) => ScalarImpl::List(self.to_list(t)?),
433            DataType::Struct(s) => ScalarImpl::Struct(self.to_struct(s)?),
434            _ => {
435                let s = self.force_string();
436                ScalarImpl::from_text(&s, ty).map_err(|e| format!("{}", e.as_report()))?
437            }
438        };
439        Ok(Some(datum))
440    }
441
442    /// Convert the jsonb value to a list value.
443    pub fn to_list(self, elem_type: &DataType) -> Result<ListValue, String> {
444        let array = self
445            .0
446            .as_array()
447            .ok_or_else(|| format!("expected JSON array, but found {self}"))?;
448        let mut builder = elem_type.create_array_builder(array.len());
449        for v in array.iter() {
450            builder.append(Self(v).to_datum(elem_type)?);
451        }
452        Ok(ListValue::new(builder.finish()))
453    }
454
455    /// Convert the jsonb value to a struct value.
456    pub fn to_struct(self, ty: &StructType) -> Result<StructValue, String> {
457        let object = self.0.as_object().ok_or_else(|| {
458            format!(
459                "cannot call populate_composite on a jsonb {}",
460                self.type_name()
461            )
462        })?;
463        let mut fields = Vec::with_capacity(ty.len());
464        for (name, ty) in ty.iter() {
465            let datum = match object.get(name) {
466                Some(v) => Self(v).to_datum(ty)?,
467                None => None,
468            };
469            fields.push(datum);
470        }
471        Ok(StructValue::new(fields))
472    }
473
474    pub fn to_map(self, ty: &MapType) -> Result<MapValue, String> {
475        let object = self
476            .0
477            .as_object()
478            .ok_or_else(|| format!("cannot convert to map from a jsonb {}", self.type_name()))?;
479        if !matches!(ty.key(), DataType::Varchar) {
480            return Err("cannot convert jsonb to a map with non-string keys".to_owned());
481        }
482
483        let mut keys: Vec<Datum> = Vec::with_capacity(object.len());
484        let mut values: Vec<Datum> = Vec::with_capacity(object.len());
485        for (k, v) in object.iter() {
486            let v = Self(v).to_datum(ty.value())?;
487            keys.push(Some(ScalarImpl::Utf8(k.to_owned().into())));
488            values.push(v);
489        }
490        MapValue::try_from_kv(
491            ListValue::from_datum_iter(ty.key(), keys),
492            ListValue::from_datum_iter(ty.value(), values),
493        )
494    }
495
496    /// Expands the top-level JSON object to a row having the struct type of the `base` argument.
497    pub fn populate_struct(
498        self,
499        ty: &StructType,
500        base: Option<StructRef<'_>>,
501    ) -> Result<StructValue, String> {
502        let Some(base) = base else {
503            return self.to_struct(ty);
504        };
505        let object = self.0.as_object().ok_or_else(|| {
506            format!(
507                "cannot call populate_composite on a jsonb {}",
508                self.type_name()
509            )
510        })?;
511        let mut fields = Vec::with_capacity(ty.len());
512        for ((name, ty), base_field) in ty.iter().zip_eq_debug(base.iter_fields_ref()) {
513            let datum = match object.get(name) {
514                Some(v) => match ty {
515                    // recursively populate the nested struct
516                    DataType::Struct(s) => Some(
517                        Self(v)
518                            .populate_struct(s, base_field.map(|s| s.into_struct()))?
519                            .into(),
520                    ),
521                    _ => Self(v).to_datum(ty)?,
522                },
523                None => base_field.to_owned_datum(),
524            };
525            fields.push(datum);
526        }
527        Ok(StructValue::new(fields))
528    }
529
530    /// Returns the capacity of the underlying buffer.
531    pub fn capacity(self) -> usize {
532        self.0.capacity()
533    }
534}
535
536/// A custom implementation for [`serde_json::ser::Formatter`] to match PostgreSQL, which adds extra
537/// space after `,` and `:` in array and object.
538struct ToTextFormatter;
539
540impl serde_json::ser::Formatter for ToTextFormatter {
541    fn begin_array_value<W>(&mut self, writer: &mut W, first: bool) -> std::io::Result<()>
542    where
543        W: ?Sized + std::io::Write,
544    {
545        if first {
546            Ok(())
547        } else {
548            writer.write_all(b", ")
549        }
550    }
551
552    fn begin_object_key<W>(&mut self, writer: &mut W, first: bool) -> std::io::Result<()>
553    where
554        W: ?Sized + std::io::Write,
555    {
556        if first {
557            Ok(())
558        } else {
559            writer.write_all(b", ")
560        }
561    }
562
563    fn begin_object_value<W>(&mut self, writer: &mut W) -> std::io::Result<()>
564    where
565        W: ?Sized + std::io::Write,
566    {
567        writer.write_all(b": ")
568    }
569}
570
571/// A wrapper of [`std::fmt::Write`] to implement [`std::io::Write`].
572struct FmtToIoUnchecked<F>(F);
573
574impl<F: std::fmt::Write> std::io::Write for FmtToIoUnchecked<F> {
575    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
576        let s = unsafe { std::str::from_utf8_unchecked(buf) };
577        self.0.write_str(s).map_err(|_| std::io::ErrorKind::Other)?;
578        Ok(buf.len())
579    }
580
581    fn flush(&mut self) -> std::io::Result<()> {
582        Ok(())
583    }
584}
585
586impl ToSql for JsonbVal {
587    accepts!(JSON, JSONB);
588
589    to_sql_checked!();
590
591    fn to_sql(
592        &self,
593        ty: &Type,
594        out: &mut BytesMut,
595    ) -> Result<IsNull, Box<dyn std::error::Error + Sync + Send>>
596    where
597        Self: Sized,
598    {
599        if matches!(*ty, Type::JSONB) {
600            out.put_u8(1);
601        }
602        write!(out, "{}", self.0).unwrap();
603        Ok(IsNull::No)
604    }
605}
606
607impl<'a> FromSql<'a> for JsonbVal {
608    accepts!(JSON, JSONB);
609
610    fn from_sql(
611        ty: &Type,
612        mut raw: &'a [u8],
613    ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
614        Ok(match *ty {
615            // Here we allow mapping JSON of pg to JSONB of rw. But please note the JSONB and JSON have different behaviors in postgres.
616            // An example of different semantics for duplicated keys in an object:
617            // test=# select jsonb_each('{"foo": 1, "bar": 2, "foo": 3}');
618            //  jsonb_each
619            //  ------------
620            //   (bar,2)
621            //   (foo,3)
622            //  (2 rows)
623            // test=# select json_each('{"foo": 1, "bar": 2, "foo": 3}');
624            //   json_each
625            //  -----------
626            //   (foo,1)
627            //   (bar,2)
628            //   (foo,3)
629            //  (3 rows)
630            Type::JSON => JsonbVal::from(Value::from_text(raw)?),
631            Type::JSONB => {
632                if raw.is_empty() || raw.get_u8() != 1 {
633                    return Err("invalid jsonb encoding".into());
634                }
635                JsonbVal::from(Value::from_text(raw)?)
636            }
637            _ => {
638                bail_not_implemented!("the JsonbVal's postgres decoding for {ty} is unsupported")
639            }
640        })
641    }
642}
643
644impl ToSql for JsonbRef<'_> {
645    accepts!(JSON, JSONB);
646
647    to_sql_checked!();
648
649    fn to_sql(
650        &self,
651        ty: &Type,
652        out: &mut BytesMut,
653    ) -> Result<IsNull, Box<dyn std::error::Error + Sync + Send>>
654    where
655        Self: Sized,
656    {
657        if matches!(*ty, Type::JSONB) {
658            out.put_u8(1);
659        }
660        write!(out, "{}", self.0).unwrap();
661        Ok(IsNull::No)
662    }
663}