data_chunk_payload_generator/
data-chunk-payload-generator.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::env;
16use std::io::Write;
17
18use prost::Message;
19use risingwave_common::array::{Op, StreamChunk};
20use risingwave_common::row::OwnedRow;
21use risingwave_common::types::{DataType, F32, F64, ScalarImpl, Timestamp};
22use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
23
24fn build_row(index: usize) -> OwnedRow {
25    let mut row_value = Vec::with_capacity(10);
26    row_value.push(Some(ScalarImpl::Int16(index as i16)));
27    row_value.push(Some(ScalarImpl::Int32(index as i32)));
28    row_value.push(Some(ScalarImpl::Int64(index as i64)));
29    row_value.push(Some(ScalarImpl::Float32(F32::from(index as f32))));
30    row_value.push(Some(ScalarImpl::Float64(F64::from(index as f64))));
31    row_value.push(Some(ScalarImpl::Bool(index % 3 == 0)));
32    row_value.push(Some(ScalarImpl::Utf8(
33        format!("{}", index).repeat((index % 10) + 1).into(),
34    )));
35    row_value.push(Some(ScalarImpl::Timestamp(
36        Timestamp::from_timestamp_uncheck(index as _, 0),
37    )));
38    row_value.push(Some(ScalarImpl::Decimal(index.into())));
39    row_value.push(if index % 5 == 0 {
40        None
41    } else {
42        Some(ScalarImpl::Int64(index as i64))
43    });
44
45    OwnedRow::new(row_value)
46}
47
48fn main() {
49    let args: Vec<String> = env::args().collect();
50    let mut flag = false;
51    let mut row_count: usize = 30000;
52    if args.len() > 1 {
53        flag = true;
54        row_count = args[1].parse().unwrap();
55    }
56    let data_types = vec![
57        DataType::Int16,
58        DataType::Int32,
59        DataType::Int64,
60        DataType::Float32,
61        DataType::Float64,
62        DataType::Boolean,
63        DataType::Varchar,
64        DataType::Timestamp,
65        DataType::Decimal,
66        DataType::Int64,
67    ];
68    let mut ops = Vec::with_capacity(row_count);
69    let mut builder = DataChunkBuilder::new(data_types, row_count * 1024);
70    for i in 0..row_count {
71        assert!(
72            builder.append_one_row(build_row(i)).is_none(),
73            "should not finish"
74        );
75        // In unit test, it does not support delete operation
76        if flag || i % 2 == 0 {
77            ops.push(Op::Insert);
78        } else {
79            ops.push(Op::Delete);
80        }
81    }
82
83    let data_chunk = builder.consume_all().expect("should not be empty");
84    let stream_chunk = StreamChunk::from_parts(ops, data_chunk);
85    let prost_stream_chunk = stream_chunk.to_protobuf();
86
87    let payload = Message::encode_to_vec(&prost_stream_chunk);
88
89    std::io::stdout()
90        .write_all(&payload)
91        .expect("should success");
92}