1use std::fmt::{self, Write};
16use std::hash::Hash;
17
18use bytes::{Buf, BufMut, BytesMut};
19use jsonbb::{Value, ValueRef};
20use postgres_types::{FromSql, IsNull, ToSql, Type, accepts, to_sql_checked};
21use risingwave_common_estimate_size::EstimateSize;
22use thiserror_ext::AsReport;
23
24use super::{
25 Datum, F64, IntoOrdered, ListValue, MapType, MapValue, ScalarImpl, StructRef, ToOwnedDatum,
26};
27use crate::types::{DataType, Scalar, ScalarRef, StructType, StructValue};
28use crate::util::iter_util::ZipEqDebug;
29
30#[derive(Debug, Clone, PartialEq, Eq, Hash)]
31pub struct JsonbVal(pub(crate) Value);
32
33#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
34pub struct JsonbRef<'a>(pub(crate) ValueRef<'a>);
35
36impl EstimateSize for JsonbVal {
37 fn estimated_heap_size(&self) -> usize {
38 self.0.capacity()
39 }
40}
41
42impl fmt::Display for JsonbVal {
45 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
46 crate::types::to_text::ToText::write(&self.as_scalar_ref(), f)
47 }
48}
49
50impl fmt::Display for JsonbRef<'_> {
53 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
54 crate::types::to_text::ToText::write(self, f)
55 }
56}
57
58impl Scalar for JsonbVal {
59 type ScalarRefType<'a> = JsonbRef<'a>;
60
61 fn as_scalar_ref(&self) -> Self::ScalarRefType<'_> {
62 JsonbRef(self.0.as_ref())
63 }
64}
65
66impl<'a> ScalarRef<'a> for JsonbRef<'a> {
67 type ScalarType = JsonbVal;
68
69 fn to_owned_scalar(&self) -> Self::ScalarType {
70 JsonbVal(self.0.into())
71 }
72
73 fn hash_scalar<H: std::hash::Hasher>(&self, state: &mut H) {
74 self.hash(state)
75 }
76}
77
78impl PartialOrd for JsonbVal {
79 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
80 Some(self.cmp(other))
81 }
82}
83
84impl Ord for JsonbVal {
85 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
86 self.as_scalar_ref().cmp(&other.as_scalar_ref())
87 }
88}
89
90impl PartialOrd for JsonbRef<'_> {
91 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
92 Some(self.cmp(other))
93 }
94}
95
96impl Ord for JsonbRef<'_> {
97 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
98 self.0.to_string().cmp(&other.0.to_string())
111 }
112}
113
114impl crate::types::to_text::ToText for JsonbRef<'_> {
115 fn write<W: std::fmt::Write>(&self, f: &mut W) -> std::fmt::Result {
116 use serde::Serialize as _;
119 let mut ser =
120 serde_json::ser::Serializer::with_formatter(FmtToIoUnchecked(f), ToTextFormatter);
121 self.0.serialize(&mut ser).map_err(|_| std::fmt::Error)
122 }
123
124 fn write_with_type<W: std::fmt::Write>(
125 &self,
126 _ty: &crate::types::DataType,
127 f: &mut W,
128 ) -> std::fmt::Result {
129 self.write(f)
130 }
131}
132
133impl crate::types::to_binary::ToBinary for JsonbRef<'_> {
134 fn to_binary_with_type(
135 &self,
136 _ty: &crate::types::DataType,
137 ) -> super::to_binary::Result<bytes::Bytes> {
138 Ok(self.value_serialize().into())
139 }
140}
141
142impl std::str::FromStr for JsonbVal {
143 type Err = <Value as std::str::FromStr>::Err;
144
145 fn from_str(s: &str) -> Result<Self, Self::Err> {
146 Ok(Self(s.parse()?))
147 }
148}
149
150impl JsonbVal {
151 pub fn null() -> Self {
153 Self(Value::null())
154 }
155
156 pub fn empty_array() -> Self {
158 Self(Value::array([]))
159 }
160
161 pub fn empty_object() -> Self {
163 Self(Value::object([]))
164 }
165
166 pub fn memcmp_deserialize(
168 deserializer: &mut memcomparable::Deserializer<impl bytes::Buf>,
169 ) -> memcomparable::Result<Self> {
170 let v = <String as serde::Deserialize>::deserialize(deserializer)?
171 .parse()
172 .map_err(|_| memcomparable::Error::Message("invalid json".into()))?;
173 Ok(Self(v))
174 }
175
176 pub fn value_deserialize(mut buf: &[u8]) -> Option<Self> {
178 if buf.is_empty() || buf.get_u8() != 1 {
179 return None;
180 }
181 Value::from_text(buf).ok().map(Self)
182 }
183
184 pub fn take(self) -> serde_json::Value {
186 self.0.into()
187 }
188}
189
190impl From<serde_json::Value> for JsonbVal {
191 fn from(v: serde_json::Value) -> Self {
192 Self(v.into())
193 }
194}
195
196impl From<Value> for JsonbVal {
197 fn from(v: Value) -> Self {
198 Self(v)
199 }
200}
201
202impl From<JsonbRef<'_>> for JsonbVal {
203 fn from(v: JsonbRef<'_>) -> Self {
204 Self(v.0.to_owned())
205 }
206}
207
208impl From<f64> for JsonbVal {
209 fn from(v: f64) -> Self {
210 Self(v.into())
211 }
212}
213
214impl<'a> From<JsonbRef<'a>> for ValueRef<'a> {
215 fn from(v: JsonbRef<'a>) -> Self {
216 v.0
217 }
218}
219
220impl<'a> JsonbRef<'a> {
221 pub fn memcmp_serialize(
222 &self,
223 serializer: &mut memcomparable::Serializer<impl bytes::BufMut>,
224 ) -> memcomparable::Result<()> {
225 let s = self.0.to_string();
228 serde::Serialize::serialize(&s, serializer)
229 }
230
231 pub fn value_serialize(&self) -> Vec<u8> {
233 use std::io::Write;
234 let mut buf = Vec::with_capacity(self.0.capacity());
239 buf.push(1);
240 write!(&mut buf, "{}", self.0).unwrap();
241 buf
242 }
243
244 pub const fn null() -> Self {
246 Self(ValueRef::Null)
247 }
248
249 pub const fn empty_string() -> Self {
251 Self(ValueRef::String(""))
252 }
253
254 pub fn is_jsonb_null(&self) -> bool {
256 self.0.is_null()
257 }
258
259 pub fn is_scalar(&self) -> bool {
261 matches!(
262 self.0,
263 ValueRef::Null | ValueRef::Bool(_) | ValueRef::Number(_) | ValueRef::String(_)
264 )
265 }
266
267 pub fn is_array(&self) -> bool {
269 self.0.is_array()
270 }
271
272 pub fn is_object(&self) -> bool {
274 self.0.is_object()
275 }
276
277 pub fn type_name(&self) -> &'static str {
281 match self.0 {
282 ValueRef::Null => "null",
283 ValueRef::Bool(_) => "boolean",
284 ValueRef::Number(_) => "number",
285 ValueRef::String(_) => "string",
286 ValueRef::Array(_) => "array",
287 ValueRef::Object(_) => "object",
288 }
289 }
290
291 pub fn array_len(&self) -> Result<usize, String> {
293 let array = self
294 .0
295 .as_array()
296 .ok_or_else(|| format!("cannot get array length of a jsonb {}", self.type_name()))?;
297 Ok(array.len())
298 }
299
300 pub fn as_bool(&self) -> Result<bool, String> {
302 self.0
303 .as_bool()
304 .ok_or_else(|| format!("cannot cast jsonb {} to type boolean", self.type_name()))
305 }
306
307 pub fn as_string(&self) -> Result<String, String> {
309 self.0
310 .as_str()
311 .map(|s| s.to_owned())
312 .ok_or_else(|| format!("cannot cast jsonb {} to type string", self.type_name()))
313 }
314
315 pub fn as_str(&self) -> Result<&str, String> {
317 self.0
318 .as_str()
319 .ok_or_else(|| format!("cannot cast jsonb {} to type &str", self.type_name()))
320 }
321
322 pub fn as_number(&self) -> Result<F64, String> {
327 self.0
328 .as_number()
329 .ok_or_else(|| format!("cannot cast jsonb {} to type number", self.type_name()))?
330 .as_f64()
331 .map(|f| f.into_ordered())
332 .ok_or_else(|| "jsonb number out of range".into())
333 }
334
335 pub fn force_str<W: std::fmt::Write>(&self, writer: &mut W) -> std::fmt::Result {
345 match self.0 {
346 ValueRef::String(v) => writer.write_str(v),
347 ValueRef::Null => Ok(()),
348 ValueRef::Bool(_) | ValueRef::Number(_) | ValueRef::Array(_) | ValueRef::Object(_) => {
349 use crate::types::to_text::ToText as _;
350 self.write_with_type(&crate::types::DataType::Jsonb, writer)
351 }
352 }
353 }
354
355 pub fn force_string(&self) -> String {
356 let mut s = String::new();
357 self.force_str(&mut s).unwrap();
358 s
359 }
360
361 pub fn access_object_field(&self, field: &str) -> Option<Self> {
362 self.0.get(field).map(Self)
363 }
364
365 pub fn access_array_element(&self, idx: usize) -> Option<Self> {
366 self.0.get(idx).map(Self)
367 }
368
369 pub fn array_elements(self) -> Result<impl Iterator<Item = JsonbRef<'a>>, String> {
371 let array = self
372 .0
373 .as_array()
374 .ok_or_else(|| format!("cannot extract elements from a jsonb {}", self.type_name()))?;
375 Ok(array.iter().map(Self))
376 }
377
378 pub fn object_keys(self) -> Result<impl Iterator<Item = &'a str>, String> {
380 let object = self.0.as_object().ok_or_else(|| {
381 format!(
382 "cannot call jsonb_object_keys on a jsonb {}",
383 self.type_name()
384 )
385 })?;
386 Ok(object.keys())
387 }
388
389 pub fn object_key_values(
391 self,
392 ) -> Result<impl Iterator<Item = (&'a str, JsonbRef<'a>)>, String> {
393 let object = self
394 .0
395 .as_object()
396 .ok_or_else(|| format!("cannot deconstruct a jsonb {}", self.type_name()))?;
397 Ok(object.iter().map(|(k, v)| (k, Self(v))))
398 }
399
400 pub fn pretty(self, f: &mut impl std::fmt::Write) -> std::fmt::Result {
402 use serde::Serialize;
403 use serde_json::ser::{PrettyFormatter, Serializer};
404
405 let mut ser =
406 Serializer::with_formatter(FmtToIoUnchecked(f), PrettyFormatter::with_indent(b" "));
407 self.0.serialize(&mut ser).map_err(|_| std::fmt::Error)
408 }
409
410 pub fn to_datum(self, ty: &DataType) -> Result<Datum, String> {
412 if self.0.as_null().is_some() {
413 return Ok(None);
414 }
415 let datum = match ty {
416 DataType::Jsonb => ScalarImpl::Jsonb(self.into()),
417 DataType::List(t) => ScalarImpl::List(self.to_list(t)?),
418 DataType::Struct(s) => ScalarImpl::Struct(self.to_struct(s)?),
419 _ => {
420 let s = self.force_string();
421 ScalarImpl::from_text(&s, ty).map_err(|e| format!("{}", e.as_report()))?
422 }
423 };
424 Ok(Some(datum))
425 }
426
427 pub fn to_list(self, elem_type: &DataType) -> Result<ListValue, String> {
429 let array = self
430 .0
431 .as_array()
432 .ok_or_else(|| format!("expected JSON array, but found {self}"))?;
433 let mut builder = elem_type.create_array_builder(array.len());
434 for v in array.iter() {
435 builder.append(Self(v).to_datum(elem_type)?);
436 }
437 Ok(ListValue::new(builder.finish()))
438 }
439
440 pub fn to_struct(self, ty: &StructType) -> Result<StructValue, String> {
442 let object = self.0.as_object().ok_or_else(|| {
443 format!(
444 "cannot call populate_composite on a jsonb {}",
445 self.type_name()
446 )
447 })?;
448 let mut fields = Vec::with_capacity(ty.len());
449 for (name, ty) in ty.iter() {
450 let datum = match object.get(name) {
451 Some(v) => Self(v).to_datum(ty)?,
452 None => None,
453 };
454 fields.push(datum);
455 }
456 Ok(StructValue::new(fields))
457 }
458
459 pub fn to_map(self, ty: &MapType) -> Result<MapValue, String> {
460 let object = self
461 .0
462 .as_object()
463 .ok_or_else(|| format!("cannot convert to map from a jsonb {}", self.type_name()))?;
464 if !matches!(ty.key(), DataType::Varchar) {
465 return Err("cannot convert jsonb to a map with non-string keys".to_owned());
466 }
467
468 let mut keys: Vec<Datum> = Vec::with_capacity(object.len());
469 let mut values: Vec<Datum> = Vec::with_capacity(object.len());
470 for (k, v) in object.iter() {
471 let v = Self(v).to_datum(ty.value())?;
472 keys.push(Some(ScalarImpl::Utf8(k.to_owned().into())));
473 values.push(v);
474 }
475 MapValue::try_from_kv(
476 ListValue::from_datum_iter(ty.key(), keys),
477 ListValue::from_datum_iter(ty.value(), values),
478 )
479 }
480
481 pub fn populate_struct(
483 self,
484 ty: &StructType,
485 base: Option<StructRef<'_>>,
486 ) -> Result<StructValue, String> {
487 let Some(base) = base else {
488 return self.to_struct(ty);
489 };
490 let object = self.0.as_object().ok_or_else(|| {
491 format!(
492 "cannot call populate_composite on a jsonb {}",
493 self.type_name()
494 )
495 })?;
496 let mut fields = Vec::with_capacity(ty.len());
497 for ((name, ty), base_field) in ty.iter().zip_eq_debug(base.iter_fields_ref()) {
498 let datum = match object.get(name) {
499 Some(v) => match ty {
500 DataType::Struct(s) => Some(
502 Self(v)
503 .populate_struct(s, base_field.map(|s| s.into_struct()))?
504 .into(),
505 ),
506 _ => Self(v).to_datum(ty)?,
507 },
508 None => base_field.to_owned_datum(),
509 };
510 fields.push(datum);
511 }
512 Ok(StructValue::new(fields))
513 }
514
515 pub fn capacity(self) -> usize {
517 self.0.capacity()
518 }
519}
520
521struct ToTextFormatter;
524
525impl serde_json::ser::Formatter for ToTextFormatter {
526 fn begin_array_value<W>(&mut self, writer: &mut W, first: bool) -> std::io::Result<()>
527 where
528 W: ?Sized + std::io::Write,
529 {
530 if first {
531 Ok(())
532 } else {
533 writer.write_all(b", ")
534 }
535 }
536
537 fn begin_object_key<W>(&mut self, writer: &mut W, first: bool) -> std::io::Result<()>
538 where
539 W: ?Sized + std::io::Write,
540 {
541 if first {
542 Ok(())
543 } else {
544 writer.write_all(b", ")
545 }
546 }
547
548 fn begin_object_value<W>(&mut self, writer: &mut W) -> std::io::Result<()>
549 where
550 W: ?Sized + std::io::Write,
551 {
552 writer.write_all(b": ")
553 }
554}
555
556struct FmtToIoUnchecked<F>(F);
558
559impl<F: std::fmt::Write> std::io::Write for FmtToIoUnchecked<F> {
560 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
561 let s = unsafe { std::str::from_utf8_unchecked(buf) };
562 self.0.write_str(s).map_err(|_| std::io::ErrorKind::Other)?;
563 Ok(buf.len())
564 }
565
566 fn flush(&mut self) -> std::io::Result<()> {
567 Ok(())
568 }
569}
570
571impl ToSql for JsonbVal {
572 accepts!(JSON, JSONB);
573
574 to_sql_checked!();
575
576 fn to_sql(
577 &self,
578 ty: &Type,
579 out: &mut BytesMut,
580 ) -> Result<IsNull, Box<dyn std::error::Error + Sync + Send>>
581 where
582 Self: Sized,
583 {
584 if matches!(*ty, Type::JSONB) {
585 out.put_u8(1);
586 }
587 write!(out, "{}", self.0).unwrap();
588 Ok(IsNull::No)
589 }
590}
591
592impl<'a> FromSql<'a> for JsonbVal {
593 accepts!(JSON, JSONB);
594
595 fn from_sql(
596 ty: &Type,
597 mut raw: &'a [u8],
598 ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
599 Ok(match *ty {
600 Type::JSON => JsonbVal::from(Value::from_text(raw)?),
616 Type::JSONB => {
617 if raw.is_empty() || raw.get_u8() != 1 {
618 return Err("invalid jsonb encoding".into());
619 }
620 JsonbVal::from(Value::from_text(raw)?)
621 }
622 _ => {
623 bail_not_implemented!("the JsonbVal's postgres decoding for {ty} is unsupported")
624 }
625 })
626 }
627}
628
629impl ToSql for JsonbRef<'_> {
630 accepts!(JSON, JSONB);
631
632 to_sql_checked!();
633
634 fn to_sql(
635 &self,
636 ty: &Type,
637 out: &mut BytesMut,
638 ) -> Result<IsNull, Box<dyn std::error::Error + Sync + Send>>
639 where
640 Self: Sized,
641 {
642 if matches!(*ty, Type::JSONB) {
643 out.put_u8(1);
644 }
645 write!(out, "{}", self.0).unwrap();
646 Ok(IsNull::No)
647 }
648}