risingwave_common/array/
map_array.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::fmt::{self, Debug, Display};
17
18use bytes::{Buf, BufMut};
19use itertools::Itertools;
20use risingwave_common_estimate_size::EstimateSize;
21use risingwave_error::BoxedError;
22use risingwave_pb::data::{PbArray, PbArrayType};
23use serde::Serializer;
24
25use super::{
26    Array, ArrayBuilder, ArrayImpl, ArrayResult, DatumRef, DefaultOrdered, ListArray,
27    ListArrayBuilder, ListRef, ListValue, MapType, ScalarImpl, ScalarRef, ScalarRefImpl,
28    StructArray, StructRef,
29};
30use crate::bitmap::Bitmap;
31use crate::types::{DataType, Scalar, ToText};
32use crate::util::memcmp_encoding;
33
34#[derive(Debug, Clone, EstimateSize)]
35pub struct MapArrayBuilder {
36    inner: ListArrayBuilder,
37}
38
39impl ArrayBuilder for MapArrayBuilder {
40    type ArrayType = MapArray;
41
42    #[cfg(not(test))]
43    fn new(_capacity: usize) -> Self {
44        panic!("please use `MapArrayBuilder::with_type` instead");
45    }
46
47    #[cfg(test)]
48    fn new(capacity: usize) -> Self {
49        Self::with_type(
50            capacity,
51            DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Varchar)),
52        )
53    }
54
55    fn with_type(capacity: usize, ty: DataType) -> Self {
56        let inner = ListArrayBuilder::with_type(capacity, ty.into_map().into_list());
57        Self { inner }
58    }
59
60    fn append_n(&mut self, n: usize, value: Option<MapRef<'_>>) {
61        self.inner.append_n(n, value.map(|v| v.into_inner()));
62    }
63
64    fn append_array(&mut self, other: &MapArray) {
65        self.inner.append_array(&other.inner);
66    }
67
68    fn pop(&mut self) -> Option<()> {
69        self.inner.pop()
70    }
71
72    fn len(&self) -> usize {
73        self.inner.len()
74    }
75
76    fn finish(self) -> MapArray {
77        let inner = self.inner.finish();
78        MapArray { inner }
79    }
80}
81
82/// `MapArray` is physically just a `List<Struct<key: K, value: V>>` array, but with some additional restrictions.
83///
84/// Type:
85/// - `key`'s datatype can only be string & integral types. (See [`MapType::check_key_type_valid`].)
86/// - `value` can be any type.
87///
88/// Value (for each map value in the array):
89/// - `key`s are non-null and unique.
90///
91/// - `key`s and `value`s must be of the same length.
92///   For a `MapArray`, it's sliced by the `ListArray`'s offsets, so it essentially means the
93///   `key` and `value` children arrays have the same length.
94///
95/// - The lists are NOT sorted by `key`.
96///
97/// - `Eq` / `Hash` / `Ord` for map:
98///
99///   It's controversial due to the physicial representation is just an unordered list.
100///   In many systems (e.g., `DuckDB` and `ClickHouse`), `{"k1":"v1","k2":"v2"} != {"k2":"v2","k1":"v1"}`.
101///   But the reverse definition might be more intuitive, especially when ingesting Avro/Protobuf data.
102///
103///   To avoid controversy, we wanted to ban all usages and make the implementation `unreachable!()`,
104///   but it's hard since these implementations can be used in different places:
105///   * Explicit in User-facing functions (e.g., comparison operators). These could be avoided completely.
106///   * Implicit in Keys (group by / order by / primary key). These could also be banned, but it's harder.
107///   * Some internal usages. One example is `_row_id`. See <https://github.com/risingwavelabs/risingwave/issues/7981#issuecomment-2257661749>.
108///     It might be solvable, but we are not sure whether it's depended somewhere else.
109///
110///   Considering these, it might be better to still choose a _well-defined_ behavior instead
111///   of using `unreachable`. We should try to have a consistent definition for these operations to minimize possible surprises.
112///   And we could still try our best to ban it to prevent misuse.
113///
114///   Currently we choose the second behavior. i.e., first sort the map by key, then compare/hash.
115///   Note that `Eq` is intuitive, but `Ord` still looks strange. We assume no users really care about
116///   which map is larger, but just provide a implementation to prevent undefined behavior.
117///
118///   See more discussion in <https://github.com/risingwavelabs/risingwave/issues/7981>.
119///
120///
121/// Note that decisions above are not definitive. Just be conservative at the beginning.
122#[derive(Debug, Clone, Eq)]
123pub struct MapArray {
124    pub(super) inner: ListArray,
125}
126
127impl EstimateSize for MapArray {
128    fn estimated_heap_size(&self) -> usize {
129        self.inner.estimated_heap_size()
130    }
131}
132
133impl Array for MapArray {
134    type Builder = MapArrayBuilder;
135    type OwnedItem = MapValue;
136    type RefItem<'a> = MapRef<'a>;
137
138    unsafe fn raw_value_at_unchecked(&self, idx: usize) -> Self::RefItem<'_> {
139        unsafe {
140            let list = self.inner.raw_value_at_unchecked(idx);
141            MapRef::new_unchecked(list)
142        }
143    }
144
145    fn len(&self) -> usize {
146        self.inner.len()
147    }
148
149    fn to_protobuf(&self) -> PbArray {
150        let mut array = self.inner.to_protobuf();
151        array.array_type = PbArrayType::Map as i32;
152        array
153    }
154
155    fn null_bitmap(&self) -> &Bitmap {
156        self.inner.null_bitmap()
157    }
158
159    fn into_null_bitmap(self) -> Bitmap {
160        self.inner.into_null_bitmap()
161    }
162
163    fn set_bitmap(&mut self, bitmap: Bitmap) {
164        self.inner.set_bitmap(bitmap)
165    }
166
167    fn data_type(&self) -> DataType {
168        let list_value_type = self.inner.values().data_type();
169        DataType::Map(MapType::from_entries(list_value_type))
170    }
171}
172
173impl MapArray {
174    pub fn from_protobuf(array: &PbArray) -> ArrayResult<ArrayImpl> {
175        let inner = ListArray::from_protobuf(array)?.into_list();
176        Ok(Self { inner }.into())
177    }
178
179    /// Return the inner struct array of the list array.
180    pub fn as_struct(&self) -> &StructArray {
181        self.inner.values().as_struct()
182    }
183
184    /// Returns the offsets of this map.
185    pub fn offsets(&self) -> &[u32] {
186        self.inner.offsets()
187    }
188}
189
190pub use scalar::{MapRef, MapValue};
191
192/// We can enforce the invariants (see [`MapArray`]) in too many places
193/// (both `MapValue`, `MapRef` and `MapArray`).
194///
195/// So we define the types and constructors in a separated `mod`
196/// to prevent direct construction.
197/// We only check the invariants in the constructors.
198/// After they are constructed, we assume the invariants holds.
199mod scalar {
200    use std::collections::HashSet;
201
202    use super::*;
203    use crate::array::{Datum, ScalarImpl, StructValue};
204
205    /// Refer to [`MapArray`] for the invariants of a map value.
206    #[derive(Clone, Eq, EstimateSize)]
207    pub struct MapValue(ListValue);
208
209    /// A map is just a slice of the underlying struct array.
210    ///
211    /// Refer to [`MapArray`] for the invariants of a map value.
212    ///
213    /// XXX: perhaps we can make it `MapRef<'a, 'b>(ListRef<'a>, ListRef<'b>);`.
214    /// Then we can build a map ref from 2 list refs without copying the data.
215    /// Currently it's impossible.
216    /// <https://github.com/risingwavelabs/risingwave/issues/17863>
217    #[derive(Copy, Clone, Eq)]
218    pub struct MapRef<'a>(ListRef<'a>);
219
220    impl MapValue {
221        pub fn inner(&self) -> &ListValue {
222            &self.0
223        }
224
225        pub fn into_inner(self) -> ListValue {
226            self.0
227        }
228
229        /// # Panics
230        /// Panics if [map invariants](`super::MapArray`) are violated.
231        pub fn from_entries(entries: ListValue) -> Self {
232            Self::try_from_entries(entries).unwrap()
233        }
234
235        /// Returns error if [map invariants](`super::MapArray`) are violated.
236        pub fn try_from_entries(entries: ListValue) -> Result<Self, String> {
237            // validates list type is valid
238            let _ = MapType::try_from_entries(entries.data_type())?;
239            let mut keys = HashSet::with_capacity(entries.len());
240            let struct_array = entries.into_array();
241            for key in struct_array.as_struct().field_at(0).iter() {
242                let Some(key) = key else {
243                    return Err("map keys must not be NULL".to_owned());
244                };
245                if !keys.insert(key) {
246                    return Err("map keys must be unique".to_owned());
247                }
248            }
249            Ok(MapValue(ListValue::new(struct_array)))
250        }
251
252        /// Returns error if [map invariants](`super::MapArray`) are violated.
253        pub fn try_from_kv(key: ListValue, value: ListValue) -> Result<Self, String> {
254            if key.len() != value.len() {
255                return Err("map keys and values have different length".to_owned());
256            }
257            let unique_keys: HashSet<_> = key.iter().unique().collect();
258            if unique_keys.len() != key.len() {
259                return Err("map keys must be unique".to_owned());
260            }
261            if unique_keys.contains(&None) {
262                return Err("map keys must not be NULL".to_owned());
263            }
264
265            let len = key.len();
266            let key_type = key.data_type();
267            let value_type = value.data_type();
268            let struct_array = StructArray::new(
269                MapType::struct_type_for_map(key_type, value_type),
270                vec![key.into_array().into_ref(), value.into_array().into_ref()],
271                Bitmap::ones(len),
272            );
273            Ok(MapValue(ListValue::new(struct_array.into())))
274        }
275
276        /// # Panics
277        /// Panics if `m1` and `m2` have different types.
278        pub fn concat(m1: MapRef<'_>, m2: MapRef<'_>) -> Self {
279            debug_assert_eq!(m1.inner().data_type(), m2.inner().data_type());
280            let m2_keys = m2.keys();
281            let l = ListValue::from_datum_iter(
282                &m1.inner().data_type(),
283                m1.iter_struct()
284                    .filter(|s| !m2_keys.contains(&s.field_at(0).expect("map key is not null")))
285                    .chain(m2.iter_struct())
286                    .map(|s| Some(ScalarRefImpl::Struct(s))),
287            );
288            Self::from_entries(l)
289        }
290
291        pub fn insert(m: MapRef<'_>, key: ScalarImpl, value: Datum) -> Self {
292            let l = ListValue::from_datum_iter(
293                &m.inner().data_type(),
294                m.iter_struct()
295                    .filter(|s| {
296                        key.as_scalar_ref_impl() != s.field_at(0).expect("map key is not null")
297                    })
298                    .chain(std::iter::once(
299                        StructValue::new(vec![Some(key.clone()), value]).as_scalar_ref(),
300                    ))
301                    .map(|s| Some(ScalarRefImpl::Struct(s))),
302            );
303            Self::from_entries(l)
304        }
305
306        pub fn delete(m: MapRef<'_>, key: ScalarRefImpl<'_>) -> Self {
307            let l = ListValue::from_datum_iter(
308                &m.inner().data_type(),
309                m.iter_struct()
310                    .filter(|s| key != s.field_at(0).expect("map key is not null"))
311                    .map(|s| Some(ScalarRefImpl::Struct(s))),
312            );
313            Self::from_entries(l)
314        }
315    }
316
317    impl<'a> MapRef<'a> {
318        /// # Safety
319        /// The caller must ensure the invariants of a map value.
320        pub unsafe fn new_unchecked(list: ListRef<'a>) -> Self {
321            MapRef(list)
322        }
323
324        pub fn inner(&self) -> &ListRef<'a> {
325            &self.0
326        }
327
328        pub fn into_inner(self) -> ListRef<'a> {
329            self.0
330        }
331
332        pub fn into_kv(self) -> (ListRef<'a>, ListRef<'a>) {
333            self.0.as_map_kv()
334        }
335
336        pub fn keys(&self) -> HashSet<ScalarRefImpl<'_>> {
337            self.iter().map(|(k, _v)| k).collect()
338        }
339
340        pub fn to_owned(self) -> MapValue {
341            MapValue(self.0.to_owned())
342        }
343
344        pub fn len(&self) -> usize {
345            self.0.len()
346        }
347
348        pub fn is_empty(&self) -> bool {
349            self.0.is_empty()
350        }
351    }
352
353    impl Scalar for MapValue {
354        type ScalarRefType<'a> = MapRef<'a>;
355
356        fn as_scalar_ref(&self) -> MapRef<'_> {
357            // MapValue is assumed to be valid, so we just construct directly without check invariants.
358            MapRef(self.0.as_scalar_ref())
359        }
360    }
361
362    impl<'a> ScalarRef<'a> for MapRef<'a> {
363        type ScalarType = MapValue;
364
365        fn to_owned_scalar(&self) -> MapValue {
366            // MapRef is assumed to be valid, so we just construct directly without check invariants.
367            MapValue(self.0.to_owned_scalar())
368        }
369
370        fn hash_scalar<H: std::hash::Hasher>(&self, state: &mut H) {
371            for (k, v) in self.iter_sorted() {
372                super::super::hash_datum(Some(k), state);
373                super::super::hash_datum(v, state);
374            }
375        }
376    }
377}
378
379/// Refer to [`MapArray`] for the semantics of the comparison.
380mod cmp {
381    use super::*;
382    use crate::array::DefaultOrd;
383    impl PartialEq for MapArray {
384        fn eq(&self, other: &Self) -> bool {
385            self.iter().eq(other.iter())
386        }
387    }
388
389    impl PartialEq for MapValue {
390        fn eq(&self, other: &Self) -> bool {
391            self.as_scalar_ref().eq(&other.as_scalar_ref())
392        }
393    }
394
395    impl PartialOrd for MapValue {
396        fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
397            Some(self.cmp(other))
398        }
399    }
400
401    impl Ord for MapValue {
402        fn cmp(&self, other: &Self) -> Ordering {
403            self.as_scalar_ref().cmp(&other.as_scalar_ref())
404        }
405    }
406
407    impl PartialEq for MapRef<'_> {
408        fn eq(&self, other: &Self) -> bool {
409            self.iter_sorted().eq(other.iter_sorted())
410        }
411    }
412
413    impl Ord for MapRef<'_> {
414        fn cmp(&self, other: &Self) -> Ordering {
415            self.iter_sorted()
416                .cmp_by(other.iter_sorted(), |(k1, v1), (k2, v2)| {
417                    k1.default_cmp(&k2).then_with(|| v1.default_cmp(&v2))
418                })
419        }
420    }
421
422    impl PartialOrd for MapRef<'_> {
423        fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
424            Some(self.cmp(other))
425        }
426    }
427}
428
429impl Debug for MapValue {
430    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
431        self.as_scalar_ref().fmt(f)
432    }
433}
434
435impl Display for MapValue {
436    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
437        self.as_scalar_ref().write(f)
438    }
439}
440
441impl<'a> MapRef<'a> {
442    /// Iterates over the elements of the map.
443    pub fn iter(
444        self,
445    ) -> impl DoubleEndedIterator + ExactSizeIterator<Item = (ScalarRefImpl<'a>, DatumRef<'a>)> + 'a
446    {
447        self.inner().iter().map(|list_elem| {
448            let list_elem = list_elem.expect("the list element in map should not be null");
449            let struct_ = list_elem.into_struct();
450            let (k, v) = struct_
451                .iter_fields_ref()
452                .next_tuple()
453                .expect("the struct in map should have exactly 2 fields");
454            (k.expect("map key should not be null"), v)
455        })
456    }
457
458    pub fn iter_struct(
459        self,
460    ) -> impl DoubleEndedIterator + ExactSizeIterator<Item = StructRef<'a>> + 'a {
461        self.inner().iter().map(|list_elem| {
462            let list_elem = list_elem.expect("the list element in map should not be null");
463            list_elem.into_struct()
464        })
465    }
466
467    pub fn iter_sorted(
468        self,
469    ) -> impl DoubleEndedIterator + ExactSizeIterator<Item = (ScalarRefImpl<'a>, DatumRef<'a>)> + 'a
470    {
471        self.iter().sorted_by_key(|(k, _v)| DefaultOrdered(*k))
472    }
473
474    /// Note: Map should not be used as key. But we don't want to panic.
475    /// See [`MapArray`] for the semantics. See also the `Ord` implementation.
476    /// TODO: ban it in fe <https://github.com/risingwavelabs/risingwave/issues/7981>
477    pub fn memcmp_serialize(
478        self,
479        serializer: &mut memcomparable::Serializer<impl BufMut>,
480    ) -> memcomparable::Result<()> {
481        let mut inner_serializer = memcomparable::Serializer::new(vec![]);
482        for (k, v) in self.iter_sorted() {
483            memcmp_encoding::serialize_datum_in_composite(Some(k), &mut inner_serializer)?;
484            memcmp_encoding::serialize_datum_in_composite(v, &mut inner_serializer)?;
485        }
486        serializer.serialize_bytes(&inner_serializer.into_inner())
487    }
488}
489
490impl MapValue {
491    /// Note: Map should not be used as key. But we don't want to panic.
492    /// See [`MapArray`] for the semantics. See also the `Ord` implementation.
493    /// TODO: ban it in fe <https://github.com/risingwavelabs/risingwave/issues/7981>
494    pub fn memcmp_deserialize(
495        datatype: &MapType,
496        deserializer: &mut memcomparable::Deserializer<impl Buf>,
497    ) -> memcomparable::Result<Self> {
498        let list = ListValue::memcmp_deserialize(&datatype.clone().into_struct(), deserializer)?;
499        Ok(Self::from_entries(list))
500    }
501}
502
503impl Debug for MapRef<'_> {
504    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
505        f.debug_list().entries(self.inner().iter()).finish()
506    }
507}
508
509impl ToText for MapRef<'_> {
510    fn write<W: std::fmt::Write>(&self, f: &mut W) -> std::fmt::Result {
511        // Note: This is arbitrarily decided...
512        write!(
513            f,
514            "{{{}}}",
515            self.iter().format_with(",", |(key, value), f| {
516                let key = key.to_text();
517                let value = value.to_text();
518                // TODO: consider quote like list and struct
519                f(&format_args!("{}:{}", key, value))
520            })
521        )
522    }
523
524    fn write_with_type<W: std::fmt::Write>(&self, ty: &DataType, f: &mut W) -> std::fmt::Result {
525        match ty {
526            DataType::Map { .. } => self.write(f),
527            _ => unreachable!(),
528        }
529    }
530}
531
532impl MapValue {
533    pub fn from_str_for_test(s: &str, data_type: &MapType) -> Result<Self, BoxedError> {
534        // TODO: this is a quick trivial implementation. Implement the full version later.
535
536        // example: {1:1,2:NULL,3:3}
537
538        if !s.starts_with('{') {
539            return Err(format!("Missing left parenthesis: {}", s).into());
540        }
541        if !s.ends_with('}') {
542            return Err(format!("Missing right parenthesis: {}", s).into());
543        }
544        let mut key_builder = data_type.key().create_array_builder(100);
545        let mut value_builder = data_type.value().create_array_builder(100);
546        for kv in s[1..s.len() - 1].split(',') {
547            let (k, v) = kv.split_once(':').ok_or("Invalid map format")?;
548            key_builder.append(Some(ScalarImpl::from_text(k, data_type.key())?));
549            if v == "NULL" {
550                value_builder.append_null();
551            } else {
552                value_builder.append(Some(ScalarImpl::from_text(v, data_type.value())?));
553            }
554        }
555        let key_array = key_builder.finish();
556        let value_array = value_builder.finish();
557
558        Ok(MapValue::try_from_kv(
559            ListValue::new(key_array),
560            ListValue::new(value_array),
561        )?)
562    }
563}