risingwave_common/row/
chain.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use bytes::BufMut;
16
17use super::Row;
18use crate::types::DatumRef;
19
20/// Row for the [`chain`](super::RowExt::chain) method.
21#[derive(Debug, Clone, Copy)]
22pub struct Chain<R1, R2> {
23    r1: R1,
24    r2: R2,
25}
26
27impl<R1: Row, R2: Row> PartialEq for Chain<R1, R2> {
28    fn eq(&self, other: &Self) -> bool {
29        self.iter().eq(other.iter())
30    }
31}
32impl<R1: Row, R2: Row> Eq for Chain<R1, R2> {}
33
34impl<R1: Row, R2: Row> Row for Chain<R1, R2> {
35    #[inline]
36    fn datum_at(&self, index: usize) -> DatumRef<'_> {
37        if index < self.r1.len() {
38            // SAFETY: `index < self.r1.len()` implies the index is valid.
39            unsafe { self.r1.datum_at_unchecked(index) }
40        } else {
41            self.r2.datum_at(index - self.r1.len())
42        }
43    }
44
45    #[inline]
46    unsafe fn datum_at_unchecked(&self, index: usize) -> DatumRef<'_> {
47        unsafe {
48            if index < self.r1.len() {
49                self.r1.datum_at_unchecked(index)
50            } else {
51                self.r2.datum_at_unchecked(index - self.r1.len())
52            }
53        }
54    }
55
56    #[inline]
57    fn len(&self) -> usize {
58        self.r1.len() + self.r2.len()
59    }
60
61    #[inline]
62    fn is_empty(&self) -> bool {
63        self.r1.is_empty() && self.r2.is_empty()
64    }
65
66    #[inline]
67    fn iter(&self) -> impl Iterator<Item = DatumRef<'_>> {
68        self.r1.iter().chain(self.r2.iter())
69    }
70
71    // Manually implemented in case `R1` or `R2` has a more efficient implementation.
72    #[inline]
73    fn value_serialize_into(&self, mut buf: impl BufMut) {
74        self.r1.value_serialize_into(&mut buf);
75        self.r2.value_serialize_into(buf);
76    }
77}
78
79impl<R1, R2> Chain<R1, R2> {
80    pub(super) fn new(r1: R1, r2: R2) -> Self {
81        Self { r1, r2 }
82    }
83}
84
85#[cfg(test)]
86mod tests {
87    use super::*;
88    use crate::row::OwnedRow;
89    use crate::types::{ScalarImpl, ScalarRefImpl};
90
91    #[test]
92    fn test_chain_row() {
93        let r1 = || OwnedRow::new((1..=3).map(|i| Some(ScalarImpl::Int64(i))).collect());
94        let r2 = || OwnedRow::new((4..=6).map(|i| Some(ScalarImpl::Int64(i))).collect());
95        let r3 = || OwnedRow::new((7..=9).map(|i| Some(ScalarImpl::Int64(i))).collect());
96
97        let r_expected = OwnedRow::new((1..=9).map(|i| Some(ScalarImpl::Int64(i))).collect());
98
99        macro_rules! test {
100            ($r:expr) => {
101                let r = $r;
102                assert_eq!(r.len(), 9);
103                assert!(r.iter().eq(r_expected.iter()));
104
105                for i in 0..9 {
106                    assert_eq!(r.datum_at(i), Some(ScalarRefImpl::Int64(i as i64 + 1)));
107                }
108            };
109        }
110
111        test!(Chain::new(r1(), Chain::new(r2(), r3())));
112        test!(Chain::new(Chain::new(r1(), r2()), r3()));
113    }
114}