1use std::fmt::{self, Write};
16use std::hash::Hash;
17
18use bytes::{Buf, BufMut, BytesMut};
19use jsonbb::{Value, ValueRef};
20use postgres_types::{FromSql, IsNull, ToSql, Type, accepts, to_sql_checked};
21use risingwave_common_estimate_size::EstimateSize;
22use thiserror_ext::AsReport;
23
24use super::{
25 Datum, F64, IntoOrdered, ListValue, MapType, MapValue, ScalarImpl, StructRef, ToOwnedDatum,
26};
27use crate::types::{
28 DEBEZIUM_UNAVAILABLE_JSON, DEBEZIUM_UNAVAILABLE_VALUE, DataType, Scalar, ScalarRef, StructType,
29 StructValue,
30};
31use crate::util::iter_util::ZipEqDebug;
32
33#[derive(Debug, Clone, PartialEq, Eq, Hash)]
34pub struct JsonbVal(pub(crate) Value);
35
36#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
37pub struct JsonbRef<'a>(pub(crate) ValueRef<'a>);
38
39impl EstimateSize for JsonbVal {
40 fn estimated_heap_size(&self) -> usize {
41 self.0.capacity()
42 }
43}
44
45impl fmt::Display for JsonbVal {
48 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
49 crate::types::to_text::ToText::write(&self.as_scalar_ref(), f)
50 }
51}
52
53impl fmt::Display for JsonbRef<'_> {
56 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
57 crate::types::to_text::ToText::write(self, f)
58 }
59}
60
61impl Scalar for JsonbVal {
62 type ScalarRefType<'a> = JsonbRef<'a>;
63
64 fn as_scalar_ref(&self) -> Self::ScalarRefType<'_> {
65 JsonbRef(self.0.as_ref())
66 }
67}
68
69impl<'a> ScalarRef<'a> for JsonbRef<'a> {
70 type ScalarType = JsonbVal;
71
72 fn to_owned_scalar(&self) -> Self::ScalarType {
73 JsonbVal(self.0.into())
74 }
75
76 fn hash_scalar<H: std::hash::Hasher>(&self, state: &mut H) {
77 self.hash(state)
78 }
79}
80
81impl PartialOrd for JsonbVal {
82 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
83 Some(self.cmp(other))
84 }
85}
86
87impl Ord for JsonbVal {
88 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
89 self.as_scalar_ref().cmp(&other.as_scalar_ref())
90 }
91}
92
93impl PartialOrd for JsonbRef<'_> {
94 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
95 Some(self.cmp(other))
96 }
97}
98
99impl Ord for JsonbRef<'_> {
100 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
101 self.0.to_string().cmp(&other.0.to_string())
114 }
115}
116
117impl crate::types::to_text::ToText for JsonbRef<'_> {
118 fn write<W: std::fmt::Write>(&self, f: &mut W) -> std::fmt::Result {
119 use serde::Serialize as _;
122 let mut ser =
123 serde_json::ser::Serializer::with_formatter(FmtToIoUnchecked(f), ToTextFormatter);
124 self.0.serialize(&mut ser).map_err(|_| std::fmt::Error)
125 }
126
127 fn write_with_type<W: std::fmt::Write>(
128 &self,
129 _ty: &crate::types::DataType,
130 f: &mut W,
131 ) -> std::fmt::Result {
132 self.write(f)
133 }
134}
135
136impl crate::types::to_binary::ToBinary for JsonbRef<'_> {
137 fn to_binary_with_type(
138 &self,
139 _ty: &crate::types::DataType,
140 ) -> super::to_binary::Result<bytes::Bytes> {
141 Ok(self.value_serialize().into())
142 }
143}
144
145impl std::str::FromStr for JsonbVal {
146 type Err = <Value as std::str::FromStr>::Err;
147
148 fn from_str(s: &str) -> Result<Self, Self::Err> {
149 Ok(Self(s.parse()?))
150 }
151}
152
153impl JsonbVal {
154 pub fn from_debezium_unavailable_value(s: &str) -> Result<Self, serde_json::Error> {
157 if s.len() == DEBEZIUM_UNAVAILABLE_VALUE.len() && s == DEBEZIUM_UNAVAILABLE_VALUE {
159 return Ok(DEBEZIUM_UNAVAILABLE_JSON.clone());
160 }
161 Ok(Self(s.parse()?))
162 }
163}
164
165impl JsonbVal {
166 pub fn null() -> Self {
168 Self(Value::null())
169 }
170
171 pub fn empty_array() -> Self {
173 Self(Value::array([]))
174 }
175
176 pub fn empty_object() -> Self {
178 Self(Value::object([]))
179 }
180
181 pub fn memcmp_deserialize(
183 deserializer: &mut memcomparable::Deserializer<impl bytes::Buf>,
184 ) -> memcomparable::Result<Self> {
185 let v = <String as serde::Deserialize>::deserialize(deserializer)?
186 .parse()
187 .map_err(|_| memcomparable::Error::Message("invalid json".into()))?;
188 Ok(Self(v))
189 }
190
191 pub fn value_deserialize(mut buf: &[u8]) -> Option<Self> {
193 if buf.is_empty() || buf.get_u8() != 1 {
194 return None;
195 }
196 Value::from_text(buf).ok().map(Self)
197 }
198
199 pub fn take(self) -> serde_json::Value {
201 self.0.into()
202 }
203}
204
205impl From<serde_json::Value> for JsonbVal {
206 fn from(v: serde_json::Value) -> Self {
207 Self(v.into())
208 }
209}
210
211impl From<Value> for JsonbVal {
212 fn from(v: Value) -> Self {
213 Self(v)
214 }
215}
216
217impl From<JsonbRef<'_>> for JsonbVal {
218 fn from(v: JsonbRef<'_>) -> Self {
219 Self(v.0.to_owned())
220 }
221}
222
223impl From<f64> for JsonbVal {
224 fn from(v: f64) -> Self {
225 Self(v.into())
226 }
227}
228
229impl<'a> From<JsonbRef<'a>> for ValueRef<'a> {
230 fn from(v: JsonbRef<'a>) -> Self {
231 v.0
232 }
233}
234
235impl<'a> JsonbRef<'a> {
236 pub fn memcmp_serialize(
237 &self,
238 serializer: &mut memcomparable::Serializer<impl bytes::BufMut>,
239 ) -> memcomparable::Result<()> {
240 let s = self.0.to_string();
243 serde::Serialize::serialize(&s, serializer)
244 }
245
246 pub fn value_serialize(&self) -> Vec<u8> {
248 use std::io::Write;
249 let mut buf = Vec::with_capacity(self.0.capacity());
254 buf.push(1);
255 write!(&mut buf, "{}", self.0).unwrap();
256 buf
257 }
258
259 pub const fn null() -> Self {
261 Self(ValueRef::Null)
262 }
263
264 pub const fn empty_string() -> Self {
266 Self(ValueRef::String(""))
267 }
268
269 pub fn is_jsonb_null(&self) -> bool {
271 self.0.is_null()
272 }
273
274 pub fn is_scalar(&self) -> bool {
276 matches!(
277 self.0,
278 ValueRef::Null | ValueRef::Bool(_) | ValueRef::Number(_) | ValueRef::String(_)
279 )
280 }
281
282 pub fn is_array(&self) -> bool {
284 self.0.is_array()
285 }
286
287 pub fn is_object(&self) -> bool {
289 self.0.is_object()
290 }
291
292 pub fn type_name(&self) -> &'static str {
296 match self.0 {
297 ValueRef::Null => "null",
298 ValueRef::Bool(_) => "boolean",
299 ValueRef::Number(_) => "number",
300 ValueRef::String(_) => "string",
301 ValueRef::Array(_) => "array",
302 ValueRef::Object(_) => "object",
303 }
304 }
305
306 pub fn array_len(&self) -> Result<usize, String> {
308 let array = self
309 .0
310 .as_array()
311 .ok_or_else(|| format!("cannot get array length of a jsonb {}", self.type_name()))?;
312 Ok(array.len())
313 }
314
315 pub fn as_bool(&self) -> Result<bool, String> {
317 self.0
318 .as_bool()
319 .ok_or_else(|| format!("cannot cast jsonb {} to type boolean", self.type_name()))
320 }
321
322 pub fn as_string(&self) -> Result<String, String> {
324 self.0
325 .as_str()
326 .map(|s| s.to_owned())
327 .ok_or_else(|| format!("cannot cast jsonb {} to type string", self.type_name()))
328 }
329
330 pub fn as_str(&self) -> Result<&str, String> {
332 self.0
333 .as_str()
334 .ok_or_else(|| format!("cannot cast jsonb {} to type &str", self.type_name()))
335 }
336
337 pub fn as_number(&self) -> Result<F64, String> {
342 self.0
343 .as_number()
344 .ok_or_else(|| format!("cannot cast jsonb {} to type number", self.type_name()))?
345 .as_f64()
346 .map(|f| f.into_ordered())
347 .ok_or_else(|| "jsonb number out of range".into())
348 }
349
350 pub fn force_str<W: std::fmt::Write>(&self, writer: &mut W) -> std::fmt::Result {
360 match self.0 {
361 ValueRef::String(v) => writer.write_str(v),
362 ValueRef::Null => Ok(()),
363 ValueRef::Bool(_) | ValueRef::Number(_) | ValueRef::Array(_) | ValueRef::Object(_) => {
364 use crate::types::to_text::ToText as _;
365 self.write_with_type(&crate::types::DataType::Jsonb, writer)
366 }
367 }
368 }
369
370 pub fn force_string(&self) -> String {
371 let mut s = String::new();
372 self.force_str(&mut s).unwrap();
373 s
374 }
375
376 pub fn access_object_field(&self, field: &str) -> Option<Self> {
377 self.0.get(field).map(Self)
378 }
379
380 pub fn access_array_element(&self, idx: usize) -> Option<Self> {
381 self.0.get(idx).map(Self)
382 }
383
384 pub fn array_elements(self) -> Result<impl Iterator<Item = JsonbRef<'a>>, String> {
386 let array = self
387 .0
388 .as_array()
389 .ok_or_else(|| format!("cannot extract elements from a jsonb {}", self.type_name()))?;
390 Ok(array.iter().map(Self))
391 }
392
393 pub fn object_keys(self) -> Result<impl Iterator<Item = &'a str>, String> {
395 let object = self.0.as_object().ok_or_else(|| {
396 format!(
397 "cannot call jsonb_object_keys on a jsonb {}",
398 self.type_name()
399 )
400 })?;
401 Ok(object.keys())
402 }
403
404 pub fn object_key_values(
406 self,
407 ) -> Result<impl Iterator<Item = (&'a str, JsonbRef<'a>)>, String> {
408 let object = self
409 .0
410 .as_object()
411 .ok_or_else(|| format!("cannot deconstruct a jsonb {}", self.type_name()))?;
412 Ok(object.iter().map(|(k, v)| (k, Self(v))))
413 }
414
415 pub fn pretty(self, f: &mut impl std::fmt::Write) -> std::fmt::Result {
417 use serde::Serialize;
418 use serde_json::ser::{PrettyFormatter, Serializer};
419
420 let mut ser =
421 Serializer::with_formatter(FmtToIoUnchecked(f), PrettyFormatter::with_indent(b" "));
422 self.0.serialize(&mut ser).map_err(|_| std::fmt::Error)
423 }
424
425 pub fn to_datum(self, ty: &DataType) -> Result<Datum, String> {
427 if self.0.as_null().is_some() {
428 return Ok(None);
429 }
430 let datum = match ty {
431 DataType::Jsonb => ScalarImpl::Jsonb(self.into()),
432 DataType::List(t) => ScalarImpl::List(self.to_list(t)?),
433 DataType::Struct(s) => ScalarImpl::Struct(self.to_struct(s)?),
434 _ => {
435 let s = self.force_string();
436 ScalarImpl::from_text(&s, ty).map_err(|e| format!("{}", e.as_report()))?
437 }
438 };
439 Ok(Some(datum))
440 }
441
442 pub fn to_list(self, elem_type: &DataType) -> Result<ListValue, String> {
444 let array = self
445 .0
446 .as_array()
447 .ok_or_else(|| format!("expected JSON array, but found {self}"))?;
448 let mut builder = elem_type.create_array_builder(array.len());
449 for v in array.iter() {
450 builder.append(Self(v).to_datum(elem_type)?);
451 }
452 Ok(ListValue::new(builder.finish()))
453 }
454
455 pub fn to_struct(self, ty: &StructType) -> Result<StructValue, String> {
457 let object = self.0.as_object().ok_or_else(|| {
458 format!(
459 "cannot call populate_composite on a jsonb {}",
460 self.type_name()
461 )
462 })?;
463 let mut fields = Vec::with_capacity(ty.len());
464 for (name, ty) in ty.iter() {
465 let datum = match object.get(name) {
466 Some(v) => Self(v).to_datum(ty)?,
467 None => None,
468 };
469 fields.push(datum);
470 }
471 Ok(StructValue::new(fields))
472 }
473
474 pub fn to_map(self, ty: &MapType) -> Result<MapValue, String> {
475 let object = self
476 .0
477 .as_object()
478 .ok_or_else(|| format!("cannot convert to map from a jsonb {}", self.type_name()))?;
479 if !matches!(ty.key(), DataType::Varchar) {
480 return Err("cannot convert jsonb to a map with non-string keys".to_owned());
481 }
482
483 let mut keys: Vec<Datum> = Vec::with_capacity(object.len());
484 let mut values: Vec<Datum> = Vec::with_capacity(object.len());
485 for (k, v) in object.iter() {
486 let v = Self(v).to_datum(ty.value())?;
487 keys.push(Some(ScalarImpl::Utf8(k.to_owned().into())));
488 values.push(v);
489 }
490 MapValue::try_from_kv(
491 ListValue::from_datum_iter(ty.key(), keys),
492 ListValue::from_datum_iter(ty.value(), values),
493 )
494 }
495
496 pub fn populate_struct(
498 self,
499 ty: &StructType,
500 base: Option<StructRef<'_>>,
501 ) -> Result<StructValue, String> {
502 let Some(base) = base else {
503 return self.to_struct(ty);
504 };
505 let object = self.0.as_object().ok_or_else(|| {
506 format!(
507 "cannot call populate_composite on a jsonb {}",
508 self.type_name()
509 )
510 })?;
511 let mut fields = Vec::with_capacity(ty.len());
512 for ((name, ty), base_field) in ty.iter().zip_eq_debug(base.iter_fields_ref()) {
513 let datum = match object.get(name) {
514 Some(v) => match ty {
515 DataType::Struct(s) => Some(
517 Self(v)
518 .populate_struct(s, base_field.map(|s| s.into_struct()))?
519 .into(),
520 ),
521 _ => Self(v).to_datum(ty)?,
522 },
523 None => base_field.to_owned_datum(),
524 };
525 fields.push(datum);
526 }
527 Ok(StructValue::new(fields))
528 }
529
530 pub fn capacity(self) -> usize {
532 self.0.capacity()
533 }
534}
535
536struct ToTextFormatter;
539
540impl serde_json::ser::Formatter for ToTextFormatter {
541 fn begin_array_value<W>(&mut self, writer: &mut W, first: bool) -> std::io::Result<()>
542 where
543 W: ?Sized + std::io::Write,
544 {
545 if first {
546 Ok(())
547 } else {
548 writer.write_all(b", ")
549 }
550 }
551
552 fn begin_object_key<W>(&mut self, writer: &mut W, first: bool) -> std::io::Result<()>
553 where
554 W: ?Sized + std::io::Write,
555 {
556 if first {
557 Ok(())
558 } else {
559 writer.write_all(b", ")
560 }
561 }
562
563 fn begin_object_value<W>(&mut self, writer: &mut W) -> std::io::Result<()>
564 where
565 W: ?Sized + std::io::Write,
566 {
567 writer.write_all(b": ")
568 }
569}
570
571struct FmtToIoUnchecked<F>(F);
573
574impl<F: std::fmt::Write> std::io::Write for FmtToIoUnchecked<F> {
575 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
576 let s = unsafe { std::str::from_utf8_unchecked(buf) };
577 self.0.write_str(s).map_err(|_| std::io::ErrorKind::Other)?;
578 Ok(buf.len())
579 }
580
581 fn flush(&mut self) -> std::io::Result<()> {
582 Ok(())
583 }
584}
585
586impl ToSql for JsonbVal {
587 accepts!(JSON, JSONB);
588
589 to_sql_checked!();
590
591 fn to_sql(
592 &self,
593 ty: &Type,
594 out: &mut BytesMut,
595 ) -> Result<IsNull, Box<dyn std::error::Error + Sync + Send>>
596 where
597 Self: Sized,
598 {
599 if matches!(*ty, Type::JSONB) {
600 out.put_u8(1);
601 }
602 write!(out, "{}", self.0).unwrap();
603 Ok(IsNull::No)
604 }
605}
606
607impl<'a> FromSql<'a> for JsonbVal {
608 accepts!(JSON, JSONB);
609
610 fn from_sql(
611 ty: &Type,
612 mut raw: &'a [u8],
613 ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
614 Ok(match *ty {
615 Type::JSON => JsonbVal::from(Value::from_text(raw)?),
631 Type::JSONB => {
632 if raw.is_empty() || raw.get_u8() != 1 {
633 return Err("invalid jsonb encoding".into());
634 }
635 JsonbVal::from(Value::from_text(raw)?)
636 }
637 _ => {
638 bail_not_implemented!("the JsonbVal's postgres decoding for {ty} is unsupported")
639 }
640 })
641 }
642}
643
644impl ToSql for JsonbRef<'_> {
645 accepts!(JSON, JSONB);
646
647 to_sql_checked!();
648
649 fn to_sql(
650 &self,
651 ty: &Type,
652 out: &mut BytesMut,
653 ) -> Result<IsNull, Box<dyn std::error::Error + Sync + Send>>
654 where
655 Self: Sized,
656 {
657 if matches!(*ty, Type::JSONB) {
658 out.put_u8(1);
659 }
660 write!(out, "{}", self.0).unwrap();
661 Ok(IsNull::No)
662 }
663}