data_chunk_payload_generator/
data-chunk-payload-generator.rs1use std::env;
16use std::io::Write;
17
18use prost::Message;
19use risingwave_common::array::{Op, StreamChunk};
20use risingwave_common::row::OwnedRow;
21use risingwave_common::types::{DataType, F32, F64, ScalarImpl, Timestamp};
22use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
23
24fn build_row(index: usize) -> OwnedRow {
25 let mut row_value = Vec::with_capacity(10);
26 row_value.push(Some(ScalarImpl::Int16(index as i16)));
27 row_value.push(Some(ScalarImpl::Int32(index as i32)));
28 row_value.push(Some(ScalarImpl::Int64(index as i64)));
29 row_value.push(Some(ScalarImpl::Float32(F32::from(index as f32))));
30 row_value.push(Some(ScalarImpl::Float64(F64::from(index as f64))));
31 row_value.push(Some(ScalarImpl::Bool(index % 3 == 0)));
32 row_value.push(Some(ScalarImpl::Utf8(
33 format!("{}", index).repeat((index % 10) + 1).into(),
34 )));
35 row_value.push(Some(ScalarImpl::Timestamp(
36 Timestamp::from_timestamp_uncheck(index as _, 0),
37 )));
38 row_value.push(Some(ScalarImpl::Decimal(index.into())));
39 row_value.push(if index % 5 == 0 {
40 None
41 } else {
42 Some(ScalarImpl::Int64(index as i64))
43 });
44
45 OwnedRow::new(row_value)
46}
47
48fn main() {
49 let args: Vec<String> = env::args().collect();
50 let mut flag = false;
51 let mut row_count: usize = 30000;
52 if args.len() > 1 {
53 flag = true;
54 row_count = args[1].parse().unwrap();
55 }
56 let data_types = vec![
57 DataType::Int16,
58 DataType::Int32,
59 DataType::Int64,
60 DataType::Float32,
61 DataType::Float64,
62 DataType::Boolean,
63 DataType::Varchar,
64 DataType::Timestamp,
65 DataType::Decimal,
66 DataType::Int64,
67 ];
68 let mut ops = Vec::with_capacity(row_count);
69 let mut builder = DataChunkBuilder::new(data_types, row_count * 1024);
70 for i in 0..row_count {
71 assert!(
72 builder.append_one_row(build_row(i)).is_none(),
73 "should not finish"
74 );
75 if flag || i % 2 == 0 {
77 ops.push(Op::Insert);
78 } else {
79 ops.push(Op::Delete);
80 }
81 }
82
83 let data_chunk = builder.consume_all().expect("should not be empty");
84 let stream_chunk = StreamChunk::from_parts(ops, data_chunk);
85 let prost_stream_chunk = stream_chunk.to_protobuf();
86
87 let payload = Message::encode_to_vec(&prost_stream_chunk);
88
89 std::io::stdout()
90 .write_all(&payload)
91 .expect("should success");
92}