data_chunk_payload_convert_generator/
data-chunk-payload-convert-generator.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::env;
16use std::fs::File;
17use std::io::{Read, Write};
18use std::process::exit;
19
20use prost::Message;
21use risingwave_common::array::{Op, StreamChunk};
22use risingwave_common::row::OwnedRow;
23use risingwave_common::types::{DataType, ScalarImpl};
24use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
25use serde::{Deserialize, Serialize};
26
27#[derive(Debug, Deserialize, Serialize)]
28struct Line {
29    id: u32,
30    name: String,
31}
32
33#[derive(Debug, Deserialize, Serialize)]
34struct Operation {
35    op_type: u32,
36    line: Line,
37}
38
39fn convert_to_op(value: u32) -> Option<Op> {
40    match value {
41        1 => Some(Op::Insert),
42        2 => Some(Op::Delete),
43        3 => Some(Op::UpdateInsert),
44        4 => Some(Op::UpdateDelete),
45        _ => None,
46    }
47}
48
49fn main() {
50    let args: Vec<String> = env::args().collect();
51    if args.len() <= 1 {
52        println!("No input file name");
53        exit(0);
54    }
55    // Read the JSON file
56    let mut file = File::open(&args[1]).expect("Failed to open file");
57    let mut contents = String::new();
58    file.read_to_string(&mut contents)
59        .expect("Failed to read file");
60
61    // Parse the JSON data
62    let data: Vec<Vec<Operation>> = serde_json::from_str(&contents).expect("Failed to parse JSON");
63
64    let data_types: Vec<_> = vec![DataType::Int32, DataType::Varchar];
65
66    // Access the data
67    let mut row_count = 0;
68    for operations in &data {
69        row_count += operations.len();
70    }
71    let mut ops = Vec::with_capacity(row_count);
72    let mut builder = DataChunkBuilder::new(data_types, row_count * 1024);
73
74    for operations in data {
75        for operation in operations {
76            let mut row_value = Vec::with_capacity(10);
77            row_value.push(Some(ScalarImpl::Int32(operation.line.id as i32)));
78            row_value.push(Some(ScalarImpl::Utf8(operation.line.name.into_boxed_str())));
79            let _ = builder.append_one_row(OwnedRow::new(row_value));
80            // let op: Op = unsafe { ::std::mem::transmute(operation.op_type as u8) };
81            if let Some(op) = convert_to_op(operation.op_type) {
82                ops.push(op);
83            } else {
84                println!("Invalid value");
85            }
86        }
87    }
88
89    let data_chunk = builder.consume_all().expect("should not be empty");
90    let stream_chunk = StreamChunk::from_parts(ops, data_chunk);
91    let prost_stream_chunk: risingwave_pb::data::StreamChunk = stream_chunk.to_protobuf();
92
93    let payload = Message::encode_to_vec(&prost_stream_chunk);
94
95    std::io::stdout()
96        .write_all(&payload)
97        .expect("should success");
98}