risingwave_common/catalog/
schema.rsuse std::ops::Index;
use itertools::Itertools;
use risingwave_pb::plan_common::{PbColumnDesc, PbField};
use super::ColumnDesc;
use crate::array::ArrayBuilderImpl;
use crate::types::{DataType, StructType};
use crate::util::iter_util::ZipEqFast;
#[derive(Clone, PartialEq, Eq, Hash)]
pub struct Field {
pub data_type: DataType,
pub name: String,
pub sub_fields: Vec<Field>,
pub type_name: String,
}
impl std::fmt::Debug for Field {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}:{:?}", self.name, self.data_type)
}
}
impl Field {
pub fn to_prost(&self) -> PbField {
PbField {
data_type: Some(self.data_type.to_protobuf()),
name: self.name.to_string(),
}
}
}
impl From<&ColumnDesc> for Field {
fn from(desc: &ColumnDesc) -> Self {
Self {
data_type: desc.data_type.clone(),
name: desc.name.clone(),
sub_fields: desc.field_descs.iter().map(|d| d.into()).collect_vec(),
type_name: desc.type_name.clone(),
}
}
}
impl From<ColumnDesc> for Field {
fn from(column_desc: ColumnDesc) -> Self {
Self {
data_type: column_desc.data_type,
name: column_desc.name,
sub_fields: column_desc
.field_descs
.into_iter()
.map(Into::into)
.collect(),
type_name: column_desc.type_name,
}
}
}
impl From<&PbColumnDesc> for Field {
fn from(pb_column_desc: &PbColumnDesc) -> Self {
Self {
data_type: pb_column_desc.column_type.as_ref().unwrap().into(),
name: pb_column_desc.name.clone(),
sub_fields: pb_column_desc.field_descs.iter().map(Into::into).collect(),
type_name: pb_column_desc.type_name.clone(),
}
}
}
pub struct FieldDisplay<'a>(pub &'a Field);
impl std::fmt::Debug for FieldDisplay<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0.name)
}
}
impl std::fmt::Display for FieldDisplay<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0.name)
}
}
#[macro_export]
macro_rules! schema_unnamed {
($($t:expr),*) => {{
$crate::catalog::Schema {
fields: vec![
$( $crate::catalog::Field::unnamed($t) ),*
],
}
}};
}
#[derive(Clone, Debug, Default, PartialEq, Eq, Hash)]
pub struct Schema {
pub fields: Vec<Field>,
}
impl Schema {
pub fn empty() -> &'static Self {
static EMPTY: Schema = Schema { fields: Vec::new() };
&EMPTY
}
pub fn len(&self) -> usize {
self.fields.len()
}
pub fn is_empty(&self) -> bool {
self.fields.is_empty()
}
pub fn new(fields: Vec<Field>) -> Self {
Self { fields }
}
pub fn names(&self) -> Vec<String> {
self.fields().iter().map(|f| f.name.clone()).collect()
}
pub fn names_str(&self) -> Vec<&str> {
self.fields().iter().map(|f| f.name.as_str()).collect()
}
pub fn data_types(&self) -> Vec<DataType> {
self.fields
.iter()
.map(|field| field.data_type.clone())
.collect()
}
pub fn fields(&self) -> &[Field] {
&self.fields
}
pub fn into_fields(self) -> Vec<Field> {
self.fields
}
pub fn create_array_builders(&self, capacity: usize) -> Vec<ArrayBuilderImpl> {
self.fields
.iter()
.map(|field| field.data_type.create_array_builder(capacity))
.collect()
}
pub fn to_prost(&self) -> Vec<PbField> {
self.fields
.clone()
.into_iter()
.map(|field| field.to_prost())
.collect()
}
pub fn type_eq(&self, other: &Self) -> bool {
if self.len() != other.len() {
return false;
}
for (a, b) in self.fields.iter().zip_eq_fast(other.fields.iter()) {
if a.data_type != b.data_type {
return false;
}
}
true
}
pub fn all_type_eq<'a>(inputs: impl IntoIterator<Item = &'a Self>) -> bool {
let mut iter = inputs.into_iter();
if let Some(first) = iter.next() {
iter.all(|x| x.type_eq(first))
} else {
true
}
}
pub fn formatted_col_names(&self) -> String {
self.fields
.iter()
.map(|f| format!("\"{}\"", &f.name))
.collect::<Vec<_>>()
.join(", ")
}
}
impl Field {
pub fn with_name<S>(data_type: DataType, name: S) -> Self
where
S: Into<String>,
{
Self {
data_type,
name: name.into(),
sub_fields: vec![],
type_name: String::new(),
}
}
pub fn with_struct<S>(
data_type: DataType,
name: S,
sub_fields: Vec<Field>,
type_name: S,
) -> Self
where
S: Into<String>,
{
Self {
data_type,
name: name.into(),
sub_fields,
type_name: type_name.into(),
}
}
pub fn unnamed(data_type: DataType) -> Self {
Self {
data_type,
name: String::new(),
sub_fields: vec![],
type_name: String::new(),
}
}
pub fn data_type(&self) -> DataType {
self.data_type.clone()
}
pub fn from_with_table_name_prefix(desc: &ColumnDesc, table_name: &str) -> Self {
Self {
data_type: desc.data_type.clone(),
name: format!("{}.{}", table_name, desc.name),
sub_fields: desc.field_descs.iter().map(|d| d.into()).collect_vec(),
type_name: desc.type_name.clone(),
}
}
}
impl From<&PbField> for Field {
fn from(prost_field: &PbField) -> Self {
Self {
data_type: DataType::from(prost_field.get_data_type().expect("data type not found")),
name: prost_field.get_name().clone(),
sub_fields: vec![],
type_name: String::new(),
}
}
}
impl Index<usize> for Schema {
type Output = Field;
fn index(&self, index: usize) -> &Self::Output {
&self.fields[index]
}
}
impl FromIterator<Field> for Schema {
fn from_iter<I: IntoIterator<Item = Field>>(iter: I) -> Self {
Schema {
fields: iter.into_iter().collect::<Vec<_>>(),
}
}
}
impl From<&StructType> for Schema {
fn from(t: &StructType) -> Self {
Schema::new(
t.iter()
.map(|(s, d)| Field::with_name(d.clone(), s))
.collect(),
)
}
}
pub mod test_utils {
use super::*;
pub fn field_n<const N: usize>(data_type: DataType) -> Schema {
Schema::new(vec![Field::unnamed(data_type); N])
}
fn int32_n<const N: usize>() -> Schema {
field_n::<N>(DataType::Int32)
}
pub fn ii() -> Schema {
int32_n::<2>()
}
pub fn iii() -> Schema {
int32_n::<3>()
}
fn varchar_n<const N: usize>() -> Schema {
field_n::<N>(DataType::Varchar)
}
pub fn sss() -> Schema {
varchar_n::<3>()
}
fn decimal_n<const N: usize>() -> Schema {
field_n::<N>(DataType::Decimal)
}
pub fn ddd() -> Schema {
decimal_n::<3>()
}
}