risingwave_connector/sink/elasticsearch_opensearch/
elasticsearch_opensearch_client.rs1use std::sync::Arc;
16
17use anyhow::anyhow;
18use futures::{FutureExt, TryFuture};
19use itertools::Itertools;
20use risingwave_common::array::StreamChunk;
21use risingwave_common::catalog::Schema;
22use serde_json::{Value, json};
23
24use super::super::SinkError;
25use super::elasticsearch_opensearch_config::ElasticSearchOpenSearchConfig;
26use super::elasticsearch_opensearch_formatter::{BuildBulkPara, ElasticSearchOpenSearchFormatter};
27use crate::sink::Result;
28use crate::sink::log_store::DeliveryFutureManagerAddFuture;
29use crate::sink::writer::AsyncTruncateSinkWriter;
30
31pub enum ElasticSearchOpenSearchClient {
32 ElasticSearch(elasticsearch::Elasticsearch),
33 OpenSearch(opensearch::OpenSearch),
34}
35enum ElasticSearchOpenSearchBulk {
36 ElasticSearch(elasticsearch::BulkOperation<serde_json::Value>),
37 OpenSearch(opensearch::BulkOperation<serde_json::Value>),
38}
39
40impl ElasticSearchOpenSearchBulk {
41 pub fn into_elasticsearch_bulk(self) -> elasticsearch::BulkOperation<serde_json::Value> {
42 if let ElasticSearchOpenSearchBulk::ElasticSearch(bulk) = self {
43 bulk
44 } else {
45 panic!("not a elasticsearch bulk")
46 }
47 }
48
49 pub fn into_opensearch_bulk(self) -> opensearch::BulkOperation<serde_json::Value> {
50 if let ElasticSearchOpenSearchBulk::OpenSearch(bulk) = self {
51 bulk
52 } else {
53 panic!("not a opensearch bulk")
54 }
55 }
56}
57
58impl ElasticSearchOpenSearchClient {
59 async fn send(&self, bulks: Vec<ElasticSearchOpenSearchBulk>) -> Result<Value> {
60 match self {
61 ElasticSearchOpenSearchClient::ElasticSearch(client) => {
62 let bulks = bulks
63 .into_iter()
64 .map(ElasticSearchOpenSearchBulk::into_elasticsearch_bulk)
65 .collect_vec();
66 let result = client
67 .bulk(elasticsearch::BulkParts::None)
68 .body(bulks)
69 .send()
70 .await?;
71 Ok(result.json::<Value>().await?)
72 }
73 ElasticSearchOpenSearchClient::OpenSearch(client) => {
74 let bulks = bulks
75 .into_iter()
76 .map(ElasticSearchOpenSearchBulk::into_opensearch_bulk)
77 .collect_vec();
78 let result = client
79 .bulk(opensearch::BulkParts::None)
80 .body(bulks)
81 .send()
82 .await?;
83 Ok(result.json::<Value>().await?)
84 }
85 }
86 }
87
88 pub async fn ping(&self) -> Result<()> {
89 match self {
90 ElasticSearchOpenSearchClient::ElasticSearch(client) => {
91 client.ping().send().await?;
92 }
93 ElasticSearchOpenSearchClient::OpenSearch(client) => {
94 client.ping().send().await?;
95 }
96 }
97 Ok(())
98 }
99
100 fn new_update(
101 &self,
102 key: String,
103 index: String,
104 retry_on_conflict: i32,
105 routing_column: Option<String>,
106 value: serde_json::Value,
107 ) -> ElasticSearchOpenSearchBulk {
108 match self {
109 ElasticSearchOpenSearchClient::ElasticSearch(_) => {
110 let bulk = elasticsearch::BulkOperation::update(key, value)
111 .index(index)
112 .retry_on_conflict(retry_on_conflict);
113 if let Some(routing_column) = routing_column {
114 ElasticSearchOpenSearchBulk::ElasticSearch(bulk.routing(routing_column).into())
115 } else {
116 ElasticSearchOpenSearchBulk::ElasticSearch(bulk.into())
117 }
118 }
119 ElasticSearchOpenSearchClient::OpenSearch(_) => {
120 let bulk = opensearch::BulkOperation::update(key, value)
121 .index(index)
122 .retry_on_conflict(retry_on_conflict);
123 if let Some(routing_column) = routing_column {
124 ElasticSearchOpenSearchBulk::OpenSearch(bulk.routing(routing_column).into())
125 } else {
126 ElasticSearchOpenSearchBulk::OpenSearch(bulk.into())
127 }
128 }
129 }
130 }
131
132 fn new_delete(
133 &self,
134 key: String,
135 index: String,
136 routing_column: Option<String>,
137 ) -> ElasticSearchOpenSearchBulk {
138 match self {
139 ElasticSearchOpenSearchClient::ElasticSearch(_) => {
140 let bulk = elasticsearch::BulkOperation::delete(key).index(index);
141 if let Some(routing_column) = routing_column {
142 ElasticSearchOpenSearchBulk::ElasticSearch(bulk.routing(routing_column).into())
143 } else {
144 ElasticSearchOpenSearchBulk::ElasticSearch(bulk.into())
145 }
146 }
147 ElasticSearchOpenSearchClient::OpenSearch(_) => {
148 let bulk = opensearch::BulkOperation::delete(key).index(index);
149 if let Some(routing_column) = routing_column {
150 ElasticSearchOpenSearchBulk::OpenSearch(bulk.routing(routing_column).into())
151 } else {
152 ElasticSearchOpenSearchBulk::OpenSearch(bulk.into())
153 }
154 }
155 }
156 }
157}
158
159pub struct ElasticSearchOpenSearchSinkWriter {
160 client: Arc<ElasticSearchOpenSearchClient>,
161 formatter: ElasticSearchOpenSearchFormatter,
162 config: ElasticSearchOpenSearchConfig,
163 is_append_only: bool,
164}
165
166impl ElasticSearchOpenSearchSinkWriter {
167 pub fn new(
168 config: ElasticSearchOpenSearchConfig,
169 schema: Schema,
170 pk_indices: Vec<usize>,
171 connector: &str,
172 is_append_only: bool,
173 ) -> Result<Self> {
174 let client = Arc::new(config.build_client(connector)?);
175 let formatter = ElasticSearchOpenSearchFormatter::new(
176 pk_indices,
177 &schema,
178 config.delimiter.clone(),
179 config.get_index_column_index(&schema)?,
180 config.index.clone(),
181 config.get_routing_column_index(&schema)?,
182 )?;
183 Ok(Self {
184 client,
185 formatter,
186 config,
187 is_append_only,
188 })
189 }
190}
191
192pub type ElasticSearchOpenSearchSinkDeliveryFuture =
193 impl TryFuture<Ok = (), Error = SinkError> + Unpin + 'static;
194
195impl AsyncTruncateSinkWriter for ElasticSearchOpenSearchSinkWriter {
196 type DeliveryFuture = ElasticSearchOpenSearchSinkDeliveryFuture;
197
198 async fn write_chunk<'a>(
199 &'a mut self,
200 chunk: StreamChunk,
201 mut add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>,
202 ) -> Result<()> {
203 let chunk_capacity = chunk.capacity();
204 let mut all_bulks: Vec<Vec<ElasticSearchOpenSearchBulk>> = vec![];
205 let mut bulks: Vec<ElasticSearchOpenSearchBulk> = Vec::with_capacity(chunk_capacity);
206
207 let mut bulks_size = 0;
208 for build_bulk_para in self.formatter.convert_chunk(chunk, self.is_append_only)? {
209 let BuildBulkPara {
210 key,
211 value,
212 index,
213 mem_size_b,
214 routing_column,
215 } = build_bulk_para;
216
217 bulks_size += mem_size_b;
218 if let Some(value) = value {
219 let value = json!({
220 "doc": value,
221 "doc_as_upsert": true
222 });
223 let bulk = self.client.new_update(
224 key,
225 index,
226 self.config.retry_on_conflict,
227 routing_column,
228 value,
229 );
230 bulks.push(bulk);
231 } else {
232 let bulk = self.client.new_delete(key, index, routing_column);
233 bulks.push(bulk);
234 };
235
236 if bulks.len() >= self.config.batch_num_messages
237 || bulks_size >= self.config.batch_size_kb * 1024
238 {
239 all_bulks.push(bulks);
240 bulks = Vec::with_capacity(chunk_capacity);
241 bulks_size = 0;
242 }
243 }
244 if !bulks.is_empty() {
245 all_bulks.push(bulks);
246 }
247 for bulks in all_bulks {
248 let client_clone = self.client.clone();
249 let future = async move {
250 let result = client_clone.send(bulks).await?;
251 if result["errors"].as_bool().is_none() || result["errors"].as_bool().unwrap() {
252 Err(SinkError::ElasticSearchOpenSearch(anyhow!(
253 "send bulk to elasticsearch failed: {:?}",
254 result
255 )))
256 } else {
257 Ok(())
258 }
259 }
260 .boxed();
261 add_future.add_future_may_await(future).await?;
262 }
263 Ok(())
264 }
265}