data_chunk_payload_convert_generator/
data-chunk-payload-convert-generator.rs1use std::env;
15use std::fs::File;
16use std::io::{Read, Write};
17use std::process::exit;
18
19use prost::Message;
20use risingwave_common::array::{Op, StreamChunk};
21use risingwave_common::row::OwnedRow;
22use risingwave_common::types::{DataType, ScalarImpl};
23use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
24use serde::{Deserialize, Serialize};
25
26#[derive(Debug, Deserialize, Serialize)]
27struct Line {
28 id: u32,
29 name: String,
30}
31
32#[derive(Debug, Deserialize, Serialize)]
33struct Operation {
34 op_type: u32,
35 line: Line,
36}
37
38fn convert_to_op(value: u32) -> Option<Op> {
39 match value {
40 1 => Some(Op::Insert),
41 2 => Some(Op::Delete),
42 3 => Some(Op::UpdateInsert),
43 4 => Some(Op::UpdateDelete),
44 _ => None,
45 }
46}
47
48fn main() {
49 let args: Vec<String> = env::args().collect();
50 if args.len() <= 1 {
51 println!("No input file name");
52 exit(0);
53 }
54 let mut file = File::open(&args[1]).expect("Failed to open file");
56 let mut contents = String::new();
57 file.read_to_string(&mut contents)
58 .expect("Failed to read file");
59
60 let data: Vec<Vec<Operation>> = serde_json::from_str(&contents).expect("Failed to parse JSON");
62
63 let data_types: Vec<_> = vec![DataType::Int32, DataType::Varchar];
64
65 let mut row_count = 0;
67 for operations in &data {
68 row_count += operations.len();
69 }
70 let mut ops = Vec::with_capacity(row_count);
71 let mut builder = DataChunkBuilder::new(data_types, row_count * 1024);
72
73 for operations in data {
74 for operation in operations {
75 let mut row_value = Vec::with_capacity(10);
76 row_value.push(Some(ScalarImpl::Int32(operation.line.id as i32)));
77 row_value.push(Some(ScalarImpl::Utf8(operation.line.name.into_boxed_str())));
78 let _ = builder.append_one_row(OwnedRow::new(row_value));
79 if let Some(op) = convert_to_op(operation.op_type) {
81 ops.push(op);
82 } else {
83 println!("Invalid value");
84 }
85 }
86 }
87
88 let data_chunk = builder.consume_all().expect("should not be empty");
89 let stream_chunk = StreamChunk::from_parts(ops, data_chunk);
90 let prost_stream_chunk: risingwave_pb::data::StreamChunk = stream_chunk.to_protobuf();
91
92 let payload = Message::encode_to_vec(&prost_stream_chunk);
93
94 std::io::stdout()
95 .write_all(&payload)
96 .expect("should success");
97}