risingwave_connector/sink/elasticsearch_opensearch/
elasticsearch.rs1use risingwave_common::catalog::Schema;
16use tonic::async_trait;
17
18use super::super::writer::{AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriterExt};
19use super::super::{DummySinkCommitCoordinator, Sink, SinkError, SinkParam, SinkWriterParam};
20use super::elasticsearch_opensearch_client::ElasticSearchOpenSearchSinkWriter;
21use super::elasticsearch_opensearch_config::ElasticSearchOpenSearchConfig;
22use crate::sink::Result;
23
24pub const ES_SINK: &str = "elasticsearch";
25
26#[derive(Debug)]
27pub struct ElasticSearchSink {
28 config: ElasticSearchOpenSearchConfig,
29 schema: Schema,
30 pk_indices: Vec<usize>,
31 is_append_only: bool,
32}
33
34#[async_trait]
35impl TryFrom<SinkParam> for ElasticSearchSink {
36 type Error = SinkError;
37
38 fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
39 let schema = param.schema();
40 let config = ElasticSearchOpenSearchConfig::from_btreemap(param.properties)?;
41 Ok(Self {
42 config,
43 schema,
44 pk_indices: param.downstream_pk,
45 is_append_only: param.sink_type.is_append_only(),
46 })
47 }
48}
49
50impl Sink for ElasticSearchSink {
51 type Coordinator = DummySinkCommitCoordinator;
52 type LogSinker = AsyncTruncateLogSinkerOf<ElasticSearchOpenSearchSinkWriter>;
53
54 const SINK_NAME: &'static str = ES_SINK;
55
56 async fn validate(&self) -> Result<()> {
57 self.config.validate_config(&self.schema)?;
58 let client = self.config.build_client(Self::SINK_NAME)?;
59 client.ping().await?;
60 Ok(())
61 }
62
63 async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
64 Ok(ElasticSearchOpenSearchSinkWriter::new(
65 self.config.clone(),
66 self.schema.clone(),
67 self.pk_indices.clone(),
68 Self::SINK_NAME,
69 self.is_append_only,
70 )?
71 .into_log_sinker(self.config.concurrent_requests))
72 }
73}