risingwave_common/test_utils/
test_stream_chunk.rs1use std::sync::Arc;
16
17use itertools::Itertools;
18
19use crate::array::{I32Array, Op, StreamChunk};
20use crate::catalog::{Field, Schema};
21use crate::row::OwnedRow;
22use crate::test_prelude::StreamChunkTestExt;
23use crate::types::{DataType, Datum, ScalarImpl};
24
25pub trait TestStreamChunk {
26 fn stream_chunk(&self) -> StreamChunk;
27
28 fn cardinality(&self) -> usize;
29
30 fn schema(&self) -> Schema;
31
32 fn pk_indices(&self) -> Vec<usize> {
33 unimplemented!()
34 }
35
36 fn data_types(&self) -> Vec<DataType> {
37 self.schema().data_types()
38 }
39
40 fn op_at(&self, idx: usize) -> Op;
41
42 fn row_at(&self, idx: usize) -> OwnedRow;
43
44 fn row_with_op_at(&self, idx: usize) -> (Op, OwnedRow) {
45 (self.op_at(idx), self.row_at(idx))
46 }
47
48 fn value_at(&self, row_idx: usize, col_idx: usize) -> Datum {
49 self.row_at(row_idx)[col_idx].clone()
50 }
51}
52
53pub struct BigStreamChunk(StreamChunk);
54
55impl BigStreamChunk {
56 #[expect(clippy::if_same_then_else)]
57 #[expect(clippy::needless_bool)]
58 pub fn new(capacity: usize) -> Self {
59 let ops = (0..capacity)
60 .map(|i| {
61 if i % 20 == 0 || i % 20 == 1 {
62 Op::UpdateDelete
63 } else if i % 20 == 2 {
64 Op::UpdateInsert
65 } else if i % 2 == 0 {
66 Op::Insert
67 } else {
68 Op::Delete
69 }
70 })
71 .collect_vec();
72
73 let visibility = (0..capacity)
74 .map(|i| {
75 if i % 20 == 1 {
76 false
77 } else if i % 20 == 10 {
78 false
79 } else {
80 true
81 }
82 })
83 .collect_vec()
84 .into_iter()
85 .collect();
86
87 let col = Arc::new(I32Array::from_iter(std::iter::repeat_n(114_514, capacity)).into());
88
89 let chunk = StreamChunk::with_visibility(ops, vec![col], visibility);
90
91 Self(chunk)
92 }
93}
94
95impl TestStreamChunk for BigStreamChunk {
96 fn stream_chunk(&self) -> StreamChunk {
97 self.0.clone()
98 }
99
100 fn cardinality(&self) -> usize {
101 self.0.cardinality()
102 }
103
104 fn schema(&self) -> Schema {
105 Schema::new(vec![Field::with_name(DataType::Int32, "v")])
106 }
107
108 fn op_at(&self, i: usize) -> Op {
109 self.0.ops()[i]
110 }
111
112 fn row_at(&self, _idx: usize) -> OwnedRow {
113 OwnedRow::new(vec![Some(ScalarImpl::Int32(114_514))])
114 }
115}
116
117pub struct WhatEverStreamChunk;
118
119impl TestStreamChunk for WhatEverStreamChunk {
120 fn stream_chunk(&self) -> StreamChunk {
121 StreamChunk::from_pretty(
122 " i f I
123 + 1 4.0 5
124 - 2 . 6
125 + . 3.5 7 D
126 U- 3 2.2 8
127 U+ 4 1.8 9
128 ",
129 )
130 }
131
132 fn cardinality(&self) -> usize {
133 4
134 }
135
136 fn pk_indices(&self) -> Vec<usize> {
137 vec![0]
138 }
139
140 fn schema(&self) -> Schema {
141 let field1 = Field::with_name(DataType::Int32, "pk");
142 let field2 = Field::with_name(DataType::Float32, "v2");
143 let field3 = Field::with_name(DataType::Int64, "v3");
144 let fields = vec![field1, field2, field3];
145 Schema::new(fields)
146 }
147
148 fn op_at(&self, idx: usize) -> Op {
149 match idx {
150 0 => Op::Insert,
151 1 => Op::Delete,
152 2 => Op::UpdateDelete,
153 3 => Op::UpdateInsert,
154 _ => unreachable!(),
155 }
156 }
157
158 fn row_at(&self, idx: usize) -> OwnedRow {
159 match idx {
160 0 => OwnedRow::new(vec![
161 Some(1i32.into()),
162 Some(4.0f32.into()),
163 Some(5i64.into()),
164 ]),
165 1 => OwnedRow::new(vec![Some(2i32.into()), None, Some(6i64.into())]),
166 2 => OwnedRow::new(vec![
167 Some(3i32.into()),
168 Some(2.2f32.into()),
169 Some(8i64.into()),
170 ]),
171 3 => OwnedRow::new(vec![
172 Some(4i32.into()),
173 Some(1.8f32.into()),
174 Some(9i64.into()),
175 ]),
176 _ => unreachable!(),
177 }
178 }
179}