risingwave_connector/sink/elasticsearch_opensearch/
elasticsearch_opensearch_client.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use anyhow::anyhow;
18use futures::{FutureExt, TryFuture};
19use itertools::Itertools;
20use risingwave_common::array::StreamChunk;
21use risingwave_common::catalog::Schema;
22use serde_json::{Value, json};
23
24use super::super::SinkError;
25use super::elasticsearch_opensearch_config::ElasticSearchOpenSearchConfig;
26use super::elasticsearch_opensearch_formatter::{BuildBulkPara, ElasticSearchOpenSearchFormatter};
27use crate::sink::Result;
28use crate::sink::log_store::DeliveryFutureManagerAddFuture;
29use crate::sink::writer::AsyncTruncateSinkWriter;
30
31pub enum ElasticSearchOpenSearchClient {
32    ElasticSearch(elasticsearch::Elasticsearch),
33    OpenSearch(opensearch::OpenSearch),
34}
35enum ElasticSearchOpenSearchBulk {
36    ElasticSearch(elasticsearch::BulkOperation<serde_json::Value>),
37    OpenSearch(opensearch::BulkOperation<serde_json::Value>),
38}
39
40impl ElasticSearchOpenSearchBulk {
41    pub fn into_elasticsearch_bulk(self) -> elasticsearch::BulkOperation<serde_json::Value> {
42        if let ElasticSearchOpenSearchBulk::ElasticSearch(bulk) = self {
43            bulk
44        } else {
45            panic!("not a elasticsearch bulk")
46        }
47    }
48
49    pub fn into_opensearch_bulk(self) -> opensearch::BulkOperation<serde_json::Value> {
50        if let ElasticSearchOpenSearchBulk::OpenSearch(bulk) = self {
51            bulk
52        } else {
53            panic!("not a opensearch bulk")
54        }
55    }
56}
57
58impl ElasticSearchOpenSearchClient {
59    async fn send(&self, bulks: Vec<ElasticSearchOpenSearchBulk>) -> Result<Value> {
60        match self {
61            ElasticSearchOpenSearchClient::ElasticSearch(client) => {
62                let bulks = bulks
63                    .into_iter()
64                    .map(ElasticSearchOpenSearchBulk::into_elasticsearch_bulk)
65                    .collect_vec();
66                let result = client
67                    .bulk(elasticsearch::BulkParts::None)
68                    .body(bulks)
69                    .send()
70                    .await?;
71                Ok(result.json::<Value>().await?)
72            }
73            ElasticSearchOpenSearchClient::OpenSearch(client) => {
74                let bulks = bulks
75                    .into_iter()
76                    .map(ElasticSearchOpenSearchBulk::into_opensearch_bulk)
77                    .collect_vec();
78                let result = client
79                    .bulk(opensearch::BulkParts::None)
80                    .body(bulks)
81                    .send()
82                    .await?;
83                Ok(result.json::<Value>().await?)
84            }
85        }
86    }
87
88    pub async fn ping(&self) -> Result<()> {
89        match self {
90            ElasticSearchOpenSearchClient::ElasticSearch(client) => {
91                client.ping().send().await?;
92            }
93            ElasticSearchOpenSearchClient::OpenSearch(client) => {
94                client.ping().send().await?;
95            }
96        }
97        Ok(())
98    }
99
100    fn new_update(
101        &self,
102        key: String,
103        index: String,
104        retry_on_conflict: i32,
105        routing_column: Option<String>,
106        value: serde_json::Value,
107    ) -> ElasticSearchOpenSearchBulk {
108        match self {
109            ElasticSearchOpenSearchClient::ElasticSearch(_) => {
110                let bulk = elasticsearch::BulkOperation::update(key, value)
111                    .index(index)
112                    .retry_on_conflict(retry_on_conflict);
113                if let Some(routing_column) = routing_column {
114                    ElasticSearchOpenSearchBulk::ElasticSearch(bulk.routing(routing_column).into())
115                } else {
116                    ElasticSearchOpenSearchBulk::ElasticSearch(bulk.into())
117                }
118            }
119            ElasticSearchOpenSearchClient::OpenSearch(_) => {
120                let bulk = opensearch::BulkOperation::update(key, value)
121                    .index(index)
122                    .retry_on_conflict(retry_on_conflict);
123                if let Some(routing_column) = routing_column {
124                    ElasticSearchOpenSearchBulk::OpenSearch(bulk.routing(routing_column).into())
125                } else {
126                    ElasticSearchOpenSearchBulk::OpenSearch(bulk.into())
127                }
128            }
129        }
130    }
131
132    fn new_delete(
133        &self,
134        key: String,
135        index: String,
136        routing_column: Option<String>,
137    ) -> ElasticSearchOpenSearchBulk {
138        match self {
139            ElasticSearchOpenSearchClient::ElasticSearch(_) => {
140                let bulk = elasticsearch::BulkOperation::delete(key).index(index);
141                if let Some(routing_column) = routing_column {
142                    ElasticSearchOpenSearchBulk::ElasticSearch(bulk.routing(routing_column).into())
143                } else {
144                    ElasticSearchOpenSearchBulk::ElasticSearch(bulk.into())
145                }
146            }
147            ElasticSearchOpenSearchClient::OpenSearch(_) => {
148                let bulk = opensearch::BulkOperation::delete(key).index(index);
149                if let Some(routing_column) = routing_column {
150                    ElasticSearchOpenSearchBulk::OpenSearch(bulk.routing(routing_column).into())
151                } else {
152                    ElasticSearchOpenSearchBulk::OpenSearch(bulk.into())
153                }
154            }
155        }
156    }
157}
158
159pub struct ElasticSearchOpenSearchSinkWriter {
160    client: Arc<ElasticSearchOpenSearchClient>,
161    formatter: ElasticSearchOpenSearchFormatter,
162    config: ElasticSearchOpenSearchConfig,
163    is_append_only: bool,
164}
165
166impl ElasticSearchOpenSearchSinkWriter {
167    pub fn new(
168        config: ElasticSearchOpenSearchConfig,
169        schema: Schema,
170        pk_indices: Vec<usize>,
171        connector: &str,
172        is_append_only: bool,
173    ) -> Result<Self> {
174        let client = Arc::new(config.build_client(connector)?);
175        let formatter = ElasticSearchOpenSearchFormatter::new(
176            pk_indices,
177            &schema,
178            config.delimiter.clone(),
179            config.get_index_column_index(&schema)?,
180            config.index.clone(),
181            config.get_routing_column_index(&schema)?,
182        )?;
183        Ok(Self {
184            client,
185            formatter,
186            config,
187            is_append_only,
188        })
189    }
190}
191
192pub type ElasticSearchOpenSearchSinkDeliveryFuture =
193    impl TryFuture<Ok = (), Error = SinkError> + Unpin + 'static;
194
195impl AsyncTruncateSinkWriter for ElasticSearchOpenSearchSinkWriter {
196    type DeliveryFuture = ElasticSearchOpenSearchSinkDeliveryFuture;
197
198    async fn write_chunk<'a>(
199        &'a mut self,
200        chunk: StreamChunk,
201        mut add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>,
202    ) -> Result<()> {
203        let chunk_capacity = chunk.capacity();
204        let mut all_bulks: Vec<Vec<ElasticSearchOpenSearchBulk>> = vec![];
205        let mut bulks: Vec<ElasticSearchOpenSearchBulk> = Vec::with_capacity(chunk_capacity);
206
207        let mut bulks_size = 0;
208        for build_bulk_para in self.formatter.convert_chunk(chunk, self.is_append_only)? {
209            let BuildBulkPara {
210                key,
211                value,
212                index,
213                mem_size_b,
214                routing_column,
215            } = build_bulk_para;
216
217            bulks_size += mem_size_b;
218            if let Some(value) = value {
219                let value = json!({
220                    "doc": value,
221                    "doc_as_upsert": true
222                });
223                let bulk = self.client.new_update(
224                    key,
225                    index,
226                    self.config.retry_on_conflict,
227                    routing_column,
228                    value,
229                );
230                bulks.push(bulk);
231            } else {
232                let bulk = self.client.new_delete(key, index, routing_column);
233                bulks.push(bulk);
234            };
235
236            if bulks.len() >= self.config.batch_num_messages
237                || bulks_size >= self.config.batch_size_kb * 1024
238            {
239                all_bulks.push(bulks);
240                bulks = Vec::with_capacity(chunk_capacity);
241                bulks_size = 0;
242            }
243        }
244        if !bulks.is_empty() {
245            all_bulks.push(bulks);
246        }
247        for bulks in all_bulks {
248            let client_clone = self.client.clone();
249            let future = async move {
250                let result = client_clone.send(bulks).await?;
251                if result["errors"].as_bool().is_none() || result["errors"].as_bool().unwrap() {
252                    Err(SinkError::ElasticSearchOpenSearch(anyhow!(
253                        "send bulk to elasticsearch failed: {:?}",
254                        result
255                    )))
256                } else {
257                    Ok(())
258                }
259            }
260            .boxed();
261            add_future.add_future_may_await(future).await?;
262        }
263        Ok(())
264    }
265}