risingwave_stream/executor/dispatch/
output_mapping.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::util::iter_util::ZipEqFast as _;
16use risingwave_pb::stream_plan::PbDispatchOutputMapping;
17
18use crate::executor::prelude::*;
19
20/// Map the output before dispatching.
21///
22/// See documentation of [`PbDispatchOutputMapping`] for more details.
23#[derive(Debug)]
24pub enum DispatchOutputMapping {
25    /// Mapping by indices only.
26    Simple(Vec<usize>),
27    /// Mapping by indices, then transform the data type to fit the downstream.
28    TypeMapping {
29        indices: Vec<usize>,
30        /// `None` means no type change for this column.
31        types: Vec<Option<(DataType, DataType)>>,
32    },
33}
34
35impl DispatchOutputMapping {
36    pub(super) fn from_protobuf(proto: PbDispatchOutputMapping) -> Self {
37        let indices = proto.indices.into_iter().map(|i| i as usize).collect();
38
39        if proto.types.is_empty() {
40            Self::Simple(indices)
41        } else {
42            let types = (proto.types.into_iter())
43                .map(|t| {
44                    t.upstream.and_then(|u| {
45                        let d = t.downstream.unwrap();
46                        // Do an extra filter to avoid unnecessary mapping overhead.
47                        if u == d {
48                            None
49                        } else {
50                            Some((u.into(), d.into()))
51                        }
52                    })
53                })
54                .collect();
55            Self::TypeMapping { indices, types }
56        }
57    }
58
59    /// Apply the mapping to the chunk.
60    pub(super) fn apply(&self, chunk: StreamChunk) -> StreamChunk {
61        match self {
62            Self::Simple(indices) => {
63                // Only eliminate noop update when the number of columns is reduced.
64                if indices.len() < chunk.columns().len() {
65                    chunk.project(indices).eliminate_adjacent_noop_update()
66                } else {
67                    chunk.project(indices)
68                }
69            }
70
71            Self::TypeMapping { indices, types } => {
72                let (ops, columns, visibility) = chunk.into_inner();
73
74                let mut new_columns = Vec::with_capacity(indices.len());
75                for (i, t) in indices.iter().zip_eq_fast(types) {
76                    let mut column = columns[*i].clone();
77
78                    if let Some((from_type, into_type)) = t {
79                        let mut builder = into_type.create_array_builder(column.len());
80                        for (datum, vis) in column.iter().zip_eq_fast(visibility.iter()) {
81                            if !vis {
82                                builder.append_null();
83                            } else {
84                                let datum = type_mapping::do_map(datum, from_type, into_type);
85                                builder.append(datum);
86                            }
87                        }
88                        column = builder.finish().into();
89                    }
90
91                    new_columns.push(column);
92                }
93
94                // Always eliminate noop update since there's always type change, and updates to some struct
95                // fields may not be visible to the downstream.
96                StreamChunk::with_visibility(ops, new_columns, visibility)
97                    .eliminate_adjacent_noop_update()
98            }
99        }
100    }
101
102    /// Apply the mapping to the watermark.
103    pub(super) fn apply_watermark(&self, watermark: Watermark) -> Option<Watermark> {
104        let indices = match self {
105            Self::Simple(indices) => indices,
106            // Type change is only supported on composite types, while watermark must be a simple type.
107            // So we simply ignore type mapping here.
108            Self::TypeMapping { indices, types } => {
109                if let Some(pos) = indices.iter().position(|p| *p == watermark.col_idx) {
110                    assert!(
111                        types[pos].is_none(),
112                        "watermark column should not have type changed"
113                    );
114                }
115                indices
116            }
117        };
118        watermark.transform_with_indices(indices)
119    }
120}
121
122mod type_mapping {
123    use risingwave_common::types::{
124        DataType, DatumCow, DatumRef, ListValue, MapValue, ScalarImpl, StructValue, ToOwnedDatum,
125        data_types,
126    };
127    use risingwave_common::util::iter_util::ZipEqFast;
128
129    /// Map the datum from `from_type` to `into_type`. Struct types must have `ids` set.
130    ///
131    /// The only allowed difference between given types is adding or removing fields in struct.
132    /// We will compare the ID of fields to find the corresponding field. If the field is not found,
133    /// it will be set to NULL.
134    pub fn do_map<'a>(
135        datum: DatumRef<'a>,
136        from_type: &DataType,
137        into_type: &DataType,
138    ) -> DatumCow<'a> {
139        let Some(scalar) = datum else {
140            return DatumCow::NULL;
141        };
142
143        if from_type == into_type {
144            return DatumCow::Borrowed(datum);
145        }
146
147        match (from_type, into_type) {
148            (data_types::simple!(), data_types::simple!()) => DatumCow::Borrowed(Some(scalar)),
149            (DataType::Vector(_), DataType::Vector(_)) => todo!("VECTOR_PLACEHOLDER"),
150
151            (DataType::List(from_inner_type), DataType::List(into_inner_type)) => {
152                let list = scalar.into_list();
153
154                // Recursively map each element.
155                let mut builder = into_inner_type.create_array_builder(list.len());
156                for datum in list.iter() {
157                    let datum = do_map(datum, from_inner_type, into_inner_type);
158                    builder.append(datum);
159                }
160                let list = ListValue::new(builder.finish());
161
162                DatumCow::Owned(Some(ScalarImpl::List(list)))
163            }
164
165            (DataType::Map(from_map_type), DataType::Map(into_map_type)) => {
166                assert_eq!(
167                    from_map_type.key(),
168                    into_map_type.key(),
169                    "key type should not be changed"
170                );
171
172                let map = scalar.into_map();
173                let (keys, values) = map.into_kv();
174
175                // Recursively map each value.
176                let mut value_builder = into_map_type.value().create_array_builder(map.len());
177                for value in values.iter() {
178                    let value = do_map(value, from_map_type.value(), into_map_type.value());
179                    value_builder.append(value);
180                }
181                let values = ListValue::new(value_builder.finish());
182
183                let map = MapValue::try_from_kv(keys.to_owned(), values).unwrap();
184
185                DatumCow::Owned(Some(ScalarImpl::Map(map)))
186            }
187
188            (DataType::Struct(from_struct_type), DataType::Struct(into_struct_type)) => {
189                let struct_value = scalar.into_struct();
190                let mut fields = Vec::with_capacity(into_struct_type.len());
191
192                for (id, into_field_type) in into_struct_type
193                    .ids()
194                    .unwrap()
195                    .zip_eq_fast(into_struct_type.types())
196                {
197                    // Find the field in the original struct.
198                    let index = from_struct_type
199                        .ids()
200                        .expect("ids of struct type should be set in dispatcher mapping context")
201                        .position(|x| x == id);
202
203                    let field = if let Some(index) = index {
204                        // Found, recursively map the field.
205                        let from_field_type = from_struct_type.type_at(index);
206                        let field = struct_value.field_at(index);
207                        do_map(field, from_field_type, into_field_type).to_owned_datum()
208                    } else {
209                        // Not found, set to NULL.
210                        None
211                    };
212
213                    fields.push(field);
214                }
215
216                let struct_value = StructValue::new(fields);
217
218                DatumCow::Owned(Some(ScalarImpl::Struct(struct_value)))
219            }
220
221            _ => panic!("mismatched types: {from_type:?} -> {into_type:?}"),
222        }
223    }
224}