risingwave_common/array/
map_array.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::fmt::{self, Debug, Display};
17
18use bytes::{Buf, BufMut};
19use itertools::Itertools;
20use risingwave_common_estimate_size::EstimateSize;
21use risingwave_error::BoxedError;
22use risingwave_pb::data::{PbArray, PbArrayType};
23use serde::Serializer;
24
25use super::{
26    Array, ArrayBuilder, ArrayImpl, ArrayResult, DatumRef, DefaultOrdered, ListArray,
27    ListArrayBuilder, ListRef, ListValue, MapType, ScalarImpl, ScalarRef, ScalarRefImpl,
28    StructArray, StructRef,
29};
30use crate::bitmap::Bitmap;
31use crate::types::{DataType, Scalar, ToText};
32use crate::util::memcmp_encoding;
33
34#[derive(Debug, Clone, EstimateSize)]
35pub struct MapArrayBuilder {
36    inner: ListArrayBuilder,
37}
38
39impl ArrayBuilder for MapArrayBuilder {
40    type ArrayType = MapArray;
41
42    #[cfg(not(test))]
43    fn new(_capacity: usize) -> Self {
44        panic!("please use `MapArrayBuilder::with_type` instead");
45    }
46
47    #[cfg(test)]
48    fn new(capacity: usize) -> Self {
49        Self::with_type(
50            capacity,
51            DataType::Map(MapType::from_kv(DataType::Varchar, DataType::Varchar)),
52        )
53    }
54
55    fn with_type(capacity: usize, ty: DataType) -> Self {
56        let inner = ListArrayBuilder::with_type(capacity, ty.into_map().into_list());
57        Self { inner }
58    }
59
60    fn append_n(&mut self, n: usize, value: Option<MapRef<'_>>) {
61        self.inner.append_n(n, value.map(|v| v.into_inner()));
62    }
63
64    fn append_array(&mut self, other: &MapArray) {
65        self.inner.append_array(&other.inner);
66    }
67
68    fn pop(&mut self) -> Option<()> {
69        self.inner.pop()
70    }
71
72    fn len(&self) -> usize {
73        self.inner.len()
74    }
75
76    fn finish(self) -> MapArray {
77        let inner = self.inner.finish();
78        MapArray { inner }
79    }
80}
81
82/// `MapArray` is physically just a `List<Struct<key: K, value: V>>` array, but with some additional restrictions.
83///
84/// Type:
85/// - `key`'s datatype can only be string & integral types. (See [`MapType::check_key_type_valid`].)
86/// - `value` can be any type.
87///
88/// Value (for each map value in the array):
89/// - `key`s are non-null and unique.
90///
91/// - `key`s and `value`s must be of the same length.
92///   For a `MapArray`, it's sliced by the `ListArray`'s offsets, so it essentially means the
93///   `key` and `value` children arrays have the same length.
94///
95/// - The lists are NOT sorted by `key`.
96///
97/// - `Eq` / `Hash` / `Ord` for map:
98///
99///   It's controversial due to the physicial representation is just an unordered list.
100///   In many systems (e.g., `DuckDB` and `ClickHouse`), `{"k1":"v1","k2":"v2"} != {"k2":"v2","k1":"v1"}`.
101///   But the reverse definition might be more intuitive, especially when ingesting Avro/Protobuf data.
102///
103///   To avoid controversy, we wanted to ban all usages and make the implementation `unreachable!()`,
104///   but it's hard since these implementations can be used in different places:
105///   * Explicit in User-facing functions (e.g., comparison operators). These could be avoided completely.
106///   * Implicit in Keys (group by / order by / primary key). These could also be banned, but it's harder.
107///   * Some internal usages. One example is `_row_id`. See <https://github.com/risingwavelabs/risingwave/issues/7981#issuecomment-2257661749>.
108///     It might be solvable, but we are not sure whether it's depended somewhere else.
109///
110///   Considering these, it might be better to still choose a _well-defined_ behavior instead
111///   of using `unreachable`. We should try to have a consistent definition for these operations to minimize possible surprises.
112///   And we could still try our best to ban it to prevent misuse.
113///
114///   Currently we choose the second behavior. i.e., first sort the map by key, then compare/hash.
115///   Note that `Eq` is intuitive, but `Ord` still looks strange. We assume no users really care about
116///   which map is larger, but just provide a implementation to prevent undefined behavior.
117///
118///   See more discussion in <https://github.com/risingwavelabs/risingwave/issues/7981>.
119///
120///
121/// Note that decisions above are not definitive. Just be conservative at the beginning.
122#[derive(Debug, Clone, Eq)]
123pub struct MapArray {
124    pub(super) inner: ListArray,
125}
126
127impl EstimateSize for MapArray {
128    fn estimated_heap_size(&self) -> usize {
129        self.inner.estimated_heap_size()
130    }
131}
132
133impl Array for MapArray {
134    type Builder = MapArrayBuilder;
135    type OwnedItem = MapValue;
136    type RefItem<'a> = MapRef<'a>;
137
138    unsafe fn raw_value_at_unchecked(&self, idx: usize) -> Self::RefItem<'_> {
139        unsafe {
140            let list = self.inner.raw_value_at_unchecked(idx);
141            MapRef::new_unchecked(list)
142        }
143    }
144
145    fn len(&self) -> usize {
146        self.inner.len()
147    }
148
149    fn to_protobuf(&self) -> PbArray {
150        let mut array = self.inner.to_protobuf();
151        array.array_type = PbArrayType::Map as i32;
152        array
153    }
154
155    fn null_bitmap(&self) -> &Bitmap {
156        self.inner.null_bitmap()
157    }
158
159    fn into_null_bitmap(self) -> Bitmap {
160        self.inner.into_null_bitmap()
161    }
162
163    fn set_bitmap(&mut self, bitmap: Bitmap) {
164        self.inner.set_bitmap(bitmap)
165    }
166
167    fn data_type(&self) -> DataType {
168        let list_value_type = self.inner.values().data_type();
169        DataType::Map(MapType::from_entries(list_value_type))
170    }
171}
172
173impl MapArray {
174    pub fn from_protobuf(array: &PbArray) -> ArrayResult<ArrayImpl> {
175        let inner = ListArray::from_protobuf(array)?.into_list();
176        Ok(Self { inner }.into())
177    }
178
179    /// Return the inner struct array of the list array.
180    pub fn as_struct(&self) -> &StructArray {
181        self.inner.values().as_struct()
182    }
183
184    /// Returns the offsets of this map.
185    pub fn offsets(&self) -> &[u32] {
186        self.inner.offsets()
187    }
188}
189
190pub use scalar::{MapRef, MapValue};
191
192/// We can enforce the invariants (see [`MapArray`]) in too many places
193/// (both `MapValue`, `MapRef` and `MapArray`).
194///
195/// So we define the types and constructors in a separated `mod`
196/// to prevent direct construction.
197/// We only check the invariants in the constructors.
198/// After they are constructed, we assume the invariants holds.
199mod scalar {
200    use std::collections::HashSet;
201
202    use super::*;
203    use crate::array::{Datum, ScalarImpl, StructValue};
204
205    /// Refer to [`MapArray`] for the invariants of a map value.
206    #[derive(Clone, Eq, EstimateSize)]
207    pub struct MapValue(ListValue);
208
209    /// A map is just a slice of the underlying struct array.
210    ///
211    /// Refer to [`MapArray`] for the invariants of a map value.
212    ///
213    /// XXX: perhaps we can make it `MapRef<'a, 'b>(ListRef<'a>, ListRef<'b>);`.
214    /// Then we can build a map ref from 2 list refs without copying the data.
215    /// Currently it's impossible.
216    /// <https://github.com/risingwavelabs/risingwave/issues/17863>
217    #[derive(Copy, Clone, Eq)]
218    pub struct MapRef<'a>(ListRef<'a>);
219
220    impl MapValue {
221        pub fn inner(&self) -> &ListValue {
222            &self.0
223        }
224
225        pub fn into_inner(self) -> ListValue {
226            self.0
227        }
228
229        /// # Panics
230        /// Panics if [map invariants](`super::MapArray`) are violated.
231        pub fn from_entries(entries: ListValue) -> Self {
232            Self::try_from_entries(entries).unwrap()
233        }
234
235        /// Returns error if [map invariants](`super::MapArray`) are violated.
236        pub fn try_from_entries(entries: ListValue) -> Result<Self, String> {
237            // validates list type is valid
238            let _ = MapType::try_from_entries(entries.elem_type())?;
239            let mut keys = HashSet::with_capacity(entries.len());
240            let struct_array = entries.into_array();
241            for key in struct_array.as_struct().field_at(0).iter() {
242                let Some(key) = key else {
243                    return Err("map keys must not be NULL".to_owned());
244                };
245                if !keys.insert(key) {
246                    return Err("map keys must be unique".to_owned());
247                }
248            }
249            Ok(MapValue(ListValue::new(struct_array)))
250        }
251
252        /// Returns error if [map invariants](`super::MapArray`) are violated.
253        pub fn try_from_kv(keys: ListValue, values: ListValue) -> Result<Self, String> {
254            if keys.len() != values.len() {
255                return Err("map keys and values have different length".to_owned());
256            }
257            let unique_keys: HashSet<_> = keys.iter().unique().collect();
258            if unique_keys.len() != keys.len() {
259                return Err("map keys must be unique".to_owned());
260            }
261            if unique_keys.contains(&None) {
262                return Err("map keys must not be NULL".to_owned());
263            }
264
265            let len = keys.len();
266            let key_type = keys.elem_type();
267            let value_type = values.elem_type();
268            let struct_array = StructArray::new(
269                MapType::struct_type_for_map(key_type, value_type),
270                vec![keys.into_array().into_ref(), values.into_array().into_ref()],
271                Bitmap::ones(len),
272            );
273            Ok(MapValue(ListValue::new(struct_array.into())))
274        }
275
276        /// # Panics
277        /// Panics if `m1` and `m2` have different types.
278        pub fn concat(m1: MapRef<'_>, m2: MapRef<'_>) -> Self {
279            debug_assert_eq!(m1.inner().elem_type(), m2.inner().elem_type());
280            let m2_keys = m2.keys();
281            let l = ListValue::from_datum_iter(
282                &m1.inner().elem_type(),
283                m1.iter_struct()
284                    .filter(|s| !m2_keys.contains(&s.field_at(0).expect("map key is not null")))
285                    .chain(m2.iter_struct())
286                    .map(|s| Some(ScalarRefImpl::Struct(s))),
287            );
288            Self::from_entries(l)
289        }
290
291        pub fn insert(m: MapRef<'_>, key: ScalarImpl, value: Datum) -> Self {
292            let l = ListValue::from_datum_iter(
293                &m.inner().elem_type(),
294                m.iter_struct()
295                    .filter(|s| {
296                        key.as_scalar_ref_impl() != s.field_at(0).expect("map key is not null")
297                    })
298                    .chain(std::iter::once(
299                        StructValue::new(vec![Some(key.clone()), value]).as_scalar_ref(),
300                    ))
301                    .map(|s| Some(ScalarRefImpl::Struct(s))),
302            );
303            Self::from_entries(l)
304        }
305
306        pub fn delete(m: MapRef<'_>, key: ScalarRefImpl<'_>) -> Self {
307            let l = ListValue::from_datum_iter(
308                &m.inner().elem_type(),
309                m.iter_struct()
310                    .filter(|s| key != s.field_at(0).expect("map key is not null"))
311                    .map(|s| Some(ScalarRefImpl::Struct(s))),
312            );
313            Self::from_entries(l)
314        }
315    }
316
317    impl<'a> MapRef<'a> {
318        /// # Safety
319        /// The caller must ensure the invariants of a map value.
320        pub unsafe fn new_unchecked(list: ListRef<'a>) -> Self {
321            MapRef(list)
322        }
323
324        pub fn inner(&self) -> &ListRef<'a> {
325            &self.0
326        }
327
328        pub fn into_inner(self) -> ListRef<'a> {
329            self.0
330        }
331
332        pub fn into_kv(self) -> (ListRef<'a>, ListRef<'a>) {
333            self.0.as_map_kv()
334        }
335
336        pub fn keys(&self) -> HashSet<ScalarRefImpl<'_>> {
337            self.iter().map(|(k, _v)| k).collect()
338        }
339
340        pub fn len(&self) -> usize {
341            self.0.len()
342        }
343
344        pub fn is_empty(&self) -> bool {
345            self.0.is_empty()
346        }
347    }
348
349    impl Scalar for MapValue {
350        type ScalarRefType<'a> = MapRef<'a>;
351
352        fn as_scalar_ref(&self) -> MapRef<'_> {
353            // MapValue is assumed to be valid, so we just construct directly without check invariants.
354            MapRef(self.0.as_scalar_ref())
355        }
356    }
357
358    impl<'a> ScalarRef<'a> for MapRef<'a> {
359        type ScalarType = MapValue;
360
361        fn to_owned_scalar(&self) -> MapValue {
362            // MapRef is assumed to be valid, so we just construct directly without check invariants.
363            MapValue(self.0.to_owned_scalar())
364        }
365
366        fn hash_scalar<H: std::hash::Hasher>(&self, state: &mut H) {
367            for (k, v) in self.iter_sorted() {
368                super::super::hash_datum(Some(k), state);
369                super::super::hash_datum(v, state);
370            }
371        }
372    }
373}
374
375/// Refer to [`MapArray`] for the semantics of the comparison.
376mod cmp {
377    use super::*;
378    use crate::array::DefaultOrd;
379    impl PartialEq for MapArray {
380        fn eq(&self, other: &Self) -> bool {
381            self.iter().eq(other.iter())
382        }
383    }
384
385    impl PartialEq for MapValue {
386        fn eq(&self, other: &Self) -> bool {
387            self.as_scalar_ref().eq(&other.as_scalar_ref())
388        }
389    }
390
391    impl PartialOrd for MapValue {
392        fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
393            Some(self.cmp(other))
394        }
395    }
396
397    impl Ord for MapValue {
398        fn cmp(&self, other: &Self) -> Ordering {
399            self.as_scalar_ref().cmp(&other.as_scalar_ref())
400        }
401    }
402
403    impl PartialEq for MapRef<'_> {
404        fn eq(&self, other: &Self) -> bool {
405            self.iter_sorted().eq(other.iter_sorted())
406        }
407    }
408
409    impl Ord for MapRef<'_> {
410        fn cmp(&self, other: &Self) -> Ordering {
411            self.iter_sorted()
412                .cmp_by(other.iter_sorted(), |(k1, v1), (k2, v2)| {
413                    k1.default_cmp(&k2).then_with(|| v1.default_cmp(&v2))
414                })
415        }
416    }
417
418    impl PartialOrd for MapRef<'_> {
419        fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
420            Some(self.cmp(other))
421        }
422    }
423}
424
425impl Debug for MapValue {
426    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
427        self.as_scalar_ref().fmt(f)
428    }
429}
430
431impl Display for MapValue {
432    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
433        self.as_scalar_ref().write(f)
434    }
435}
436
437impl<'a> MapRef<'a> {
438    /// Iterates over the elements of the map.
439    pub fn iter(
440        self,
441    ) -> impl DoubleEndedIterator + ExactSizeIterator<Item = (ScalarRefImpl<'a>, DatumRef<'a>)> + 'a
442    {
443        self.inner().iter().map(|list_elem| {
444            let list_elem = list_elem.expect("the list element in map should not be null");
445            let struct_ = list_elem.into_struct();
446            let (k, v) = struct_
447                .iter_fields_ref()
448                .next_tuple()
449                .expect("the struct in map should have exactly 2 fields");
450            (k.expect("map key should not be null"), v)
451        })
452    }
453
454    pub fn iter_struct(
455        self,
456    ) -> impl DoubleEndedIterator + ExactSizeIterator<Item = StructRef<'a>> + 'a {
457        self.inner().iter().map(|list_elem| {
458            let list_elem = list_elem.expect("the list element in map should not be null");
459            list_elem.into_struct()
460        })
461    }
462
463    pub fn iter_sorted(
464        self,
465    ) -> impl DoubleEndedIterator + ExactSizeIterator<Item = (ScalarRefImpl<'a>, DatumRef<'a>)> + 'a
466    {
467        self.iter().sorted_by_key(|(k, _v)| DefaultOrdered(*k))
468    }
469
470    /// Note: Map should not be used as key. But we don't want to panic.
471    /// See [`MapArray`] for the semantics. See also the `Ord` implementation.
472    /// TODO: ban it in fe <https://github.com/risingwavelabs/risingwave/issues/7981>
473    pub fn memcmp_serialize(
474        self,
475        serializer: &mut memcomparable::Serializer<impl BufMut>,
476    ) -> memcomparable::Result<()> {
477        let mut inner_serializer = memcomparable::Serializer::new(vec![]);
478        for (k, v) in self.iter_sorted() {
479            memcmp_encoding::serialize_datum_in_composite(Some(k), &mut inner_serializer)?;
480            memcmp_encoding::serialize_datum_in_composite(v, &mut inner_serializer)?;
481        }
482        serializer.serialize_bytes(&inner_serializer.into_inner())
483    }
484}
485
486impl MapValue {
487    /// Note: Map should not be used as key. But we don't want to panic.
488    /// See [`MapArray`] for the semantics. See also the `Ord` implementation.
489    /// TODO: ban it in fe <https://github.com/risingwavelabs/risingwave/issues/7981>
490    pub fn memcmp_deserialize(
491        datatype: &MapType,
492        deserializer: &mut memcomparable::Deserializer<impl Buf>,
493    ) -> memcomparable::Result<Self> {
494        let list = ListValue::memcmp_deserialize(&datatype.clone().into_list_type(), deserializer)?;
495        Ok(Self::from_entries(list))
496    }
497}
498
499impl Debug for MapRef<'_> {
500    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
501        f.debug_list().entries(self.inner().iter()).finish()
502    }
503}
504
505impl ToText for MapRef<'_> {
506    fn write<W: std::fmt::Write>(&self, f: &mut W) -> std::fmt::Result {
507        // Note: This is arbitrarily decided...
508        write!(
509            f,
510            "{{{}}}",
511            self.iter().format_with(",", |(key, value), f| {
512                let key = key.to_text();
513                let value = value.to_text();
514                // TODO: consider quote like list and struct
515                f(&format_args!("{}:{}", key, value))
516            })
517        )
518    }
519
520    fn write_with_type<W: std::fmt::Write>(&self, ty: &DataType, f: &mut W) -> std::fmt::Result {
521        match ty {
522            DataType::Map { .. } => self.write(f),
523            _ => unreachable!(),
524        }
525    }
526}
527
528impl MapValue {
529    pub fn from_str_for_test(s: &str, data_type: &MapType) -> Result<Self, BoxedError> {
530        // TODO: this is a quick trivial implementation. Implement the full version later.
531
532        // example: {1:1,2:NULL,3:3}
533
534        if !s.starts_with('{') {
535            return Err(format!("Missing left parenthesis: {}", s).into());
536        }
537        if !s.ends_with('}') {
538            return Err(format!("Missing right parenthesis: {}", s).into());
539        }
540        let mut key_builder = data_type.key().create_array_builder(100);
541        let mut value_builder = data_type.value().create_array_builder(100);
542        for kv in s[1..s.len() - 1].split(',') {
543            let (k, v) = kv.split_once(':').ok_or("Invalid map format")?;
544            key_builder.append(Some(ScalarImpl::from_text(k, data_type.key())?));
545            if v == "NULL" {
546                value_builder.append_null();
547            } else {
548                value_builder.append(Some(ScalarImpl::from_text(v, data_type.value())?));
549            }
550        }
551        let key_array = key_builder.finish();
552        let value_array = value_builder.finish();
553
554        Ok(MapValue::try_from_kv(
555            ListValue::new(key_array),
556            ListValue::new(value_array),
557        )?)
558    }
559}