risingwave_common/
bitmap.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// Licensed to the Apache Software Foundation (ASF) under one
16// or more contributor license agreements.  See the NOTICE file
17// distributed with this work for additional information
18// regarding copyright ownership.  The ASF licenses this file
19// to you under the Apache License, Version 2.0 (the
20// "License"); you may not use this file except in compliance
21// with the License.  You may obtain a copy of the License at
22//
23//   http://www.apache.org/licenses/LICENSE-2.0
24//
25// Unless required by applicable law or agreed to in writing,
26// software distributed under the License is distributed on an
27// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
28// KIND, either express or implied.  See the License for the
29// specific language governing permissions and limitations
30// under the License.
31
32//! Defines a bitmap, which is used to track which values in an Arrow array are null.
33//! This is called a "validity bitmap" in the Arrow documentation.
34//! This file is adapted from [arrow-rs](https://github.com/apache/arrow-rs)
35
36// allow `zip` for performance reasons
37#![allow(clippy::disallowed_methods)]
38
39use std::iter::{self, TrustedLen};
40use std::ops::{BitAnd, BitAndAssign, BitOr, BitOrAssign, BitXor, Not, Range, RangeInclusive};
41
42use risingwave_common_estimate_size::EstimateSize;
43use risingwave_pb::common::PbBuffer;
44use risingwave_pb::common::buffer::CompressionType;
45use rw_iter_util::ZipEqFast;
46
47#[derive(Default, Debug, Clone, EstimateSize)]
48pub struct BitmapBuilder {
49    len: usize,
50    data: Vec<usize>,
51}
52
53const BITS: usize = usize::BITS as usize;
54
55impl From<Bitmap> for BitmapBuilder {
56    fn from(bitmap: Bitmap) -> Self {
57        let len = bitmap.len();
58        if let Some(bits) = bitmap.bits {
59            BitmapBuilder {
60                len,
61                data: bits.into_vec(),
62            }
63        } else if bitmap.count_ones == 0 {
64            Self::zeroed(bitmap.len())
65        } else {
66            debug_assert!(bitmap.len() == bitmap.count_ones());
67            Self::filled(bitmap.len())
68        }
69    }
70}
71
72impl BitmapBuilder {
73    /// Creates a new empty bitmap with at least the specified capacity.
74    pub fn with_capacity(capacity: usize) -> BitmapBuilder {
75        BitmapBuilder {
76            len: 0,
77            data: Vec::with_capacity(Bitmap::vec_len(capacity)),
78        }
79    }
80
81    /// Creates a new bitmap with all bits set to 0.
82    pub fn zeroed(len: usize) -> BitmapBuilder {
83        BitmapBuilder {
84            len,
85            data: vec![0; Bitmap::vec_len(len)],
86        }
87    }
88
89    /// Creates a new bitmap with all bits set to 1.
90    pub fn filled(len: usize) -> BitmapBuilder {
91        let vec_len = Bitmap::vec_len(len);
92        let mut data = vec![usize::MAX; vec_len];
93        if vec_len >= 1 && !len.is_multiple_of(BITS) {
94            data[vec_len - 1] = (1 << (len % BITS)) - 1;
95        }
96        BitmapBuilder { len, data }
97    }
98
99    /// Writes a new value into a single bit.
100    pub fn set(&mut self, n: usize, val: bool) {
101        assert!(n < self.len);
102
103        let byte = &mut self.data[n / BITS];
104        let mask = 1 << (n % BITS);
105        if val {
106            *byte |= mask;
107        } else {
108            *byte &= !mask;
109        }
110    }
111
112    /// Tests a single bit.
113    pub fn is_set(&self, n: usize) -> bool {
114        assert!(n < self.len);
115        let byte = &self.data[n / BITS];
116        let mask = 1 << (n % BITS);
117        *byte & mask != 0
118    }
119
120    /// Appends a single bit to the back.
121    pub fn append(&mut self, bit_set: bool) -> &mut Self {
122        if self.len.is_multiple_of(BITS) {
123            self.data.push(0);
124        }
125        self.data[self.len / BITS] |= (bit_set as usize) << (self.len % BITS);
126        self.len += 1;
127        self
128    }
129
130    /// Appends `n` bits to the back.
131    pub fn append_n(&mut self, mut n: usize, bit_set: bool) -> &mut Self {
132        while n != 0 && !self.len.is_multiple_of(BITS) {
133            self.append(bit_set);
134            n -= 1;
135        }
136        self.len += n;
137        self.data.resize(
138            Bitmap::vec_len(self.len),
139            if bit_set { usize::MAX } else { 0 },
140        );
141        if bit_set && !self.len.is_multiple_of(BITS) {
142            // remove tailing 1s
143            *self.data.last_mut().unwrap() &= (1 << (self.len % BITS)) - 1;
144        }
145        self
146    }
147
148    /// Removes the last bit.
149    pub fn pop(&mut self) -> Option<()> {
150        if self.len == 0 {
151            return None;
152        }
153        self.len -= 1;
154        self.data.truncate(Bitmap::vec_len(self.len));
155        if !self.len.is_multiple_of(BITS) {
156            *self.data.last_mut().unwrap() &= (1 << (self.len % BITS)) - 1;
157        }
158        Some(())
159    }
160
161    /// Appends a bitmap to the back.
162    pub fn append_bitmap(&mut self, other: &Bitmap) -> &mut Self {
163        if self.len.is_multiple_of(BITS) {
164            // fast path: self is aligned
165            self.len += other.len();
166            if let Some(bits) = &other.bits {
167                // append the bytes
168                self.data.extend_from_slice(bits);
169            } else if other.count_ones == 0 {
170                // append 0s
171                self.data.resize(Bitmap::vec_len(self.len), 0);
172            } else {
173                // append 1s
174                self.data.resize(Bitmap::vec_len(self.len), usize::MAX);
175                if !self.len.is_multiple_of(BITS) {
176                    // remove tailing 1s
177                    *self.data.last_mut().unwrap() = (1 << (self.len % BITS)) - 1;
178                }
179            }
180        } else {
181            // slow path: append bits one by one
182            for bit in other.iter() {
183                self.append(bit);
184            }
185        }
186        self
187    }
188
189    /// Finishes building and returns the bitmap.
190    pub fn finish(self) -> Bitmap {
191        let count_ones = self.data.iter().map(|&x| x.count_ones()).sum::<u32>() as usize;
192        Bitmap {
193            num_bits: self.len(),
194            count_ones,
195            bits: (count_ones != 0 && count_ones != self.len).then(|| self.data.into()),
196        }
197    }
198
199    /// Returns the number of bits in the bitmap.
200    pub fn len(&self) -> usize {
201        self.len
202    }
203
204    /// Returns `true` if the bitmap has a length of 0.
205    pub fn is_empty(&self) -> bool {
206        self.len == 0
207    }
208}
209
210/// An immutable bitmap. Use [`BitmapBuilder`] to build it.
211#[derive(Clone, PartialEq, Eq)]
212pub struct Bitmap {
213    /// The useful bits in the bitmap. The total number of bits will usually
214    /// be larger than the useful bits due to byte-padding.
215    num_bits: usize,
216
217    /// The number of high bits in the bitmap.
218    count_ones: usize,
219
220    /// Bits are stored in a compact form.
221    /// They are packed into `usize`s.
222    ///
223    /// Optimization: If all bits are set to 0 or 1, this field MUST be `None`.
224    bits: Option<Box<[usize]>>,
225}
226
227impl EstimateSize for Bitmap {
228    fn estimated_heap_size(&self) -> usize {
229        match &self.bits {
230            Some(bits) => std::mem::size_of_val(bits.as_ref()),
231            None => 0,
232        }
233    }
234}
235
236impl std::fmt::Debug for Bitmap {
237    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
238        for data in self.iter() {
239            write!(f, "{}", data as u8)?;
240        }
241        Ok(())
242    }
243}
244
245impl Bitmap {
246    /// Creates a new bitmap with all bits set to 0.
247    pub fn zeros(num_bits: usize) -> Self {
248        Self {
249            bits: None,
250            num_bits,
251            count_ones: 0,
252        }
253    }
254
255    /// Creates a new bitmap with all bits set to 1.
256    pub fn ones(num_bits: usize) -> Self {
257        Self {
258            bits: None,
259            num_bits,
260            count_ones: num_bits,
261        }
262    }
263
264    /// Creates a new bitmap from vector.
265    fn from_vec_with_len(buf: Vec<usize>, num_bits: usize) -> Self {
266        debug_assert_eq!(buf.len(), Self::vec_len(num_bits));
267        let count_ones = buf.iter().map(|&x| x.count_ones()).sum::<u32>() as usize;
268        debug_assert!(count_ones <= num_bits);
269        Self {
270            bits: (count_ones != 0 && count_ones != num_bits).then(|| buf.into()),
271            num_bits,
272            count_ones,
273        }
274    }
275
276    /// Creates a new bitmap from bytes.
277    pub fn from_bytes(buf: &[u8]) -> Self {
278        Self::from_bytes_with_len(buf, buf.len() * 8)
279    }
280
281    /// Creates a new bitmap from the indices of bits set to 1.
282    pub fn from_indices(num_bits: usize, ones: &[usize]) -> Self {
283        let mut builder = BitmapBuilder::zeroed(num_bits);
284
285        for &idx in ones {
286            debug_assert!(idx < num_bits);
287            builder.set(idx, true);
288        }
289
290        builder.finish()
291    }
292
293    /// Creates a new bitmap from bytes and length.
294    fn from_bytes_with_len(buf: &[u8], num_bits: usize) -> Self {
295        let mut bits = Vec::with_capacity(Self::vec_len(num_bits));
296        let slice = unsafe {
297            bits.set_len(bits.capacity());
298            std::slice::from_raw_parts_mut(bits.as_ptr() as *mut u8, bits.len() * (BITS / 8))
299        };
300        slice[..buf.len()].copy_from_slice(buf);
301        slice[buf.len()..].fill(0);
302        Self::from_vec_with_len(bits, num_bits)
303    }
304
305    /// Creates a new bitmap from a slice of `bool`.
306    pub fn from_bool_slice(bools: &[bool]) -> Self {
307        // use SIMD to speed up
308        let mut iter = bools.iter().copied().array_chunks::<BITS>();
309        let mut bits = Vec::with_capacity(Self::vec_len(bools.len()));
310        for chunk in iter.by_ref() {
311            let bitmask = std::simd::Mask::<i8, BITS>::from_array(chunk).to_bitmask() as usize;
312            bits.push(bitmask);
313        }
314        if let Some(remainder_iter) = iter.into_remainder()
315            && !remainder_iter.is_empty()
316        {
317            let mut bitmask = 0;
318            for (i, b) in remainder_iter.enumerate() {
319                bitmask |= (b as usize) << i;
320            }
321            bits.push(bitmask);
322        }
323        Self::from_vec_with_len(bits, bools.len())
324    }
325
326    /// Return the next set bit index on or after `bit_idx`.
327    pub fn next_set_bit(&self, bit_idx: usize) -> Option<usize> {
328        (bit_idx..self.len()).find(|&idx| unsafe { self.is_set_unchecked(idx) })
329    }
330
331    /// Counts the number of bits set to 1.
332    pub fn count_ones(&self) -> usize {
333        self.count_ones
334    }
335
336    /// Returns true if any bit is set to 1.
337    pub fn any(&self) -> bool {
338        self.count_ones != 0
339    }
340
341    /// Returns the length of vector to store `num_bits` bits.
342    fn vec_len(num_bits: usize) -> usize {
343        num_bits.div_ceil(BITS)
344    }
345
346    /// Returns the number of valid bits in the bitmap,
347    /// also referred to its 'length'.
348    #[inline]
349    pub fn len(&self) -> usize {
350        self.num_bits
351    }
352
353    /// Returns true if the `Bitmap` has a length of 0.
354    pub fn is_empty(&self) -> bool {
355        self.num_bits == 0
356    }
357
358    /// Returns true if the bit at `idx` is set, without doing bounds checking.
359    ///
360    /// # Safety
361    ///
362    /// Index must be in range.
363    pub unsafe fn is_set_unchecked(&self, idx: usize) -> bool {
364        unsafe {
365            match &self.bits {
366                None => self.count_ones != 0,
367                Some(bits) => bits.get_unchecked(idx / BITS) & (1 << (idx % BITS)) != 0,
368            }
369        }
370    }
371
372    /// Returns true if the bit at `idx` is set.
373    pub fn is_set(&self, idx: usize) -> bool {
374        assert!(idx < self.len());
375        unsafe { self.is_set_unchecked(idx) }
376    }
377
378    /// Tests if every bit is set to 1.
379    pub fn all(&self) -> bool {
380        self.count_ones == self.len()
381    }
382
383    /// Produces an iterator over each bit.
384    pub fn iter(&self) -> BitmapIter<'_> {
385        BitmapIter {
386            bits: self.bits.as_ref().map(|s| &s[..]),
387            idx: 0,
388            num_bits: self.num_bits,
389            current_usize: 0,
390            all_ones: self.count_ones == self.num_bits,
391        }
392    }
393
394    /// Enumerates the index of each bit set to 1.
395    pub fn iter_ones(&self) -> BitmapOnesIter<'_> {
396        if let Some(bits) = &self.bits {
397            BitmapOnesIter::Buffer {
398                bits: &bits[..],
399                cur_idx: 0,
400                cur_bits: bits.first().cloned(),
401            }
402        } else {
403            // all zeros or all ones
404            BitmapOnesIter::Range {
405                start: 0,
406                end: self.count_ones,
407            }
408        }
409    }
410
411    /// Returns an iterator which yields the position ranges of continuous high bits.
412    pub fn high_ranges(&self) -> impl Iterator<Item = RangeInclusive<usize>> + '_ {
413        let mut start = None;
414
415        self.iter()
416            .chain(iter::once(false))
417            .enumerate()
418            .filter_map(move |(i, bit)| match (bit, start) {
419                // A new high range starts.
420                (true, None) => {
421                    start = Some(i);
422                    None
423                }
424                // The current high range ends.
425                (false, Some(s)) => {
426                    start = None;
427                    Some(s..=(i - 1))
428                }
429                _ => None,
430            })
431    }
432
433    #[cfg(test)]
434    fn assert_valid(&self) {
435        assert_eq!(
436            self.iter().map(|x| x as usize).sum::<usize>(),
437            self.count_ones
438        )
439    }
440
441    /// Creates a new bitmap with all bits in range set to 1.
442    ///
443    /// # Example
444    /// ```
445    /// use risingwave_common::bitmap::Bitmap;
446    /// let bitmap = Bitmap::from_range(200, 100..180);
447    /// assert_eq!(bitmap.count_ones(), 80);
448    /// for i in 0..200 {
449    ///     assert_eq!(bitmap.is_set(i), i >= 100 && i < 180);
450    /// }
451    /// ```
452    pub fn from_range(num_bits: usize, range: Range<usize>) -> Self {
453        assert!(range.start <= range.end);
454        assert!(range.end <= num_bits);
455        if range.start == range.end {
456            return Self::zeros(num_bits);
457        } else if range == (0..num_bits) {
458            return Self::ones(num_bits);
459        }
460        let mut bits = vec![0; Self::vec_len(num_bits)];
461        let start = range.start / BITS;
462        let end = range.end / BITS;
463        let start_offset = range.start % BITS;
464        let end_offset = range.end % BITS;
465        if start == end {
466            bits[start] = ((1 << (end_offset - start_offset)) - 1) << start_offset;
467        } else {
468            bits[start] = !0 << start_offset;
469            bits[start + 1..end].fill(!0);
470            if end_offset != 0 {
471                bits[end] = (1 << end_offset) - 1;
472            }
473        }
474        Self {
475            bits: Some(bits.into()),
476            num_bits,
477            count_ones: range.len(),
478        }
479    }
480}
481
482impl From<usize> for Bitmap {
483    fn from(val: usize) -> Self {
484        Self::ones(val)
485    }
486}
487
488impl<'b> BitAnd<&'b Bitmap> for &Bitmap {
489    type Output = Bitmap;
490
491    fn bitand(self, rhs: &'b Bitmap) -> Bitmap {
492        assert_eq!(self.num_bits, rhs.num_bits);
493        let (lbits, rbits) = match (&self.bits, &rhs.bits) {
494            _ if self.count_ones == 0 || rhs.count_ones == 0 => {
495                return Bitmap::zeros(self.num_bits);
496            }
497            (_, None) => return self.clone(),
498            (None, _) => return rhs.clone(),
499            (Some(lbits), Some(rbits)) => (lbits, rbits),
500        };
501        let bits = (lbits.iter().zip(rbits.iter()))
502            .map(|(&a, &b)| a & b)
503            .collect();
504        Bitmap::from_vec_with_len(bits, self.num_bits)
505    }
506}
507
508impl BitAnd<Bitmap> for &Bitmap {
509    type Output = Bitmap;
510
511    fn bitand(self, rhs: Bitmap) -> Self::Output {
512        self.bitand(&rhs)
513    }
514}
515
516impl<'b> BitAnd<&'b Bitmap> for Bitmap {
517    type Output = Bitmap;
518
519    fn bitand(self, rhs: &'b Bitmap) -> Self::Output {
520        rhs.bitand(self)
521    }
522}
523
524impl BitAnd for Bitmap {
525    type Output = Bitmap;
526
527    fn bitand(self, rhs: Bitmap) -> Self::Output {
528        (&self).bitand(&rhs)
529    }
530}
531
532impl BitAndAssign<&Bitmap> for Bitmap {
533    fn bitand_assign(&mut self, rhs: &Bitmap) {
534        *self = &*self & rhs;
535    }
536}
537
538impl BitAndAssign<Bitmap> for Bitmap {
539    fn bitand_assign(&mut self, rhs: Bitmap) {
540        *self = &*self & rhs;
541    }
542}
543
544impl<'b> BitOr<&'b Bitmap> for &Bitmap {
545    type Output = Bitmap;
546
547    fn bitor(self, rhs: &'b Bitmap) -> Bitmap {
548        assert_eq!(self.num_bits, rhs.num_bits);
549        let (lbits, rbits) = match (&self.bits, &rhs.bits) {
550            _ if self.count_ones == self.num_bits || rhs.count_ones == self.num_bits => {
551                return Bitmap::ones(self.num_bits);
552            }
553            (_, None) => return self.clone(),
554            (None, _) => return rhs.clone(),
555            (Some(lbits), Some(rbits)) => (lbits, rbits),
556        };
557        let bits = (lbits.iter().zip(rbits.iter()))
558            .map(|(&a, &b)| a | b)
559            .collect();
560        Bitmap::from_vec_with_len(bits, self.num_bits)
561    }
562}
563
564impl BitOr<Bitmap> for &Bitmap {
565    type Output = Bitmap;
566
567    fn bitor(self, rhs: Bitmap) -> Self::Output {
568        self.bitor(&rhs)
569    }
570}
571
572impl<'b> BitOr<&'b Bitmap> for Bitmap {
573    type Output = Bitmap;
574
575    fn bitor(self, rhs: &'b Bitmap) -> Self::Output {
576        rhs.bitor(self)
577    }
578}
579
580impl BitOr for Bitmap {
581    type Output = Bitmap;
582
583    fn bitor(self, rhs: Bitmap) -> Self::Output {
584        (&self).bitor(&rhs)
585    }
586}
587
588impl BitOrAssign<&Bitmap> for Bitmap {
589    fn bitor_assign(&mut self, rhs: &Bitmap) {
590        *self = &*self | rhs;
591    }
592}
593
594impl BitOrAssign<Bitmap> for Bitmap {
595    fn bitor_assign(&mut self, rhs: Bitmap) {
596        *self |= &rhs;
597    }
598}
599
600impl BitXor for &Bitmap {
601    type Output = Bitmap;
602
603    fn bitxor(self, rhs: &Bitmap) -> Self::Output {
604        assert_eq!(self.num_bits, rhs.num_bits);
605        let (lbits, rbits) = match (&self.bits, &rhs.bits) {
606            (_, None) if rhs.count_ones == 0 => return self.clone(),
607            (_, None) => return !self,
608            (None, _) if self.count_ones == 0 => return rhs.clone(),
609            (None, _) => return !rhs,
610            (Some(lbits), Some(rbits)) => (lbits, rbits),
611        };
612        let bits = (lbits.iter().zip(rbits.iter()))
613            .map(|(&a, &b)| a ^ b)
614            .collect();
615        Bitmap::from_vec_with_len(bits, self.num_bits)
616    }
617}
618
619impl Not for &Bitmap {
620    type Output = Bitmap;
621
622    fn not(self) -> Self::Output {
623        let bits = match &self.bits {
624            None if self.count_ones == 0 => return Bitmap::ones(self.num_bits),
625            None => return Bitmap::zeros(self.num_bits),
626            Some(bits) => bits,
627        };
628        let mut bits: Box<[usize]> = bits.iter().map(|b| !b).collect();
629        if !self.num_bits.is_multiple_of(BITS) {
630            bits[self.num_bits / BITS] &= (1 << (self.num_bits % BITS)) - 1;
631        }
632        Bitmap {
633            num_bits: self.num_bits,
634            count_ones: self.num_bits - self.count_ones,
635            bits: Some(bits),
636        }
637    }
638}
639
640impl Not for Bitmap {
641    type Output = Bitmap;
642
643    fn not(mut self) -> Self::Output {
644        let bits = match &mut self.bits {
645            None if self.count_ones == 0 => return Bitmap::ones(self.num_bits),
646            None => return Bitmap::zeros(self.num_bits),
647            Some(bits) => bits,
648        };
649        bits.iter_mut().for_each(|x| *x = !*x);
650        if !self.num_bits.is_multiple_of(BITS) {
651            bits[self.num_bits / BITS] &= (1 << (self.num_bits % BITS)) - 1;
652        }
653        self.count_ones = self.num_bits - self.count_ones;
654        self
655    }
656}
657
658impl FromIterator<bool> for Bitmap {
659    fn from_iter<T: IntoIterator<Item = bool>>(iter: T) -> Self {
660        let vec = iter.into_iter().collect::<Vec<_>>();
661        Self::from_bool_slice(&vec)
662    }
663}
664
665impl FromIterator<Option<bool>> for Bitmap {
666    fn from_iter<T: IntoIterator<Item = Option<bool>>>(iter: T) -> Self {
667        iter.into_iter().map(|b| b.unwrap_or(false)).collect()
668    }
669}
670
671impl Bitmap {
672    pub fn to_protobuf(&self) -> PbBuffer {
673        let body_len = self.num_bits.div_ceil(8) + 1;
674        let mut body = Vec::with_capacity(body_len);
675        body.push((self.num_bits % 8) as u8);
676        match &self.bits {
677            None if self.count_ones == 0 => body.resize(body_len, 0),
678            None => {
679                body.resize(body_len, u8::MAX);
680                if !self.num_bits.is_multiple_of(8) {
681                    body[body_len - 1] = (1 << (self.num_bits % 8)) - 1;
682                }
683            }
684            Some(bits) => {
685                body.extend_from_slice(unsafe {
686                    std::slice::from_raw_parts(
687                        bits.as_ptr() as *const u8,
688                        self.num_bits.div_ceil(8),
689                    )
690                });
691            }
692        }
693        PbBuffer {
694            body,
695            compression: CompressionType::None as i32,
696        }
697    }
698}
699
700impl From<&PbBuffer> for Bitmap {
701    fn from(buf: &PbBuffer) -> Self {
702        let last_byte_num_bits = buf.body[0];
703        let num_bits = ((buf.body.len() - 1) * 8) - ((8 - last_byte_num_bits) % 8) as usize;
704        Self::from_bytes_with_len(&buf.body[1..], num_bits)
705    }
706}
707
708impl From<PbBuffer> for Bitmap {
709    fn from(buf: PbBuffer) -> Self {
710        Self::from(&buf)
711    }
712}
713
714/// Bitmap iterator.
715pub struct BitmapIter<'a> {
716    bits: Option<&'a [usize]>,
717    idx: usize,
718    num_bits: usize,
719    current_usize: usize,
720    all_ones: bool,
721}
722
723impl BitmapIter<'_> {
724    fn next_always_load_usize(&mut self) -> Option<bool> {
725        if self.idx >= self.num_bits {
726            return None;
727        }
728        let Some(bits) = &self.bits else {
729            self.idx += 1;
730            return Some(self.all_ones);
731        };
732
733        // Offset of the bit within the usize.
734        let usize_offset = self.idx % BITS;
735
736        // Get the index of usize which the bit is located in
737        let usize_index = self.idx / BITS;
738        self.current_usize = unsafe { *bits.get_unchecked(usize_index) };
739
740        let bit_mask = 1 << usize_offset;
741        let bit_flag = self.current_usize & bit_mask != 0;
742        self.idx += 1;
743        Some(bit_flag)
744    }
745}
746
747impl iter::Iterator for BitmapIter<'_> {
748    type Item = bool;
749
750    fn next(&mut self) -> Option<Self::Item> {
751        if self.idx >= self.num_bits {
752            return None;
753        }
754        let Some(bits) = &self.bits else {
755            self.idx += 1;
756            return Some(self.all_ones);
757        };
758
759        // Offset of the bit within the usize.
760        let usize_offset = self.idx % BITS;
761
762        if usize_offset == 0 {
763            // Get the index of usize which the bit is located in
764            let usize_index = self.idx / BITS;
765            self.current_usize = unsafe { *bits.get_unchecked(usize_index) };
766        }
767
768        let bit_mask = 1 << usize_offset;
769
770        let bit_flag = self.current_usize & bit_mask != 0;
771        self.idx += 1;
772        Some(bit_flag)
773    }
774
775    fn size_hint(&self) -> (usize, Option<usize>) {
776        let remaining = self.num_bits - self.idx;
777        (remaining, Some(remaining))
778    }
779
780    fn nth(&mut self, n: usize) -> Option<Self::Item> {
781        self.idx += n;
782        self.next_always_load_usize()
783    }
784}
785
786impl ExactSizeIterator for BitmapIter<'_> {}
787unsafe impl TrustedLen for BitmapIter<'_> {}
788
789pub enum BitmapOnesIter<'a> {
790    Range {
791        start: usize,
792        end: usize,
793    },
794    Buffer {
795        bits: &'a [usize],
796        cur_idx: usize,
797        cur_bits: Option<usize>,
798    },
799}
800
801impl iter::Iterator for BitmapOnesIter<'_> {
802    type Item = usize;
803
804    fn next(&mut self) -> Option<Self::Item> {
805        match self {
806            BitmapOnesIter::Range { start, end } => {
807                let i = *start;
808                if i >= *end {
809                    None
810                } else {
811                    *start += 1;
812                    Some(i)
813                }
814            }
815            BitmapOnesIter::Buffer {
816                bits,
817                cur_idx,
818                cur_bits,
819            } => {
820                while *cur_bits == Some(0) {
821                    *cur_idx += 1;
822                    *cur_bits = if *cur_idx >= bits.len() {
823                        None
824                    } else {
825                        Some(bits[*cur_idx])
826                    };
827                }
828                cur_bits.map(|bits| {
829                    let low_bit = bits & bits.wrapping_neg();
830                    let low_bit_idx = bits.trailing_zeros();
831                    *cur_bits = Some(bits ^ low_bit);
832                    *cur_idx * BITS + low_bit_idx as usize
833                })
834            }
835        }
836    }
837
838    fn size_hint(&self) -> (usize, Option<usize>) {
839        match self {
840            BitmapOnesIter::Range { start, end } => {
841                let remaining = end - start;
842                (remaining, Some(remaining))
843            }
844            BitmapOnesIter::Buffer { bits, .. } => (0, Some(bits.len() * usize::BITS as usize)),
845        }
846    }
847}
848
849pub trait FilterByBitmap: ExactSizeIterator + Sized {
850    fn filter_by_bitmap(self, bitmap: &Bitmap) -> impl Iterator<Item = Self::Item> {
851        self.zip_eq_fast(bitmap.iter())
852            .filter_map(|(item, bit)| bit.then_some(item))
853    }
854}
855
856impl<T> FilterByBitmap for T where T: ExactSizeIterator {}
857
858#[cfg(test)]
859mod tests {
860    use super::*;
861
862    #[test]
863    fn test_bitmap_builder() {
864        let bitmap1 = {
865            let mut builder = BitmapBuilder::default();
866            let bits = [
867                false, true, true, false, true, false, true, false, true, false, true, true, false,
868                true, false, true,
869            ];
870            for bit in bits {
871                builder.append(bit);
872            }
873            for (idx, bit) in bits.iter().enumerate() {
874                assert_eq!(builder.is_set(idx), *bit);
875            }
876            builder.finish()
877        };
878        let byte1 = 0b0101_0110_u8;
879        let byte2 = 0b1010_1101_u8;
880        let expected = Bitmap::from_bytes(&[byte1, byte2]);
881        assert_eq!(bitmap1, expected);
882        assert_eq!(
883            bitmap1.count_ones(),
884            (byte1.count_ones() + byte2.count_ones()) as usize
885        );
886
887        let bitmap2 = {
888            let mut builder = BitmapBuilder::default();
889            let bits = [false, true, true, false, true, false, true, false];
890            for bit in bits {
891                builder.append(bit);
892            }
893            for (idx, bit) in bits.iter().enumerate() {
894                assert_eq!(builder.is_set(idx), *bit);
895            }
896            builder.finish()
897        };
898        let byte1 = 0b0101_0110_u8;
899        let expected = Bitmap::from_bytes(&[byte1]);
900        assert_eq!(bitmap2, expected);
901    }
902
903    #[test]
904    fn test_builder_append_bitmap() {
905        let mut builder = BitmapBuilder::default();
906        builder.append_bitmap(&Bitmap::zeros(64));
907        builder.append_bitmap(&Bitmap::ones(64));
908        builder.append_bitmap(&Bitmap::from_bytes(&[0b1010_1010]));
909        builder.append_bitmap(&Bitmap::zeros(8));
910        builder.append_bitmap(&Bitmap::ones(8));
911        let bitmap = builder.finish();
912        assert_eq!(
913            bitmap,
914            Bitmap::from_vec_with_len(
915                vec![0, usize::MAX, 0b1111_1111_0000_0000_1010_1010],
916                64 + 64 + 8 + 8 + 8
917            ),
918        );
919    }
920
921    #[test]
922    fn test_bitmap_get_size() {
923        let bitmap1 = {
924            let mut builder = BitmapBuilder::default();
925            let bits = [
926                false, true, true, false, true, false, true, false, true, false, true, true, false,
927                true, false, true,
928            ];
929            for bit in bits {
930                builder.append(bit);
931            }
932            for (idx, bit) in bits.iter().enumerate() {
933                assert_eq!(builder.is_set(idx), *bit);
934            }
935            builder.finish()
936        };
937        assert_eq!(8, bitmap1.estimated_heap_size());
938        assert_eq!(40, bitmap1.estimated_size());
939    }
940
941    #[test]
942    fn test_bitmap_all_high() {
943        let num_bits = 3;
944        let bitmap = Bitmap::ones(num_bits);
945        assert_eq!(bitmap.len(), num_bits);
946        assert!(bitmap.all());
947        for i in 0..num_bits {
948            assert!(bitmap.is_set(i));
949        }
950        // Test to and from protobuf is OK.
951        assert_eq!(bitmap, Bitmap::from(&bitmap.to_protobuf()));
952    }
953
954    #[test]
955    fn test_bitwise_and() {
956        #[rustfmt::skip]
957        let cases = [(
958            vec![],
959            vec![],
960            vec![],
961        ), (
962            vec![0, 1, 1, 0, 1, 0, 1, 0],
963            vec![0, 1, 0, 0, 1, 1, 1, 0],
964            vec![0, 1, 0, 0, 1, 0, 1, 0],
965        ), (
966            vec![0, 1, 0, 1, 0],
967            vec![1, 1, 1, 1, 0],
968            vec![0, 1, 0, 1, 0],
969        )];
970
971        for (input1, input2, expected) in cases {
972            let bitmap1: Bitmap = input1.into_iter().map(|x| x != 0).collect();
973            let bitmap2: Bitmap = input2.into_iter().map(|x| x != 0).collect();
974            let res = bitmap1 & bitmap2;
975            res.assert_valid();
976            assert_eq!(res.iter().map(|x| x as i32).collect::<Vec<_>>(), expected,);
977        }
978    }
979
980    #[test]
981    fn test_bitwise_not() {
982        #[rustfmt::skip]
983        let cases = [(
984            vec![1, 0, 1, 0, 1],
985            vec![0, 1, 0, 1, 0],
986        ), (
987            vec![],
988            vec![],
989        ), (
990            vec![1, 0, 1, 1, 0, 0, 1, 1],
991            vec![0, 1, 0, 0, 1, 1, 0, 0],
992        ), (
993            vec![1, 0, 0, 1, 1, 1, 0, 0, 1, 0],
994            vec![0, 1, 1, 0, 0, 0, 1, 1, 0, 1],
995        )];
996
997        for (input, expected) in cases {
998            let bitmap: Bitmap = input.into_iter().map(|x| x != 0).collect();
999            let res = !bitmap;
1000            res.assert_valid();
1001            assert_eq!(res.iter().map(|x| x as i32).collect::<Vec<_>>(), expected);
1002        }
1003    }
1004
1005    #[test]
1006    fn test_bitwise_or() {
1007        let bitmap1 = Bitmap::from_bytes(&[0b01101010]);
1008        let bitmap2 = Bitmap::from_bytes(&[0b01001110]);
1009        assert_eq!(Bitmap::from_bytes(&[0b01101110]), (bitmap1 | bitmap2));
1010    }
1011
1012    #[test]
1013    fn test_bitmap_is_set() {
1014        let bitmap = Bitmap::from_bytes(&[0b01001010]);
1015        assert!(!bitmap.is_set(0));
1016        assert!(bitmap.is_set(1));
1017        assert!(!bitmap.is_set(2));
1018        assert!(bitmap.is_set(3));
1019        assert!(!bitmap.is_set(4));
1020        assert!(!bitmap.is_set(5));
1021        assert!(bitmap.is_set(6));
1022        assert!(!bitmap.is_set(7));
1023    }
1024
1025    #[test]
1026    fn test_bitmap_iter() {
1027        {
1028            let bitmap = Bitmap::from_bytes(&[0b01001010]);
1029            let mut booleans = vec![];
1030            for b in bitmap.iter() {
1031                booleans.push(b as u8);
1032            }
1033            assert_eq!(booleans, vec![0u8, 1, 0, 1, 0, 0, 1, 0]);
1034        }
1035        {
1036            let bitmap: Bitmap = vec![true; 5].into_iter().collect();
1037            for b in bitmap.iter() {
1038                assert!(b);
1039            }
1040        }
1041    }
1042
1043    #[test]
1044    fn test_bitmap_high_ranges_iter() {
1045        fn test(bits: impl IntoIterator<Item = bool>, expected: Vec<RangeInclusive<usize>>) {
1046            let bitmap = Bitmap::from_iter(bits);
1047            let high_ranges = bitmap.high_ranges().collect::<Vec<_>>();
1048            assert_eq!(high_ranges, expected);
1049        }
1050
1051        test(
1052            vec![
1053                true, true, true, false, false, true, true, true, false, true,
1054            ],
1055            vec![0..=2, 5..=7, 9..=9],
1056        );
1057        test(vec![true, true, true], vec![0..=2]);
1058        test(vec![false, false, false], vec![]);
1059    }
1060
1061    #[test]
1062    fn test_bitmap_from_protobuf() {
1063        let bitmap_bytes = vec![3u8 /* len % BITS */, 0b0101_0010, 0b110];
1064        let buf = PbBuffer {
1065            body: bitmap_bytes,
1066            compression: CompressionType::None as _,
1067        };
1068        let bitmap: Bitmap = (&buf).into();
1069        let actual_bytes: Vec<u8> = bitmap.iter().map(|b| b as u8).collect();
1070
1071        assert_eq!(actual_bytes, vec![0, 1, 0, 0, 1, 0, 1, 0, /*  */ 0, 1, 1]); // in reverse order
1072        assert_eq!(bitmap.count_ones(), 5);
1073    }
1074
1075    #[test]
1076    fn test_bitmap_from_buffer() {
1077        let byte1 = 0b0110_1010_u8;
1078        let byte2 = 0b1011_0101_u8;
1079        let bitmap = Bitmap::from_bytes(&[0b0110_1010, 0b1011_0101]);
1080        let expected = Bitmap::from_iter(vec![
1081            false, true, false, true, false, true, true, false, true, false, true, false, true,
1082            true, false, true,
1083        ]);
1084        let count_ones = (byte1.count_ones() + byte2.count_ones()) as usize;
1085        assert_eq!(expected, bitmap);
1086        assert_eq!(bitmap.count_ones(), count_ones);
1087        assert_eq!(expected.count_ones(), count_ones);
1088    }
1089
1090    #[test]
1091    fn test_bitmap_eq() {
1092        let b1: Bitmap = Bitmap::zeros(3);
1093        let b2: Bitmap = Bitmap::zeros(5);
1094        assert_ne!(b1, b2);
1095
1096        let b1: Bitmap = [true, false].iter().cycle().cloned().take(10000).collect();
1097        let b2: Bitmap = [true, false].iter().cycle().cloned().take(10000).collect();
1098        assert_eq!(b1, b2);
1099    }
1100
1101    #[test]
1102    fn test_bitmap_set() {
1103        let mut b = BitmapBuilder::zeroed(10);
1104
1105        b.set(0, true);
1106        b.set(7, true);
1107        b.set(8, true);
1108        b.set(9, true);
1109
1110        b.set(7, false);
1111        b.set(8, false);
1112
1113        b.append(true);
1114        assert_eq!(b.len(), 11);
1115
1116        let b = b.finish();
1117        assert_eq!(b.count_ones(), 3);
1118        assert_eq!(&b.bits.unwrap()[..], &[0b0110_0000_0001]);
1119    }
1120
1121    #[test]
1122    fn test_bitmap_pop() {
1123        let mut b = BitmapBuilder::zeroed(7);
1124
1125        {
1126            b.append(true);
1127            assert!(b.is_set(b.len() - 1));
1128            b.pop();
1129            assert!(!b.is_set(b.len() - 1));
1130        }
1131
1132        {
1133            b.append(false);
1134            assert!(!b.is_set(b.len() - 1));
1135            b.pop();
1136            assert!(!b.is_set(b.len() - 1));
1137        }
1138
1139        {
1140            b.append(true);
1141            b.append(false);
1142            assert!(!b.is_set(b.len() - 1));
1143            b.pop();
1144            assert!(b.is_set(b.len() - 1));
1145            b.pop();
1146            assert!(!b.is_set(b.len() - 1));
1147        }
1148    }
1149
1150    #[test]
1151    fn test_bitmap_iter_ones() {
1152        let mut builder = BitmapBuilder::zeroed(1000);
1153        builder.append_n(1000, true);
1154        let bitmap = builder.finish();
1155        let mut iter = bitmap.iter_ones();
1156        for i in 0..1000 {
1157            let item = iter.next();
1158            assert!(item == Some(i + 1000));
1159        }
1160        for _ in 0..5 {
1161            let item = iter.next();
1162            assert!(item.is_none());
1163        }
1164    }
1165
1166    #[test]
1167    fn from_indices_creates_correct_bitmap() {
1168        let bitmap = Bitmap::from_indices(10, &[1, 3, 5, 7, 9]);
1169        assert_eq!(bitmap.len(), 10);
1170        for i in [1, 3, 5, 7, 9] {
1171            assert!(bitmap.is_set(i));
1172        }
1173
1174        for i in [0, 2, 4, 6, 8] {
1175            assert!(!bitmap.is_set(i));
1176        }
1177    }
1178
1179    #[test]
1180    fn from_indices_empty_indices_creates_zeroed_bitmap() {
1181        let bitmap = Bitmap::from_indices(10, &[]);
1182        assert_eq!(bitmap.len(), 10);
1183        assert!(!bitmap.any());
1184    }
1185
1186    #[test]
1187    fn from_indices_out_of_bounds_panics() {
1188        let result = std::panic::catch_unwind(|| {
1189            Bitmap::from_indices(5, &[0, 6]);
1190        });
1191        assert!(result.is_err());
1192    }
1193
1194    #[test]
1195    fn from_indices_all_indices_set_creates_filled_bitmap() {
1196        let bitmap = Bitmap::from_indices(5, &[0, 1, 2, 3, 4]);
1197        assert_eq!(bitmap.len(), 5);
1198        assert!(bitmap.all());
1199    }
1200}