1use std::fmt::{self, Write};
16use std::hash::Hash;
17
18use bytes::{Buf, BufMut, BytesMut};
19use jsonbb::{Value, ValueRef};
20use postgres_types::{FromSql, IsNull, ToSql, Type, accepts, to_sql_checked};
21use risingwave_common_estimate_size::EstimateSize;
22use thiserror_ext::AsReport;
23
24use super::{
25 Datum, F64, IntoOrdered, ListValue, MapType, MapValue, ScalarImpl, StructRef, ToOwnedDatum,
26};
27use crate::types::{
28 DEBEZIUM_UNAVAILABLE_JSON, DEBEZIUM_UNAVAILABLE_VALUE, DataType, ListType, Scalar, ScalarRef,
29 StructType, StructValue,
30};
31use crate::util::iter_util::ZipEqDebug;
32
33#[derive(Debug, Clone, PartialEq, Eq, Hash)]
34pub struct JsonbVal(pub(crate) Value);
35
36#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
37pub struct JsonbRef<'a>(pub(crate) ValueRef<'a>);
38
39impl EstimateSize for JsonbVal {
40 fn estimated_heap_size(&self) -> usize {
41 self.0.capacity()
42 }
43}
44
45impl fmt::Display for JsonbVal {
48 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
49 crate::types::to_text::ToText::write(&self.as_scalar_ref(), f)
50 }
51}
52
53impl fmt::Display for JsonbRef<'_> {
56 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
57 crate::types::to_text::ToText::write(self, f)
58 }
59}
60
61impl Scalar for JsonbVal {
62 type ScalarRefType<'a> = JsonbRef<'a>;
63
64 fn as_scalar_ref(&self) -> Self::ScalarRefType<'_> {
65 JsonbRef(self.0.as_ref())
66 }
67}
68
69impl<'a> ScalarRef<'a> for JsonbRef<'a> {
70 type ScalarType = JsonbVal;
71
72 fn to_owned_scalar(&self) -> Self::ScalarType {
73 JsonbVal(self.0.into())
74 }
75
76 fn hash_scalar<H: std::hash::Hasher>(&self, state: &mut H) {
77 self.hash(state)
78 }
79}
80
81impl PartialOrd for JsonbVal {
82 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
83 Some(self.cmp(other))
84 }
85}
86
87impl Ord for JsonbVal {
88 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
89 self.as_scalar_ref().cmp(&other.as_scalar_ref())
90 }
91}
92
93impl PartialOrd for JsonbRef<'_> {
94 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
95 Some(self.cmp(other))
96 }
97}
98
99impl Ord for JsonbRef<'_> {
100 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
101 self.0.to_string().cmp(&other.0.to_string())
114 }
115}
116
117impl crate::types::to_text::ToText for JsonbRef<'_> {
118 fn write<W: std::fmt::Write>(&self, f: &mut W) -> std::fmt::Result {
119 use serde::Serialize as _;
122 let mut ser =
123 serde_json::ser::Serializer::with_formatter(FmtToIoUnchecked(f), ToTextFormatter);
124 self.0.serialize(&mut ser).map_err(|_| std::fmt::Error)
125 }
126
127 fn write_with_type<W: std::fmt::Write>(
128 &self,
129 _ty: &crate::types::DataType,
130 f: &mut W,
131 ) -> std::fmt::Result {
132 self.write(f)
133 }
134}
135
136impl crate::types::to_binary::ToBinary for JsonbRef<'_> {
137 fn to_binary_with_type(
138 &self,
139 _ty: &crate::types::DataType,
140 ) -> super::to_binary::Result<bytes::Bytes> {
141 Ok(self.value_serialize().into())
142 }
143}
144
145impl std::str::FromStr for JsonbVal {
146 type Err = <Value as std::str::FromStr>::Err;
147
148 fn from_str(s: &str) -> Result<Self, Self::Err> {
149 Ok(Self(s.parse()?))
150 }
151}
152
153impl JsonbVal {
154 pub fn from_debezium_unavailable_value(s: &str) -> Result<Self, serde_json::Error> {
157 if s.len() == DEBEZIUM_UNAVAILABLE_VALUE.len() && s == DEBEZIUM_UNAVAILABLE_VALUE {
159 return Ok(DEBEZIUM_UNAVAILABLE_JSON.clone());
160 }
161 Ok(Self(s.parse()?))
162 }
163}
164
165impl JsonbVal {
166 pub fn null() -> Self {
168 Self(Value::null())
169 }
170
171 pub fn empty_array() -> Self {
173 Self(Value::array([]))
174 }
175
176 pub fn empty_object() -> Self {
178 Self(Value::object([]))
179 }
180
181 pub fn memcmp_deserialize(
183 deserializer: &mut memcomparable::Deserializer<impl bytes::Buf>,
184 ) -> memcomparable::Result<Self> {
185 let v = <String as serde::Deserialize>::deserialize(deserializer)?
186 .parse()
187 .map_err(|_| memcomparable::Error::Message("invalid json".into()))?;
188 Ok(Self(v))
189 }
190
191 pub fn value_deserialize(mut buf: &[u8]) -> Option<Self> {
193 if buf.is_empty() || buf.get_u8() != 1 {
194 return None;
195 }
196 Value::from_text(buf).ok().map(Self)
197 }
198
199 pub fn take(self) -> serde_json::Value {
201 self.0.into()
202 }
203}
204
205impl From<serde_json::Value> for JsonbVal {
206 fn from(v: serde_json::Value) -> Self {
207 Self(v.into())
208 }
209}
210
211impl From<Value> for JsonbVal {
212 fn from(v: Value) -> Self {
213 Self(v)
214 }
215}
216
217impl From<JsonbRef<'_>> for JsonbVal {
218 fn from(v: JsonbRef<'_>) -> Self {
219 Self(v.0.to_owned())
220 }
221}
222
223impl From<f64> for JsonbVal {
224 fn from(v: f64) -> Self {
225 Self(v.into())
226 }
227}
228
229impl<'a> From<JsonbRef<'a>> for ValueRef<'a> {
230 fn from(v: JsonbRef<'a>) -> Self {
231 v.0
232 }
233}
234
235impl<'a> JsonbRef<'a> {
236 pub fn memcmp_serialize(
237 &self,
238 serializer: &mut memcomparable::Serializer<impl bytes::BufMut>,
239 ) -> memcomparable::Result<()> {
240 let s = self.0.to_string();
243 serde::Serialize::serialize(&s, serializer)
244 }
245
246 pub fn value_serialize(&self) -> Vec<u8> {
248 use std::io::Write;
249 let mut buf = Vec::with_capacity(self.0.capacity());
254 buf.push(1);
255 write!(&mut buf, "{}", self.0).unwrap();
256 buf
257 }
258
259 pub const fn null() -> Self {
261 Self(ValueRef::Null)
262 }
263
264 pub const fn empty_string() -> Self {
266 Self(ValueRef::String(""))
267 }
268
269 pub fn is_jsonb_null(&self) -> bool {
271 self.0.is_null()
272 }
273
274 pub fn is_scalar(&self) -> bool {
276 matches!(
277 self.0,
278 ValueRef::Null | ValueRef::Bool(_) | ValueRef::Number(_) | ValueRef::String(_)
279 )
280 }
281
282 pub fn is_array(&self) -> bool {
284 self.0.is_array()
285 }
286
287 pub fn is_object(&self) -> bool {
289 self.0.is_object()
290 }
291
292 pub fn type_name(&self) -> &'static str {
296 match self.0 {
297 ValueRef::Null => "null",
298 ValueRef::Bool(_) => "boolean",
299 ValueRef::Number(_) => "number",
300 ValueRef::String(_) => "string",
301 ValueRef::Array(_) => "array",
302 ValueRef::Object(_) => "object",
303 }
304 }
305
306 pub fn array_len(&self) -> Result<usize, String> {
308 let array = self
309 .0
310 .as_array()
311 .ok_or_else(|| format!("cannot get array length of a jsonb {}", self.type_name()))?;
312 Ok(array.len())
313 }
314
315 pub fn as_bool(&self) -> Result<bool, String> {
317 self.0
318 .as_bool()
319 .ok_or_else(|| format!("cannot cast jsonb {} to type boolean", self.type_name()))
320 }
321
322 pub fn as_string(&self) -> Result<String, String> {
324 self.0
325 .as_str()
326 .map(|s| s.to_owned())
327 .ok_or_else(|| format!("cannot cast jsonb {} to type string", self.type_name()))
328 }
329
330 pub fn as_str(&self) -> Result<&str, String> {
332 self.0
333 .as_str()
334 .ok_or_else(|| format!("cannot cast jsonb {} to type &str", self.type_name()))
335 }
336
337 pub fn as_number(&self) -> Result<F64, String> {
342 self.0
343 .as_number()
344 .ok_or_else(|| format!("cannot cast jsonb {} to type number", self.type_name()))?
345 .as_f64()
346 .map(|f| f.into_ordered())
347 .ok_or_else(|| "jsonb number out of range".into())
348 }
349
350 pub fn force_str<W: std::fmt::Write>(&self, writer: &mut W) -> std::fmt::Result {
360 match self.0 {
361 ValueRef::String(v) => writer.write_str(v),
362 ValueRef::Null => Ok(()),
363 ValueRef::Bool(_) | ValueRef::Number(_) | ValueRef::Array(_) | ValueRef::Object(_) => {
364 use crate::types::to_text::ToText as _;
365 self.write_with_type(&crate::types::DataType::Jsonb, writer)
366 }
367 }
368 }
369
370 pub fn force_string(&self) -> String {
371 let mut s = String::new();
372 self.force_str(&mut s).unwrap();
373 s
374 }
375
376 pub fn access_object_field(&self, field: &str) -> Option<Self> {
377 self.0.get(field).map(Self)
378 }
379
380 pub fn access_array_element(&self, idx: usize) -> Option<Self> {
381 self.0.get(idx).map(Self)
382 }
383
384 pub fn array_elements(self) -> Result<impl Iterator<Item = JsonbRef<'a>>, String> {
386 let array = self
387 .0
388 .as_array()
389 .ok_or_else(|| format!("cannot extract elements from a jsonb {}", self.type_name()))?;
390 Ok(array.iter().map(Self))
391 }
392
393 pub fn object_keys(self) -> Result<impl Iterator<Item = &'a str>, String> {
395 let object = self.0.as_object().ok_or_else(|| {
396 format!(
397 "cannot call jsonb_object_keys on a jsonb {}",
398 self.type_name()
399 )
400 })?;
401 Ok(object.keys())
402 }
403
404 pub fn object_key_values(
406 self,
407 ) -> Result<impl Iterator<Item = (&'a str, JsonbRef<'a>)>, String> {
408 let object = self
409 .0
410 .as_object()
411 .ok_or_else(|| format!("cannot deconstruct a jsonb {}", self.type_name()))?;
412 Ok(object.iter().map(|(k, v)| (k, Self(v))))
413 }
414
415 pub fn pretty(self, f: &mut impl std::fmt::Write) -> std::fmt::Result {
417 use serde::Serialize;
418 use serde_json::ser::{PrettyFormatter, Serializer};
419
420 let mut ser =
421 Serializer::with_formatter(FmtToIoUnchecked(f), PrettyFormatter::with_indent(b" "));
422 self.0.serialize(&mut ser).map_err(|_| std::fmt::Error)
423 }
424
425 pub fn to_datum(self, ty: &DataType) -> Result<Datum, String> {
427 if self.0.as_null().is_some() {
428 return Ok(None);
429 }
430 let datum = match ty {
431 DataType::Jsonb => ScalarImpl::Jsonb(self.into()),
432 DataType::List(l) => ScalarImpl::List(self.to_list(l)?),
433 DataType::Struct(s) => ScalarImpl::Struct(self.to_struct(s)?),
434 _ => {
435 let s = self.force_string();
436 ScalarImpl::from_text(&s, ty).map_err(|e| format!("{}", e.as_report()))?
437 }
438 };
439 Ok(Some(datum))
440 }
441
442 pub fn to_list(self, ty: &ListType) -> Result<ListValue, String> {
444 let elem_type = ty.elem();
445 let array = self
446 .0
447 .as_array()
448 .ok_or_else(|| format!("expected JSON array, but found {self}"))?;
449 let mut builder = elem_type.create_array_builder(array.len());
450 for v in array.iter() {
451 builder.append(Self(v).to_datum(elem_type)?);
452 }
453 Ok(ListValue::new(builder.finish()))
454 }
455
456 pub fn to_struct(self, ty: &StructType) -> Result<StructValue, String> {
458 let object = self.0.as_object().ok_or_else(|| {
459 format!(
460 "cannot call populate_composite on a jsonb {}",
461 self.type_name()
462 )
463 })?;
464 let mut fields = Vec::with_capacity(ty.len());
465 for (name, ty) in ty.iter() {
466 let datum = match object.get(name) {
467 Some(v) => Self(v).to_datum(ty)?,
468 None => None,
469 };
470 fields.push(datum);
471 }
472 Ok(StructValue::new(fields))
473 }
474
475 pub fn to_map(self, ty: &MapType) -> Result<MapValue, String> {
476 let object = self
477 .0
478 .as_object()
479 .ok_or_else(|| format!("cannot convert to map from a jsonb {}", self.type_name()))?;
480 if !matches!(ty.key(), DataType::Varchar) {
481 return Err("cannot convert jsonb to a map with non-string keys".to_owned());
482 }
483
484 let mut keys: Vec<Datum> = Vec::with_capacity(object.len());
485 let mut values: Vec<Datum> = Vec::with_capacity(object.len());
486 for (k, v) in object.iter() {
487 let v = Self(v).to_datum(ty.value())?;
488 keys.push(Some(ScalarImpl::Utf8(k.to_owned().into())));
489 values.push(v);
490 }
491 MapValue::try_from_kv(
492 ListValue::from_datum_iter(ty.key(), keys),
493 ListValue::from_datum_iter(ty.value(), values),
494 )
495 }
496
497 pub fn populate_struct(
499 self,
500 ty: &StructType,
501 base: Option<StructRef<'_>>,
502 ) -> Result<StructValue, String> {
503 let Some(base) = base else {
504 return self.to_struct(ty);
505 };
506 let object = self.0.as_object().ok_or_else(|| {
507 format!(
508 "cannot call populate_composite on a jsonb {}",
509 self.type_name()
510 )
511 })?;
512 let mut fields = Vec::with_capacity(ty.len());
513 for ((name, ty), base_field) in ty.iter().zip_eq_debug(base.iter_fields_ref()) {
514 let datum = match object.get(name) {
515 Some(v) => match ty {
516 DataType::Struct(s) => Some(
518 Self(v)
519 .populate_struct(s, base_field.map(|s| s.into_struct()))?
520 .into(),
521 ),
522 _ => Self(v).to_datum(ty)?,
523 },
524 None => base_field.to_owned_datum(),
525 };
526 fields.push(datum);
527 }
528 Ok(StructValue::new(fields))
529 }
530
531 pub fn capacity(self) -> usize {
533 self.0.capacity()
534 }
535}
536
537struct ToTextFormatter;
540
541impl serde_json::ser::Formatter for ToTextFormatter {
542 fn begin_array_value<W>(&mut self, writer: &mut W, first: bool) -> std::io::Result<()>
543 where
544 W: ?Sized + std::io::Write,
545 {
546 if first {
547 Ok(())
548 } else {
549 writer.write_all(b", ")
550 }
551 }
552
553 fn begin_object_key<W>(&mut self, writer: &mut W, first: bool) -> std::io::Result<()>
554 where
555 W: ?Sized + std::io::Write,
556 {
557 if first {
558 Ok(())
559 } else {
560 writer.write_all(b", ")
561 }
562 }
563
564 fn begin_object_value<W>(&mut self, writer: &mut W) -> std::io::Result<()>
565 where
566 W: ?Sized + std::io::Write,
567 {
568 writer.write_all(b": ")
569 }
570}
571
572struct FmtToIoUnchecked<F>(F);
574
575impl<F: std::fmt::Write> std::io::Write for FmtToIoUnchecked<F> {
576 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
577 let s = unsafe { std::str::from_utf8_unchecked(buf) };
578 self.0.write_str(s).map_err(|_| std::io::ErrorKind::Other)?;
579 Ok(buf.len())
580 }
581
582 fn flush(&mut self) -> std::io::Result<()> {
583 Ok(())
584 }
585}
586
587impl ToSql for JsonbVal {
588 accepts!(JSON, JSONB);
589
590 to_sql_checked!();
591
592 fn to_sql(
593 &self,
594 ty: &Type,
595 out: &mut BytesMut,
596 ) -> Result<IsNull, Box<dyn std::error::Error + Sync + Send>>
597 where
598 Self: Sized,
599 {
600 if matches!(*ty, Type::JSONB) {
601 out.put_u8(1);
602 }
603 write!(out, "{}", self.0).unwrap();
604 Ok(IsNull::No)
605 }
606}
607
608impl<'a> FromSql<'a> for JsonbVal {
609 accepts!(JSON, JSONB);
610
611 fn from_sql(
612 ty: &Type,
613 mut raw: &'a [u8],
614 ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
615 Ok(match *ty {
616 Type::JSON => JsonbVal::from(Value::from_text(raw)?),
632 Type::JSONB => {
633 if raw.is_empty() || raw.get_u8() != 1 {
634 return Err("invalid jsonb encoding".into());
635 }
636 JsonbVal::from(Value::from_text(raw)?)
637 }
638 _ => {
639 bail_not_implemented!("the JsonbVal's postgres decoding for {ty} is unsupported")
640 }
641 })
642 }
643}
644
645impl ToSql for JsonbRef<'_> {
646 accepts!(JSON, JSONB);
647
648 to_sql_checked!();
649
650 fn to_sql(
651 &self,
652 ty: &Type,
653 out: &mut BytesMut,
654 ) -> Result<IsNull, Box<dyn std::error::Error + Sync + Send>>
655 where
656 Self: Sized,
657 {
658 if matches!(*ty, Type::JSONB) {
659 out.put_u8(1);
660 }
661 write!(out, "{}", self.0).unwrap();
662 Ok(IsNull::No)
663 }
664}