risingwave_stream/executor/dispatch/
output_mapping.rs1use risingwave_common::util::iter_util::ZipEqFast as _;
16use risingwave_pb::stream_plan::PbDispatchOutputMapping;
17
18use crate::executor::prelude::*;
19
20#[derive(Debug)]
24pub enum DispatchOutputMapping {
25 Simple(Vec<usize>),
27 TypeMapping {
29 indices: Vec<usize>,
30 types: Vec<Option<(DataType, DataType)>>,
32 },
33}
34
35impl DispatchOutputMapping {
36 pub(super) fn from_protobuf(proto: PbDispatchOutputMapping) -> Self {
37 let indices = proto.indices.into_iter().map(|i| i as usize).collect();
38
39 if proto.types.is_empty() {
40 Self::Simple(indices)
41 } else {
42 let types = (proto.types.into_iter())
43 .map(|t| {
44 t.upstream.and_then(|u| {
45 let d = t.downstream.unwrap();
46 if u == d {
48 None
49 } else {
50 Some((u.into(), d.into()))
51 }
52 })
53 })
54 .collect();
55 Self::TypeMapping { indices, types }
56 }
57 }
58
59 pub(super) fn apply(&self, chunk: StreamChunk) -> StreamChunk {
61 match self {
62 Self::Simple(indices) => {
63 if indices.len() < chunk.columns().len() {
65 chunk.project(indices).eliminate_adjacent_noop_update()
66 } else {
67 chunk.project(indices)
68 }
69 }
70
71 Self::TypeMapping { indices, types } => {
72 let (ops, columns, visibility) = chunk.into_inner();
73
74 let mut new_columns = Vec::with_capacity(indices.len());
75 for (i, t) in indices.iter().zip_eq_fast(types) {
76 let mut column = columns[*i].clone();
77
78 if let Some((from_type, into_type)) = t {
79 let mut builder = into_type.create_array_builder(column.len());
80 for (datum, vis) in column.iter().zip_eq_fast(visibility.iter()) {
81 if !vis {
82 builder.append_null();
83 } else {
84 let datum = type_mapping::do_map(datum, from_type, into_type);
85 builder.append(datum);
86 }
87 }
88 column = builder.finish().into();
89 }
90
91 new_columns.push(column);
92 }
93
94 StreamChunk::with_visibility(ops, new_columns, visibility)
97 .eliminate_adjacent_noop_update()
98 }
99 }
100 }
101
102 pub(super) fn apply_watermark(&self, watermark: Watermark) -> Option<Watermark> {
104 let indices = match self {
105 Self::Simple(indices) => indices,
106 Self::TypeMapping { indices, types } => {
109 if let Some(pos) = indices.iter().position(|p| *p == watermark.col_idx) {
110 assert!(
111 types[pos].is_none(),
112 "watermark column should not have type changed"
113 );
114 }
115 indices
116 }
117 };
118 watermark.transform_with_indices(indices)
119 }
120}
121
122mod type_mapping {
123 use risingwave_common::types::{
124 DataType, DatumCow, DatumRef, ListValue, MapValue, ScalarImpl, StructValue, ToOwnedDatum,
125 data_types,
126 };
127 use risingwave_common::util::iter_util::ZipEqFast;
128
129 pub fn do_map<'a>(
135 datum: DatumRef<'a>,
136 from_type: &DataType,
137 into_type: &DataType,
138 ) -> DatumCow<'a> {
139 let Some(scalar) = datum else {
140 return DatumCow::NULL;
141 };
142
143 if from_type == into_type {
144 return DatumCow::Borrowed(datum);
145 }
146
147 match (from_type, into_type) {
148 (DataType::Vector(dim1), DataType::Vector(dim2)) => {
149 assert_eq!(dim1, dim2);
150 DatumCow::Borrowed(Some(scalar))
151 }
152 (data_types::simple!(), data_types::simple!()) => DatumCow::Borrowed(Some(scalar)),
153 (DataType::List(from_inner_type), DataType::List(into_inner_type)) => {
154 let list = scalar.into_list();
155
156 let mut builder = into_inner_type.create_array_builder(list.len());
158 for datum in list.iter() {
159 let datum = do_map(datum, from_inner_type, into_inner_type);
160 builder.append(datum);
161 }
162 let list = ListValue::new(builder.finish());
163
164 DatumCow::Owned(Some(ScalarImpl::List(list)))
165 }
166
167 (DataType::Map(from_map_type), DataType::Map(into_map_type)) => {
168 assert_eq!(
169 from_map_type.key(),
170 into_map_type.key(),
171 "key type should not be changed"
172 );
173
174 let map = scalar.into_map();
175 let (keys, values) = map.into_kv();
176
177 let mut value_builder = into_map_type.value().create_array_builder(map.len());
179 for value in values.iter() {
180 let value = do_map(value, from_map_type.value(), into_map_type.value());
181 value_builder.append(value);
182 }
183 let values = ListValue::new(value_builder.finish());
184
185 let map = MapValue::try_from_kv(keys.to_owned(), values).unwrap();
186
187 DatumCow::Owned(Some(ScalarImpl::Map(map)))
188 }
189
190 (DataType::Struct(from_struct_type), DataType::Struct(into_struct_type)) => {
191 let struct_value = scalar.into_struct();
192 let mut fields = Vec::with_capacity(into_struct_type.len());
193
194 for (id, into_field_type) in into_struct_type
195 .ids()
196 .unwrap()
197 .zip_eq_fast(into_struct_type.types())
198 {
199 let index = from_struct_type
201 .ids()
202 .expect("ids of struct type should be set in dispatcher mapping context")
203 .position(|x| x == id);
204
205 let field = if let Some(index) = index {
206 let from_field_type = from_struct_type.type_at(index);
208 let field = struct_value.field_at(index);
209 do_map(field, from_field_type, into_field_type).to_owned_datum()
210 } else {
211 None
213 };
214
215 fields.push(field);
216 }
217
218 let struct_value = StructValue::new(fields);
219
220 DatumCow::Owned(Some(ScalarImpl::Struct(struct_value)))
221 }
222
223 _ => panic!("mismatched types: {from_type:?} -> {into_type:?}"),
224 }
225 }
226}