risingwave_connector/sink/elasticsearch_opensearch/
elasticsearch_opensearch_formatter.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use anyhow::anyhow;
16use risingwave_common::array::{Op, RowRef, StreamChunk};
17use risingwave_common::catalog::Schema;
18use risingwave_common::row::Row;
19use risingwave_common::util::iter_util::ZipEqDebug;
20use serde_json::{Map, Value};
21
22use super::super::SinkError;
23use super::super::encoder::template::TemplateEncoder;
24use super::super::encoder::{JsonEncoder, RowEncoder};
25use crate::sink::Result;
26
27pub struct ElasticSearchOpenSearchFormatter {
28    key_encoder: TemplateEncoder,
29    value_encoder: JsonEncoder,
30    index_column: Option<usize>,
31    index: Option<String>,
32    routing_column: Option<usize>,
33}
34
35pub struct BuildBulkPara {
36    pub index: String,
37    pub key: String,
38    pub value: Option<Map<String, Value>>,
39    pub mem_size_b: usize,
40    pub routing_column: Option<String>,
41}
42
43impl ElasticSearchOpenSearchFormatter {
44    pub fn new(
45        pk_indices: Vec<usize>,
46        schema: &Schema,
47        delimiter: Option<String>,
48        index_column: Option<usize>,
49        index: Option<String>,
50        routing_column: Option<usize>,
51    ) -> Result<Self> {
52        let key_format = if pk_indices.is_empty() {
53            let name = &schema
54                .fields()
55                .get(0)
56                .ok_or_else(|| {
57                    SinkError::ElasticSearchOpenSearch(anyhow!(
58                        "no value find in sink schema, index is 0"
59                    ))
60                })?
61                .name;
62            format!("{{{}}}", name)
63        } else if pk_indices.len() == 1 {
64            let index = *pk_indices.get(0).unwrap();
65            let name = &schema
66                .fields()
67                .get(index)
68                .ok_or_else(|| {
69                    SinkError::ElasticSearchOpenSearch(anyhow!(
70                        "no value find in sink schema, index is {:?}",
71                        index
72                    ))
73                })?
74                .name;
75            format!("{{{}}}", name)
76        } else {
77            let delimiter = delimiter
78                .as_ref()
79                .ok_or_else(|| anyhow!("please set the separator in the with option, when there are multiple primary key values"))?
80                .clone();
81            let mut names = Vec::with_capacity(pk_indices.len());
82            for index in &pk_indices {
83                names.push(format!(
84                    "{{{}}}",
85                    schema
86                        .fields()
87                        .get(*index)
88                        .ok_or_else(|| {
89                            SinkError::ElasticSearchOpenSearch(anyhow!(
90                                "no value find in sink schema, index is {:?}",
91                                index
92                            ))
93                        })?
94                        .name
95                ));
96            }
97            names.join(&delimiter)
98        };
99        let col_indices = if let Some(index) = index_column {
100            let mut col_indices: Vec<usize> = (0..schema.len()).collect();
101            col_indices.remove(index);
102            Some(col_indices)
103        } else {
104            None
105        };
106        let key_encoder =
107            TemplateEncoder::new_string(schema.clone(), col_indices.clone(), key_format);
108        let value_encoder = JsonEncoder::new_with_es(schema.clone(), col_indices.clone());
109        Ok(Self {
110            key_encoder,
111            value_encoder,
112            index_column,
113            index,
114            routing_column,
115        })
116    }
117
118    pub fn convert_chunk(
119        &self,
120        chunk: StreamChunk,
121        is_append_only: bool,
122    ) -> Result<Vec<BuildBulkPara>> {
123        let mut update_delete_row: Option<(String, RowRef<'_>)> = None;
124        let mut result_vec = Vec::with_capacity(chunk.capacity());
125        for (op, rows) in chunk.rows() {
126            let index = if let Some(index_column) = self.index_column {
127                rows.datum_at(index_column)
128                    .ok_or_else(|| {
129                        SinkError::ElasticSearchOpenSearch(anyhow!(
130                            "no value find in sink schema, index is {:?}",
131                            index_column
132                        ))
133                    })?
134                    .into_utf8()
135            } else {
136                self.index.as_ref().unwrap()
137            };
138            let routing_column = self
139                .routing_column
140                .map(|routing_column| {
141                    Ok::<String, SinkError>(
142                        rows.datum_at(routing_column)
143                            .ok_or_else(|| {
144                                SinkError::ElasticSearchOpenSearch(anyhow!(
145                                    "no value find in sink schema, index is {:?}",
146                                    routing_column
147                                ))
148                            })?
149                            .into_utf8()
150                            .to_owned(),
151                    )
152                })
153                .transpose()?;
154            match op {
155                Op::Insert => {
156                    let key = self.key_encoder.encode(rows)?.into_string()?;
157                    let value = self.value_encoder.encode(rows)?;
158                    result_vec.push(BuildBulkPara {
159                        index: index.to_owned(),
160                        key,
161                        value: Some(value),
162                        mem_size_b: rows.value_estimate_size(),
163                        routing_column,
164                    });
165                }
166                Op::UpdateInsert => {
167                    let key = self.key_encoder.encode(rows)?.into_string()?;
168                    let mut modified_col_indices = Vec::with_capacity(rows.len());
169                    let (delete_key, delete_row) =
170                        update_delete_row.take().expect("update_delete_row is None");
171                    if delete_key == key {
172                        delete_row
173                            .iter()
174                            .enumerate()
175                            .zip_eq_debug(rows.iter())
176                            .for_each(|((index, delete_column), insert_column)| {
177                                if insert_column == delete_column {
178                                    // do nothing
179                                } else {
180                                    modified_col_indices.push(index);
181                                }
182                            });
183                    }
184                    let value = self
185                        .value_encoder
186                        .encode_cols(rows, modified_col_indices.into_iter())?;
187                    result_vec.push(BuildBulkPara {
188                        index: index.to_owned(),
189                        key,
190                        value: Some(value),
191                        mem_size_b: rows.value_estimate_size(),
192                        routing_column,
193                    });
194                }
195                Op::Delete => {
196                    if is_append_only {
197                        return Err(SinkError::ElasticSearchOpenSearch(anyhow!(
198                            "`Delete` operation is not supported in `append_only` mode"
199                        )));
200                    }
201                    let key = self.key_encoder.encode(rows)?.into_string()?;
202                    let mem_size_b = std::mem::size_of_val(&key);
203                    result_vec.push(BuildBulkPara {
204                        index: index.to_owned(),
205                        key,
206                        value: None,
207                        mem_size_b,
208                        routing_column,
209                    });
210                }
211                Op::UpdateDelete => {
212                    if is_append_only {
213                        return Err(SinkError::ElasticSearchOpenSearch(anyhow!(
214                            "`UpdateDelete` operation is not supported in `append_only` mode"
215                        )));
216                    } else {
217                        let key = self.key_encoder.encode(rows)?.into_string()?;
218                        update_delete_row = Some((key, rows));
219                    }
220                }
221            }
222        }
223        Ok(result_vec)
224    }
225}