data_chunk_payload_convert_generator/
data-chunk-payload-convert-generator.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14use std::env;
15use std::fs::File;
16use std::io::{Read, Write};
17use std::process::exit;
18
19use prost::Message;
20use risingwave_common::array::{Op, StreamChunk};
21use risingwave_common::row::OwnedRow;
22use risingwave_common::types::{DataType, ScalarImpl};
23use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
24use serde::{Deserialize, Serialize};
25
26#[derive(Debug, Deserialize, Serialize)]
27struct Line {
28    id: u32,
29    name: String,
30}
31
32#[derive(Debug, Deserialize, Serialize)]
33struct Operation {
34    op_type: u32,
35    line: Line,
36}
37
38fn convert_to_op(value: u32) -> Option<Op> {
39    match value {
40        1 => Some(Op::Insert),
41        2 => Some(Op::Delete),
42        3 => Some(Op::UpdateInsert),
43        4 => Some(Op::UpdateDelete),
44        _ => None,
45    }
46}
47
48fn main() {
49    let args: Vec<String> = env::args().collect();
50    if args.len() <= 1 {
51        println!("No input file name");
52        exit(0);
53    }
54    // Read the JSON file
55    let mut file = File::open(&args[1]).expect("Failed to open file");
56    let mut contents = String::new();
57    file.read_to_string(&mut contents)
58        .expect("Failed to read file");
59
60    // Parse the JSON data
61    let data: Vec<Vec<Operation>> = serde_json::from_str(&contents).expect("Failed to parse JSON");
62
63    let data_types: Vec<_> = vec![DataType::Int32, DataType::Varchar];
64
65    // Access the data
66    let mut row_count = 0;
67    for operations in &data {
68        row_count += operations.len();
69    }
70    let mut ops = Vec::with_capacity(row_count);
71    let mut builder = DataChunkBuilder::new(data_types, row_count * 1024);
72
73    for operations in data {
74        for operation in operations {
75            let mut row_value = Vec::with_capacity(10);
76            row_value.push(Some(ScalarImpl::Int32(operation.line.id as i32)));
77            row_value.push(Some(ScalarImpl::Utf8(operation.line.name.into_boxed_str())));
78            let _ = builder.append_one_row(OwnedRow::new(row_value));
79            // let op: Op = unsafe { ::std::mem::transmute(operation.op_type as u8) };
80            if let Some(op) = convert_to_op(operation.op_type) {
81                ops.push(op);
82            } else {
83                println!("Invalid value");
84            }
85        }
86    }
87
88    let data_chunk = builder.consume_all().expect("should not be empty");
89    let stream_chunk = StreamChunk::from_parts(ops, data_chunk);
90    let prost_stream_chunk: risingwave_pb::data::StreamChunk = stream_chunk.to_protobuf();
91
92    let payload = Message::encode_to_vec(&prost_stream_chunk);
93
94    std::io::stdout()
95        .write_all(&payload)
96        .expect("should success");
97}