risingwave_connector/sink/elasticsearch_opensearch/
elasticsearch_opensearch_formatter.rs1use anyhow::anyhow;
16use risingwave_common::array::{Op, RowRef, StreamChunk};
17use risingwave_common::catalog::Schema;
18use risingwave_common::row::Row;
19use risingwave_common::util::iter_util::ZipEqDebug;
20use serde_json::{Map, Value};
21
22use super::super::SinkError;
23use super::super::encoder::template::TemplateEncoder;
24use super::super::encoder::{JsonEncoder, RowEncoder};
25use crate::sink::Result;
26
27pub struct ElasticSearchOpenSearchFormatter {
28 key_encoder: TemplateEncoder,
29 value_encoder: JsonEncoder,
30 index_column: Option<usize>,
31 index: Option<String>,
32 routing_column: Option<usize>,
33}
34
35pub struct BuildBulkPara {
36 pub index: String,
37 pub key: String,
38 pub value: Option<Map<String, Value>>,
39 pub mem_size_b: usize,
40 pub routing_column: Option<String>,
41}
42
43impl ElasticSearchOpenSearchFormatter {
44 pub fn new(
45 pk_indices: Vec<usize>,
46 schema: &Schema,
47 delimiter: Option<String>,
48 index_column: Option<usize>,
49 index: Option<String>,
50 routing_column: Option<usize>,
51 ) -> Result<Self> {
52 let key_format = if pk_indices.is_empty() {
53 let name = &schema
54 .fields()
55 .get(0)
56 .ok_or_else(|| {
57 SinkError::ElasticSearchOpenSearch(anyhow!(
58 "no value find in sink schema, index is 0"
59 ))
60 })?
61 .name;
62 format!("{{{}}}", name)
63 } else if pk_indices.len() == 1 {
64 let index = *pk_indices.get(0).unwrap();
65 let name = &schema
66 .fields()
67 .get(index)
68 .ok_or_else(|| {
69 SinkError::ElasticSearchOpenSearch(anyhow!(
70 "no value find in sink schema, index is {:?}",
71 index
72 ))
73 })?
74 .name;
75 format!("{{{}}}", name)
76 } else {
77 let delimiter = delimiter
78 .as_ref()
79 .ok_or_else(|| anyhow!("please set the separator in the with option, when there are multiple primary key values"))?
80 .clone();
81 let mut names = Vec::with_capacity(pk_indices.len());
82 for index in &pk_indices {
83 names.push(format!(
84 "{{{}}}",
85 schema
86 .fields()
87 .get(*index)
88 .ok_or_else(|| {
89 SinkError::ElasticSearchOpenSearch(anyhow!(
90 "no value find in sink schema, index is {:?}",
91 index
92 ))
93 })?
94 .name
95 ));
96 }
97 names.join(&delimiter)
98 };
99 let col_indices = if let Some(index) = index_column {
100 let mut col_indices: Vec<usize> = (0..schema.len()).collect();
101 col_indices.remove(index);
102 Some(col_indices)
103 } else {
104 None
105 };
106 let key_encoder =
107 TemplateEncoder::new_string(schema.clone(), col_indices.clone(), key_format);
108 let value_encoder = JsonEncoder::new_with_es(schema.clone(), col_indices.clone());
109 Ok(Self {
110 key_encoder,
111 value_encoder,
112 index_column,
113 index,
114 routing_column,
115 })
116 }
117
118 pub fn convert_chunk(
119 &self,
120 chunk: StreamChunk,
121 is_append_only: bool,
122 ) -> Result<Vec<BuildBulkPara>> {
123 let mut update_delete_row: Option<(String, RowRef<'_>)> = None;
124 let mut result_vec = Vec::with_capacity(chunk.capacity());
125 for (op, rows) in chunk.rows() {
126 let index = if let Some(index_column) = self.index_column {
127 rows.datum_at(index_column)
128 .ok_or_else(|| {
129 SinkError::ElasticSearchOpenSearch(anyhow!(
130 "no value find in sink schema, index is {:?}",
131 index_column
132 ))
133 })?
134 .into_utf8()
135 } else {
136 self.index.as_ref().unwrap()
137 };
138 let routing_column = self
139 .routing_column
140 .map(|routing_column| {
141 Ok::<String, SinkError>(
142 rows.datum_at(routing_column)
143 .ok_or_else(|| {
144 SinkError::ElasticSearchOpenSearch(anyhow!(
145 "no value find in sink schema, index is {:?}",
146 routing_column
147 ))
148 })?
149 .into_utf8()
150 .to_owned(),
151 )
152 })
153 .transpose()?;
154 match op {
155 Op::Insert => {
156 let key = self.key_encoder.encode(rows)?.into_string()?;
157 let value = self.value_encoder.encode(rows)?;
158 result_vec.push(BuildBulkPara {
159 index: index.to_owned(),
160 key,
161 value: Some(value),
162 mem_size_b: rows.value_estimate_size(),
163 routing_column,
164 });
165 }
166 Op::UpdateInsert => {
167 let key = self.key_encoder.encode(rows)?.into_string()?;
168 let mut modified_col_indices = Vec::with_capacity(rows.len());
169 let (delete_key, delete_row) =
170 update_delete_row.take().expect("update_delete_row is None");
171 if delete_key == key {
172 delete_row
173 .iter()
174 .enumerate()
175 .zip_eq_debug(rows.iter())
176 .for_each(|((index, delete_column), insert_column)| {
177 if insert_column == delete_column {
178 } else {
180 modified_col_indices.push(index);
181 }
182 });
183 }
184 let value = self
185 .value_encoder
186 .encode_cols(rows, modified_col_indices.into_iter())?;
187 result_vec.push(BuildBulkPara {
188 index: index.to_owned(),
189 key,
190 value: Some(value),
191 mem_size_b: rows.value_estimate_size(),
192 routing_column,
193 });
194 }
195 Op::Delete => {
196 if is_append_only {
197 return Err(SinkError::ElasticSearchOpenSearch(anyhow!(
198 "`Delete` operation is not supported in `append_only` mode"
199 )));
200 }
201 let key = self.key_encoder.encode(rows)?.into_string()?;
202 let mem_size_b = std::mem::size_of_val(&key);
203 result_vec.push(BuildBulkPara {
204 index: index.to_owned(),
205 key,
206 value: None,
207 mem_size_b,
208 routing_column,
209 });
210 }
211 Op::UpdateDelete => {
212 if is_append_only {
213 return Err(SinkError::ElasticSearchOpenSearch(anyhow!(
214 "`UpdateDelete` operation is not supported in `append_only` mode"
215 )));
216 } else {
217 let key = self.key_encoder.encode(rows)?.into_string()?;
218 update_delete_row = Some((key, rows));
219 }
220 }
221 }
222 }
223 Ok(result_vec)
224 }
225}