risingwave_common/array/
jsonb_array.rs1use std::mem::ManuallyDrop;
16
17use risingwave_common_estimate_size::EstimateSize;
18use risingwave_pb::data::{PbArray, PbArrayType};
19
20use super::{Array, ArrayBuilder, ArrayImpl, ArrayResult};
21use crate::bitmap::{Bitmap, BitmapBuilder};
22use crate::types::{DataType, JsonbRef, JsonbVal, Scalar};
23
24#[derive(Debug, Clone, EstimateSize)]
25pub struct JsonbArrayBuilder {
26 bitmap: BitmapBuilder,
27 builder: jsonbb::Builder,
28}
29
30#[derive(Debug, Clone, PartialEq, Eq, EstimateSize)]
31pub struct JsonbArray {
32 bitmap: Bitmap,
33 data: jsonbb::Value,
35}
36
37impl ArrayBuilder for JsonbArrayBuilder {
38 type ArrayType = JsonbArray;
39
40 fn new(capacity: usize) -> Self {
41 let mut builder = jsonbb::Builder::with_capacity(capacity);
42 builder.begin_array();
43 Self {
44 bitmap: BitmapBuilder::with_capacity(capacity),
45 builder,
46 }
47 }
48
49 fn with_type(capacity: usize, ty: DataType) -> Self {
50 assert_eq!(ty, DataType::Jsonb);
51 Self::new(capacity)
52 }
53
54 fn append_n(&mut self, n: usize, value: Option<<Self::ArrayType as Array>::RefItem<'_>>) {
55 match value {
56 Some(x) => {
57 self.bitmap.append_n(n, true);
58 for _ in 0..n {
59 self.builder.add_value(x.0);
60 }
61 }
62 None => {
63 self.bitmap.append_n(n, false);
64 for _ in 0..n {
65 self.builder.add_null();
66 }
67 }
68 }
69 }
70
71 fn append_array(&mut self, other: &Self::ArrayType) {
72 for bit in other.bitmap.iter() {
73 self.bitmap.append(bit);
74 }
75 for value in other.data.as_array().unwrap().iter() {
76 self.builder.add_value(value);
77 }
78 }
79
80 fn pop(&mut self) -> Option<()> {
81 self.bitmap.pop()?;
82 self.builder.pop();
83 Some(())
84 }
85
86 fn len(&self) -> usize {
87 self.bitmap.len()
88 }
89
90 fn finish(mut self) -> Self::ArrayType {
91 self.builder.end_array();
92 Self::ArrayType {
93 bitmap: self.bitmap.finish(),
94 data: self.builder.finish(),
95 }
96 }
97}
98
99impl JsonbArrayBuilder {
100 pub fn writer(&mut self) -> JsonbWriter<'_> {
101 JsonbWriter::new(self)
102 }
103}
104
105impl JsonbArray {
106 pub fn from_protobuf(array: &PbArray) -> ArrayResult<ArrayImpl> {
110 ensure!(
111 array.values.len() == 1,
112 "Must have exactly 1 buffer in a jsonb array"
113 );
114 let arr = JsonbArray {
115 bitmap: array.get_null_bitmap()?.into(),
116 data: jsonbb::Value::from_bytes(&array.values[0].body),
117 };
118 Ok(arr.into())
119 }
120}
121
122impl Array for JsonbArray {
123 type Builder = JsonbArrayBuilder;
124 type OwnedItem = JsonbVal;
125 type RefItem<'a> = JsonbRef<'a>;
126
127 unsafe fn raw_value_at_unchecked(&self, idx: usize) -> Self::RefItem<'_> {
128 JsonbRef(self.data.as_array().unwrap().get(idx).unwrap())
129 }
130
131 fn len(&self) -> usize {
132 self.bitmap.len()
133 }
134
135 fn to_protobuf(&self) -> PbArray {
136 use risingwave_pb::common::Buffer;
137 use risingwave_pb::common::buffer::CompressionType;
138
139 PbArray {
140 null_bitmap: Some(self.null_bitmap().to_protobuf()),
141 values: vec![Buffer {
142 compression: CompressionType::None as i32,
143 body: self.data.as_bytes().to_vec(),
144 }],
145 array_type: PbArrayType::Jsonb as i32,
146 struct_array_data: None,
147 list_array_data: None,
148 }
149 }
150
151 fn null_bitmap(&self) -> &Bitmap {
152 &self.bitmap
153 }
154
155 fn into_null_bitmap(self) -> Bitmap {
156 self.bitmap
157 }
158
159 fn set_bitmap(&mut self, bitmap: Bitmap) {
160 self.bitmap = bitmap;
161 }
162
163 fn data_type(&self) -> DataType {
164 DataType::Jsonb
165 }
166}
167
168impl FromIterator<Option<JsonbVal>> for JsonbArray {
169 fn from_iter<I: IntoIterator<Item = Option<JsonbVal>>>(iter: I) -> Self {
170 let iter = iter.into_iter();
171 let mut builder = <Self as Array>::Builder::new(iter.size_hint().0);
172 for i in iter {
173 match i {
174 Some(x) => builder.append(Some(x.as_scalar_ref())),
175 None => builder.append(None),
176 }
177 }
178 builder.finish()
179 }
180}
181
182impl FromIterator<JsonbVal> for JsonbArray {
183 fn from_iter<I: IntoIterator<Item = JsonbVal>>(iter: I) -> Self {
184 iter.into_iter().map(Some).collect()
185 }
186}
187
188pub struct JsonbWriter<'a> {
193 array_builder: &'a mut JsonbArrayBuilder,
194 checkpoint: jsonbb::Checkpoint,
195}
196
197impl JsonbWriter<'_> {
198 pub fn new(array_builder: &mut JsonbArrayBuilder) -> JsonbWriter<'_> {
199 let checkpoint = array_builder.builder.checkpoint();
200 JsonbWriter {
201 array_builder,
202 checkpoint,
203 }
204 }
205
206 pub fn inner(&mut self) -> &mut jsonbb::Builder {
207 &mut self.array_builder.builder
208 }
209
210 pub fn finish(self) {
213 self.array_builder.bitmap.append(true);
214 let _ = ManuallyDrop::new(self); }
216
217 pub fn rollback(self) {
220 self.array_builder.builder.rollback_to(&self.checkpoint);
221 let _ = ManuallyDrop::new(self); }
223}
224
225impl Drop for JsonbWriter<'_> {
226 fn drop(&mut self) {
229 self.array_builder.builder.rollback_to(&self.checkpoint);
230 }
231}