risingwave_connector/sink/elasticsearch_opensearch/
elasticsearch_opensearch_client.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use anyhow::anyhow;
18use futures::{FutureExt, TryFuture};
19use itertools::Itertools;
20use risingwave_common::array::StreamChunk;
21use risingwave_common::catalog::Schema;
22use serde_json::{Value, json};
23
24use super::super::SinkError;
25use super::elasticsearch_opensearch_config::ElasticSearchOpenSearchConfig;
26use super::elasticsearch_opensearch_formatter::{BuildBulkPara, ElasticSearchOpenSearchFormatter};
27use crate::sink::Result;
28use crate::sink::log_store::DeliveryFutureManagerAddFuture;
29use crate::sink::writer::AsyncTruncateSinkWriter;
30
31pub enum ElasticSearchOpenSearchClient {
32    ElasticSearch(elasticsearch::Elasticsearch),
33    OpenSearch(opensearch::OpenSearch),
34}
35enum ElasticSearchOpenSearchBulk {
36    ElasticSearch(elasticsearch::BulkOperation<serde_json::Value>),
37    OpenSearch(opensearch::BulkOperation<serde_json::Value>),
38}
39
40impl ElasticSearchOpenSearchBulk {
41    pub fn into_elasticsearch_bulk(self) -> elasticsearch::BulkOperation<serde_json::Value> {
42        if let ElasticSearchOpenSearchBulk::ElasticSearch(bulk) = self {
43            bulk
44        } else {
45            panic!("not a elasticsearch bulk")
46        }
47    }
48
49    pub fn into_opensearch_bulk(self) -> opensearch::BulkOperation<serde_json::Value> {
50        if let ElasticSearchOpenSearchBulk::OpenSearch(bulk) = self {
51            bulk
52        } else {
53            panic!("not a opensearch bulk")
54        }
55    }
56}
57
58impl ElasticSearchOpenSearchClient {
59    async fn send(&self, bulks: Vec<ElasticSearchOpenSearchBulk>) -> Result<Value> {
60        match self {
61            ElasticSearchOpenSearchClient::ElasticSearch(client) => {
62                let bulks = bulks
63                    .into_iter()
64                    .map(ElasticSearchOpenSearchBulk::into_elasticsearch_bulk)
65                    .collect_vec();
66                let result = client
67                    .bulk(elasticsearch::BulkParts::None)
68                    .body(bulks)
69                    .send()
70                    .await?;
71                Ok(result.json::<Value>().await?)
72            }
73            ElasticSearchOpenSearchClient::OpenSearch(client) => {
74                let bulks = bulks
75                    .into_iter()
76                    .map(ElasticSearchOpenSearchBulk::into_opensearch_bulk)
77                    .collect_vec();
78                let result = client
79                    .bulk(opensearch::BulkParts::None)
80                    .body(bulks)
81                    .send()
82                    .await?;
83                Ok(result.json::<Value>().await?)
84            }
85        }
86    }
87
88    pub async fn ping(&self) -> Result<()> {
89        match self {
90            ElasticSearchOpenSearchClient::ElasticSearch(client) => {
91                client.ping().send().await?;
92            }
93            ElasticSearchOpenSearchClient::OpenSearch(client) => {
94                client.ping().send().await?;
95            }
96        }
97        Ok(())
98    }
99
100    fn new_update(
101        &self,
102        key: String,
103        index: String,
104        retry_on_conflict: i32,
105        routing_column: Option<String>,
106        value: serde_json::Value,
107    ) -> ElasticSearchOpenSearchBulk {
108        match self {
109            ElasticSearchOpenSearchClient::ElasticSearch(_) => {
110                let bulk = elasticsearch::BulkOperation::update(key, value)
111                    .index(index)
112                    .retry_on_conflict(retry_on_conflict);
113                if let Some(routing_column) = routing_column {
114                    ElasticSearchOpenSearchBulk::ElasticSearch(bulk.routing(routing_column).into())
115                } else {
116                    ElasticSearchOpenSearchBulk::ElasticSearch(bulk.into())
117                }
118            }
119            ElasticSearchOpenSearchClient::OpenSearch(_) => {
120                let bulk = opensearch::BulkOperation::update(key, value)
121                    .index(index)
122                    .retry_on_conflict(retry_on_conflict);
123                if let Some(routing_column) = routing_column {
124                    ElasticSearchOpenSearchBulk::OpenSearch(bulk.routing(routing_column).into())
125                } else {
126                    ElasticSearchOpenSearchBulk::OpenSearch(bulk.into())
127                }
128            }
129        }
130    }
131
132    fn new_delete(
133        &self,
134        key: String,
135        index: String,
136        routing_column: Option<String>,
137    ) -> ElasticSearchOpenSearchBulk {
138        match self {
139            ElasticSearchOpenSearchClient::ElasticSearch(_) => {
140                let bulk = elasticsearch::BulkOperation::delete(key).index(index);
141                if let Some(routing_column) = routing_column {
142                    ElasticSearchOpenSearchBulk::ElasticSearch(bulk.routing(routing_column).into())
143                } else {
144                    ElasticSearchOpenSearchBulk::ElasticSearch(bulk.into())
145                }
146            }
147            ElasticSearchOpenSearchClient::OpenSearch(_) => {
148                let bulk = opensearch::BulkOperation::delete(key).index(index);
149                if let Some(routing_column) = routing_column {
150                    ElasticSearchOpenSearchBulk::OpenSearch(bulk.routing(routing_column).into())
151                } else {
152                    ElasticSearchOpenSearchBulk::OpenSearch(bulk.into())
153                }
154            }
155        }
156    }
157}
158
159pub struct ElasticSearchOpenSearchSinkWriter {
160    client: Arc<ElasticSearchOpenSearchClient>,
161    formatter: ElasticSearchOpenSearchFormatter,
162    config: ElasticSearchOpenSearchConfig,
163    is_append_only: bool,
164}
165
166impl ElasticSearchOpenSearchSinkWriter {
167    pub fn new(
168        config: ElasticSearchOpenSearchConfig,
169        schema: Schema,
170        pk_indices: Vec<usize>,
171        connector: &str,
172        is_append_only: bool,
173    ) -> Result<Self> {
174        let client = Arc::new(config.build_client(connector)?);
175        let formatter = ElasticSearchOpenSearchFormatter::new(
176            pk_indices,
177            &schema,
178            config.delimiter.clone(),
179            config.get_index_column_index(&schema)?,
180            config.index.clone(),
181            config.get_routing_column_index(&schema)?,
182        )?;
183        Ok(Self {
184            client,
185            formatter,
186            config,
187            is_append_only,
188        })
189    }
190}
191
192pub type ElasticSearchOpenSearchSinkDeliveryFuture =
193    impl TryFuture<Ok = (), Error = SinkError> + Unpin + 'static;
194
195impl AsyncTruncateSinkWriter for ElasticSearchOpenSearchSinkWriter {
196    type DeliveryFuture = ElasticSearchOpenSearchSinkDeliveryFuture;
197
198    #[define_opaque(ElasticSearchOpenSearchSinkDeliveryFuture)]
199    async fn write_chunk<'a>(
200        &'a mut self,
201        chunk: StreamChunk,
202        mut add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>,
203    ) -> Result<()> {
204        let chunk_capacity = chunk.capacity();
205        let mut all_bulks: Vec<Vec<ElasticSearchOpenSearchBulk>> = vec![];
206        let mut bulks: Vec<ElasticSearchOpenSearchBulk> = Vec::with_capacity(chunk_capacity);
207
208        let mut bulks_size = 0;
209        for build_bulk_para in self.formatter.convert_chunk(chunk, self.is_append_only)? {
210            let BuildBulkPara {
211                key,
212                value,
213                index,
214                mem_size_b,
215                routing_column,
216            } = build_bulk_para;
217
218            bulks_size += mem_size_b;
219            if let Some(value) = value {
220                let value = json!({
221                    "doc": value,
222                    "doc_as_upsert": true
223                });
224                let bulk = self.client.new_update(
225                    key,
226                    index,
227                    self.config.retry_on_conflict,
228                    routing_column,
229                    value,
230                );
231                bulks.push(bulk);
232            } else {
233                let bulk = self.client.new_delete(key, index, routing_column);
234                bulks.push(bulk);
235            };
236
237            if bulks.len() >= self.config.batch_num_messages
238                || bulks_size >= self.config.batch_size_kb * 1024
239            {
240                all_bulks.push(bulks);
241                bulks = Vec::with_capacity(chunk_capacity);
242                bulks_size = 0;
243            }
244        }
245        if !bulks.is_empty() {
246            all_bulks.push(bulks);
247        }
248        for bulks in all_bulks {
249            let client_clone = self.client.clone();
250            let future = async move {
251                let result = client_clone.send(bulks).await?;
252                if result["errors"].as_bool().is_none() || result["errors"].as_bool().unwrap() {
253                    Err(SinkError::ElasticSearchOpenSearch(anyhow!(
254                        "send bulk to elasticsearch failed: {:?}",
255                        result
256                    )))
257                } else {
258                    Ok(())
259                }
260            }
261            .boxed();
262            add_future.add_future_may_await(future).await?;
263        }
264        Ok(())
265    }
266}