risingwave_connector/sink/elasticsearch_opensearch/
elasticsearch_opensearch_config.rsuse std::collections::BTreeMap;
use anyhow::anyhow;
use risingwave_common::catalog::Schema;
use risingwave_common::types::DataType;
use serde::Deserialize;
use serde_with::{serde_as, DisplayFromStr};
use url::Url;
use with_options::WithOptions;
use super::super::SinkError;
use super::elasticsearch::ES_SINK;
use super::elasticsearch_opensearch_client::ElasticSearchOpenSearchClient;
use super::opensearch::OPENSEARCH_SINK;
use crate::sink::Result;
pub const ES_OPTION_DELIMITER: &str = "delimiter";
pub const ES_OPTION_INDEX_COLUMN: &str = "index_column";
pub const ES_OPTION_INDEX: &str = "index";
pub const ES_OPTION_ROUTING_COLUMN: &str = "routing_column";
#[serde_as]
#[derive(Deserialize, Debug, Clone, WithOptions)]
pub struct ElasticSearchOpenSearchConfig {
#[serde(rename = "url")]
pub url: String,
#[serde(rename = "index")]
pub index: Option<String>,
#[serde(rename = "delimiter")]
pub delimiter: Option<String>,
#[serde(rename = "username")]
pub username: String,
#[serde(rename = "password")]
pub password: String,
#[serde(rename = "index_column")]
pub index_column: Option<String>,
#[serde(rename = "routing_column")]
pub routing_column: Option<String>,
#[serde(rename = "retry_on_conflict")]
#[serde_as(as = "DisplayFromStr")]
#[serde(default = "default_retry_on_conflict")]
pub retry_on_conflict: i32,
#[serde(rename = "batch_num_messages")]
#[serde_as(as = "DisplayFromStr")]
#[serde(default = "default_batch_num_messages")]
pub batch_num_messages: usize,
#[serde(rename = "batch_size_kb")]
#[serde_as(as = "DisplayFromStr")]
#[serde(default = "default_batch_size_kb")]
pub batch_size_kb: usize,
#[serde(rename = "concurrent_requests")]
#[serde_as(as = "DisplayFromStr")]
#[serde(default = "default_concurrent_requests")]
pub concurrent_requests: usize,
}
fn default_retry_on_conflict() -> i32 {
3
}
fn default_batch_num_messages() -> usize {
512
}
fn default_batch_size_kb() -> usize {
5 * 1024
}
fn default_concurrent_requests() -> usize {
1024
}
impl ElasticSearchOpenSearchConfig {
pub fn from_btreemap(properties: BTreeMap<String, String>) -> Result<Self> {
let config = serde_json::from_value::<ElasticSearchOpenSearchConfig>(
serde_json::to_value(properties).unwrap(),
)
.map_err(|e| SinkError::Config(anyhow!(e)))?;
Ok(config)
}
pub fn build_client(&self, connector: &str) -> Result<ElasticSearchOpenSearchClient> {
let url =
Url::parse(&self.url).map_err(|e| SinkError::ElasticSearchOpenSearch(anyhow!(e)))?;
if connector.eq(ES_SINK) {
let transport = elasticsearch::http::transport::TransportBuilder::new(
elasticsearch::http::transport::SingleNodeConnectionPool::new(url),
)
.auth(elasticsearch::auth::Credentials::Basic(
self.username.clone(),
self.password.clone(),
))
.build()
.map_err(|e| SinkError::ElasticSearchOpenSearch(anyhow!(e)))?;
let client = elasticsearch::Elasticsearch::new(transport);
Ok(ElasticSearchOpenSearchClient::ElasticSearch(client))
} else if connector.eq(OPENSEARCH_SINK) {
let transport = opensearch::http::transport::TransportBuilder::new(
opensearch::http::transport::SingleNodeConnectionPool::new(url),
)
.auth(opensearch::auth::Credentials::Basic(
self.username.clone(),
self.password.clone(),
))
.build()
.map_err(|e| SinkError::ElasticSearchOpenSearch(anyhow!(e)))?;
let client = opensearch::OpenSearch::new(transport);
Ok(ElasticSearchOpenSearchClient::OpenSearch(client))
} else {
panic!(
"connector type must be {} or {}, but get {}",
ES_SINK, OPENSEARCH_SINK, connector
);
}
}
pub fn validate_config(&self, schema: &Schema) -> Result<()> {
if self.index_column.is_some() && self.index.is_some()
|| self.index_column.is_none() && self.index.is_none()
{
return Err(SinkError::Config(anyhow!(
"please set only one of the 'index_column' or 'index' properties."
)));
}
if let Some(index_column) = &self.index_column {
let filed = schema
.fields()
.iter()
.find(|f| &f.name == index_column)
.unwrap();
if filed.data_type() != DataType::Varchar {
return Err(SinkError::Config(anyhow!(
"please ensure the data type of {} is varchar.",
index_column
)));
}
}
if let Some(routing_column) = &self.routing_column {
let filed = schema
.fields()
.iter()
.find(|f| &f.name == routing_column)
.unwrap();
if filed.data_type() != DataType::Varchar {
return Err(SinkError::Config(anyhow!(
"please ensure the data type of {} is varchar.",
routing_column
)));
}
}
Ok(())
}
pub fn get_index_column_index(&self, schema: &Schema) -> Result<Option<usize>> {
let index_column_idx = self
.index_column
.as_ref()
.map(|n| {
schema
.fields()
.iter()
.position(|s| &s.name == n)
.ok_or_else(|| anyhow!("Cannot find {}", ES_OPTION_INDEX_COLUMN))
})
.transpose()?;
Ok(index_column_idx)
}
pub fn get_routing_column_index(&self, schema: &Schema) -> Result<Option<usize>> {
let routing_column_idx = self
.routing_column
.as_ref()
.map(|n| {
schema
.fields()
.iter()
.position(|s| &s.name == n)
.ok_or_else(|| anyhow!("Cannot find {}", ES_OPTION_ROUTING_COLUMN))
})
.transpose()?;
Ok(routing_column_idx)
}
}