1use std::cmp::Ordering;
16use std::fmt::{self, Debug, Display};
17
18use bytes::{Buf, BufMut};
19use itertools::Itertools;
20use risingwave_common_estimate_size::EstimateSize;
21use risingwave_error::BoxedError;
22use risingwave_pb::data::{PbArray, PbArrayType};
23use serde::Serializer;
24
25use super::{
26 Array, ArrayBuilder, ArrayImpl, ArrayResult, DatumRef, DefaultOrdered, ListArray,
27 ListArrayBuilder, ListRef, ListValue, MapType, ScalarImpl, ScalarRef, ScalarRefImpl,
28 StructArray, StructRef,
29};
30use crate::bitmap::Bitmap;
31use crate::types::{DataType, Scalar, ToText};
32use crate::util::memcmp_encoding;
33
34#[derive(Debug, Clone, EstimateSize)]
35pub struct MapArrayBuilder {
36 inner: ListArrayBuilder,
37}
38
39impl ArrayBuilder for MapArrayBuilder {
40 type ArrayType = MapArray;
41
42 #[cfg(not(test))]
43 fn new(_capacity: usize) -> Self {
44 panic!("please use `MapArrayBuilder::with_type` instead");
45 }
46
47 #[cfg(test)]
48 fn new(capacity: usize) -> Self {
49 Self::with_type(
50 capacity,
51 DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Varchar)),
52 )
53 }
54
55 fn with_type(capacity: usize, ty: DataType) -> Self {
56 let inner = ListArrayBuilder::with_type(capacity, ty.into_map().into_list());
57 Self { inner }
58 }
59
60 fn append_n(&mut self, n: usize, value: Option<MapRef<'_>>) {
61 self.inner.append_n(n, value.map(|v| v.into_inner()));
62 }
63
64 fn append_array(&mut self, other: &MapArray) {
65 self.inner.append_array(&other.inner);
66 }
67
68 fn pop(&mut self) -> Option<()> {
69 self.inner.pop()
70 }
71
72 fn len(&self) -> usize {
73 self.inner.len()
74 }
75
76 fn finish(self) -> MapArray {
77 let inner = self.inner.finish();
78 MapArray { inner }
79 }
80}
81
82#[derive(Debug, Clone, Eq)]
123pub struct MapArray {
124 pub(super) inner: ListArray,
125}
126
127impl EstimateSize for MapArray {
128 fn estimated_heap_size(&self) -> usize {
129 self.inner.estimated_heap_size()
130 }
131}
132
133impl Array for MapArray {
134 type Builder = MapArrayBuilder;
135 type OwnedItem = MapValue;
136 type RefItem<'a> = MapRef<'a>;
137
138 unsafe fn raw_value_at_unchecked(&self, idx: usize) -> Self::RefItem<'_> {
139 unsafe {
140 let list = self.inner.raw_value_at_unchecked(idx);
141 MapRef::new_unchecked(list)
142 }
143 }
144
145 fn len(&self) -> usize {
146 self.inner.len()
147 }
148
149 fn to_protobuf(&self) -> PbArray {
150 let mut array = self.inner.to_protobuf();
151 array.array_type = PbArrayType::Map as i32;
152 array
153 }
154
155 fn null_bitmap(&self) -> &Bitmap {
156 self.inner.null_bitmap()
157 }
158
159 fn into_null_bitmap(self) -> Bitmap {
160 self.inner.into_null_bitmap()
161 }
162
163 fn set_bitmap(&mut self, bitmap: Bitmap) {
164 self.inner.set_bitmap(bitmap)
165 }
166
167 fn data_type(&self) -> DataType {
168 let list_value_type = self.inner.values().data_type();
169 DataType::Map(MapType::from_entries(list_value_type))
170 }
171}
172
173impl MapArray {
174 pub fn from_protobuf(array: &PbArray) -> ArrayResult<ArrayImpl> {
175 let inner = ListArray::from_protobuf(array)?.into_list();
176 Ok(Self { inner }.into())
177 }
178
179 pub fn as_struct(&self) -> &StructArray {
181 self.inner.values().as_struct()
182 }
183
184 pub fn offsets(&self) -> &[u32] {
186 self.inner.offsets()
187 }
188}
189
190pub use scalar::{MapRef, MapValue};
191
192mod scalar {
200 use std::collections::HashSet;
201
202 use super::*;
203 use crate::array::{Datum, ScalarImpl, StructValue};
204
205 #[derive(Clone, Eq, EstimateSize)]
207 pub struct MapValue(ListValue);
208
209 #[derive(Copy, Clone, Eq)]
218 pub struct MapRef<'a>(ListRef<'a>);
219
220 impl MapValue {
221 pub fn inner(&self) -> &ListValue {
222 &self.0
223 }
224
225 pub fn into_inner(self) -> ListValue {
226 self.0
227 }
228
229 pub fn from_entries(entries: ListValue) -> Self {
232 Self::try_from_entries(entries).unwrap()
233 }
234
235 pub fn try_from_entries(entries: ListValue) -> Result<Self, String> {
237 let _ = MapType::try_from_entries(entries.data_type())?;
239 let mut keys = HashSet::with_capacity(entries.len());
240 let struct_array = entries.into_array();
241 for key in struct_array.as_struct().field_at(0).iter() {
242 let Some(key) = key else {
243 return Err("map keys must not be NULL".to_owned());
244 };
245 if !keys.insert(key) {
246 return Err("map keys must be unique".to_owned());
247 }
248 }
249 Ok(MapValue(ListValue::new(struct_array)))
250 }
251
252 pub fn try_from_kv(key: ListValue, value: ListValue) -> Result<Self, String> {
254 if key.len() != value.len() {
255 return Err("map keys and values have different length".to_owned());
256 }
257 let unique_keys: HashSet<_> = key.iter().unique().collect();
258 if unique_keys.len() != key.len() {
259 return Err("map keys must be unique".to_owned());
260 }
261 if unique_keys.contains(&None) {
262 return Err("map keys must not be NULL".to_owned());
263 }
264
265 let len = key.len();
266 let key_type = key.data_type();
267 let value_type = value.data_type();
268 let struct_array = StructArray::new(
269 MapType::struct_type_for_map(key_type, value_type),
270 vec![key.into_array().into_ref(), value.into_array().into_ref()],
271 Bitmap::ones(len),
272 );
273 Ok(MapValue(ListValue::new(struct_array.into())))
274 }
275
276 pub fn concat(m1: MapRef<'_>, m2: MapRef<'_>) -> Self {
279 debug_assert_eq!(m1.inner().data_type(), m2.inner().data_type());
280 let m2_keys = m2.keys();
281 let l = ListValue::from_datum_iter(
282 &m1.inner().data_type(),
283 m1.iter_struct()
284 .filter(|s| !m2_keys.contains(&s.field_at(0).expect("map key is not null")))
285 .chain(m2.iter_struct())
286 .map(|s| Some(ScalarRefImpl::Struct(s))),
287 );
288 Self::from_entries(l)
289 }
290
291 pub fn insert(m: MapRef<'_>, key: ScalarImpl, value: Datum) -> Self {
292 let l = ListValue::from_datum_iter(
293 &m.inner().data_type(),
294 m.iter_struct()
295 .filter(|s| {
296 key.as_scalar_ref_impl() != s.field_at(0).expect("map key is not null")
297 })
298 .chain(std::iter::once(
299 StructValue::new(vec![Some(key.clone()), value]).as_scalar_ref(),
300 ))
301 .map(|s| Some(ScalarRefImpl::Struct(s))),
302 );
303 Self::from_entries(l)
304 }
305
306 pub fn delete(m: MapRef<'_>, key: ScalarRefImpl<'_>) -> Self {
307 let l = ListValue::from_datum_iter(
308 &m.inner().data_type(),
309 m.iter_struct()
310 .filter(|s| key != s.field_at(0).expect("map key is not null"))
311 .map(|s| Some(ScalarRefImpl::Struct(s))),
312 );
313 Self::from_entries(l)
314 }
315 }
316
317 impl<'a> MapRef<'a> {
318 pub unsafe fn new_unchecked(list: ListRef<'a>) -> Self {
321 MapRef(list)
322 }
323
324 pub fn inner(&self) -> &ListRef<'a> {
325 &self.0
326 }
327
328 pub fn into_inner(self) -> ListRef<'a> {
329 self.0
330 }
331
332 pub fn into_kv(self) -> (ListRef<'a>, ListRef<'a>) {
333 self.0.as_map_kv()
334 }
335
336 pub fn keys(&self) -> HashSet<ScalarRefImpl<'_>> {
337 self.iter().map(|(k, _v)| k).collect()
338 }
339
340 pub fn to_owned(self) -> MapValue {
341 MapValue(self.0.to_owned())
342 }
343
344 pub fn len(&self) -> usize {
345 self.0.len()
346 }
347
348 pub fn is_empty(&self) -> bool {
349 self.0.is_empty()
350 }
351 }
352
353 impl Scalar for MapValue {
354 type ScalarRefType<'a> = MapRef<'a>;
355
356 fn as_scalar_ref(&self) -> MapRef<'_> {
357 MapRef(self.0.as_scalar_ref())
359 }
360 }
361
362 impl<'a> ScalarRef<'a> for MapRef<'a> {
363 type ScalarType = MapValue;
364
365 fn to_owned_scalar(&self) -> MapValue {
366 MapValue(self.0.to_owned_scalar())
368 }
369
370 fn hash_scalar<H: std::hash::Hasher>(&self, state: &mut H) {
371 for (k, v) in self.iter_sorted() {
372 super::super::hash_datum(Some(k), state);
373 super::super::hash_datum(v, state);
374 }
375 }
376 }
377}
378
379mod cmp {
381 use super::*;
382 use crate::array::DefaultOrd;
383 impl PartialEq for MapArray {
384 fn eq(&self, other: &Self) -> bool {
385 self.iter().eq(other.iter())
386 }
387 }
388
389 impl PartialEq for MapValue {
390 fn eq(&self, other: &Self) -> bool {
391 self.as_scalar_ref().eq(&other.as_scalar_ref())
392 }
393 }
394
395 impl PartialOrd for MapValue {
396 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
397 Some(self.cmp(other))
398 }
399 }
400
401 impl Ord for MapValue {
402 fn cmp(&self, other: &Self) -> Ordering {
403 self.as_scalar_ref().cmp(&other.as_scalar_ref())
404 }
405 }
406
407 impl PartialEq for MapRef<'_> {
408 fn eq(&self, other: &Self) -> bool {
409 self.iter_sorted().eq(other.iter_sorted())
410 }
411 }
412
413 impl Ord for MapRef<'_> {
414 fn cmp(&self, other: &Self) -> Ordering {
415 self.iter_sorted()
416 .cmp_by(other.iter_sorted(), |(k1, v1), (k2, v2)| {
417 k1.default_cmp(&k2).then_with(|| v1.default_cmp(&v2))
418 })
419 }
420 }
421
422 impl PartialOrd for MapRef<'_> {
423 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
424 Some(self.cmp(other))
425 }
426 }
427}
428
429impl Debug for MapValue {
430 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
431 self.as_scalar_ref().fmt(f)
432 }
433}
434
435impl Display for MapValue {
436 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
437 self.as_scalar_ref().write(f)
438 }
439}
440
441impl<'a> MapRef<'a> {
442 pub fn iter(
444 self,
445 ) -> impl DoubleEndedIterator + ExactSizeIterator<Item = (ScalarRefImpl<'a>, DatumRef<'a>)> + 'a
446 {
447 self.inner().iter().map(|list_elem| {
448 let list_elem = list_elem.expect("the list element in map should not be null");
449 let struct_ = list_elem.into_struct();
450 let (k, v) = struct_
451 .iter_fields_ref()
452 .next_tuple()
453 .expect("the struct in map should have exactly 2 fields");
454 (k.expect("map key should not be null"), v)
455 })
456 }
457
458 pub fn iter_struct(
459 self,
460 ) -> impl DoubleEndedIterator + ExactSizeIterator<Item = StructRef<'a>> + 'a {
461 self.inner().iter().map(|list_elem| {
462 let list_elem = list_elem.expect("the list element in map should not be null");
463 list_elem.into_struct()
464 })
465 }
466
467 pub fn iter_sorted(
468 self,
469 ) -> impl DoubleEndedIterator + ExactSizeIterator<Item = (ScalarRefImpl<'a>, DatumRef<'a>)> + 'a
470 {
471 self.iter().sorted_by_key(|(k, _v)| DefaultOrdered(*k))
472 }
473
474 pub fn memcmp_serialize(
478 self,
479 serializer: &mut memcomparable::Serializer<impl BufMut>,
480 ) -> memcomparable::Result<()> {
481 let mut inner_serializer = memcomparable::Serializer::new(vec![]);
482 for (k, v) in self.iter_sorted() {
483 memcmp_encoding::serialize_datum_in_composite(Some(k), &mut inner_serializer)?;
484 memcmp_encoding::serialize_datum_in_composite(v, &mut inner_serializer)?;
485 }
486 serializer.serialize_bytes(&inner_serializer.into_inner())
487 }
488}
489
490impl MapValue {
491 pub fn memcmp_deserialize(
495 datatype: &MapType,
496 deserializer: &mut memcomparable::Deserializer<impl Buf>,
497 ) -> memcomparable::Result<Self> {
498 let list = ListValue::memcmp_deserialize(&datatype.clone().into_struct(), deserializer)?;
499 Ok(Self::from_entries(list))
500 }
501}
502
503impl Debug for MapRef<'_> {
504 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
505 f.debug_list().entries(self.inner().iter()).finish()
506 }
507}
508
509impl ToText for MapRef<'_> {
510 fn write<W: std::fmt::Write>(&self, f: &mut W) -> std::fmt::Result {
511 write!(
513 f,
514 "{{{}}}",
515 self.iter().format_with(",", |(key, value), f| {
516 let key = key.to_text();
517 let value = value.to_text();
518 f(&format_args!("{}:{}", key, value))
520 })
521 )
522 }
523
524 fn write_with_type<W: std::fmt::Write>(&self, ty: &DataType, f: &mut W) -> std::fmt::Result {
525 match ty {
526 DataType::Map { .. } => self.write(f),
527 _ => unreachable!(),
528 }
529 }
530}
531
532impl MapValue {
533 pub fn from_str_for_test(s: &str, data_type: &MapType) -> Result<Self, BoxedError> {
534 if !s.starts_with('{') {
539 return Err(format!("Missing left parenthesis: {}", s).into());
540 }
541 if !s.ends_with('}') {
542 return Err(format!("Missing right parenthesis: {}", s).into());
543 }
544 let mut key_builder = data_type.key().create_array_builder(100);
545 let mut value_builder = data_type.value().create_array_builder(100);
546 for kv in s[1..s.len() - 1].split(',') {
547 let (k, v) = kv.split_once(':').ok_or("Invalid map format")?;
548 key_builder.append(Some(ScalarImpl::from_text(k, data_type.key())?));
549 if v == "NULL" {
550 value_builder.append_null();
551 } else {
552 value_builder.append(Some(ScalarImpl::from_text(v, data_type.value())?));
553 }
554 }
555 let key_array = key_builder.finish();
556 let value_array = value_builder.finish();
557
558 Ok(MapValue::try_from_kv(
559 ListValue::new(key_array),
560 ListValue::new(value_array),
561 )?)
562 }
563}