1use std::fmt::{self, Write};
16use std::hash::Hash;
17
18use bytes::{Buf, BufMut, BytesMut};
19use jsonbb::{Value, ValueRef};
20use postgres_types::{FromSql, IsNull, ToSql, Type, accepts, to_sql_checked};
21use risingwave_common_estimate_size::EstimateSize;
22use thiserror_ext::AsReport;
23
24use super::{
25 Datum, F64, IntoOrdered, ListValue, MapType, MapValue, ScalarImpl, StructRef, ToOwnedDatum,
26};
27use crate::types::{
28 DEBEZIUM_UNAVAILABLE_JSON, DEBEZIUM_UNAVAILABLE_VALUE, DataType, ListType, Scalar, ScalarRef,
29 StructType, StructValue,
30};
31use crate::util::iter_util::ZipEqDebug;
32
33#[derive(Debug, Clone, PartialEq, Eq, Hash)]
34pub struct JsonbVal(pub(crate) Value);
35
36#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
37pub struct JsonbRef<'a>(pub(crate) ValueRef<'a>);
38
39impl EstimateSize for JsonbVal {
40 fn estimated_heap_size(&self) -> usize {
41 self.0.capacity()
42 }
43}
44
45impl fmt::Display for JsonbVal {
48 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
49 crate::types::to_text::ToText::write(&self.as_scalar_ref(), f)
50 }
51}
52
53impl fmt::Display for JsonbRef<'_> {
56 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
57 crate::types::to_text::ToText::write(self, f)
58 }
59}
60
61impl Scalar for JsonbVal {
62 type ScalarRefType<'a> = JsonbRef<'a>;
63
64 fn as_scalar_ref(&self) -> Self::ScalarRefType<'_> {
65 JsonbRef(self.0.as_ref())
66 }
67}
68
69impl<'a> ScalarRef<'a> for JsonbRef<'a> {
70 type ScalarType = JsonbVal;
71
72 fn to_owned_scalar(&self) -> Self::ScalarType {
73 JsonbVal(self.0.into())
74 }
75
76 fn hash_scalar<H: std::hash::Hasher>(&self, state: &mut H) {
77 self.hash(state)
78 }
79}
80
81impl PartialOrd for JsonbVal {
82 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
83 Some(self.cmp(other))
84 }
85}
86
87impl Ord for JsonbVal {
88 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
89 self.as_scalar_ref().cmp(&other.as_scalar_ref())
90 }
91}
92
93impl PartialOrd for JsonbRef<'_> {
94 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
95 Some(self.cmp(other))
96 }
97}
98
99impl Ord for JsonbRef<'_> {
100 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
101 self.0.to_string().cmp(&other.0.to_string())
114 }
115}
116
117impl crate::types::to_text::ToText for JsonbRef<'_> {
118 fn write<W: std::fmt::Write>(&self, f: &mut W) -> std::fmt::Result {
119 use serde::Serialize as _;
122 let mut ser =
123 serde_json::ser::Serializer::with_formatter(FmtToIoUnchecked(f), ToTextFormatter);
124 self.0.serialize(&mut ser).map_err(|_| std::fmt::Error)
125 }
126
127 fn write_with_type<W: std::fmt::Write>(
128 &self,
129 _ty: &crate::types::DataType,
130 f: &mut W,
131 ) -> std::fmt::Result {
132 self.write(f)
133 }
134}
135
136impl crate::types::to_binary::ToBinary for JsonbRef<'_> {
137 fn to_binary_with_type(
138 &self,
139 _ty: &crate::types::DataType,
140 ) -> super::to_binary::Result<bytes::Bytes> {
141 Ok(self.value_serialize().into())
142 }
143}
144
145impl std::str::FromStr for JsonbVal {
146 type Err = <Value as std::str::FromStr>::Err;
147
148 fn from_str(s: &str) -> Result<Self, Self::Err> {
149 Ok(Self(s.parse()?))
150 }
151}
152
153impl JsonbVal {
154 pub fn from_debezium_unavailable_value(s: &str) -> Result<Self, serde_json::Error> {
157 if s.len() == DEBEZIUM_UNAVAILABLE_VALUE.len() && s == DEBEZIUM_UNAVAILABLE_VALUE {
159 return Ok(DEBEZIUM_UNAVAILABLE_JSON.clone());
160 }
161 Ok(Self(s.parse()?))
162 }
163}
164
165impl JsonbVal {
166 pub fn null() -> Self {
168 Self(Value::null())
169 }
170
171 pub fn empty_array() -> Self {
173 Self(Value::array([]))
174 }
175
176 pub fn empty_object() -> Self {
178 Self(Value::object([]))
179 }
180
181 pub fn memcmp_deserialize(
183 deserializer: &mut memcomparable::Deserializer<impl bytes::Buf>,
184 ) -> memcomparable::Result<Self> {
185 let v = <String as serde::Deserialize>::deserialize(deserializer)?
186 .parse()
187 .map_err(|_| memcomparable::Error::Message("invalid json".into()))?;
188 Ok(Self(v))
189 }
190
191 pub fn value_deserialize(mut buf: &[u8]) -> Option<Self> {
193 if buf.is_empty() || buf.get_u8() != 1 {
194 return None;
195 }
196 Value::from_text(buf).ok().map(Self)
197 }
198
199 pub fn take(self) -> serde_json::Value {
201 self.0.into()
202 }
203}
204
205impl From<serde_json::Value> for JsonbVal {
206 fn from(v: serde_json::Value) -> Self {
207 Self(v.into())
208 }
209}
210
211impl From<Value> for JsonbVal {
212 fn from(v: Value) -> Self {
213 Self(v)
214 }
215}
216
217impl From<JsonbRef<'_>> for JsonbVal {
218 fn from(v: JsonbRef<'_>) -> Self {
219 Self(v.0.to_owned())
220 }
221}
222
223impl From<f64> for JsonbVal {
224 fn from(v: f64) -> Self {
225 Self(v.into())
226 }
227}
228
229impl<'a> From<JsonbRef<'a>> for ValueRef<'a> {
230 fn from(v: JsonbRef<'a>) -> Self {
231 v.0
232 }
233}
234
235impl<'a> JsonbRef<'a> {
236 pub fn memcmp_serialize(
237 &self,
238 serializer: &mut memcomparable::Serializer<impl bytes::BufMut>,
239 ) -> memcomparable::Result<()> {
240 let s = self.0.to_string();
243 serde::Serialize::serialize(&s, serializer)
244 }
245
246 pub fn value_serialize(&self) -> Vec<u8> {
248 use std::io::Write;
249 let mut buf = Vec::with_capacity(self.0.capacity());
254 buf.push(1);
255 write!(&mut buf, "{}", self.0).unwrap();
256 buf
257 }
258
259 pub const fn null() -> Self {
261 Self(ValueRef::Null)
262 }
263
264 pub fn is_jsonb_null(&self) -> bool {
266 self.0.is_null()
267 }
268
269 pub fn is_scalar(&self) -> bool {
271 matches!(
272 self.0,
273 ValueRef::Null | ValueRef::Bool(_) | ValueRef::Number(_) | ValueRef::String(_)
274 )
275 }
276
277 pub fn is_array(&self) -> bool {
279 self.0.is_array()
280 }
281
282 pub fn is_object(&self) -> bool {
284 self.0.is_object()
285 }
286
287 pub fn type_name(&self) -> &'static str {
291 match self.0 {
292 ValueRef::Null => "null",
293 ValueRef::Bool(_) => "boolean",
294 ValueRef::Number(_) => "number",
295 ValueRef::String(_) => "string",
296 ValueRef::Array(_) => "array",
297 ValueRef::Object(_) => "object",
298 }
299 }
300
301 pub fn array_len(&self) -> Result<usize, String> {
303 let array = self
304 .0
305 .as_array()
306 .ok_or_else(|| format!("cannot get array length of a jsonb {}", self.type_name()))?;
307 Ok(array.len())
308 }
309
310 pub fn as_bool(&self) -> Result<bool, String> {
312 self.0
313 .as_bool()
314 .ok_or_else(|| format!("cannot cast jsonb {} to type boolean", self.type_name()))
315 }
316
317 pub fn as_string(&self) -> Result<String, String> {
319 self.0
320 .as_str()
321 .map(|s| s.to_owned())
322 .ok_or_else(|| format!("cannot cast jsonb {} to type string", self.type_name()))
323 }
324
325 pub fn as_str(&self) -> Result<&str, String> {
327 self.0
328 .as_str()
329 .ok_or_else(|| format!("cannot cast jsonb {} to type &str", self.type_name()))
330 }
331
332 pub fn as_number(&self) -> Result<F64, String> {
337 self.0
338 .as_number()
339 .ok_or_else(|| format!("cannot cast jsonb {} to type number", self.type_name()))?
340 .as_f64()
341 .map(|f| f.into_ordered())
342 .ok_or_else(|| "jsonb number out of range".into())
343 }
344
345 pub fn force_str<W: std::fmt::Write>(&self, writer: &mut W) -> std::fmt::Result {
355 match self.0 {
356 ValueRef::String(v) => writer.write_str(v.as_str()),
357 ValueRef::Null => Ok(()),
358 ValueRef::Bool(_) | ValueRef::Number(_) | ValueRef::Array(_) | ValueRef::Object(_) => {
359 use crate::types::to_text::ToText as _;
360 self.write_with_type(&crate::types::DataType::Jsonb, writer)
361 }
362 }
363 }
364
365 pub fn force_string(&self) -> String {
366 let mut s = String::new();
367 self.force_str(&mut s).unwrap();
368 s
369 }
370
371 pub fn access_object_field(&self, field: &str) -> Option<Self> {
372 self.0.get(field).map(Self)
373 }
374
375 pub fn access_array_element(&self, idx: usize) -> Option<Self> {
376 self.0.get(idx).map(Self)
377 }
378
379 pub fn array_elements(self) -> Result<impl Iterator<Item = JsonbRef<'a>>, String> {
381 let array = self
382 .0
383 .as_array()
384 .ok_or_else(|| format!("cannot extract elements from a jsonb {}", self.type_name()))?;
385 Ok(array.iter().map(Self))
386 }
387
388 pub fn object_keys(self) -> Result<impl Iterator<Item = &'a str>, String> {
390 let object = self.0.as_object().ok_or_else(|| {
391 format!(
392 "cannot call jsonb_object_keys on a jsonb {}",
393 self.type_name()
394 )
395 })?;
396 Ok(object.keys())
397 }
398
399 pub fn object_key_values(
401 self,
402 ) -> Result<impl Iterator<Item = (&'a str, JsonbRef<'a>)>, String> {
403 let object = self
404 .0
405 .as_object()
406 .ok_or_else(|| format!("cannot deconstruct a jsonb {}", self.type_name()))?;
407 Ok(object.iter().map(|(k, v)| (k, Self(v))))
408 }
409
410 pub fn pretty(self, f: &mut impl std::fmt::Write) -> std::fmt::Result {
412 use serde::Serialize;
413 use serde_json::ser::{PrettyFormatter, Serializer};
414
415 let mut ser =
416 Serializer::with_formatter(FmtToIoUnchecked(f), PrettyFormatter::with_indent(b" "));
417 self.0.serialize(&mut ser).map_err(|_| std::fmt::Error)
418 }
419
420 pub fn to_datum(self, ty: &DataType) -> Result<Datum, String> {
422 if self.0.as_null().is_some() {
423 return Ok(None);
424 }
425 let datum = match ty {
426 DataType::Jsonb => ScalarImpl::Jsonb(self.into()),
427 DataType::List(l) => ScalarImpl::List(self.to_list(l)?),
428 DataType::Struct(s) => ScalarImpl::Struct(self.to_struct(s)?),
429 _ => {
430 let s = self.force_string();
431 ScalarImpl::from_text(&s, ty).map_err(|e| format!("{}", e.as_report()))?
432 }
433 };
434 Ok(Some(datum))
435 }
436
437 pub fn to_list(self, ty: &ListType) -> Result<ListValue, String> {
439 let elem_type = ty.elem();
440 let array = self
441 .0
442 .as_array()
443 .ok_or_else(|| format!("expected JSON array, but found {self}"))?;
444 let mut builder = elem_type.create_array_builder(array.len());
445 for v in array.iter() {
446 builder.append(Self(v).to_datum(elem_type)?);
447 }
448 Ok(ListValue::new(builder.finish()))
449 }
450
451 pub fn to_struct(self, ty: &StructType) -> Result<StructValue, String> {
453 let object = self.0.as_object().ok_or_else(|| {
454 format!(
455 "cannot call populate_composite on a jsonb {}",
456 self.type_name()
457 )
458 })?;
459 let mut fields = Vec::with_capacity(ty.len());
460 for (name, ty) in ty.iter() {
461 let datum = match object.get(name) {
462 Some(v) => Self(v).to_datum(ty)?,
463 None => None,
464 };
465 fields.push(datum);
466 }
467 Ok(StructValue::new(fields))
468 }
469
470 pub fn to_map(self, ty: &MapType) -> Result<MapValue, String> {
471 let object = self
472 .0
473 .as_object()
474 .ok_or_else(|| format!("cannot convert to map from a jsonb {}", self.type_name()))?;
475 if !matches!(ty.key(), DataType::Varchar) {
476 return Err("cannot convert jsonb to a map with non-string keys".to_owned());
477 }
478
479 let mut keys: Vec<Datum> = Vec::with_capacity(object.len());
480 let mut values: Vec<Datum> = Vec::with_capacity(object.len());
481 for (k, v) in object.iter() {
482 let v = Self(v).to_datum(ty.value())?;
483 keys.push(Some(ScalarImpl::Utf8(k.to_owned().into())));
484 values.push(v);
485 }
486 MapValue::try_from_kv(
487 ListValue::from_datum_iter(ty.key(), keys),
488 ListValue::from_datum_iter(ty.value(), values),
489 )
490 }
491
492 pub fn populate_struct(
494 self,
495 ty: &StructType,
496 base: Option<StructRef<'_>>,
497 ) -> Result<StructValue, String> {
498 let Some(base) = base else {
499 return self.to_struct(ty);
500 };
501 let object = self.0.as_object().ok_or_else(|| {
502 format!(
503 "cannot call populate_composite on a jsonb {}",
504 self.type_name()
505 )
506 })?;
507 let mut fields = Vec::with_capacity(ty.len());
508 for ((name, ty), base_field) in ty.iter().zip_eq_debug(base.iter_fields_ref()) {
509 let datum = match object.get(name) {
510 Some(v) => match ty {
511 DataType::Struct(s) => Some(
513 Self(v)
514 .populate_struct(s, base_field.map(|s| s.into_struct()))?
515 .into(),
516 ),
517 _ => Self(v).to_datum(ty)?,
518 },
519 None => base_field.to_owned_datum(),
520 };
521 fields.push(datum);
522 }
523 Ok(StructValue::new(fields))
524 }
525
526 pub fn capacity(self) -> usize {
528 self.0.capacity()
529 }
530}
531
532struct ToTextFormatter;
535
536impl serde_json::ser::Formatter for ToTextFormatter {
537 fn begin_array_value<W>(&mut self, writer: &mut W, first: bool) -> std::io::Result<()>
538 where
539 W: ?Sized + std::io::Write,
540 {
541 if first {
542 Ok(())
543 } else {
544 writer.write_all(b", ")
545 }
546 }
547
548 fn begin_object_key<W>(&mut self, writer: &mut W, first: bool) -> std::io::Result<()>
549 where
550 W: ?Sized + std::io::Write,
551 {
552 if first {
553 Ok(())
554 } else {
555 writer.write_all(b", ")
556 }
557 }
558
559 fn begin_object_value<W>(&mut self, writer: &mut W) -> std::io::Result<()>
560 where
561 W: ?Sized + std::io::Write,
562 {
563 writer.write_all(b": ")
564 }
565}
566
567struct FmtToIoUnchecked<F>(F);
569
570impl<F: std::fmt::Write> std::io::Write for FmtToIoUnchecked<F> {
571 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
572 let s = unsafe { std::str::from_utf8_unchecked(buf) };
573 self.0.write_str(s).map_err(|_| std::io::ErrorKind::Other)?;
574 Ok(buf.len())
575 }
576
577 fn flush(&mut self) -> std::io::Result<()> {
578 Ok(())
579 }
580}
581
582impl ToSql for JsonbVal {
583 accepts!(JSON, JSONB);
584
585 to_sql_checked!();
586
587 fn to_sql(
588 &self,
589 ty: &Type,
590 out: &mut BytesMut,
591 ) -> Result<IsNull, Box<dyn std::error::Error + Sync + Send>>
592 where
593 Self: Sized,
594 {
595 if matches!(*ty, Type::JSONB) {
596 out.put_u8(1);
597 }
598 write!(out, "{}", self.0).unwrap();
599 Ok(IsNull::No)
600 }
601}
602
603impl<'a> FromSql<'a> for JsonbVal {
604 accepts!(JSON, JSONB);
605
606 fn from_sql(
607 ty: &Type,
608 mut raw: &'a [u8],
609 ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
610 Ok(match *ty {
611 Type::JSON => JsonbVal::from(Value::from_text(raw)?),
627 Type::JSONB => {
628 if raw.is_empty() || raw.get_u8() != 1 {
629 return Err("invalid jsonb encoding".into());
630 }
631 JsonbVal::from(Value::from_text(raw)?)
632 }
633 _ => {
634 bail_not_implemented!("the JsonbVal's postgres decoding for {ty} is unsupported")
635 }
636 })
637 }
638}
639
640impl ToSql for JsonbRef<'_> {
641 accepts!(JSON, JSONB);
642
643 to_sql_checked!();
644
645 fn to_sql(
646 &self,
647 ty: &Type,
648 out: &mut BytesMut,
649 ) -> Result<IsNull, Box<dyn std::error::Error + Sync + Send>>
650 where
651 Self: Sized,
652 {
653 if matches!(*ty, Type::JSONB) {
654 out.put_u8(1);
655 }
656 write!(out, "{}", self.0).unwrap();
657 Ok(IsNull::No)
658 }
659}