risingwave_connector/parser/unified/
util.rsuse super::{AccessResult, ChangeEvent};
use crate::parser::unified::ChangeEventOperation;
use crate::parser::SourceStreamChunkRowWriter;
use crate::source::SourceColumnDesc;
pub fn apply_row_operation_on_stream_chunk_writer_with_op(
row_op: impl ChangeEvent,
writer: &mut SourceStreamChunkRowWriter<'_>,
op: ChangeEventOperation,
) -> AccessResult<()> {
let f = |column: &SourceColumnDesc| row_op.access_field(column);
match op {
ChangeEventOperation::Upsert => writer.do_insert(f),
ChangeEventOperation::Delete => writer.do_delete(f),
}
}
pub fn apply_row_operation_on_stream_chunk_writer(
row_op: impl ChangeEvent,
writer: &mut SourceStreamChunkRowWriter<'_>,
) -> AccessResult<()> {
let op = row_op.op()?;
apply_row_operation_on_stream_chunk_writer_with_op(row_op, writer, op)
}