risingwave_common/array/
jsonb_array.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::mem::ManuallyDrop;
16
17use risingwave_common_estimate_size::EstimateSize;
18use risingwave_pb::data::{PbArray, PbArrayType};
19
20use super::{Array, ArrayBuilder, ArrayImpl, ArrayResult};
21use crate::bitmap::{Bitmap, BitmapBuilder};
22use crate::types::{DataType, JsonbRef, JsonbVal, Scalar};
23
24#[derive(Debug, Clone, EstimateSize)]
25pub struct JsonbArrayBuilder {
26    bitmap: BitmapBuilder,
27    builder: jsonbb::Builder,
28}
29
30#[derive(Debug, Clone, PartialEq, Eq, EstimateSize)]
31pub struct JsonbArray {
32    bitmap: Bitmap,
33    /// Elements are stored as a single JSONB array value.
34    data: jsonbb::Value,
35}
36
37impl ArrayBuilder for JsonbArrayBuilder {
38    type ArrayType = JsonbArray;
39
40    fn new(capacity: usize) -> Self {
41        let mut builder = jsonbb::Builder::with_capacity(capacity);
42        builder.begin_array();
43        Self {
44            bitmap: BitmapBuilder::with_capacity(capacity),
45            builder,
46        }
47    }
48
49    fn with_type(capacity: usize, ty: DataType) -> Self {
50        assert_eq!(ty, DataType::Jsonb);
51        Self::new(capacity)
52    }
53
54    fn append_n(&mut self, n: usize, value: Option<<Self::ArrayType as Array>::RefItem<'_>>) {
55        match value {
56            Some(x) => {
57                self.bitmap.append_n(n, true);
58                for _ in 0..n {
59                    self.builder.add_value(x.0);
60                }
61            }
62            None => {
63                self.bitmap.append_n(n, false);
64                for _ in 0..n {
65                    self.builder.add_null();
66                }
67            }
68        }
69    }
70
71    fn append_array(&mut self, other: &Self::ArrayType) {
72        for bit in other.bitmap.iter() {
73            self.bitmap.append(bit);
74        }
75        for value in other.data.as_array().unwrap().iter() {
76            self.builder.add_value(value);
77        }
78    }
79
80    fn pop(&mut self) -> Option<()> {
81        self.bitmap.pop()?;
82        self.builder.pop();
83        Some(())
84    }
85
86    fn len(&self) -> usize {
87        self.bitmap.len()
88    }
89
90    fn finish(mut self) -> Self::ArrayType {
91        self.builder.end_array();
92        Self::ArrayType {
93            bitmap: self.bitmap.finish(),
94            data: self.builder.finish(),
95        }
96    }
97}
98
99impl JsonbArrayBuilder {
100    pub fn writer(&mut self) -> JsonbWriter<'_> {
101        JsonbWriter::new(self)
102    }
103}
104
105impl JsonbArray {
106    /// Loads a `JsonbArray` from a protobuf array.
107    ///
108    /// See also `JsonbArray::to_protobuf`.
109    pub fn from_protobuf(array: &PbArray) -> ArrayResult<ArrayImpl> {
110        ensure!(
111            array.values.len() == 1,
112            "Must have exactly 1 buffer in a jsonb array"
113        );
114        let arr = JsonbArray {
115            bitmap: array.get_null_bitmap()?.into(),
116            data: jsonbb::Value::from_bytes(&array.values[0].body),
117        };
118        Ok(arr.into())
119    }
120}
121
122impl Array for JsonbArray {
123    type Builder = JsonbArrayBuilder;
124    type OwnedItem = JsonbVal;
125    type RefItem<'a> = JsonbRef<'a>;
126
127    unsafe fn raw_value_at_unchecked(&self, idx: usize) -> Self::RefItem<'_> {
128        JsonbRef(self.data.as_array().unwrap().get(idx).unwrap())
129    }
130
131    fn len(&self) -> usize {
132        self.bitmap.len()
133    }
134
135    fn to_protobuf(&self) -> PbArray {
136        use risingwave_pb::common::Buffer;
137        use risingwave_pb::common::buffer::CompressionType;
138
139        PbArray {
140            null_bitmap: Some(self.null_bitmap().to_protobuf()),
141            values: vec![Buffer {
142                compression: CompressionType::None as i32,
143                body: self.data.as_bytes().to_vec(),
144            }],
145            array_type: PbArrayType::Jsonb as i32,
146            struct_array_data: None,
147            list_array_data: None,
148        }
149    }
150
151    fn null_bitmap(&self) -> &Bitmap {
152        &self.bitmap
153    }
154
155    fn into_null_bitmap(self) -> Bitmap {
156        self.bitmap
157    }
158
159    fn set_bitmap(&mut self, bitmap: Bitmap) {
160        self.bitmap = bitmap;
161    }
162
163    fn data_type(&self) -> DataType {
164        DataType::Jsonb
165    }
166}
167
168impl FromIterator<Option<JsonbVal>> for JsonbArray {
169    fn from_iter<I: IntoIterator<Item = Option<JsonbVal>>>(iter: I) -> Self {
170        let iter = iter.into_iter();
171        let mut builder = <Self as Array>::Builder::new(iter.size_hint().0);
172        for i in iter {
173            match i {
174                Some(x) => builder.append(Some(x.as_scalar_ref())),
175                None => builder.append(None),
176            }
177        }
178        builder.finish()
179    }
180}
181
182impl FromIterator<JsonbVal> for JsonbArray {
183    fn from_iter<I: IntoIterator<Item = JsonbVal>>(iter: I) -> Self {
184        iter.into_iter().map(Some).collect()
185    }
186}
187
188/// Note: Dropping an unfinished `JsonbWriter` will roll back any partially
189/// written elements to the saved checkpoint. Callers must not pop entries
190/// beyond the checkpoint captured when the writer was created; doing so may
191/// leave the array in an inconsistent state.
192pub struct JsonbWriter<'a> {
193    array_builder: &'a mut JsonbArrayBuilder,
194    checkpoint: jsonbb::Checkpoint,
195}
196
197impl JsonbWriter<'_> {
198    pub fn new(array_builder: &mut JsonbArrayBuilder) -> JsonbWriter<'_> {
199        let checkpoint = array_builder.builder.checkpoint();
200        JsonbWriter {
201            array_builder,
202            checkpoint,
203        }
204    }
205
206    pub fn inner(&mut self) -> &mut jsonbb::Builder {
207        &mut self.array_builder.builder
208    }
209
210    /// `finish` will be called when the entire record is successfully written.
211    /// The partial data was committed and the `builder` can no longer be used.
212    pub fn finish(self) {
213        self.array_builder.bitmap.append(true);
214        let _ = ManuallyDrop::new(self); // Prevent drop
215    }
216
217    /// `rollback` will be called while the entire record is abandoned.
218    /// The partial data was cleaned and the `builder` can be safely used.
219    pub fn rollback(self) {
220        self.array_builder.builder.rollback_to(&self.checkpoint);
221        let _ = ManuallyDrop::new(self); // Prevent drop
222    }
223}
224
225impl Drop for JsonbWriter<'_> {
226    /// If the writer is dropped without calling `finish` or `rollback`,
227    /// we rollback the partial data by default.
228    fn drop(&mut self) {
229        self.array_builder.builder.rollback_to(&self.checkpoint);
230    }
231}