risingwave_stream/executor/dispatch/
output_mapping.rs1use risingwave_common::util::iter_util::ZipEqFast as _;
16use risingwave_pb::stream_plan::PbDispatchOutputMapping;
17
18use crate::executor::prelude::*;
19
20#[derive(Debug)]
24pub enum DispatchOutputMapping {
25 Simple(Vec<usize>),
27 TypeMapping {
29 indices: Vec<usize>,
30 types: Vec<Option<(DataType, DataType)>>,
32 },
33}
34
35impl DispatchOutputMapping {
36 pub(super) fn from_protobuf(proto: PbDispatchOutputMapping) -> Self {
37 let indices = proto.indices.into_iter().map(|i| i as usize).collect();
38
39 if proto.types.is_empty() {
40 Self::Simple(indices)
41 } else {
42 let types = (proto.types.into_iter())
43 .map(|t| {
44 t.upstream.and_then(|u| {
45 let d = t.downstream.unwrap();
46 if u == d {
48 None
49 } else {
50 Some((u.into(), d.into()))
51 }
52 })
53 })
54 .collect();
55 Self::TypeMapping { indices, types }
56 }
57 }
58
59 pub(super) fn apply(&self, chunk: StreamChunk) -> StreamChunk {
61 match self {
62 Self::Simple(indices) => {
63 if indices.len() < chunk.columns().len() {
65 chunk.project(indices).eliminate_adjacent_noop_update()
66 } else {
67 chunk.project(indices)
68 }
69 }
70
71 Self::TypeMapping { indices, types } => {
72 let (ops, columns, visibility) = chunk.into_inner();
73
74 let mut new_columns = Vec::with_capacity(indices.len());
75 for (i, t) in indices.iter().zip_eq_fast(types) {
76 let mut column = columns[*i].clone();
77
78 if let Some((from_type, into_type)) = t {
79 let mut builder = into_type.create_array_builder(column.len());
80 for (datum, vis) in column.iter().zip_eq_fast(visibility.iter()) {
81 if !vis {
82 builder.append_null();
83 } else {
84 let datum = type_mapping::do_map(datum, from_type, into_type);
85 builder.append(datum);
86 }
87 }
88 column = builder.finish().into();
89 }
90
91 new_columns.push(column);
92 }
93
94 StreamChunk::with_visibility(ops, new_columns, visibility)
97 .eliminate_adjacent_noop_update()
98 }
99 }
100 }
101
102 pub(super) fn apply_watermark(&self, watermark: Watermark) -> Option<Watermark> {
104 let indices = match self {
105 Self::Simple(indices) => indices,
106 Self::TypeMapping { indices, types } => {
109 if let Some(pos) = indices.iter().position(|p| *p == watermark.col_idx) {
110 assert!(
111 types[pos].is_none(),
112 "watermark column should not have type changed"
113 );
114 }
115 indices
116 }
117 };
118 watermark.transform_with_indices(indices)
119 }
120}
121
122mod type_mapping {
123 use risingwave_common::types::{
124 DataType, DatumCow, DatumRef, ListValue, MapValue, ScalarImpl, StructValue, ToOwnedDatum,
125 data_types,
126 };
127 use risingwave_common::util::iter_util::ZipEqFast;
128
129 pub fn do_map<'a>(
135 datum: DatumRef<'a>,
136 from_type: &DataType,
137 into_type: &DataType,
138 ) -> DatumCow<'a> {
139 let Some(scalar) = datum else {
140 return DatumCow::NULL;
141 };
142
143 if from_type == into_type {
144 return DatumCow::Borrowed(datum);
145 }
146
147 match (from_type, into_type) {
148 (data_types::simple!(), data_types::simple!()) => DatumCow::Borrowed(Some(scalar)),
149 (DataType::Vector(_), DataType::Vector(_)) => todo!("VECTOR_PLACEHOLDER"),
150
151 (DataType::List(from_inner_type), DataType::List(into_inner_type)) => {
152 let list = scalar.into_list();
153
154 let mut builder = into_inner_type.create_array_builder(list.len());
156 for datum in list.iter() {
157 let datum = do_map(datum, from_inner_type, into_inner_type);
158 builder.append(datum);
159 }
160 let list = ListValue::new(builder.finish());
161
162 DatumCow::Owned(Some(ScalarImpl::List(list)))
163 }
164
165 (DataType::Map(from_map_type), DataType::Map(into_map_type)) => {
166 assert_eq!(
167 from_map_type.key(),
168 into_map_type.key(),
169 "key type should not be changed"
170 );
171
172 let map = scalar.into_map();
173 let (keys, values) = map.into_kv();
174
175 let mut value_builder = into_map_type.value().create_array_builder(map.len());
177 for value in values.iter() {
178 let value = do_map(value, from_map_type.value(), into_map_type.value());
179 value_builder.append(value);
180 }
181 let values = ListValue::new(value_builder.finish());
182
183 let map = MapValue::try_from_kv(keys.to_owned(), values).unwrap();
184
185 DatumCow::Owned(Some(ScalarImpl::Map(map)))
186 }
187
188 (DataType::Struct(from_struct_type), DataType::Struct(into_struct_type)) => {
189 let struct_value = scalar.into_struct();
190 let mut fields = Vec::with_capacity(into_struct_type.len());
191
192 for (id, into_field_type) in into_struct_type
193 .ids()
194 .unwrap()
195 .zip_eq_fast(into_struct_type.types())
196 {
197 let index = from_struct_type
199 .ids()
200 .expect("ids of struct type should be set in dispatcher mapping context")
201 .position(|x| x == id);
202
203 let field = if let Some(index) = index {
204 let from_field_type = from_struct_type.type_at(index);
206 let field = struct_value.field_at(index);
207 do_map(field, from_field_type, into_field_type).to_owned_datum()
208 } else {
209 None
211 };
212
213 fields.push(field);
214 }
215
216 let struct_value = StructValue::new(fields);
217
218 DatumCow::Owned(Some(ScalarImpl::Struct(struct_value)))
219 }
220
221 _ => panic!("mismatched types: {from_type:?} -> {into_type:?}"),
222 }
223 }
224}