risingwave_common/types/
jsonb.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt::{self, Write};
16use std::hash::Hash;
17
18use bytes::{Buf, BufMut, BytesMut};
19use jsonbb::{Value, ValueRef};
20use postgres_types::{FromSql, IsNull, ToSql, Type, accepts, to_sql_checked};
21use risingwave_common_estimate_size::EstimateSize;
22use thiserror_ext::AsReport;
23
24use super::{
25    Datum, F64, IntoOrdered, ListValue, MapType, MapValue, ScalarImpl, StructRef, ToOwnedDatum,
26};
27use crate::types::{DataType, Scalar, ScalarRef, StructType, StructValue};
28use crate::util::iter_util::ZipEqDebug;
29
30#[derive(Debug, Clone, PartialEq, Eq, Hash)]
31pub struct JsonbVal(pub(crate) Value);
32
33#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
34pub struct JsonbRef<'a>(pub(crate) ValueRef<'a>);
35
36impl EstimateSize for JsonbVal {
37    fn estimated_heap_size(&self) -> usize {
38        self.0.capacity()
39    }
40}
41
42/// The display of `JsonbVal` is pg-compatible format which has slightly different from
43/// `serde_json::Value`.
44impl fmt::Display for JsonbVal {
45    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
46        crate::types::to_text::ToText::write(&self.as_scalar_ref(), f)
47    }
48}
49
50/// The display of `JsonbRef` is pg-compatible format which has slightly different from
51/// `serde_json::Value`.
52impl fmt::Display for JsonbRef<'_> {
53    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
54        crate::types::to_text::ToText::write(self, f)
55    }
56}
57
58impl Scalar for JsonbVal {
59    type ScalarRefType<'a> = JsonbRef<'a>;
60
61    fn as_scalar_ref(&self) -> Self::ScalarRefType<'_> {
62        JsonbRef(self.0.as_ref())
63    }
64}
65
66impl<'a> ScalarRef<'a> for JsonbRef<'a> {
67    type ScalarType = JsonbVal;
68
69    fn to_owned_scalar(&self) -> Self::ScalarType {
70        JsonbVal(self.0.into())
71    }
72
73    fn hash_scalar<H: std::hash::Hasher>(&self, state: &mut H) {
74        self.hash(state)
75    }
76}
77
78impl PartialOrd for JsonbVal {
79    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
80        Some(self.cmp(other))
81    }
82}
83
84impl Ord for JsonbVal {
85    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
86        self.as_scalar_ref().cmp(&other.as_scalar_ref())
87    }
88}
89
90impl PartialOrd for JsonbRef<'_> {
91    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
92        Some(self.cmp(other))
93    }
94}
95
96impl Ord for JsonbRef<'_> {
97    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
98        // We do not intend to support ordering `jsonb` type.
99        // Before #7981 is done, we do not panic but just compare its string representation.
100        // Note that `serde_json` without feature `preserve_order` uses `BTreeMap` for json object.
101        // So its string form always have keys sorted.
102        //
103        // In PostgreSQL, Object > Array > Boolean > Number > String > Null.
104        // But here we have Object > true > Null > false > Array > Number > String.
105        // Because in ascii: `{` > `t` > `n` > `f` > `[` > `9` `-` > `"`.
106        //
107        // This is just to keep consistent with the memcomparable encoding, which uses string form.
108        // If we implemented the same typed comparison as PostgreSQL, we would need a corresponding
109        // memcomparable encoding for it.
110        self.0.to_string().cmp(&other.0.to_string())
111    }
112}
113
114impl crate::types::to_text::ToText for JsonbRef<'_> {
115    fn write<W: std::fmt::Write>(&self, f: &mut W) -> std::fmt::Result {
116        // Use custom [`ToTextFormatter`] to serialize. If we are okay with the default, this can be
117        // just `write!(f, "{}", self.0)`
118        use serde::Serialize as _;
119        let mut ser =
120            serde_json::ser::Serializer::with_formatter(FmtToIoUnchecked(f), ToTextFormatter);
121        self.0.serialize(&mut ser).map_err(|_| std::fmt::Error)
122    }
123
124    fn write_with_type<W: std::fmt::Write>(
125        &self,
126        _ty: &crate::types::DataType,
127        f: &mut W,
128    ) -> std::fmt::Result {
129        self.write(f)
130    }
131}
132
133impl crate::types::to_binary::ToBinary for JsonbRef<'_> {
134    fn to_binary_with_type(
135        &self,
136        _ty: &crate::types::DataType,
137    ) -> super::to_binary::Result<bytes::Bytes> {
138        Ok(self.value_serialize().into())
139    }
140}
141
142impl std::str::FromStr for JsonbVal {
143    type Err = <Value as std::str::FromStr>::Err;
144
145    fn from_str(s: &str) -> Result<Self, Self::Err> {
146        Ok(Self(s.parse()?))
147    }
148}
149
150impl JsonbVal {
151    /// Returns a jsonb `null`.
152    pub fn null() -> Self {
153        Self(Value::null())
154    }
155
156    /// Returns an empty array `[]`.
157    pub fn empty_array() -> Self {
158        Self(Value::array([]))
159    }
160
161    /// Returns an empty array `{}`.
162    pub fn empty_object() -> Self {
163        Self(Value::object([]))
164    }
165
166    /// Deserialize from a memcomparable encoding.
167    pub fn memcmp_deserialize(
168        deserializer: &mut memcomparable::Deserializer<impl bytes::Buf>,
169    ) -> memcomparable::Result<Self> {
170        let v = <String as serde::Deserialize>::deserialize(deserializer)?
171            .parse()
172            .map_err(|_| memcomparable::Error::Message("invalid json".into()))?;
173        Ok(Self(v))
174    }
175
176    /// Deserialize from a pgwire "BINARY" encoding.
177    pub fn value_deserialize(mut buf: &[u8]) -> Option<Self> {
178        if buf.is_empty() || buf.get_u8() != 1 {
179            return None;
180        }
181        Value::from_text(buf).ok().map(Self)
182    }
183
184    /// Convert the value to a [`serde_json::Value`].
185    pub fn take(self) -> serde_json::Value {
186        self.0.into()
187    }
188}
189
190impl From<serde_json::Value> for JsonbVal {
191    fn from(v: serde_json::Value) -> Self {
192        Self(v.into())
193    }
194}
195
196impl From<Value> for JsonbVal {
197    fn from(v: Value) -> Self {
198        Self(v)
199    }
200}
201
202impl From<JsonbRef<'_>> for JsonbVal {
203    fn from(v: JsonbRef<'_>) -> Self {
204        Self(v.0.to_owned())
205    }
206}
207
208impl From<f64> for JsonbVal {
209    fn from(v: f64) -> Self {
210        Self(v.into())
211    }
212}
213
214impl<'a> From<JsonbRef<'a>> for ValueRef<'a> {
215    fn from(v: JsonbRef<'a>) -> Self {
216        v.0
217    }
218}
219
220impl<'a> JsonbRef<'a> {
221    pub fn memcmp_serialize(
222        &self,
223        serializer: &mut memcomparable::Serializer<impl bytes::BufMut>,
224    ) -> memcomparable::Result<()> {
225        // As mentioned with `cmp`, this implementation is not intended to be used.
226        // But before #7981 is done, we do not want to `panic` here.
227        let s = self.0.to_string();
228        serde::Serialize::serialize(&s, serializer)
229    }
230
231    /// Serialize to a pgwire "BINARY" encoding.
232    pub fn value_serialize(&self) -> Vec<u8> {
233        use std::io::Write;
234        // Reuse the pgwire "BINARY" encoding for jsonb type.
235        // It is not truly binary, but one byte of version `1u8` followed by string form.
236        // This version number helps us maintain compatibility when we switch to more efficient
237        // encoding later.
238        let mut buf = Vec::with_capacity(self.0.capacity());
239        buf.push(1);
240        write!(&mut buf, "{}", self.0).unwrap();
241        buf
242    }
243
244    /// Returns a jsonb `null` value.
245    pub const fn null() -> Self {
246        Self(ValueRef::Null)
247    }
248
249    /// Returns a value for empty string.
250    pub const fn empty_string() -> Self {
251        Self(ValueRef::String(""))
252    }
253
254    /// Returns true if this is a jsonb `null`.
255    pub fn is_jsonb_null(&self) -> bool {
256        self.0.is_null()
257    }
258
259    /// Returns true if this is a jsonb null, boolean, number or string.
260    pub fn is_scalar(&self) -> bool {
261        matches!(
262            self.0,
263            ValueRef::Null | ValueRef::Bool(_) | ValueRef::Number(_) | ValueRef::String(_)
264        )
265    }
266
267    /// Returns true if this is a jsonb array.
268    pub fn is_array(&self) -> bool {
269        self.0.is_array()
270    }
271
272    /// Returns true if this is a jsonb object.
273    pub fn is_object(&self) -> bool {
274        self.0.is_object()
275    }
276
277    /// Returns the type name of this jsonb.
278    ///
279    /// Possible values are: `null`, `boolean`, `number`, `string`, `array`, `object`.
280    pub fn type_name(&self) -> &'static str {
281        match self.0 {
282            ValueRef::Null => "null",
283            ValueRef::Bool(_) => "boolean",
284            ValueRef::Number(_) => "number",
285            ValueRef::String(_) => "string",
286            ValueRef::Array(_) => "array",
287            ValueRef::Object(_) => "object",
288        }
289    }
290
291    /// Returns the length of this json array.
292    pub fn array_len(&self) -> Result<usize, String> {
293        let array = self
294            .0
295            .as_array()
296            .ok_or_else(|| format!("cannot get array length of a jsonb {}", self.type_name()))?;
297        Ok(array.len())
298    }
299
300    /// If the JSON is a boolean, returns the associated bool.
301    pub fn as_bool(&self) -> Result<bool, String> {
302        self.0
303            .as_bool()
304            .ok_or_else(|| format!("cannot cast jsonb {} to type boolean", self.type_name()))
305    }
306
307    /// If the JSON is a string, returns the associated string.
308    pub fn as_string(&self) -> Result<String, String> {
309        self.0
310            .as_str()
311            .map(|s| s.to_owned())
312            .ok_or_else(|| format!("cannot cast jsonb {} to type string", self.type_name()))
313    }
314
315    /// If the JSON is a string, returns the associated &str.
316    pub fn as_str(&self) -> Result<&str, String> {
317        self.0
318            .as_str()
319            .ok_or_else(|| format!("cannot cast jsonb {} to type &str", self.type_name()))
320    }
321
322    /// Attempt to read jsonb as a JSON number.
323    ///
324    /// According to RFC 8259, only number within IEEE 754 binary64 (double precision) has good
325    /// interoperability. We do not support arbitrary precision like PostgreSQL `numeric` right now.
326    pub fn as_number(&self) -> Result<F64, String> {
327        self.0
328            .as_number()
329            .ok_or_else(|| format!("cannot cast jsonb {} to type number", self.type_name()))?
330            .as_f64()
331            .map(|f| f.into_ordered())
332            .ok_or_else(|| "jsonb number out of range".into())
333    }
334
335    /// This is part of the `->>` or `#>>` syntax to access a child as string.
336    ///
337    /// * It is not `as_str`, because there is no runtime error when the jsonb type is not string.
338    /// * It is not same as [`std::fmt::Display`] or [`super::ToText`] (cast to string) in the
339    ///   following 2 cases:
340    ///   * Jsonb null is displayed as 4-letter `null` but treated as sql null here.
341    ///       * This function writes nothing and the caller is responsible for checking
342    ///         [`Self::is_jsonb_null`] to differentiate it from an empty string.
343    ///   * Jsonb string is displayed with quotes but treated as its inner value here.
344    pub fn force_str<W: std::fmt::Write>(&self, writer: &mut W) -> std::fmt::Result {
345        match self.0 {
346            ValueRef::String(v) => writer.write_str(v),
347            ValueRef::Null => Ok(()),
348            ValueRef::Bool(_) | ValueRef::Number(_) | ValueRef::Array(_) | ValueRef::Object(_) => {
349                use crate::types::to_text::ToText as _;
350                self.write_with_type(&crate::types::DataType::Jsonb, writer)
351            }
352        }
353    }
354
355    pub fn force_string(&self) -> String {
356        let mut s = String::new();
357        self.force_str(&mut s).unwrap();
358        s
359    }
360
361    pub fn access_object_field(&self, field: &str) -> Option<Self> {
362        self.0.get(field).map(Self)
363    }
364
365    pub fn access_array_element(&self, idx: usize) -> Option<Self> {
366        self.0.get(idx).map(Self)
367    }
368
369    /// Returns an iterator over the elements if this is an array.
370    pub fn array_elements(self) -> Result<impl Iterator<Item = JsonbRef<'a>>, String> {
371        let array = self
372            .0
373            .as_array()
374            .ok_or_else(|| format!("cannot extract elements from a jsonb {}", self.type_name()))?;
375        Ok(array.iter().map(Self))
376    }
377
378    /// Returns an iterator over the keys if this is an object.
379    pub fn object_keys(self) -> Result<impl Iterator<Item = &'a str>, String> {
380        let object = self.0.as_object().ok_or_else(|| {
381            format!(
382                "cannot call jsonb_object_keys on a jsonb {}",
383                self.type_name()
384            )
385        })?;
386        Ok(object.keys())
387    }
388
389    /// Returns an iterator over the key-value pairs if this is an object.
390    pub fn object_key_values(
391        self,
392    ) -> Result<impl Iterator<Item = (&'a str, JsonbRef<'a>)>, String> {
393        let object = self
394            .0
395            .as_object()
396            .ok_or_else(|| format!("cannot deconstruct a jsonb {}", self.type_name()))?;
397        Ok(object.iter().map(|(k, v)| (k, Self(v))))
398    }
399
400    /// Pretty print the jsonb value to the given writer, with 4 spaces indentation.
401    pub fn pretty(self, f: &mut impl std::fmt::Write) -> std::fmt::Result {
402        use serde::Serialize;
403        use serde_json::ser::{PrettyFormatter, Serializer};
404
405        let mut ser =
406            Serializer::with_formatter(FmtToIoUnchecked(f), PrettyFormatter::with_indent(b"    "));
407        self.0.serialize(&mut ser).map_err(|_| std::fmt::Error)
408    }
409
410    /// Convert the jsonb value to a datum.
411    pub fn to_datum(self, ty: &DataType) -> Result<Datum, String> {
412        if self.0.as_null().is_some() {
413            return Ok(None);
414        }
415        let datum = match ty {
416            DataType::Jsonb => ScalarImpl::Jsonb(self.into()),
417            DataType::List(t) => ScalarImpl::List(self.to_list(t)?),
418            DataType::Struct(s) => ScalarImpl::Struct(self.to_struct(s)?),
419            _ => {
420                let s = self.force_string();
421                ScalarImpl::from_text(&s, ty).map_err(|e| format!("{}", e.as_report()))?
422            }
423        };
424        Ok(Some(datum))
425    }
426
427    /// Convert the jsonb value to a list value.
428    pub fn to_list(self, elem_type: &DataType) -> Result<ListValue, String> {
429        let array = self
430            .0
431            .as_array()
432            .ok_or_else(|| format!("expected JSON array, but found {self}"))?;
433        let mut builder = elem_type.create_array_builder(array.len());
434        for v in array.iter() {
435            builder.append(Self(v).to_datum(elem_type)?);
436        }
437        Ok(ListValue::new(builder.finish()))
438    }
439
440    /// Convert the jsonb value to a struct value.
441    pub fn to_struct(self, ty: &StructType) -> Result<StructValue, String> {
442        let object = self.0.as_object().ok_or_else(|| {
443            format!(
444                "cannot call populate_composite on a jsonb {}",
445                self.type_name()
446            )
447        })?;
448        let mut fields = Vec::with_capacity(ty.len());
449        for (name, ty) in ty.iter() {
450            let datum = match object.get(name) {
451                Some(v) => Self(v).to_datum(ty)?,
452                None => None,
453            };
454            fields.push(datum);
455        }
456        Ok(StructValue::new(fields))
457    }
458
459    pub fn to_map(self, ty: &MapType) -> Result<MapValue, String> {
460        let object = self
461            .0
462            .as_object()
463            .ok_or_else(|| format!("cannot convert to map from a jsonb {}", self.type_name()))?;
464        if !matches!(ty.key(), DataType::Varchar) {
465            return Err("cannot convert jsonb to a map with non-string keys".to_owned());
466        }
467
468        let mut keys: Vec<Datum> = Vec::with_capacity(object.len());
469        let mut values: Vec<Datum> = Vec::with_capacity(object.len());
470        for (k, v) in object.iter() {
471            let v = Self(v).to_datum(ty.value())?;
472            keys.push(Some(ScalarImpl::Utf8(k.to_owned().into())));
473            values.push(v);
474        }
475        MapValue::try_from_kv(
476            ListValue::from_datum_iter(ty.key(), keys),
477            ListValue::from_datum_iter(ty.value(), values),
478        )
479    }
480
481    /// Expands the top-level JSON object to a row having the struct type of the `base` argument.
482    pub fn populate_struct(
483        self,
484        ty: &StructType,
485        base: Option<StructRef<'_>>,
486    ) -> Result<StructValue, String> {
487        let Some(base) = base else {
488            return self.to_struct(ty);
489        };
490        let object = self.0.as_object().ok_or_else(|| {
491            format!(
492                "cannot call populate_composite on a jsonb {}",
493                self.type_name()
494            )
495        })?;
496        let mut fields = Vec::with_capacity(ty.len());
497        for ((name, ty), base_field) in ty.iter().zip_eq_debug(base.iter_fields_ref()) {
498            let datum = match object.get(name) {
499                Some(v) => match ty {
500                    // recursively populate the nested struct
501                    DataType::Struct(s) => Some(
502                        Self(v)
503                            .populate_struct(s, base_field.map(|s| s.into_struct()))?
504                            .into(),
505                    ),
506                    _ => Self(v).to_datum(ty)?,
507                },
508                None => base_field.to_owned_datum(),
509            };
510            fields.push(datum);
511        }
512        Ok(StructValue::new(fields))
513    }
514
515    /// Returns the capacity of the underlying buffer.
516    pub fn capacity(self) -> usize {
517        self.0.capacity()
518    }
519}
520
521/// A custom implementation for [`serde_json::ser::Formatter`] to match PostgreSQL, which adds extra
522/// space after `,` and `:` in array and object.
523struct ToTextFormatter;
524
525impl serde_json::ser::Formatter for ToTextFormatter {
526    fn begin_array_value<W>(&mut self, writer: &mut W, first: bool) -> std::io::Result<()>
527    where
528        W: ?Sized + std::io::Write,
529    {
530        if first {
531            Ok(())
532        } else {
533            writer.write_all(b", ")
534        }
535    }
536
537    fn begin_object_key<W>(&mut self, writer: &mut W, first: bool) -> std::io::Result<()>
538    where
539        W: ?Sized + std::io::Write,
540    {
541        if first {
542            Ok(())
543        } else {
544            writer.write_all(b", ")
545        }
546    }
547
548    fn begin_object_value<W>(&mut self, writer: &mut W) -> std::io::Result<()>
549    where
550        W: ?Sized + std::io::Write,
551    {
552        writer.write_all(b": ")
553    }
554}
555
556/// A wrapper of [`std::fmt::Write`] to implement [`std::io::Write`].
557struct FmtToIoUnchecked<F>(F);
558
559impl<F: std::fmt::Write> std::io::Write for FmtToIoUnchecked<F> {
560    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
561        let s = unsafe { std::str::from_utf8_unchecked(buf) };
562        self.0.write_str(s).map_err(|_| std::io::ErrorKind::Other)?;
563        Ok(buf.len())
564    }
565
566    fn flush(&mut self) -> std::io::Result<()> {
567        Ok(())
568    }
569}
570
571impl ToSql for JsonbVal {
572    accepts!(JSON, JSONB);
573
574    to_sql_checked!();
575
576    fn to_sql(
577        &self,
578        ty: &Type,
579        out: &mut BytesMut,
580    ) -> Result<IsNull, Box<dyn std::error::Error + Sync + Send>>
581    where
582        Self: Sized,
583    {
584        if matches!(*ty, Type::JSONB) {
585            out.put_u8(1);
586        }
587        write!(out, "{}", self.0).unwrap();
588        Ok(IsNull::No)
589    }
590}
591
592impl<'a> FromSql<'a> for JsonbVal {
593    accepts!(JSON, JSONB);
594
595    fn from_sql(
596        ty: &Type,
597        mut raw: &'a [u8],
598    ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
599        Ok(match *ty {
600            // Here we allow mapping JSON of pg to JSONB of rw. But please note the JSONB and JSON have different behaviors in postgres.
601            // An example of different semantics for duplicated keys in an object:
602            // test=# select jsonb_each('{"foo": 1, "bar": 2, "foo": 3}');
603            //  jsonb_each
604            //  ------------
605            //   (bar,2)
606            //   (foo,3)
607            //  (2 rows)
608            // test=# select json_each('{"foo": 1, "bar": 2, "foo": 3}');
609            //   json_each
610            //  -----------
611            //   (foo,1)
612            //   (bar,2)
613            //   (foo,3)
614            //  (3 rows)
615            Type::JSON => JsonbVal::from(Value::from_text(raw)?),
616            Type::JSONB => {
617                if raw.is_empty() || raw.get_u8() != 1 {
618                    return Err("invalid jsonb encoding".into());
619                }
620                JsonbVal::from(Value::from_text(raw)?)
621            }
622            _ => {
623                bail_not_implemented!("the JsonbVal's postgres decoding for {ty} is unsupported")
624            }
625        })
626    }
627}
628
629impl ToSql for JsonbRef<'_> {
630    accepts!(JSON, JSONB);
631
632    to_sql_checked!();
633
634    fn to_sql(
635        &self,
636        ty: &Type,
637        out: &mut BytesMut,
638    ) -> Result<IsNull, Box<dyn std::error::Error + Sync + Send>>
639    where
640        Self: Sized,
641    {
642        if matches!(*ty, Type::JSONB) {
643            out.put_u8(1);
644        }
645        write!(out, "{}", self.0).unwrap();
646        Ok(IsNull::No)
647    }
648}