risingwave_common/
bitmap.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// Licensed to the Apache Software Foundation (ASF) under one
16// or more contributor license agreements.  See the NOTICE file
17// distributed with this work for additional information
18// regarding copyright ownership.  The ASF licenses this file
19// to you under the Apache License, Version 2.0 (the
20// "License"); you may not use this file except in compliance
21// with the License.  You may obtain a copy of the License at
22//
23//   http://www.apache.org/licenses/LICENSE-2.0
24//
25// Unless required by applicable law or agreed to in writing,
26// software distributed under the License is distributed on an
27// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
28// KIND, either express or implied.  See the License for the
29// specific language governing permissions and limitations
30// under the License.
31
32//! Defines a bitmap, which is used to track which values in an Arrow array are null.
33//! This is called a "validity bitmap" in the Arrow documentation.
34//! This file is adapted from [arrow-rs](https://github.com/apache/arrow-rs)
35
36// allow `zip` for performance reasons
37#![allow(clippy::disallowed_methods)]
38
39use std::iter::{self, TrustedLen};
40use std::ops::{BitAnd, BitAndAssign, BitOr, BitOrAssign, BitXor, Not, Range, RangeInclusive};
41
42use risingwave_common_estimate_size::EstimateSize;
43use risingwave_pb::common::PbBuffer;
44use risingwave_pb::common::buffer::CompressionType;
45use rw_iter_util::ZipEqFast;
46
47#[derive(Default, Debug, Clone, EstimateSize)]
48pub struct BitmapBuilder {
49    len: usize,
50    data: Vec<usize>,
51}
52
53const BITS: usize = usize::BITS as usize;
54
55impl From<Bitmap> for BitmapBuilder {
56    fn from(bitmap: Bitmap) -> Self {
57        let len = bitmap.len();
58        if let Some(bits) = bitmap.bits {
59            BitmapBuilder {
60                len,
61                data: bits.into_vec(),
62            }
63        } else if bitmap.count_ones == 0 {
64            Self::zeroed(bitmap.len())
65        } else {
66            debug_assert!(bitmap.len() == bitmap.count_ones());
67            Self::filled(bitmap.len())
68        }
69    }
70}
71
72impl BitmapBuilder {
73    /// Creates a new empty bitmap with at least the specified capacity.
74    pub fn with_capacity(capacity: usize) -> BitmapBuilder {
75        BitmapBuilder {
76            len: 0,
77            data: Vec::with_capacity(Bitmap::vec_len(capacity)),
78        }
79    }
80
81    /// Creates a new bitmap with all bits set to 0.
82    pub fn zeroed(len: usize) -> BitmapBuilder {
83        BitmapBuilder {
84            len,
85            data: vec![0; Bitmap::vec_len(len)],
86        }
87    }
88
89    /// Creates a new bitmap with all bits set to 1.
90    pub fn filled(len: usize) -> BitmapBuilder {
91        let vec_len = Bitmap::vec_len(len);
92        let mut data = vec![usize::MAX; vec_len];
93        if vec_len >= 1 && len % BITS != 0 {
94            data[vec_len - 1] = (1 << (len % BITS)) - 1;
95        }
96        BitmapBuilder { len, data }
97    }
98
99    /// Writes a new value into a single bit.
100    pub fn set(&mut self, n: usize, val: bool) {
101        assert!(n < self.len);
102
103        let byte = &mut self.data[n / BITS];
104        let mask = 1 << (n % BITS);
105        if val {
106            *byte |= mask;
107        } else {
108            *byte &= !mask;
109        }
110    }
111
112    /// Tests a single bit.
113    pub fn is_set(&self, n: usize) -> bool {
114        assert!(n < self.len);
115        let byte = &self.data[n / BITS];
116        let mask = 1 << (n % BITS);
117        *byte & mask != 0
118    }
119
120    /// Appends a single bit to the back.
121    pub fn append(&mut self, bit_set: bool) -> &mut Self {
122        if self.len % BITS == 0 {
123            self.data.push(0);
124        }
125        self.data[self.len / BITS] |= (bit_set as usize) << (self.len % BITS);
126        self.len += 1;
127        self
128    }
129
130    /// Appends `n` bits to the back.
131    pub fn append_n(&mut self, mut n: usize, bit_set: bool) -> &mut Self {
132        while n != 0 && self.len % BITS != 0 {
133            self.append(bit_set);
134            n -= 1;
135        }
136        self.len += n;
137        self.data.resize(
138            Bitmap::vec_len(self.len),
139            if bit_set { usize::MAX } else { 0 },
140        );
141        if bit_set && self.len % BITS != 0 {
142            // remove tailing 1s
143            *self.data.last_mut().unwrap() &= (1 << (self.len % BITS)) - 1;
144        }
145        self
146    }
147
148    /// Removes the last bit.
149    pub fn pop(&mut self) -> Option<()> {
150        if self.len == 0 {
151            return None;
152        }
153        self.len -= 1;
154        self.data.truncate(Bitmap::vec_len(self.len));
155        if self.len % BITS != 0 {
156            *self.data.last_mut().unwrap() &= (1 << (self.len % BITS)) - 1;
157        }
158        Some(())
159    }
160
161    /// Appends a bitmap to the back.
162    pub fn append_bitmap(&mut self, other: &Bitmap) -> &mut Self {
163        if self.len % BITS == 0 {
164            // fast path: self is aligned
165            self.len += other.len();
166            if let Some(bits) = &other.bits {
167                // append the bytes
168                self.data.extend_from_slice(bits);
169            } else if other.count_ones == 0 {
170                // append 0s
171                self.data.resize(Bitmap::vec_len(self.len), 0);
172            } else {
173                // append 1s
174                self.data.resize(Bitmap::vec_len(self.len), usize::MAX);
175                if self.len % BITS != 0 {
176                    // remove tailing 1s
177                    *self.data.last_mut().unwrap() = (1 << (self.len % BITS)) - 1;
178                }
179            }
180        } else {
181            // slow path: append bits one by one
182            for bit in other.iter() {
183                self.append(bit);
184            }
185        }
186        self
187    }
188
189    /// Finishes building and returns the bitmap.
190    pub fn finish(self) -> Bitmap {
191        let count_ones = self.data.iter().map(|&x| x.count_ones()).sum::<u32>() as usize;
192        Bitmap {
193            num_bits: self.len(),
194            count_ones,
195            bits: (count_ones != 0 && count_ones != self.len).then(|| self.data.into()),
196        }
197    }
198
199    /// Returns the number of bits in the bitmap.
200    pub fn len(&self) -> usize {
201        self.len
202    }
203
204    /// Returns `true` if the bitmap has a length of 0.
205    pub fn is_empty(&self) -> bool {
206        self.len == 0
207    }
208}
209
210/// An immutable bitmap. Use [`BitmapBuilder`] to build it.
211#[derive(Clone, PartialEq, Eq)]
212pub struct Bitmap {
213    /// The useful bits in the bitmap. The total number of bits will usually
214    /// be larger than the useful bits due to byte-padding.
215    num_bits: usize,
216
217    /// The number of high bits in the bitmap.
218    count_ones: usize,
219
220    /// Bits are stored in a compact form.
221    /// They are packed into `usize`s.
222    ///
223    /// Optimization: If all bits are set to 0 or 1, this field MUST be `None`.
224    bits: Option<Box<[usize]>>,
225}
226
227impl EstimateSize for Bitmap {
228    fn estimated_heap_size(&self) -> usize {
229        match &self.bits {
230            Some(bits) => std::mem::size_of_val(bits.as_ref()),
231            None => 0,
232        }
233    }
234}
235
236impl std::fmt::Debug for Bitmap {
237    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
238        for data in self.iter() {
239            write!(f, "{}", data as u8)?;
240        }
241        Ok(())
242    }
243}
244
245impl Bitmap {
246    /// Creates a new bitmap with all bits set to 0.
247    pub fn zeros(num_bits: usize) -> Self {
248        Self {
249            bits: None,
250            num_bits,
251            count_ones: 0,
252        }
253    }
254
255    /// Creates a new bitmap with all bits set to 1.
256    pub fn ones(num_bits: usize) -> Self {
257        Self {
258            bits: None,
259            num_bits,
260            count_ones: num_bits,
261        }
262    }
263
264    /// Creates a new bitmap from vector.
265    fn from_vec_with_len(buf: Vec<usize>, num_bits: usize) -> Self {
266        debug_assert_eq!(buf.len(), Self::vec_len(num_bits));
267        let count_ones = buf.iter().map(|&x| x.count_ones()).sum::<u32>() as usize;
268        debug_assert!(count_ones <= num_bits);
269        Self {
270            bits: (count_ones != 0 && count_ones != num_bits).then(|| buf.into()),
271            num_bits,
272            count_ones,
273        }
274    }
275
276    /// Creates a new bitmap from bytes.
277    pub fn from_bytes(buf: &[u8]) -> Self {
278        Self::from_bytes_with_len(buf, buf.len() * 8)
279    }
280
281    /// Creates a new bitmap from bytes and length.
282    fn from_bytes_with_len(buf: &[u8], num_bits: usize) -> Self {
283        let mut bits = Vec::with_capacity(Self::vec_len(num_bits));
284        let slice = unsafe {
285            bits.set_len(bits.capacity());
286            std::slice::from_raw_parts_mut(bits.as_ptr() as *mut u8, bits.len() * (BITS / 8))
287        };
288        slice[..buf.len()].copy_from_slice(buf);
289        slice[buf.len()..].fill(0);
290        Self::from_vec_with_len(bits, num_bits)
291    }
292
293    /// Creates a new bitmap from a slice of `bool`.
294    pub fn from_bool_slice(bools: &[bool]) -> Self {
295        // use SIMD to speed up
296        let mut iter = bools.array_chunks::<BITS>();
297        let mut bits = Vec::with_capacity(Self::vec_len(bools.len()));
298        for chunk in iter.by_ref() {
299            let bitmask = std::simd::Mask::<i8, BITS>::from_array(*chunk).to_bitmask() as usize;
300            bits.push(bitmask);
301        }
302        if !iter.remainder().is_empty() {
303            let mut bitmask = 0;
304            for (i, b) in iter.remainder().iter().enumerate() {
305                bitmask |= (*b as usize) << i;
306            }
307            bits.push(bitmask);
308        }
309        Self::from_vec_with_len(bits, bools.len())
310    }
311
312    /// Return the next set bit index on or after `bit_idx`.
313    pub fn next_set_bit(&self, bit_idx: usize) -> Option<usize> {
314        (bit_idx..self.len()).find(|&idx| unsafe { self.is_set_unchecked(idx) })
315    }
316
317    /// Counts the number of bits set to 1.
318    pub fn count_ones(&self) -> usize {
319        self.count_ones
320    }
321
322    /// Returns true if any bit is set to 1.
323    pub fn any(&self) -> bool {
324        self.count_ones != 0
325    }
326
327    /// Returns the length of vector to store `num_bits` bits.
328    fn vec_len(num_bits: usize) -> usize {
329        num_bits.div_ceil(BITS)
330    }
331
332    /// Returns the number of valid bits in the bitmap,
333    /// also referred to its 'length'.
334    #[inline]
335    pub fn len(&self) -> usize {
336        self.num_bits
337    }
338
339    /// Returns true if the `Bitmap` has a length of 0.
340    pub fn is_empty(&self) -> bool {
341        self.num_bits == 0
342    }
343
344    /// Returns true if the bit at `idx` is set, without doing bounds checking.
345    ///
346    /// # Safety
347    ///
348    /// Index must be in range.
349    pub unsafe fn is_set_unchecked(&self, idx: usize) -> bool {
350        unsafe {
351            match &self.bits {
352                None => self.count_ones != 0,
353                Some(bits) => bits.get_unchecked(idx / BITS) & (1 << (idx % BITS)) != 0,
354            }
355        }
356    }
357
358    /// Returns true if the bit at `idx` is set.
359    pub fn is_set(&self, idx: usize) -> bool {
360        assert!(idx < self.len());
361        unsafe { self.is_set_unchecked(idx) }
362    }
363
364    /// Tests if every bit is set to 1.
365    pub fn all(&self) -> bool {
366        self.count_ones == self.len()
367    }
368
369    /// Produces an iterator over each bit.
370    pub fn iter(&self) -> BitmapIter<'_> {
371        BitmapIter {
372            bits: self.bits.as_ref().map(|s| &s[..]),
373            idx: 0,
374            num_bits: self.num_bits,
375            current_usize: 0,
376            all_ones: self.count_ones == self.num_bits,
377        }
378    }
379
380    /// Enumerates the index of each bit set to 1.
381    pub fn iter_ones(&self) -> BitmapOnesIter<'_> {
382        if let Some(bits) = &self.bits {
383            BitmapOnesIter::Buffer {
384                bits: &bits[..],
385                cur_idx: 0,
386                cur_bits: bits.first().cloned(),
387            }
388        } else {
389            // all zeros or all ones
390            BitmapOnesIter::Range {
391                start: 0,
392                end: self.count_ones,
393            }
394        }
395    }
396
397    /// Returns an iterator which yields the position ranges of continuous high bits.
398    pub fn high_ranges(&self) -> impl Iterator<Item = RangeInclusive<usize>> + '_ {
399        let mut start = None;
400
401        self.iter()
402            .chain(iter::once(false))
403            .enumerate()
404            .filter_map(move |(i, bit)| match (bit, start) {
405                // A new high range starts.
406                (true, None) => {
407                    start = Some(i);
408                    None
409                }
410                // The current high range ends.
411                (false, Some(s)) => {
412                    start = None;
413                    Some(s..=(i - 1))
414                }
415                _ => None,
416            })
417    }
418
419    #[cfg(test)]
420    fn assert_valid(&self) {
421        assert_eq!(
422            self.iter().map(|x| x as usize).sum::<usize>(),
423            self.count_ones
424        )
425    }
426
427    /// Creates a new bitmap with all bits in range set to 1.
428    ///
429    /// # Example
430    /// ```
431    /// use risingwave_common::bitmap::Bitmap;
432    /// let bitmap = Bitmap::from_range(200, 100..180);
433    /// assert_eq!(bitmap.count_ones(), 80);
434    /// for i in 0..200 {
435    ///     assert_eq!(bitmap.is_set(i), i >= 100 && i < 180);
436    /// }
437    /// ```
438    pub fn from_range(num_bits: usize, range: Range<usize>) -> Self {
439        assert!(range.start <= range.end);
440        assert!(range.end <= num_bits);
441        if range.start == range.end {
442            return Self::zeros(num_bits);
443        } else if range == (0..num_bits) {
444            return Self::ones(num_bits);
445        }
446        let mut bits = vec![0; Self::vec_len(num_bits)];
447        let start = range.start / BITS;
448        let end = range.end / BITS;
449        let start_offset = range.start % BITS;
450        let end_offset = range.end % BITS;
451        if start == end {
452            bits[start] = ((1 << (end_offset - start_offset)) - 1) << start_offset;
453        } else {
454            bits[start] = !0 << start_offset;
455            bits[start + 1..end].fill(!0);
456            if end_offset != 0 {
457                bits[end] = (1 << end_offset) - 1;
458            }
459        }
460        Self {
461            bits: Some(bits.into()),
462            num_bits,
463            count_ones: range.len(),
464        }
465    }
466}
467
468impl From<usize> for Bitmap {
469    fn from(val: usize) -> Self {
470        Self::ones(val)
471    }
472}
473
474impl<'b> BitAnd<&'b Bitmap> for &Bitmap {
475    type Output = Bitmap;
476
477    fn bitand(self, rhs: &'b Bitmap) -> Bitmap {
478        assert_eq!(self.num_bits, rhs.num_bits);
479        let (lbits, rbits) = match (&self.bits, &rhs.bits) {
480            _ if self.count_ones == 0 || rhs.count_ones == 0 => {
481                return Bitmap::zeros(self.num_bits);
482            }
483            (_, None) => return self.clone(),
484            (None, _) => return rhs.clone(),
485            (Some(lbits), Some(rbits)) => (lbits, rbits),
486        };
487        let bits = (lbits.iter().zip(rbits.iter()))
488            .map(|(&a, &b)| a & b)
489            .collect();
490        Bitmap::from_vec_with_len(bits, self.num_bits)
491    }
492}
493
494impl BitAnd<Bitmap> for &Bitmap {
495    type Output = Bitmap;
496
497    fn bitand(self, rhs: Bitmap) -> Self::Output {
498        self.bitand(&rhs)
499    }
500}
501
502impl<'b> BitAnd<&'b Bitmap> for Bitmap {
503    type Output = Bitmap;
504
505    fn bitand(self, rhs: &'b Bitmap) -> Self::Output {
506        rhs.bitand(self)
507    }
508}
509
510impl BitAnd for Bitmap {
511    type Output = Bitmap;
512
513    fn bitand(self, rhs: Bitmap) -> Self::Output {
514        (&self).bitand(&rhs)
515    }
516}
517
518impl BitAndAssign<&Bitmap> for Bitmap {
519    fn bitand_assign(&mut self, rhs: &Bitmap) {
520        *self = &*self & rhs;
521    }
522}
523
524impl BitAndAssign<Bitmap> for Bitmap {
525    fn bitand_assign(&mut self, rhs: Bitmap) {
526        *self = &*self & rhs;
527    }
528}
529
530impl<'b> BitOr<&'b Bitmap> for &Bitmap {
531    type Output = Bitmap;
532
533    fn bitor(self, rhs: &'b Bitmap) -> Bitmap {
534        assert_eq!(self.num_bits, rhs.num_bits);
535        let (lbits, rbits) = match (&self.bits, &rhs.bits) {
536            _ if self.count_ones == self.num_bits || rhs.count_ones == self.num_bits => {
537                return Bitmap::ones(self.num_bits);
538            }
539            (_, None) => return self.clone(),
540            (None, _) => return rhs.clone(),
541            (Some(lbits), Some(rbits)) => (lbits, rbits),
542        };
543        let bits = (lbits.iter().zip(rbits.iter()))
544            .map(|(&a, &b)| a | b)
545            .collect();
546        Bitmap::from_vec_with_len(bits, self.num_bits)
547    }
548}
549
550impl BitOr<Bitmap> for &Bitmap {
551    type Output = Bitmap;
552
553    fn bitor(self, rhs: Bitmap) -> Self::Output {
554        self.bitor(&rhs)
555    }
556}
557
558impl<'b> BitOr<&'b Bitmap> for Bitmap {
559    type Output = Bitmap;
560
561    fn bitor(self, rhs: &'b Bitmap) -> Self::Output {
562        rhs.bitor(self)
563    }
564}
565
566impl BitOr for Bitmap {
567    type Output = Bitmap;
568
569    fn bitor(self, rhs: Bitmap) -> Self::Output {
570        (&self).bitor(&rhs)
571    }
572}
573
574impl BitOrAssign<&Bitmap> for Bitmap {
575    fn bitor_assign(&mut self, rhs: &Bitmap) {
576        *self = &*self | rhs;
577    }
578}
579
580impl BitOrAssign<Bitmap> for Bitmap {
581    fn bitor_assign(&mut self, rhs: Bitmap) {
582        *self |= &rhs;
583    }
584}
585
586impl BitXor for &Bitmap {
587    type Output = Bitmap;
588
589    fn bitxor(self, rhs: &Bitmap) -> Self::Output {
590        assert_eq!(self.num_bits, rhs.num_bits);
591        let (lbits, rbits) = match (&self.bits, &rhs.bits) {
592            (_, None) if rhs.count_ones == 0 => return self.clone(),
593            (_, None) => return !self,
594            (None, _) if self.count_ones == 0 => return rhs.clone(),
595            (None, _) => return !rhs,
596            (Some(lbits), Some(rbits)) => (lbits, rbits),
597        };
598        let bits = (lbits.iter().zip(rbits.iter()))
599            .map(|(&a, &b)| a ^ b)
600            .collect();
601        Bitmap::from_vec_with_len(bits, self.num_bits)
602    }
603}
604
605impl Not for &Bitmap {
606    type Output = Bitmap;
607
608    fn not(self) -> Self::Output {
609        let bits = match &self.bits {
610            None if self.count_ones == 0 => return Bitmap::ones(self.num_bits),
611            None => return Bitmap::zeros(self.num_bits),
612            Some(bits) => bits,
613        };
614        let mut bits: Box<[usize]> = bits.iter().map(|b| !b).collect();
615        if self.num_bits % BITS != 0 {
616            bits[self.num_bits / BITS] &= (1 << (self.num_bits % BITS)) - 1;
617        }
618        Bitmap {
619            num_bits: self.num_bits,
620            count_ones: self.num_bits - self.count_ones,
621            bits: Some(bits),
622        }
623    }
624}
625
626impl Not for Bitmap {
627    type Output = Bitmap;
628
629    fn not(mut self) -> Self::Output {
630        let bits = match &mut self.bits {
631            None if self.count_ones == 0 => return Bitmap::ones(self.num_bits),
632            None => return Bitmap::zeros(self.num_bits),
633            Some(bits) => bits,
634        };
635        bits.iter_mut().for_each(|x| *x = !*x);
636        if self.num_bits % BITS != 0 {
637            bits[self.num_bits / BITS] &= (1 << (self.num_bits % BITS)) - 1;
638        }
639        self.count_ones = self.num_bits - self.count_ones;
640        self
641    }
642}
643
644impl FromIterator<bool> for Bitmap {
645    fn from_iter<T: IntoIterator<Item = bool>>(iter: T) -> Self {
646        let vec = iter.into_iter().collect::<Vec<_>>();
647        Self::from_bool_slice(&vec)
648    }
649}
650
651impl FromIterator<Option<bool>> for Bitmap {
652    fn from_iter<T: IntoIterator<Item = Option<bool>>>(iter: T) -> Self {
653        iter.into_iter().map(|b| b.unwrap_or(false)).collect()
654    }
655}
656
657impl Bitmap {
658    pub fn to_protobuf(&self) -> PbBuffer {
659        let body_len = self.num_bits.div_ceil(8) + 1;
660        let mut body = Vec::with_capacity(body_len);
661        body.push((self.num_bits % 8) as u8);
662        match &self.bits {
663            None if self.count_ones == 0 => body.resize(body_len, 0),
664            None => {
665                body.resize(body_len, u8::MAX);
666                if self.num_bits % 8 != 0 {
667                    body[body_len - 1] = (1 << (self.num_bits % 8)) - 1;
668                }
669            }
670            Some(bits) => {
671                body.extend_from_slice(unsafe {
672                    std::slice::from_raw_parts(
673                        bits.as_ptr() as *const u8,
674                        self.num_bits.div_ceil(8),
675                    )
676                });
677            }
678        }
679        PbBuffer {
680            body,
681            compression: CompressionType::None as i32,
682        }
683    }
684}
685
686impl From<&PbBuffer> for Bitmap {
687    fn from(buf: &PbBuffer) -> Self {
688        let last_byte_num_bits = buf.body[0];
689        let num_bits = ((buf.body.len() - 1) * 8) - ((8 - last_byte_num_bits) % 8) as usize;
690        Self::from_bytes_with_len(&buf.body[1..], num_bits)
691    }
692}
693
694impl From<PbBuffer> for Bitmap {
695    fn from(buf: PbBuffer) -> Self {
696        Self::from(&buf)
697    }
698}
699
700/// Bitmap iterator.
701pub struct BitmapIter<'a> {
702    bits: Option<&'a [usize]>,
703    idx: usize,
704    num_bits: usize,
705    current_usize: usize,
706    all_ones: bool,
707}
708
709impl BitmapIter<'_> {
710    fn next_always_load_usize(&mut self) -> Option<bool> {
711        if self.idx >= self.num_bits {
712            return None;
713        }
714        let Some(bits) = &self.bits else {
715            self.idx += 1;
716            return Some(self.all_ones);
717        };
718
719        // Offset of the bit within the usize.
720        let usize_offset = self.idx % BITS;
721
722        // Get the index of usize which the bit is located in
723        let usize_index = self.idx / BITS;
724        self.current_usize = unsafe { *bits.get_unchecked(usize_index) };
725
726        let bit_mask = 1 << usize_offset;
727        let bit_flag = self.current_usize & bit_mask != 0;
728        self.idx += 1;
729        Some(bit_flag)
730    }
731}
732
733impl iter::Iterator for BitmapIter<'_> {
734    type Item = bool;
735
736    fn next(&mut self) -> Option<Self::Item> {
737        if self.idx >= self.num_bits {
738            return None;
739        }
740        let Some(bits) = &self.bits else {
741            self.idx += 1;
742            return Some(self.all_ones);
743        };
744
745        // Offset of the bit within the usize.
746        let usize_offset = self.idx % BITS;
747
748        if usize_offset == 0 {
749            // Get the index of usize which the bit is located in
750            let usize_index = self.idx / BITS;
751            self.current_usize = unsafe { *bits.get_unchecked(usize_index) };
752        }
753
754        let bit_mask = 1 << usize_offset;
755
756        let bit_flag = self.current_usize & bit_mask != 0;
757        self.idx += 1;
758        Some(bit_flag)
759    }
760
761    fn size_hint(&self) -> (usize, Option<usize>) {
762        let remaining = self.num_bits - self.idx;
763        (remaining, Some(remaining))
764    }
765
766    fn nth(&mut self, n: usize) -> Option<Self::Item> {
767        self.idx += n;
768        self.next_always_load_usize()
769    }
770}
771
772impl ExactSizeIterator for BitmapIter<'_> {}
773unsafe impl TrustedLen for BitmapIter<'_> {}
774
775pub enum BitmapOnesIter<'a> {
776    Range {
777        start: usize,
778        end: usize,
779    },
780    Buffer {
781        bits: &'a [usize],
782        cur_idx: usize,
783        cur_bits: Option<usize>,
784    },
785}
786
787impl iter::Iterator for BitmapOnesIter<'_> {
788    type Item = usize;
789
790    fn next(&mut self) -> Option<Self::Item> {
791        match self {
792            BitmapOnesIter::Range { start, end } => {
793                let i = *start;
794                if i >= *end {
795                    None
796                } else {
797                    *start += 1;
798                    Some(i)
799                }
800            }
801            BitmapOnesIter::Buffer {
802                bits,
803                cur_idx,
804                cur_bits,
805            } => {
806                while *cur_bits == Some(0) {
807                    *cur_idx += 1;
808                    *cur_bits = if *cur_idx >= bits.len() {
809                        None
810                    } else {
811                        Some(bits[*cur_idx])
812                    };
813                }
814                cur_bits.map(|bits| {
815                    let low_bit = bits & bits.wrapping_neg();
816                    let low_bit_idx = bits.trailing_zeros();
817                    *cur_bits = Some(bits ^ low_bit);
818                    *cur_idx * BITS + low_bit_idx as usize
819                })
820            }
821        }
822    }
823
824    fn size_hint(&self) -> (usize, Option<usize>) {
825        match self {
826            BitmapOnesIter::Range { start, end } => {
827                let remaining = end - start;
828                (remaining, Some(remaining))
829            }
830            BitmapOnesIter::Buffer { bits, .. } => (0, Some(bits.len() * usize::BITS as usize)),
831        }
832    }
833}
834
835pub trait FilterByBitmap: ExactSizeIterator + Sized {
836    fn filter_by_bitmap(self, bitmap: &Bitmap) -> impl Iterator<Item = Self::Item> {
837        self.zip_eq_fast(bitmap.iter())
838            .filter_map(|(item, bit)| bit.then_some(item))
839    }
840}
841
842impl<T> FilterByBitmap for T where T: ExactSizeIterator {}
843
844#[cfg(test)]
845mod tests {
846    use super::*;
847
848    #[test]
849    fn test_bitmap_builder() {
850        let bitmap1 = {
851            let mut builder = BitmapBuilder::default();
852            let bits = [
853                false, true, true, false, true, false, true, false, true, false, true, true, false,
854                true, false, true,
855            ];
856            for bit in bits {
857                builder.append(bit);
858            }
859            for (idx, bit) in bits.iter().enumerate() {
860                assert_eq!(builder.is_set(idx), *bit);
861            }
862            builder.finish()
863        };
864        let byte1 = 0b0101_0110_u8;
865        let byte2 = 0b1010_1101_u8;
866        let expected = Bitmap::from_bytes(&[byte1, byte2]);
867        assert_eq!(bitmap1, expected);
868        assert_eq!(
869            bitmap1.count_ones(),
870            (byte1.count_ones() + byte2.count_ones()) as usize
871        );
872
873        let bitmap2 = {
874            let mut builder = BitmapBuilder::default();
875            let bits = [false, true, true, false, true, false, true, false];
876            for bit in bits {
877                builder.append(bit);
878            }
879            for (idx, bit) in bits.iter().enumerate() {
880                assert_eq!(builder.is_set(idx), *bit);
881            }
882            builder.finish()
883        };
884        let byte1 = 0b0101_0110_u8;
885        let expected = Bitmap::from_bytes(&[byte1]);
886        assert_eq!(bitmap2, expected);
887    }
888
889    #[test]
890    fn test_builder_append_bitmap() {
891        let mut builder = BitmapBuilder::default();
892        builder.append_bitmap(&Bitmap::zeros(64));
893        builder.append_bitmap(&Bitmap::ones(64));
894        builder.append_bitmap(&Bitmap::from_bytes(&[0b1010_1010]));
895        builder.append_bitmap(&Bitmap::zeros(8));
896        builder.append_bitmap(&Bitmap::ones(8));
897        let bitmap = builder.finish();
898        assert_eq!(
899            bitmap,
900            Bitmap::from_vec_with_len(
901                vec![0, usize::MAX, 0b1111_1111_0000_0000_1010_1010],
902                64 + 64 + 8 + 8 + 8
903            ),
904        );
905    }
906
907    #[test]
908    fn test_bitmap_get_size() {
909        let bitmap1 = {
910            let mut builder = BitmapBuilder::default();
911            let bits = [
912                false, true, true, false, true, false, true, false, true, false, true, true, false,
913                true, false, true,
914            ];
915            for bit in bits {
916                builder.append(bit);
917            }
918            for (idx, bit) in bits.iter().enumerate() {
919                assert_eq!(builder.is_set(idx), *bit);
920            }
921            builder.finish()
922        };
923        assert_eq!(8, bitmap1.estimated_heap_size());
924        assert_eq!(40, bitmap1.estimated_size());
925    }
926
927    #[test]
928    fn test_bitmap_all_high() {
929        let num_bits = 3;
930        let bitmap = Bitmap::ones(num_bits);
931        assert_eq!(bitmap.len(), num_bits);
932        assert!(bitmap.all());
933        for i in 0..num_bits {
934            assert!(bitmap.is_set(i));
935        }
936        // Test to and from protobuf is OK.
937        assert_eq!(bitmap, Bitmap::from(&bitmap.to_protobuf()));
938    }
939
940    #[test]
941    fn test_bitwise_and() {
942        #[rustfmt::skip]
943        let cases = [(
944            vec![],
945            vec![],
946            vec![],
947        ), (
948            vec![0, 1, 1, 0, 1, 0, 1, 0],
949            vec![0, 1, 0, 0, 1, 1, 1, 0],
950            vec![0, 1, 0, 0, 1, 0, 1, 0],
951        ), (
952            vec![0, 1, 0, 1, 0],
953            vec![1, 1, 1, 1, 0],
954            vec![0, 1, 0, 1, 0],
955        )];
956
957        for (input1, input2, expected) in cases {
958            let bitmap1: Bitmap = input1.into_iter().map(|x| x != 0).collect();
959            let bitmap2: Bitmap = input2.into_iter().map(|x| x != 0).collect();
960            let res = bitmap1 & bitmap2;
961            res.assert_valid();
962            assert_eq!(res.iter().map(|x| x as i32).collect::<Vec<_>>(), expected,);
963        }
964    }
965
966    #[test]
967    fn test_bitwise_not() {
968        #[rustfmt::skip]
969        let cases = [(
970            vec![1, 0, 1, 0, 1],
971            vec![0, 1, 0, 1, 0],
972        ), (
973            vec![],
974            vec![],
975        ), (
976            vec![1, 0, 1, 1, 0, 0, 1, 1],
977            vec![0, 1, 0, 0, 1, 1, 0, 0],
978        ), (
979            vec![1, 0, 0, 1, 1, 1, 0, 0, 1, 0],
980            vec![0, 1, 1, 0, 0, 0, 1, 1, 0, 1],
981        )];
982
983        for (input, expected) in cases {
984            let bitmap: Bitmap = input.into_iter().map(|x| x != 0).collect();
985            let res = !bitmap;
986            res.assert_valid();
987            assert_eq!(res.iter().map(|x| x as i32).collect::<Vec<_>>(), expected);
988        }
989    }
990
991    #[test]
992    fn test_bitwise_or() {
993        let bitmap1 = Bitmap::from_bytes(&[0b01101010]);
994        let bitmap2 = Bitmap::from_bytes(&[0b01001110]);
995        assert_eq!(Bitmap::from_bytes(&[0b01101110]), (bitmap1 | bitmap2));
996    }
997
998    #[test]
999    fn test_bitmap_is_set() {
1000        let bitmap = Bitmap::from_bytes(&[0b01001010]);
1001        assert!(!bitmap.is_set(0));
1002        assert!(bitmap.is_set(1));
1003        assert!(!bitmap.is_set(2));
1004        assert!(bitmap.is_set(3));
1005        assert!(!bitmap.is_set(4));
1006        assert!(!bitmap.is_set(5));
1007        assert!(bitmap.is_set(6));
1008        assert!(!bitmap.is_set(7));
1009    }
1010
1011    #[test]
1012    fn test_bitmap_iter() {
1013        {
1014            let bitmap = Bitmap::from_bytes(&[0b01001010]);
1015            let mut booleans = vec![];
1016            for b in bitmap.iter() {
1017                booleans.push(b as u8);
1018            }
1019            assert_eq!(booleans, vec![0u8, 1, 0, 1, 0, 0, 1, 0]);
1020        }
1021        {
1022            let bitmap: Bitmap = vec![true; 5].into_iter().collect();
1023            for b in bitmap.iter() {
1024                assert!(b);
1025            }
1026        }
1027    }
1028
1029    #[test]
1030    fn test_bitmap_high_ranges_iter() {
1031        fn test(bits: impl IntoIterator<Item = bool>, expected: Vec<RangeInclusive<usize>>) {
1032            let bitmap = Bitmap::from_iter(bits);
1033            let high_ranges = bitmap.high_ranges().collect::<Vec<_>>();
1034            assert_eq!(high_ranges, expected);
1035        }
1036
1037        test(
1038            vec![
1039                true, true, true, false, false, true, true, true, false, true,
1040            ],
1041            vec![0..=2, 5..=7, 9..=9],
1042        );
1043        test(vec![true, true, true], vec![0..=2]);
1044        test(vec![false, false, false], vec![]);
1045    }
1046
1047    #[test]
1048    fn test_bitmap_from_protobuf() {
1049        let bitmap_bytes = vec![3u8 /* len % BITS */, 0b0101_0010, 0b110];
1050        let buf = PbBuffer {
1051            body: bitmap_bytes,
1052            compression: CompressionType::None as _,
1053        };
1054        let bitmap: Bitmap = (&buf).into();
1055        let actual_bytes: Vec<u8> = bitmap.iter().map(|b| b as u8).collect();
1056
1057        assert_eq!(actual_bytes, vec![0, 1, 0, 0, 1, 0, 1, 0, /*  */ 0, 1, 1]); // in reverse order
1058        assert_eq!(bitmap.count_ones(), 5);
1059    }
1060
1061    #[test]
1062    fn test_bitmap_from_buffer() {
1063        let byte1 = 0b0110_1010_u8;
1064        let byte2 = 0b1011_0101_u8;
1065        let bitmap = Bitmap::from_bytes(&[0b0110_1010, 0b1011_0101]);
1066        let expected = Bitmap::from_iter(vec![
1067            false, true, false, true, false, true, true, false, true, false, true, false, true,
1068            true, false, true,
1069        ]);
1070        let count_ones = (byte1.count_ones() + byte2.count_ones()) as usize;
1071        assert_eq!(expected, bitmap);
1072        assert_eq!(bitmap.count_ones(), count_ones);
1073        assert_eq!(expected.count_ones(), count_ones);
1074    }
1075
1076    #[test]
1077    fn test_bitmap_eq() {
1078        let b1: Bitmap = Bitmap::zeros(3);
1079        let b2: Bitmap = Bitmap::zeros(5);
1080        assert_ne!(b1, b2);
1081
1082        let b1: Bitmap = [true, false].iter().cycle().cloned().take(10000).collect();
1083        let b2: Bitmap = [true, false].iter().cycle().cloned().take(10000).collect();
1084        assert_eq!(b1, b2);
1085    }
1086
1087    #[test]
1088    fn test_bitmap_set() {
1089        let mut b = BitmapBuilder::zeroed(10);
1090
1091        b.set(0, true);
1092        b.set(7, true);
1093        b.set(8, true);
1094        b.set(9, true);
1095
1096        b.set(7, false);
1097        b.set(8, false);
1098
1099        b.append(true);
1100        assert_eq!(b.len(), 11);
1101
1102        let b = b.finish();
1103        assert_eq!(b.count_ones(), 3);
1104        assert_eq!(&b.bits.unwrap()[..], &[0b0110_0000_0001]);
1105    }
1106
1107    #[test]
1108    fn test_bitmap_pop() {
1109        let mut b = BitmapBuilder::zeroed(7);
1110
1111        {
1112            b.append(true);
1113            assert!(b.is_set(b.len() - 1));
1114            b.pop();
1115            assert!(!b.is_set(b.len() - 1));
1116        }
1117
1118        {
1119            b.append(false);
1120            assert!(!b.is_set(b.len() - 1));
1121            b.pop();
1122            assert!(!b.is_set(b.len() - 1));
1123        }
1124
1125        {
1126            b.append(true);
1127            b.append(false);
1128            assert!(!b.is_set(b.len() - 1));
1129            b.pop();
1130            assert!(b.is_set(b.len() - 1));
1131            b.pop();
1132            assert!(!b.is_set(b.len() - 1));
1133        }
1134    }
1135
1136    #[test]
1137    fn test_bitmap_iter_ones() {
1138        let mut builder = BitmapBuilder::zeroed(1000);
1139        builder.append_n(1000, true);
1140        let bitmap = builder.finish();
1141        let mut iter = bitmap.iter_ones();
1142        for i in 0..1000 {
1143            let item = iter.next();
1144            assert!(item == Some(i + 1000));
1145        }
1146        for _ in 0..5 {
1147            let item = iter.next();
1148            assert!(item.is_none());
1149        }
1150    }
1151}