use std::fmt::Display;
use std::mem::size_of;
use std::ops::{Deref, DerefMut};
use std::sync::Arc;
use std::{fmt, mem};
use either::Either;
use enum_as_inner::EnumAsInner;
use itertools::Itertools;
use rand::prelude::SmallRng;
use rand::{Rng, SeedableRng};
use risingwave_common_estimate_size::EstimateSize;
use risingwave_pb::data::{PbOp, PbStreamChunk};
use super::stream_chunk_builder::StreamChunkBuilder;
use super::{ArrayImpl, ArrayRef, ArrayResult, DataChunkTestExt, RowRef};
use crate::array::DataChunk;
use crate::bitmap::{Bitmap, BitmapBuilder};
use crate::catalog::Schema;
use crate::field_generator::VarcharProperty;
use crate::row::Row;
use crate::types::{DataType, DefaultOrdered, ToText};
#[derive(Clone, Copy, Debug, PartialOrd, Ord, PartialEq, Eq, Hash, EnumAsInner)]
pub enum Op {
Insert,
Delete,
UpdateDelete,
UpdateInsert,
}
impl Op {
pub fn to_protobuf(self) -> PbOp {
match self {
Op::Insert => PbOp::Insert,
Op::Delete => PbOp::Delete,
Op::UpdateInsert => PbOp::UpdateInsert,
Op::UpdateDelete => PbOp::UpdateDelete,
}
}
pub fn from_protobuf(prost: &i32) -> ArrayResult<Op> {
let op = match PbOp::try_from(*prost) {
Ok(PbOp::Insert) => Op::Insert,
Ok(PbOp::Delete) => Op::Delete,
Ok(PbOp::UpdateInsert) => Op::UpdateInsert,
Ok(PbOp::UpdateDelete) => Op::UpdateDelete,
Ok(PbOp::Unspecified) => unreachable!(),
Err(_) => bail!("No such op type"),
};
Ok(op)
}
pub fn normalize_update(self) -> Op {
match self {
Op::Insert => Op::Insert,
Op::Delete => Op::Delete,
Op::UpdateDelete => Op::Delete,
Op::UpdateInsert => Op::Insert,
}
}
pub fn to_i16(self) -> i16 {
match self {
Op::Insert => 1,
Op::Delete => 2,
Op::UpdateInsert => 3,
Op::UpdateDelete => 4,
}
}
pub fn to_varchar(self) -> String {
match self {
Op::Insert => "Insert",
Op::Delete => "Delete",
Op::UpdateInsert => "UpdateInsert",
Op::UpdateDelete => "UpdateDelete",
}
.to_string()
}
}
pub type Ops<'a> = &'a [Op];
#[derive(Clone, PartialEq)]
pub struct StreamChunk {
ops: Arc<[Op]>,
data: DataChunk,
}
impl Default for StreamChunk {
fn default() -> Self {
Self {
ops: Arc::new([]),
data: DataChunk::new(vec![], 0),
}
}
}
impl StreamChunk {
pub fn new(ops: impl Into<Arc<[Op]>>, columns: Vec<ArrayRef>) -> Self {
let ops = ops.into();
let visibility = Bitmap::ones(ops.len());
Self::with_visibility(ops, columns, visibility)
}
pub fn with_visibility(
ops: impl Into<Arc<[Op]>>,
columns: Vec<ArrayRef>,
visibility: Bitmap,
) -> Self {
let ops = ops.into();
for col in &columns {
assert_eq!(col.len(), ops.len());
}
let data = DataChunk::new(columns, visibility);
StreamChunk { ops, data }
}
pub fn from_rows(rows: &[(Op, impl Row)], data_types: &[DataType]) -> Self {
let mut builder = StreamChunkBuilder::unlimited(data_types.to_vec(), Some(rows.len()));
for (op, row) in rows {
let none = builder.append_row(*op, row);
debug_assert!(none.is_none());
}
builder.take().expect("chunk should not be empty")
}
pub fn data_chunk(&self) -> &DataChunk {
&self.data
}
pub fn compact(self) -> Self {
if self.is_compacted() {
return self;
}
let (ops, columns, visibility) = self.into_inner();
let cardinality = visibility
.iter()
.fold(0, |vis_cnt, vis| vis_cnt + vis as usize);
let columns: Vec<_> = columns
.into_iter()
.map(|col| col.compact(&visibility, cardinality).into())
.collect();
let mut new_ops = Vec::with_capacity(cardinality);
for idx in visibility.iter_ones() {
new_ops.push(ops[idx]);
}
StreamChunk::new(new_ops, columns)
}
pub fn split(&self, size: usize) -> Vec<Self> {
let mut builder = StreamChunkBuilder::new(size, self.data_types());
let mut outputs = Vec::new();
for (op, row) in self.rows() {
if let Some(chunk) = builder.append_row(op, row) {
outputs.push(chunk);
}
}
if let Some(output) = builder.take() {
outputs.push(output);
}
outputs
}
pub fn into_parts(self) -> (DataChunk, Arc<[Op]>) {
(self.data, self.ops)
}
pub fn from_parts(ops: impl Into<Arc<[Op]>>, data_chunk: DataChunk) -> Self {
let (columns, vis) = data_chunk.into_parts();
Self::with_visibility(ops, columns, vis)
}
pub fn into_inner(self) -> (Arc<[Op]>, Vec<ArrayRef>, Bitmap) {
let (columns, vis) = self.data.into_parts();
(self.ops, columns, vis)
}
pub fn to_protobuf(&self) -> PbStreamChunk {
if !self.is_compacted() {
return self.clone().compact().to_protobuf();
}
PbStreamChunk {
cardinality: self.cardinality() as u32,
ops: self.ops.iter().map(|op| op.to_protobuf() as i32).collect(),
columns: self.columns().iter().map(|col| col.to_protobuf()).collect(),
}
}
pub fn from_protobuf(prost: &PbStreamChunk) -> ArrayResult<Self> {
let cardinality = prost.get_cardinality() as usize;
let mut ops = Vec::with_capacity(cardinality);
for op in prost.get_ops() {
ops.push(Op::from_protobuf(op)?);
}
let mut columns = vec![];
for column in prost.get_columns() {
columns.push(ArrayImpl::from_protobuf(column, cardinality)?.into());
}
Ok(StreamChunk::new(ops, columns))
}
pub fn ops(&self) -> &[Op] {
&self.ops
}
pub fn to_pretty(&self) -> impl Display {
self.to_pretty_inner(None)
}
pub fn to_pretty_with_schema(&self, schema: &Schema) -> impl Display {
self.to_pretty_inner(Some(schema))
}
fn to_pretty_inner(&self, schema: Option<&Schema>) -> impl Display {
use comfy_table::{Cell, CellAlignment, Table};
if self.cardinality() == 0 {
return Either::Left("(empty)");
}
let mut table = Table::new();
table.load_preset(DataChunk::PRETTY_TABLE_PRESET);
if let Some(schema) = schema {
assert_eq!(self.dimension(), schema.len());
let cells = std::iter::once(String::new())
.chain(schema.fields().iter().map(|f| f.name.clone()));
table.set_header(cells);
}
for (op, row_ref) in self.rows() {
let mut cells = Vec::with_capacity(row_ref.len() + 1);
cells.push(
Cell::new(match op {
Op::Insert => "+",
Op::Delete => "-",
Op::UpdateDelete => "U-",
Op::UpdateInsert => "U+",
})
.set_alignment(CellAlignment::Right),
);
for datum in row_ref.iter() {
let str = match datum {
None => "".to_owned(), Some(scalar) => scalar.to_text(),
};
cells.push(Cell::new(str));
}
table.add_row(cells);
}
Either::Right(table)
}
pub fn project(&self, indices: &[usize]) -> Self {
Self {
ops: self.ops.clone(),
data: self.data.project(indices),
}
}
pub fn eliminate_adjacent_noop_update(self) -> Self {
let len = self.data_chunk().capacity();
let mut c: StreamChunkMut = self.into();
let mut prev_r = None;
for curr in 0..len {
if !c.vis(curr) {
continue;
}
if let Some(prev) = prev_r {
if matches!(c.op(prev), Op::UpdateDelete | Op::Delete)
&& matches!(c.op(curr), Op::UpdateInsert | Op::Insert)
&& c.row_ref(prev) == c.row_ref(curr)
{
c.set_vis(prev, false);
c.set_vis(curr, false);
prev_r = None;
} else {
prev_r = Some(curr)
}
} else {
prev_r = Some(curr);
}
}
c.into()
}
pub fn project_with_vis(&self, indices: &[usize], vis: Bitmap) -> Self {
Self {
ops: self.ops.clone(),
data: self.data.project_with_vis(indices, vis),
}
}
pub fn clone_with_vis(&self, vis: Bitmap) -> Self {
Self {
ops: self.ops.clone(),
data: self.data.with_visibility(vis),
}
}
}
impl Deref for StreamChunk {
type Target = DataChunk;
fn deref(&self) -> &Self::Target {
&self.data
}
}
impl DerefMut for StreamChunk {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.data
}
}
impl From<DataChunk> for StreamChunk {
fn from(data: DataChunk) -> Self {
Self::from_parts(vec![Op::Insert; data.capacity()], data)
}
}
impl fmt::Debug for StreamChunk {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
if f.alternate() {
write!(
f,
"StreamChunk {{ cardinality: {}, capacity: {}, data:\n{}\n }}",
self.cardinality(),
self.capacity(),
self.to_pretty()
)
} else {
f.debug_struct("StreamChunk")
.field("cardinality", &self.cardinality())
.field("capacity", &self.capacity())
.finish_non_exhaustive()
}
}
}
impl EstimateSize for StreamChunk {
fn estimated_heap_size(&self) -> usize {
self.data.estimated_heap_size() + self.ops.len() * size_of::<Op>()
}
}
enum OpsMutState {
ArcRef(Arc<[Op]>),
Mut(Vec<Op>),
}
impl OpsMutState {
const UNDEFINED: Self = Self::Mut(Vec::new());
}
pub struct OpsMut {
state: OpsMutState,
}
impl OpsMut {
pub fn new(ops: Arc<[Op]>) -> Self {
Self {
state: OpsMutState::ArcRef(ops),
}
}
pub fn len(&self) -> usize {
match &self.state {
OpsMutState::ArcRef(v) => v.len(),
OpsMutState::Mut(v) => v.len(),
}
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn set(&mut self, n: usize, val: Op) {
debug_assert!(n < self.len());
if let OpsMutState::Mut(v) = &mut self.state {
v[n] = val;
} else {
let state = mem::replace(&mut self.state, OpsMutState::UNDEFINED); let mut v = match state {
OpsMutState::ArcRef(v) => v.to_vec(),
OpsMutState::Mut(_) => unreachable!(),
};
v[n] = val;
self.state = OpsMutState::Mut(v);
}
}
pub fn get(&self, n: usize) -> Op {
debug_assert!(n < self.len());
match &self.state {
OpsMutState::ArcRef(v) => v[n],
OpsMutState::Mut(v) => v[n],
}
}
}
impl From<OpsMut> for Arc<[Op]> {
fn from(v: OpsMut) -> Self {
match v.state {
OpsMutState::ArcRef(a) => a,
OpsMutState::Mut(v) => v.into(),
}
}
}
pub struct StreamChunkMut {
columns: Arc<[ArrayRef]>,
ops: OpsMut,
vis: BitmapBuilder,
}
impl From<StreamChunk> for StreamChunkMut {
fn from(c: StreamChunk) -> Self {
let (c, ops) = c.into_parts();
let (columns, vis) = c.into_parts_v2();
Self {
columns,
ops: OpsMut::new(ops),
vis: vis.into(),
}
}
}
impl From<StreamChunkMut> for StreamChunk {
fn from(c: StreamChunkMut) -> Self {
StreamChunk::from_parts(c.ops, DataChunk::from_parts(c.columns, c.vis.finish()))
}
}
pub struct OpRowMutRef<'a> {
c: &'a mut StreamChunkMut,
i: usize,
}
impl OpRowMutRef<'_> {
pub fn index(&self) -> usize {
self.i
}
pub fn vis(&self) -> bool {
self.c.vis.is_set(self.i)
}
pub fn op(&self) -> Op {
self.c.ops.get(self.i)
}
pub fn set_vis(&mut self, val: bool) {
self.c.set_vis(self.i, val);
}
pub fn set_op(&mut self, val: Op) {
self.c.set_op(self.i, val);
}
pub fn row_ref(&self) -> RowRef<'_> {
RowRef::with_columns(self.c.columns(), self.i)
}
pub fn same_chunk(&self, other: &Self) -> bool {
std::ptr::eq(self.c, other.c)
}
}
impl StreamChunkMut {
pub fn capacity(&self) -> usize {
self.vis.len()
}
pub fn vis(&self, i: usize) -> bool {
self.vis.is_set(i)
}
pub fn op(&self, i: usize) -> Op {
self.ops.get(i)
}
pub fn row_ref(&self, i: usize) -> RowRef<'_> {
RowRef::with_columns(self.columns(), i)
}
pub fn set_vis(&mut self, n: usize, val: bool) {
self.vis.set(n, val);
}
pub fn set_op(&mut self, n: usize, val: Op) {
self.ops.set(n, val);
}
pub fn columns(&self) -> &[ArrayRef] {
&self.columns
}
pub fn to_rows_mut(&mut self) -> impl Iterator<Item = (RowRef<'_>, OpRowMutRef<'_>)> {
unsafe {
(0..self.vis.len())
.filter(|i| self.vis.is_set(*i))
.map(|i| {
let p = self as *const StreamChunkMut;
let p = p as *mut StreamChunkMut;
(
RowRef::with_columns(self.columns(), i),
OpRowMutRef { c: &mut *p, i },
)
})
}
}
}
#[easy_ext::ext(StreamChunkTestExt)]
impl StreamChunk {
pub fn from_pretty(s: &str) -> Self {
let mut chunk_str = String::new();
let mut ops = vec![];
let (header, body) = match s.split_once('\n') {
Some(pair) => pair,
None => {
return StreamChunk {
ops: Arc::new([]),
data: DataChunk::from_pretty(s),
};
}
};
chunk_str.push_str(header);
chunk_str.push('\n');
for line in body.split_inclusive('\n') {
if line.trim_start().is_empty() {
continue;
}
let (op, row) = line
.trim_start()
.split_once(|c: char| c.is_ascii_whitespace())
.ok_or_else(|| panic!("missing operation: {line:?}"))
.unwrap();
ops.push(match op {
"+" => Op::Insert,
"-" => Op::Delete,
"U+" => Op::UpdateInsert,
"U-" => Op::UpdateDelete,
t => panic!("invalid op: {t:?}"),
});
chunk_str.push_str(row);
}
StreamChunk {
ops: ops.into(),
data: DataChunk::from_pretty(&chunk_str),
}
}
pub fn valid(&self) -> bool {
let len = self.ops.len();
let data = &self.data;
data.visibility().len() == len && data.columns().iter().all(|col| col.len() == len)
}
pub fn concat(chunks: Vec<StreamChunk>) -> StreamChunk {
let data_types = chunks[0].data_types();
let size = chunks.iter().map(|c| c.cardinality()).sum::<usize>();
let mut builder = StreamChunkBuilder::unlimited(data_types, Some(size));
for chunk in chunks {
for (op, row) in chunk.rows() {
let none = builder.append_row(op, row);
debug_assert!(none.is_none());
}
}
builder.take().expect("chunk should not be empty")
}
pub fn sort_rows(self) -> Self {
if self.capacity() == 0 {
return self;
}
let rows = self.rows().collect_vec();
let mut idx = (0..self.capacity()).collect_vec();
idx.sort_by_key(|&i| {
let (op, row_ref) = rows[i];
(op, DefaultOrdered(row_ref))
});
StreamChunk {
ops: idx.iter().map(|&i| self.ops[i]).collect(),
data: self.data.reorder_rows(&idx),
}
}
pub fn gen_stream_chunks(
num_of_chunks: usize,
chunk_size: usize,
data_types: &[DataType],
varchar_properties: &VarcharProperty,
) -> Vec<StreamChunk> {
Self::gen_stream_chunks_inner(
num_of_chunks,
chunk_size,
data_types,
varchar_properties,
1.0,
1.0,
)
}
pub fn gen_stream_chunks_inner(
num_of_chunks: usize,
chunk_size: usize,
data_types: &[DataType],
varchar_properties: &VarcharProperty,
visibility_percent: f64, inserts_percent: f64, ) -> Vec<StreamChunk> {
let ops = if inserts_percent == 0.0 {
vec![Op::Delete; chunk_size]
} else if inserts_percent == 1.0 {
vec![Op::Insert; chunk_size]
} else {
let mut rng = SmallRng::from_seed([0; 32]);
let mut ops = vec![];
for _ in 0..chunk_size {
ops.push(if rng.gen_bool(inserts_percent) {
Op::Insert
} else {
Op::Delete
});
}
ops
};
DataChunk::gen_data_chunks(
num_of_chunks,
chunk_size,
data_types,
varchar_properties,
visibility_percent,
)
.into_iter()
.map(|chunk| StreamChunk::from_parts(ops.clone(), chunk))
.collect()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_to_pretty_string() {
let chunk = StreamChunk::from_pretty(
" I I
+ 1 6
- 2 .
U- 3 7
U+ 4 .",
);
assert_eq!(
chunk.to_pretty().to_string(),
"\
+----+---+---+
| + | 1 | 6 |
| - | 2 | |
| U- | 3 | 7 |
| U+ | 4 | |
+----+---+---+"
);
}
#[test]
fn test_split_1() {
let chunk = StreamChunk::from_pretty(
" I I
+ 1 6
- 2 .
U- 3 7
U+ 4 .",
);
let results = chunk.split(2);
assert_eq!(2, results.len());
assert_eq!(
results[0].to_pretty().to_string(),
"\
+---+---+---+
| + | 1 | 6 |
| - | 2 | |
+---+---+---+"
);
assert_eq!(
results[1].to_pretty().to_string(),
"\
+----+---+---+
| U- | 3 | 7 |
| U+ | 4 | |
+----+---+---+"
);
}
#[test]
fn test_split_2() {
let chunk = StreamChunk::from_pretty(
" I I
+ 1 6
U- 3 7
U+ 4 .
- 2 .",
);
let results = chunk.split(2);
assert_eq!(2, results.len());
assert_eq!(
results[0].to_pretty().to_string(),
"\
+----+---+---+
| + | 1 | 6 |
| U- | 3 | 7 |
| U+ | 4 | |
+----+---+---+"
);
assert_eq!(
results[1].to_pretty().to_string(),
"\
+---+---+---+
| - | 2 | |
+---+---+---+"
);
}
#[test]
fn test_eliminate_adjacent_noop_update() {
let c = StreamChunk::from_pretty(
" I I
- 1 6 D
- 2 2
+ 2 3
- 2 3
+ 1 6
- 1 7
+ 1 10 D
+ 1 7
U- 3 7
U+ 3 7
+ 2 3",
);
let c = c.eliminate_adjacent_noop_update();
assert_eq!(
c.to_pretty().to_string(),
"\
+---+---+---+
| - | 2 | 2 |
| + | 2 | 3 |
| - | 2 | 3 |
| + | 1 | 6 |
| + | 2 | 3 |
+---+---+---+"
);
}
}