risingwave_common/types/
jsonb.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt::{self, Write};
16use std::hash::Hash;
17
18use bytes::{Buf, BufMut, BytesMut};
19use jsonbb::{Value, ValueRef};
20use postgres_types::{FromSql, IsNull, ToSql, Type, accepts, to_sql_checked};
21use risingwave_common_estimate_size::EstimateSize;
22use thiserror_ext::AsReport;
23
24use super::{
25    Datum, F64, IntoOrdered, ListValue, MapType, MapValue, ScalarImpl, StructRef, ToOwnedDatum,
26};
27use crate::types::{
28    DEBEZIUM_UNAVAILABLE_JSON, DEBEZIUM_UNAVAILABLE_VALUE, DataType, ListType, Scalar, ScalarRef,
29    StructType, StructValue,
30};
31use crate::util::iter_util::ZipEqDebug;
32
33#[derive(Debug, Clone, PartialEq, Eq, Hash)]
34pub struct JsonbVal(pub(crate) Value);
35
36#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
37pub struct JsonbRef<'a>(pub(crate) ValueRef<'a>);
38
39impl EstimateSize for JsonbVal {
40    fn estimated_heap_size(&self) -> usize {
41        self.0.capacity()
42    }
43}
44
45/// The display of `JsonbVal` is pg-compatible format which has slightly different from
46/// `serde_json::Value`.
47impl fmt::Display for JsonbVal {
48    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
49        crate::types::to_text::ToText::write(&self.as_scalar_ref(), f)
50    }
51}
52
53/// The display of `JsonbRef` is pg-compatible format which has slightly different from
54/// `serde_json::Value`.
55impl fmt::Display for JsonbRef<'_> {
56    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
57        crate::types::to_text::ToText::write(self, f)
58    }
59}
60
61impl Scalar for JsonbVal {
62    type ScalarRefType<'a> = JsonbRef<'a>;
63
64    fn as_scalar_ref(&self) -> Self::ScalarRefType<'_> {
65        JsonbRef(self.0.as_ref())
66    }
67}
68
69impl<'a> ScalarRef<'a> for JsonbRef<'a> {
70    type ScalarType = JsonbVal;
71
72    fn to_owned_scalar(&self) -> Self::ScalarType {
73        JsonbVal(self.0.into())
74    }
75
76    fn hash_scalar<H: std::hash::Hasher>(&self, state: &mut H) {
77        self.hash(state)
78    }
79}
80
81impl PartialOrd for JsonbVal {
82    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
83        Some(self.cmp(other))
84    }
85}
86
87impl Ord for JsonbVal {
88    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
89        self.as_scalar_ref().cmp(&other.as_scalar_ref())
90    }
91}
92
93impl PartialOrd for JsonbRef<'_> {
94    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
95        Some(self.cmp(other))
96    }
97}
98
99impl Ord for JsonbRef<'_> {
100    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
101        // We do not intend to support ordering `jsonb` type.
102        // Before #7981 is done, we do not panic but just compare its string representation.
103        // Note that `serde_json` without feature `preserve_order` uses `BTreeMap` for json object.
104        // So its string form always have keys sorted.
105        //
106        // In PostgreSQL, Object > Array > Boolean > Number > String > Null.
107        // But here we have Object > true > Null > false > Array > Number > String.
108        // Because in ascii: `{` > `t` > `n` > `f` > `[` > `9` `-` > `"`.
109        //
110        // This is just to keep consistent with the memcomparable encoding, which uses string form.
111        // If we implemented the same typed comparison as PostgreSQL, we would need a corresponding
112        // memcomparable encoding for it.
113        self.0.to_string().cmp(&other.0.to_string())
114    }
115}
116
117impl crate::types::to_text::ToText for JsonbRef<'_> {
118    fn write<W: std::fmt::Write>(&self, f: &mut W) -> std::fmt::Result {
119        // Use custom [`ToTextFormatter`] to serialize. If we are okay with the default, this can be
120        // just `write!(f, "{}", self.0)`
121        use serde::Serialize as _;
122        let mut ser =
123            serde_json::ser::Serializer::with_formatter(FmtToIoUnchecked(f), ToTextFormatter);
124        self.0.serialize(&mut ser).map_err(|_| std::fmt::Error)
125    }
126
127    fn write_with_type<W: std::fmt::Write>(
128        &self,
129        _ty: &crate::types::DataType,
130        f: &mut W,
131    ) -> std::fmt::Result {
132        self.write(f)
133    }
134}
135
136impl crate::types::to_binary::ToBinary for JsonbRef<'_> {
137    fn to_binary_with_type(
138        &self,
139        _ty: &crate::types::DataType,
140    ) -> super::to_binary::Result<bytes::Bytes> {
141        Ok(self.value_serialize().into())
142    }
143}
144
145impl std::str::FromStr for JsonbVal {
146    type Err = <Value as std::str::FromStr>::Err;
147
148    fn from_str(s: &str) -> Result<Self, Self::Err> {
149        Ok(Self(s.parse()?))
150    }
151}
152
153impl JsonbVal {
154    /// Create a `JsonbVal` from a string, with special handling for Debezium's unavailable value placeholder.
155    /// Returns a Result to handle parsing errors properly.
156    pub fn from_debezium_unavailable_value(s: &str) -> Result<Self, serde_json::Error> {
157        // Special handling for Debezium's unavailable value placeholder
158        if s.len() == DEBEZIUM_UNAVAILABLE_VALUE.len() && s == DEBEZIUM_UNAVAILABLE_VALUE {
159            return Ok(DEBEZIUM_UNAVAILABLE_JSON.clone());
160        }
161        Ok(Self(s.parse()?))
162    }
163}
164
165impl JsonbVal {
166    /// Returns a jsonb `null`.
167    pub fn null() -> Self {
168        Self(Value::null())
169    }
170
171    /// Returns an empty array `[]`.
172    pub fn empty_array() -> Self {
173        Self(Value::array([]))
174    }
175
176    /// Returns an empty array `{}`.
177    pub fn empty_object() -> Self {
178        Self(Value::object([]))
179    }
180
181    /// Deserialize from a memcomparable encoding.
182    pub fn memcmp_deserialize(
183        deserializer: &mut memcomparable::Deserializer<impl bytes::Buf>,
184    ) -> memcomparable::Result<Self> {
185        let v = <String as serde::Deserialize>::deserialize(deserializer)?
186            .parse()
187            .map_err(|_| memcomparable::Error::Message("invalid json".into()))?;
188        Ok(Self(v))
189    }
190
191    /// Deserialize from a pgwire "BINARY" encoding.
192    pub fn value_deserialize(mut buf: &[u8]) -> Option<Self> {
193        if buf.is_empty() || buf.get_u8() != 1 {
194            return None;
195        }
196        Value::from_text(buf).ok().map(Self)
197    }
198
199    /// Convert the value to a [`serde_json::Value`].
200    pub fn take(self) -> serde_json::Value {
201        self.0.into()
202    }
203}
204
205impl From<serde_json::Value> for JsonbVal {
206    fn from(v: serde_json::Value) -> Self {
207        Self(v.into())
208    }
209}
210
211impl From<Value> for JsonbVal {
212    fn from(v: Value) -> Self {
213        Self(v)
214    }
215}
216
217impl From<JsonbRef<'_>> for JsonbVal {
218    fn from(v: JsonbRef<'_>) -> Self {
219        Self(v.0.to_owned())
220    }
221}
222
223impl From<f64> for JsonbVal {
224    fn from(v: f64) -> Self {
225        Self(v.into())
226    }
227}
228
229impl<'a> From<JsonbRef<'a>> for ValueRef<'a> {
230    fn from(v: JsonbRef<'a>) -> Self {
231        v.0
232    }
233}
234
235impl<'a> JsonbRef<'a> {
236    pub fn memcmp_serialize(
237        &self,
238        serializer: &mut memcomparable::Serializer<impl bytes::BufMut>,
239    ) -> memcomparable::Result<()> {
240        // As mentioned with `cmp`, this implementation is not intended to be used.
241        // But before #7981 is done, we do not want to `panic` here.
242        let s = self.0.to_string();
243        serde::Serialize::serialize(&s, serializer)
244    }
245
246    /// Serialize to a pgwire "BINARY" encoding.
247    pub fn value_serialize(&self) -> Vec<u8> {
248        use std::io::Write;
249        // Reuse the pgwire "BINARY" encoding for jsonb type.
250        // It is not truly binary, but one byte of version `1u8` followed by string form.
251        // This version number helps us maintain compatibility when we switch to more efficient
252        // encoding later.
253        let mut buf = Vec::with_capacity(self.0.capacity());
254        buf.push(1);
255        write!(&mut buf, "{}", self.0).unwrap();
256        buf
257    }
258
259    /// Returns a jsonb `null` value.
260    pub const fn null() -> Self {
261        Self(ValueRef::Null)
262    }
263
264    /// Returns a value for empty string.
265    pub const fn empty_string() -> Self {
266        Self(ValueRef::String(""))
267    }
268
269    /// Returns true if this is a jsonb `null`.
270    pub fn is_jsonb_null(&self) -> bool {
271        self.0.is_null()
272    }
273
274    /// Returns true if this is a jsonb null, boolean, number or string.
275    pub fn is_scalar(&self) -> bool {
276        matches!(
277            self.0,
278            ValueRef::Null | ValueRef::Bool(_) | ValueRef::Number(_) | ValueRef::String(_)
279        )
280    }
281
282    /// Returns true if this is a jsonb array.
283    pub fn is_array(&self) -> bool {
284        self.0.is_array()
285    }
286
287    /// Returns true if this is a jsonb object.
288    pub fn is_object(&self) -> bool {
289        self.0.is_object()
290    }
291
292    /// Returns the type name of this jsonb.
293    ///
294    /// Possible values are: `null`, `boolean`, `number`, `string`, `array`, `object`.
295    pub fn type_name(&self) -> &'static str {
296        match self.0 {
297            ValueRef::Null => "null",
298            ValueRef::Bool(_) => "boolean",
299            ValueRef::Number(_) => "number",
300            ValueRef::String(_) => "string",
301            ValueRef::Array(_) => "array",
302            ValueRef::Object(_) => "object",
303        }
304    }
305
306    /// Returns the length of this json array.
307    pub fn array_len(&self) -> Result<usize, String> {
308        let array = self
309            .0
310            .as_array()
311            .ok_or_else(|| format!("cannot get array length of a jsonb {}", self.type_name()))?;
312        Ok(array.len())
313    }
314
315    /// If the JSON is a boolean, returns the associated bool.
316    pub fn as_bool(&self) -> Result<bool, String> {
317        self.0
318            .as_bool()
319            .ok_or_else(|| format!("cannot cast jsonb {} to type boolean", self.type_name()))
320    }
321
322    /// If the JSON is a string, returns the associated string.
323    pub fn as_string(&self) -> Result<String, String> {
324        self.0
325            .as_str()
326            .map(|s| s.to_owned())
327            .ok_or_else(|| format!("cannot cast jsonb {} to type string", self.type_name()))
328    }
329
330    /// If the JSON is a string, returns the associated &str.
331    pub fn as_str(&self) -> Result<&str, String> {
332        self.0
333            .as_str()
334            .ok_or_else(|| format!("cannot cast jsonb {} to type &str", self.type_name()))
335    }
336
337    /// Attempt to read jsonb as a JSON number.
338    ///
339    /// According to RFC 8259, only number within IEEE 754 binary64 (double precision) has good
340    /// interoperability. We do not support arbitrary precision like PostgreSQL `numeric` right now.
341    pub fn as_number(&self) -> Result<F64, String> {
342        self.0
343            .as_number()
344            .ok_or_else(|| format!("cannot cast jsonb {} to type number", self.type_name()))?
345            .as_f64()
346            .map(|f| f.into_ordered())
347            .ok_or_else(|| "jsonb number out of range".into())
348    }
349
350    /// This is part of the `->>` or `#>>` syntax to access a child as string.
351    ///
352    /// * It is not `as_str`, because there is no runtime error when the jsonb type is not string.
353    /// * It is not same as [`std::fmt::Display`] or [`super::ToText`] (cast to string) in the
354    ///   following 2 cases:
355    ///   * Jsonb null is displayed as 4-letter `null` but treated as sql null here.
356    ///       * This function writes nothing and the caller is responsible for checking
357    ///         [`Self::is_jsonb_null`] to differentiate it from an empty string.
358    ///   * Jsonb string is displayed with quotes but treated as its inner value here.
359    pub fn force_str<W: std::fmt::Write>(&self, writer: &mut W) -> std::fmt::Result {
360        match self.0 {
361            ValueRef::String(v) => writer.write_str(v),
362            ValueRef::Null => Ok(()),
363            ValueRef::Bool(_) | ValueRef::Number(_) | ValueRef::Array(_) | ValueRef::Object(_) => {
364                use crate::types::to_text::ToText as _;
365                self.write_with_type(&crate::types::DataType::Jsonb, writer)
366            }
367        }
368    }
369
370    pub fn force_string(&self) -> String {
371        let mut s = String::new();
372        self.force_str(&mut s).unwrap();
373        s
374    }
375
376    pub fn access_object_field(&self, field: &str) -> Option<Self> {
377        self.0.get(field).map(Self)
378    }
379
380    pub fn access_array_element(&self, idx: usize) -> Option<Self> {
381        self.0.get(idx).map(Self)
382    }
383
384    /// Returns an iterator over the elements if this is an array.
385    pub fn array_elements(self) -> Result<impl Iterator<Item = JsonbRef<'a>>, String> {
386        let array = self
387            .0
388            .as_array()
389            .ok_or_else(|| format!("cannot extract elements from a jsonb {}", self.type_name()))?;
390        Ok(array.iter().map(Self))
391    }
392
393    /// Returns an iterator over the keys if this is an object.
394    pub fn object_keys(self) -> Result<impl Iterator<Item = &'a str>, String> {
395        let object = self.0.as_object().ok_or_else(|| {
396            format!(
397                "cannot call jsonb_object_keys on a jsonb {}",
398                self.type_name()
399            )
400        })?;
401        Ok(object.keys())
402    }
403
404    /// Returns an iterator over the key-value pairs if this is an object.
405    pub fn object_key_values(
406        self,
407    ) -> Result<impl Iterator<Item = (&'a str, JsonbRef<'a>)>, String> {
408        let object = self
409            .0
410            .as_object()
411            .ok_or_else(|| format!("cannot deconstruct a jsonb {}", self.type_name()))?;
412        Ok(object.iter().map(|(k, v)| (k, Self(v))))
413    }
414
415    /// Pretty print the jsonb value to the given writer, with 4 spaces indentation.
416    pub fn pretty(self, f: &mut impl std::fmt::Write) -> std::fmt::Result {
417        use serde::Serialize;
418        use serde_json::ser::{PrettyFormatter, Serializer};
419
420        let mut ser =
421            Serializer::with_formatter(FmtToIoUnchecked(f), PrettyFormatter::with_indent(b"    "));
422        self.0.serialize(&mut ser).map_err(|_| std::fmt::Error)
423    }
424
425    /// Convert the jsonb value to a datum.
426    pub fn to_datum(self, ty: &DataType) -> Result<Datum, String> {
427        if self.0.as_null().is_some() {
428            return Ok(None);
429        }
430        let datum = match ty {
431            DataType::Jsonb => ScalarImpl::Jsonb(self.into()),
432            DataType::List(l) => ScalarImpl::List(self.to_list(l)?),
433            DataType::Struct(s) => ScalarImpl::Struct(self.to_struct(s)?),
434            _ => {
435                let s = self.force_string();
436                ScalarImpl::from_text(&s, ty).map_err(|e| format!("{}", e.as_report()))?
437            }
438        };
439        Ok(Some(datum))
440    }
441
442    /// Convert the jsonb value to a list value.
443    pub fn to_list(self, ty: &ListType) -> Result<ListValue, String> {
444        let elem_type = ty.elem();
445        let array = self
446            .0
447            .as_array()
448            .ok_or_else(|| format!("expected JSON array, but found {self}"))?;
449        let mut builder = elem_type.create_array_builder(array.len());
450        for v in array.iter() {
451            builder.append(Self(v).to_datum(elem_type)?);
452        }
453        Ok(ListValue::new(builder.finish()))
454    }
455
456    /// Convert the jsonb value to a struct value.
457    pub fn to_struct(self, ty: &StructType) -> Result<StructValue, String> {
458        let object = self.0.as_object().ok_or_else(|| {
459            format!(
460                "cannot call populate_composite on a jsonb {}",
461                self.type_name()
462            )
463        })?;
464        let mut fields = Vec::with_capacity(ty.len());
465        for (name, ty) in ty.iter() {
466            let datum = match object.get(name) {
467                Some(v) => Self(v).to_datum(ty)?,
468                None => None,
469            };
470            fields.push(datum);
471        }
472        Ok(StructValue::new(fields))
473    }
474
475    pub fn to_map(self, ty: &MapType) -> Result<MapValue, String> {
476        let object = self
477            .0
478            .as_object()
479            .ok_or_else(|| format!("cannot convert to map from a jsonb {}", self.type_name()))?;
480        if !matches!(ty.key(), DataType::Varchar) {
481            return Err("cannot convert jsonb to a map with non-string keys".to_owned());
482        }
483
484        let mut keys: Vec<Datum> = Vec::with_capacity(object.len());
485        let mut values: Vec<Datum> = Vec::with_capacity(object.len());
486        for (k, v) in object.iter() {
487            let v = Self(v).to_datum(ty.value())?;
488            keys.push(Some(ScalarImpl::Utf8(k.to_owned().into())));
489            values.push(v);
490        }
491        MapValue::try_from_kv(
492            ListValue::from_datum_iter(ty.key(), keys),
493            ListValue::from_datum_iter(ty.value(), values),
494        )
495    }
496
497    /// Expands the top-level JSON object to a row having the struct type of the `base` argument.
498    pub fn populate_struct(
499        self,
500        ty: &StructType,
501        base: Option<StructRef<'_>>,
502    ) -> Result<StructValue, String> {
503        let Some(base) = base else {
504            return self.to_struct(ty);
505        };
506        let object = self.0.as_object().ok_or_else(|| {
507            format!(
508                "cannot call populate_composite on a jsonb {}",
509                self.type_name()
510            )
511        })?;
512        let mut fields = Vec::with_capacity(ty.len());
513        for ((name, ty), base_field) in ty.iter().zip_eq_debug(base.iter_fields_ref()) {
514            let datum = match object.get(name) {
515                Some(v) => match ty {
516                    // recursively populate the nested struct
517                    DataType::Struct(s) => Some(
518                        Self(v)
519                            .populate_struct(s, base_field.map(|s| s.into_struct()))?
520                            .into(),
521                    ),
522                    _ => Self(v).to_datum(ty)?,
523                },
524                None => base_field.to_owned_datum(),
525            };
526            fields.push(datum);
527        }
528        Ok(StructValue::new(fields))
529    }
530
531    /// Returns the capacity of the underlying buffer.
532    pub fn capacity(self) -> usize {
533        self.0.capacity()
534    }
535}
536
537/// A custom implementation for [`serde_json::ser::Formatter`] to match PostgreSQL, which adds extra
538/// space after `,` and `:` in array and object.
539struct ToTextFormatter;
540
541impl serde_json::ser::Formatter for ToTextFormatter {
542    fn begin_array_value<W>(&mut self, writer: &mut W, first: bool) -> std::io::Result<()>
543    where
544        W: ?Sized + std::io::Write,
545    {
546        if first {
547            Ok(())
548        } else {
549            writer.write_all(b", ")
550        }
551    }
552
553    fn begin_object_key<W>(&mut self, writer: &mut W, first: bool) -> std::io::Result<()>
554    where
555        W: ?Sized + std::io::Write,
556    {
557        if first {
558            Ok(())
559        } else {
560            writer.write_all(b", ")
561        }
562    }
563
564    fn begin_object_value<W>(&mut self, writer: &mut W) -> std::io::Result<()>
565    where
566        W: ?Sized + std::io::Write,
567    {
568        writer.write_all(b": ")
569    }
570}
571
572/// A wrapper of [`std::fmt::Write`] to implement [`std::io::Write`].
573struct FmtToIoUnchecked<F>(F);
574
575impl<F: std::fmt::Write> std::io::Write for FmtToIoUnchecked<F> {
576    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
577        let s = unsafe { std::str::from_utf8_unchecked(buf) };
578        self.0.write_str(s).map_err(|_| std::io::ErrorKind::Other)?;
579        Ok(buf.len())
580    }
581
582    fn flush(&mut self) -> std::io::Result<()> {
583        Ok(())
584    }
585}
586
587impl ToSql for JsonbVal {
588    accepts!(JSON, JSONB);
589
590    to_sql_checked!();
591
592    fn to_sql(
593        &self,
594        ty: &Type,
595        out: &mut BytesMut,
596    ) -> Result<IsNull, Box<dyn std::error::Error + Sync + Send>>
597    where
598        Self: Sized,
599    {
600        if matches!(*ty, Type::JSONB) {
601            out.put_u8(1);
602        }
603        write!(out, "{}", self.0).unwrap();
604        Ok(IsNull::No)
605    }
606}
607
608impl<'a> FromSql<'a> for JsonbVal {
609    accepts!(JSON, JSONB);
610
611    fn from_sql(
612        ty: &Type,
613        mut raw: &'a [u8],
614    ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
615        Ok(match *ty {
616            // Here we allow mapping JSON of pg to JSONB of rw. But please note the JSONB and JSON have different behaviors in postgres.
617            // An example of different semantics for duplicated keys in an object:
618            // test=# select jsonb_each('{"foo": 1, "bar": 2, "foo": 3}');
619            //  jsonb_each
620            //  ------------
621            //   (bar,2)
622            //   (foo,3)
623            //  (2 rows)
624            // test=# select json_each('{"foo": 1, "bar": 2, "foo": 3}');
625            //   json_each
626            //  -----------
627            //   (foo,1)
628            //   (bar,2)
629            //   (foo,3)
630            //  (3 rows)
631            Type::JSON => JsonbVal::from(Value::from_text(raw)?),
632            Type::JSONB => {
633                if raw.is_empty() || raw.get_u8() != 1 {
634                    return Err("invalid jsonb encoding".into());
635                }
636                JsonbVal::from(Value::from_text(raw)?)
637            }
638            _ => {
639                bail_not_implemented!("the JsonbVal's postgres decoding for {ty} is unsupported")
640            }
641        })
642    }
643}
644
645impl ToSql for JsonbRef<'_> {
646    accepts!(JSON, JSONB);
647
648    to_sql_checked!();
649
650    fn to_sql(
651        &self,
652        ty: &Type,
653        out: &mut BytesMut,
654    ) -> Result<IsNull, Box<dyn std::error::Error + Sync + Send>>
655    where
656        Self: Sized,
657    {
658        if matches!(*ty, Type::JSONB) {
659            out.put_u8(1);
660        }
661        write!(out, "{}", self.0).unwrap();
662        Ok(IsNull::No)
663    }
664}