data_chunk_payload_convert_generator/
data-chunk-payload-convert-generator.rs1use std::env;
16use std::fs::File;
17use std::io::{Read, Write};
18use std::process::exit;
19
20use prost::Message;
21use risingwave_common::array::{Op, StreamChunk};
22use risingwave_common::row::OwnedRow;
23use risingwave_common::types::{DataType, ScalarImpl};
24use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
25use serde::{Deserialize, Serialize};
26
27#[derive(Debug, Deserialize, Serialize)]
28struct Line {
29 id: u32,
30 name: String,
31}
32
33#[derive(Debug, Deserialize, Serialize)]
34struct Operation {
35 op_type: u32,
36 line: Line,
37}
38
39fn convert_to_op(value: u32) -> Option<Op> {
40 match value {
41 1 => Some(Op::Insert),
42 2 => Some(Op::Delete),
43 3 => Some(Op::UpdateInsert),
44 4 => Some(Op::UpdateDelete),
45 _ => None,
46 }
47}
48
49fn main() {
50 let args: Vec<String> = env::args().collect();
51 if args.len() <= 1 {
52 println!("No input file name");
53 exit(0);
54 }
55 let mut file = File::open(&args[1]).expect("Failed to open file");
57 let mut contents = String::new();
58 file.read_to_string(&mut contents)
59 .expect("Failed to read file");
60
61 let data: Vec<Vec<Operation>> = serde_json::from_str(&contents).expect("Failed to parse JSON");
63
64 let data_types: Vec<_> = vec![DataType::Int32, DataType::Varchar];
65
66 let mut row_count = 0;
68 for operations in &data {
69 row_count += operations.len();
70 }
71 let mut ops = Vec::with_capacity(row_count);
72 let mut builder = DataChunkBuilder::new(data_types, row_count * 1024);
73
74 for operations in data {
75 for operation in operations {
76 let mut row_value = Vec::with_capacity(10);
77 row_value.push(Some(ScalarImpl::Int32(operation.line.id as i32)));
78 row_value.push(Some(ScalarImpl::Utf8(operation.line.name.into_boxed_str())));
79 let _ = builder.append_one_row(OwnedRow::new(row_value));
80 if let Some(op) = convert_to_op(operation.op_type) {
82 ops.push(op);
83 } else {
84 println!("Invalid value");
85 }
86 }
87 }
88
89 let data_chunk = builder.consume_all().expect("should not be empty");
90 let stream_chunk = StreamChunk::from_parts(ops, data_chunk);
91 let prost_stream_chunk: risingwave_pb::data::StreamChunk = stream_chunk.to_protobuf();
92
93 let payload = Message::encode_to_vec(&prost_stream_chunk);
94
95 std::io::stdout()
96 .write_all(&payload)
97 .expect("should success");
98}