risingwave_connector/sink/elasticsearch_opensearch/
opensearch.rs1use anyhow::anyhow;
16use risingwave_common::catalog::Schema;
17use risingwave_common::session_config::sink_decouple::SinkDecouple;
18use tonic::async_trait;
19
20use super::super::writer::{AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriterExt};
21use super::super::{DummySinkCommitCoordinator, Sink, SinkError, SinkParam, SinkWriterParam};
22use super::elasticsearch_opensearch_client::ElasticSearchOpenSearchSinkWriter;
23use super::elasticsearch_opensearch_config::ElasticSearchOpenSearchConfig;
24use crate::sink::Result;
25
26pub const OPENSEARCH_SINK: &str = "opensearch";
27
28#[derive(Debug)]
29pub struct OpenSearchSink {
30 config: ElasticSearchOpenSearchConfig,
31 schema: Schema,
32 pk_indices: Vec<usize>,
33 is_append_only: bool,
34}
35
36#[async_trait]
37impl TryFrom<SinkParam> for OpenSearchSink {
38 type Error = SinkError;
39
40 fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
41 let schema = param.schema();
42 let config = ElasticSearchOpenSearchConfig::from_btreemap(param.properties)?;
43 Ok(Self {
44 config,
45 schema,
46 pk_indices: param.downstream_pk,
47 is_append_only: param.sink_type.is_append_only(),
48 })
49 }
50}
51
52impl Sink for OpenSearchSink {
53 type Coordinator = DummySinkCommitCoordinator;
54 type LogSinker = AsyncTruncateLogSinkerOf<ElasticSearchOpenSearchSinkWriter>;
55
56 const SINK_NAME: &'static str = OPENSEARCH_SINK;
57
58 async fn validate(&self) -> Result<()> {
59 risingwave_common::license::Feature::OpenSearchSink
60 .check_available()
61 .map_err(|e| anyhow::anyhow!(e))?;
62 self.config.validate_config(&self.schema)?;
63 let client = self.config.build_client(Self::SINK_NAME)?;
64 client.ping().await?;
65 Ok(())
66 }
67
68 async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
69 Ok(ElasticSearchOpenSearchSinkWriter::new(
70 self.config.clone(),
71 self.schema.clone(),
72 self.pk_indices.clone(),
73 Self::SINK_NAME,
74 self.is_append_only,
75 )?
76 .into_log_sinker(self.config.concurrent_requests))
77 }
78
79 fn set_default_commit_checkpoint_interval(
80 desc: &mut crate::sink::catalog::desc::SinkDesc,
81 user_specified: &risingwave_common::session_config::sink_decouple::SinkDecouple,
82 ) -> Result<()> {
83 if crate::sink::is_sink_support_commit_checkpoint_interval(Self::SINK_NAME) {
84 match desc
85 .properties
86 .get(crate::sink::decouple_checkpoint_log_sink::COMMIT_CHECKPOINT_INTERVAL)
87 {
88 Some(commit_checkpoint_interval) => {
89 let commit_checkpoint_interval = commit_checkpoint_interval
90 .parse::<u64>()
91 .map_err(|e| SinkError::Config(anyhow!(e)))?;
92 if std::matches!(user_specified, SinkDecouple::Disable)
93 && commit_checkpoint_interval > 1
94 {
95 return Err(SinkError::Config(anyhow!(
96 "config conflict: `commit_checkpoint_interval` larger than 1 means that sink decouple must be enabled, but session config sink_decouple is disabled"
97 )));
98 }
99 }
100 None => match user_specified {
101 risingwave_common::session_config::sink_decouple::SinkDecouple::Default
102 | risingwave_common::session_config::sink_decouple::SinkDecouple::Enable => {
103 desc.properties.insert(
104 crate::sink::decouple_checkpoint_log_sink::COMMIT_CHECKPOINT_INTERVAL.to_owned(),
105 crate::sink::decouple_checkpoint_log_sink::DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE.to_string(),
106 );
107 }
108 risingwave_common::session_config::sink_decouple::SinkDecouple::Disable => {
109 desc.properties.insert(
110 crate::sink::decouple_checkpoint_log_sink::COMMIT_CHECKPOINT_INTERVAL.to_owned(),
111 crate::sink::decouple_checkpoint_log_sink::DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITHOUT_SINK_DECOUPLE.to_string(),
112 );
113 }
114 },
115 }
116 }
117 Ok(())
118 }
119
120 fn is_sink_decouple(
121 user_specified: &risingwave_common::session_config::sink_decouple::SinkDecouple,
122 ) -> Result<bool> {
123 match user_specified {
124 risingwave_common::session_config::sink_decouple::SinkDecouple::Default
125 | risingwave_common::session_config::sink_decouple::SinkDecouple::Enable => Ok(true),
126 risingwave_common::session_config::sink_decouple::SinkDecouple::Disable => Ok(false),
127 }
128 }
129}