risingwave_common/test_utils/
test_stream_chunk.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use itertools::Itertools;
18
19use crate::array::{I32Array, Op, StreamChunk};
20use crate::catalog::{Field, Schema};
21use crate::row::OwnedRow;
22use crate::test_prelude::StreamChunkTestExt;
23use crate::types::{DataType, Datum, ScalarImpl};
24
25pub trait TestStreamChunk {
26    fn stream_chunk(&self) -> StreamChunk;
27
28    fn cardinality(&self) -> usize;
29
30    fn schema(&self) -> Schema;
31
32    fn pk_indices(&self) -> Vec<usize> {
33        unimplemented!()
34    }
35
36    fn data_types(&self) -> Vec<DataType> {
37        self.schema().data_types()
38    }
39
40    fn op_at(&self, idx: usize) -> Op;
41
42    fn row_at(&self, idx: usize) -> OwnedRow;
43
44    fn row_with_op_at(&self, idx: usize) -> (Op, OwnedRow) {
45        (self.op_at(idx), self.row_at(idx))
46    }
47
48    fn value_at(&self, row_idx: usize, col_idx: usize) -> Datum {
49        self.row_at(row_idx)[col_idx].clone()
50    }
51}
52
53pub struct BigStreamChunk(StreamChunk);
54
55impl BigStreamChunk {
56    #[expect(clippy::if_same_then_else)]
57    #[expect(clippy::needless_bool)]
58    pub fn new(capacity: usize) -> Self {
59        let ops = (0..capacity)
60            .map(|i| {
61                if i % 20 == 0 || i % 20 == 1 {
62                    Op::UpdateDelete
63                } else if i % 20 == 2 {
64                    Op::UpdateInsert
65                } else if i % 2 == 0 {
66                    Op::Insert
67                } else {
68                    Op::Delete
69                }
70            })
71            .collect_vec();
72
73        let visibility = (0..capacity)
74            .map(|i| {
75                if i % 20 == 1 {
76                    false
77                } else if i % 20 == 10 {
78                    false
79                } else {
80                    true
81                }
82            })
83            .collect_vec()
84            .into_iter()
85            .collect();
86
87        let col = Arc::new(I32Array::from_iter(std::iter::repeat_n(114_514, capacity)).into());
88
89        let chunk = StreamChunk::with_visibility(ops, vec![col], visibility);
90
91        Self(chunk)
92    }
93}
94
95impl TestStreamChunk for BigStreamChunk {
96    fn stream_chunk(&self) -> StreamChunk {
97        self.0.clone()
98    }
99
100    fn cardinality(&self) -> usize {
101        self.0.cardinality()
102    }
103
104    fn schema(&self) -> Schema {
105        Schema::new(vec![Field::with_name(DataType::Int32, "v")])
106    }
107
108    fn op_at(&self, i: usize) -> Op {
109        self.0.ops()[i]
110    }
111
112    fn row_at(&self, _idx: usize) -> OwnedRow {
113        OwnedRow::new(vec![Some(ScalarImpl::Int32(114_514))])
114    }
115}
116
117pub struct WhatEverStreamChunk;
118
119impl TestStreamChunk for WhatEverStreamChunk {
120    fn stream_chunk(&self) -> StreamChunk {
121        StreamChunk::from_pretty(
122            "   i    f       I
123            +   1    4.0     5
124            -   2    .       6
125            +   .    3.5     7 D
126            U-  3    2.2     8
127            U+  4    1.8     9
128        ",
129        )
130    }
131
132    fn cardinality(&self) -> usize {
133        4
134    }
135
136    fn pk_indices(&self) -> Vec<usize> {
137        vec![0]
138    }
139
140    fn schema(&self) -> Schema {
141        let field1 = Field::with_name(DataType::Int32, "pk");
142        let field2 = Field::with_name(DataType::Float32, "v2");
143        let field3 = Field::with_name(DataType::Int64, "v3");
144        let fields = vec![field1, field2, field3];
145        Schema::new(fields)
146    }
147
148    fn op_at(&self, idx: usize) -> Op {
149        match idx {
150            0 => Op::Insert,
151            1 => Op::Delete,
152            2 => Op::UpdateDelete,
153            3 => Op::UpdateInsert,
154            _ => unreachable!(),
155        }
156    }
157
158    fn row_at(&self, idx: usize) -> OwnedRow {
159        match idx {
160            0 => OwnedRow::new(vec![
161                Some(1i32.into()),
162                Some(4.0f32.into()),
163                Some(5i64.into()),
164            ]),
165            1 => OwnedRow::new(vec![Some(2i32.into()), None, Some(6i64.into())]),
166            2 => OwnedRow::new(vec![
167                Some(3i32.into()),
168                Some(2.2f32.into()),
169                Some(8i64.into()),
170            ]),
171            3 => OwnedRow::new(vec![
172                Some(4i32.into()),
173                Some(1.8f32.into()),
174                Some(9i64.into()),
175            ]),
176            _ => unreachable!(),
177        }
178    }
179}