risingwave_connector/sink/elasticsearch_opensearch/
elasticsearch.rs1use risingwave_common::catalog::Schema;
16use tonic::async_trait;
17
18use super::super::writer::{AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriterExt};
19use super::super::{Sink, SinkError, SinkParam, SinkWriterParam};
20use super::elasticsearch_opensearch_client::ElasticSearchOpenSearchSinkWriter;
21use super::elasticsearch_opensearch_config::{ElasticSearchConfig, ElasticSearchOpenSearchConfig};
22use crate::enforce_secret::EnforceSecret;
23use crate::sink::Result;
24
25pub const ES_SINK: &str = "elasticsearch";
26
27#[derive(Debug)]
28pub struct ElasticSearchSink {
29 config: ElasticSearchOpenSearchConfig,
30 schema: Schema,
31 pk_indices: Vec<usize>,
32 is_append_only: bool,
33}
34
35impl EnforceSecret for ElasticSearchSink {
36 fn enforce_secret<'a>(
37 prop_iter: impl Iterator<Item = &'a str>,
38 ) -> crate::error::ConnectorResult<()> {
39 for prop in prop_iter {
40 ElasticSearchOpenSearchConfig::enforce_one(prop)?;
41 }
42 Ok(())
43 }
44}
45
46#[async_trait]
47impl TryFrom<SinkParam> for ElasticSearchSink {
48 type Error = SinkError;
49
50 fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
51 let schema = param.schema();
52 let config = ElasticSearchConfig::from_btreemap(param.properties)?.inner;
53 Ok(Self {
54 config,
55 schema,
56 pk_indices: param.downstream_pk,
57 is_append_only: param.sink_type.is_append_only(),
58 })
59 }
60}
61
62impl Sink for ElasticSearchSink {
63 type LogSinker = AsyncTruncateLogSinkerOf<ElasticSearchOpenSearchSinkWriter>;
64
65 const SINK_NAME: &'static str = ES_SINK;
66
67 async fn validate(&self) -> Result<()> {
68 self.config.validate_config(&self.schema)?;
69 let client = self.config.build_client(Self::SINK_NAME)?;
70 client.ping().await?;
71 Ok(())
72 }
73
74 async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
75 Ok(ElasticSearchOpenSearchSinkWriter::new(
76 self.config.clone(),
77 self.schema.clone(),
78 self.pk_indices.clone(),
79 Self::SINK_NAME,
80 self.is_append_only,
81 )?
82 .into_log_sinker(self.config.concurrent_requests))
83 }
84}