Skip to main content

risingwave_pb/
connector_service.rs

1// This file is @generated by prost-build.
2#[derive(prost_helpers::AnyPB)]
3#[derive(Clone, PartialEq, ::prost::Message)]
4pub struct TableSchema {
5    #[prost(message, repeated, tag = "1")]
6    pub columns: ::prost::alloc::vec::Vec<super::plan_common::ColumnDesc>,
7    #[prost(uint32, repeated, tag = "2")]
8    pub pk_indices: ::prost::alloc::vec::Vec<u32>,
9}
10#[derive(prost_helpers::AnyPB)]
11#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
12pub struct ValidationError {
13    #[prost(string, tag = "1")]
14    pub error_message: ::prost::alloc::string::String,
15}
16#[derive(prost_helpers::AnyPB)]
17#[derive(Clone, PartialEq, ::prost::Message)]
18pub struct SinkParam {
19    #[prost(uint32, tag = "1", wrapper = "crate::id::SinkId")]
20    pub sink_id: crate::id::SinkId,
21    #[prost(btree_map = "string, string", tag = "2")]
22    pub properties: ::prost::alloc::collections::BTreeMap<
23        ::prost::alloc::string::String,
24        ::prost::alloc::string::String,
25    >,
26    #[prost(message, optional, tag = "3")]
27    pub table_schema: ::core::option::Option<TableSchema>,
28    /// to be deprecated
29    #[prost(enumeration = "super::catalog::SinkType", tag = "4")]
30    pub sink_type: i32,
31    #[prost(string, tag = "5")]
32    pub db_name: ::prost::alloc::string::String,
33    #[prost(string, tag = "6")]
34    pub sink_from_name: ::prost::alloc::string::String,
35    #[prost(message, optional, tag = "7")]
36    pub format_desc: ::core::option::Option<super::catalog::SinkFormatDesc>,
37    #[prost(string, tag = "8")]
38    pub sink_name: ::prost::alloc::string::String,
39    /// Backward-compatible replacement for `SINK_TYPE_FORCE_APPEND_ONLY`.
40    ///
41    /// Should not directly access this field. Use method `ignore_delete()` instead.
42    #[prost(bool, tag = "9")]
43    pub raw_ignore_delete: bool,
44}
45#[derive(prost_helpers::AnyPB)]
46#[derive(Clone, PartialEq, ::prost::Message)]
47pub struct SinkWriterStreamRequest {
48    #[prost(oneof = "sink_writer_stream_request::Request", tags = "1, 3, 4")]
49    pub request: ::core::option::Option<sink_writer_stream_request::Request>,
50}
51/// Nested message and enum types in `SinkWriterStreamRequest`.
52pub mod sink_writer_stream_request {
53    #[derive(prost_helpers::AnyPB)]
54    #[derive(Clone, PartialEq, ::prost::Message)]
55    pub struct StartSink {
56        #[prost(message, optional, tag = "1")]
57        pub sink_param: ::core::option::Option<super::SinkParam>,
58        #[prost(message, optional, tag = "3")]
59        pub payload_schema: ::core::option::Option<super::TableSchema>,
60    }
61    #[derive(prost_helpers::AnyPB)]
62    #[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
63    pub struct WriteBatch {
64        #[prost(uint64, tag = "3")]
65        pub batch_id: u64,
66        #[prost(uint64, tag = "4")]
67        pub epoch: u64,
68        #[prost(oneof = "write_batch::Payload", tags = "2, 5")]
69        pub payload: ::core::option::Option<write_batch::Payload>,
70    }
71    /// Nested message and enum types in `WriteBatch`.
72    pub mod write_batch {
73        #[derive(prost_helpers::AnyPB)]
74        #[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
75        pub struct StreamChunkPayload {
76            #[prost(bytes = "vec", tag = "1")]
77            pub binary_data: ::prost::alloc::vec::Vec<u8>,
78        }
79        #[derive(prost_helpers::AnyPB)]
80        #[derive(Clone, PartialEq, Eq, Hash, ::prost::Oneof)]
81        pub enum Payload {
82            #[prost(message, tag = "2")]
83            StreamChunkPayload(StreamChunkPayload),
84            /// This is a reference pointer to a StreamChunk. The StreamChunk is owned
85            /// by the JniSinkWriterStreamRequest, which should handle the release of StreamChunk.
86            /// Index set to 5 because 3 and 4 have been occupied by `batch_id` and `epoch`
87            #[prost(int64, tag = "5")]
88            StreamChunkRefPointer(i64),
89        }
90    }
91    #[derive(prost_helpers::AnyPB)]
92    #[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)]
93    pub struct Barrier {
94        #[prost(uint64, tag = "1")]
95        pub epoch: u64,
96        #[prost(bool, tag = "2")]
97        pub is_checkpoint: bool,
98    }
99    #[derive(prost_helpers::AnyPB)]
100    #[derive(Clone, PartialEq, ::prost::Oneof)]
101    pub enum Request {
102        #[prost(message, tag = "1")]
103        Start(StartSink),
104        #[prost(message, tag = "3")]
105        WriteBatch(WriteBatch),
106        #[prost(message, tag = "4")]
107        Barrier(Barrier),
108    }
109}
110#[derive(prost_helpers::AnyPB)]
111#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
112pub struct SinkWriterStreamResponse {
113    #[prost(oneof = "sink_writer_stream_response::Response", tags = "1, 2, 3")]
114    pub response: ::core::option::Option<sink_writer_stream_response::Response>,
115}
116/// Nested message and enum types in `SinkWriterStreamResponse`.
117pub mod sink_writer_stream_response {
118    #[derive(prost_helpers::AnyPB)]
119    #[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)]
120    pub struct StartResponse {}
121    #[derive(prost_helpers::AnyPB)]
122    #[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
123    pub struct CommitResponse {
124        #[prost(uint64, tag = "1")]
125        pub epoch: u64,
126        #[prost(message, optional, tag = "2")]
127        pub metadata: ::core::option::Option<super::SinkMetadata>,
128    }
129    #[derive(prost_helpers::AnyPB)]
130    #[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)]
131    pub struct BatchWrittenResponse {
132        #[prost(uint64, tag = "1")]
133        pub epoch: u64,
134        #[prost(uint64, tag = "2")]
135        pub batch_id: u64,
136    }
137    #[derive(prost_helpers::AnyPB)]
138    #[derive(Clone, PartialEq, Eq, Hash, ::prost::Oneof)]
139    pub enum Response {
140        #[prost(message, tag = "1")]
141        Start(StartResponse),
142        #[prost(message, tag = "2")]
143        Commit(CommitResponse),
144        #[prost(message, tag = "3")]
145        Batch(BatchWrittenResponse),
146    }
147}
148#[derive(prost_helpers::AnyPB)]
149#[derive(Clone, PartialEq, ::prost::Message)]
150pub struct ValidateSinkRequest {
151    #[prost(message, optional, tag = "1")]
152    pub sink_param: ::core::option::Option<SinkParam>,
153}
154#[derive(prost_helpers::AnyPB)]
155#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
156pub struct ValidateSinkResponse {
157    /// On validation failure, we return the error.
158    #[prost(message, optional, tag = "1")]
159    pub error: ::core::option::Option<ValidationError>,
160}
161#[derive(prost_helpers::AnyPB)]
162#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
163pub struct SinkMetadata {
164    #[prost(oneof = "sink_metadata::Metadata", tags = "1")]
165    pub metadata: ::core::option::Option<sink_metadata::Metadata>,
166}
167/// Nested message and enum types in `SinkMetadata`.
168pub mod sink_metadata {
169    #[derive(prost_helpers::AnyPB)]
170    #[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
171    pub struct SerializedMetadata {
172        #[prost(bytes = "vec", tag = "1")]
173        pub metadata: ::prost::alloc::vec::Vec<u8>,
174    }
175    #[derive(prost_helpers::AnyPB)]
176    #[derive(Clone, PartialEq, Eq, Hash, ::prost::Oneof)]
177    pub enum Metadata {
178        #[prost(message, tag = "1")]
179        Serialized(SerializedMetadata),
180    }
181}
182#[derive(prost_helpers::AnyPB)]
183#[derive(Clone, PartialEq, ::prost::Message)]
184pub struct SinkCoordinatorStreamRequest {
185    #[prost(oneof = "sink_coordinator_stream_request::Request", tags = "1, 2")]
186    pub request: ::core::option::Option<sink_coordinator_stream_request::Request>,
187}
188/// Nested message and enum types in `SinkCoordinatorStreamRequest`.
189pub mod sink_coordinator_stream_request {
190    #[derive(prost_helpers::AnyPB)]
191    #[derive(Clone, PartialEq, ::prost::Message)]
192    pub struct StartCoordinator {
193        #[prost(message, optional, tag = "1")]
194        pub param: ::core::option::Option<super::SinkParam>,
195    }
196    #[derive(prost_helpers::AnyPB)]
197    #[derive(Clone, PartialEq, ::prost::Message)]
198    pub struct CommitMetadata {
199        #[prost(uint64, tag = "1")]
200        pub epoch: u64,
201        #[prost(message, repeated, tag = "2")]
202        pub metadata: ::prost::alloc::vec::Vec<super::SinkMetadata>,
203    }
204    #[derive(prost_helpers::AnyPB)]
205    #[derive(Clone, PartialEq, ::prost::Oneof)]
206    pub enum Request {
207        #[prost(message, tag = "1")]
208        Start(StartCoordinator),
209        #[prost(message, tag = "2")]
210        Commit(CommitMetadata),
211    }
212}
213#[derive(prost_helpers::AnyPB)]
214#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)]
215pub struct SinkCoordinatorStreamResponse {
216    #[prost(oneof = "sink_coordinator_stream_response::Response", tags = "1, 2")]
217    pub response: ::core::option::Option<sink_coordinator_stream_response::Response>,
218}
219/// Nested message and enum types in `SinkCoordinatorStreamResponse`.
220pub mod sink_coordinator_stream_response {
221    #[derive(prost_helpers::AnyPB)]
222    #[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)]
223    pub struct StartResponse {}
224    #[derive(prost_helpers::AnyPB)]
225    #[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)]
226    pub struct CommitResponse {
227        #[prost(uint64, tag = "1")]
228        pub epoch: u64,
229    }
230    #[derive(prost_helpers::AnyPB)]
231    #[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Oneof)]
232    pub enum Response {
233        #[prost(message, tag = "1")]
234        Start(StartResponse),
235        #[prost(message, tag = "2")]
236        Commit(CommitResponse),
237    }
238}
239#[derive(prost_helpers::AnyPB)]
240#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
241pub struct CdcMessage {
242    /// The value of the Debezium message
243    #[prost(string, tag = "1")]
244    pub payload: ::prost::alloc::string::String,
245    #[prost(string, tag = "2")]
246    pub partition: ::prost::alloc::string::String,
247    #[prost(string, tag = "3")]
248    pub offset: ::prost::alloc::string::String,
249    #[prost(string, tag = "4")]
250    pub full_table_name: ::prost::alloc::string::String,
251    #[prost(int64, tag = "5")]
252    pub source_ts_ms: i64,
253    #[prost(enumeration = "cdc_message::CdcMessageType", tag = "6")]
254    pub msg_type: i32,
255    /// The key of the Debezium message, which only used by `mongodb-cdc` connector.
256    #[prost(string, tag = "7")]
257    pub key: ::prost::alloc::string::String,
258    /// The CDC source type of this message. This is required to disambiguate the parsing logic for
259    /// `full_table_name` (e.g. `db.table` in MySQL vs `schema.table` in Postgres/SQL Server).
260    #[prost(enumeration = "SourceType", tag = "8")]
261    pub source_type: i32,
262}
263/// Nested message and enum types in `CdcMessage`.
264pub mod cdc_message {
265    #[derive(prost_helpers::AnyPB)]
266    #[derive(
267        Clone,
268        Copy,
269        Debug,
270        PartialEq,
271        Eq,
272        Hash,
273        PartialOrd,
274        Ord,
275        ::prost::Enumeration
276    )]
277    #[repr(i32)]
278    pub enum CdcMessageType {
279        Unspecified = 0,
280        Heartbeat = 1,
281        Data = 2,
282        TransactionMeta = 3,
283        SchemaChange = 4,
284    }
285    impl CdcMessageType {
286        /// String value of the enum field names used in the ProtoBuf definition.
287        ///
288        /// The values are not transformed in any way and thus are considered stable
289        /// (if the ProtoBuf definition does not change) and safe for programmatic use.
290        pub fn as_str_name(&self) -> &'static str {
291            match self {
292                Self::Unspecified => "UNSPECIFIED",
293                Self::Heartbeat => "HEARTBEAT",
294                Self::Data => "DATA",
295                Self::TransactionMeta => "TRANSACTION_META",
296                Self::SchemaChange => "SCHEMA_CHANGE",
297            }
298        }
299        /// Creates an enum from field names used in the ProtoBuf definition.
300        pub fn from_str_name(value: &str) -> ::core::option::Option<Self> {
301            match value {
302                "UNSPECIFIED" => Some(Self::Unspecified),
303                "HEARTBEAT" => Some(Self::Heartbeat),
304                "DATA" => Some(Self::Data),
305                "TRANSACTION_META" => Some(Self::TransactionMeta),
306                "SCHEMA_CHANGE" => Some(Self::SchemaChange),
307                _ => None,
308            }
309        }
310    }
311}
312#[derive(prost_helpers::AnyPB)]
313#[derive(Clone, PartialEq, ::prost::Message)]
314pub struct GetEventStreamRequest {
315    #[prost(uint64, tag = "1")]
316    pub source_id: u64,
317    #[prost(enumeration = "SourceType", tag = "2")]
318    pub source_type: i32,
319    #[prost(string, tag = "3")]
320    pub start_offset: ::prost::alloc::string::String,
321    #[prost(btree_map = "string, string", tag = "4")]
322    pub properties: ::prost::alloc::collections::BTreeMap<
323        ::prost::alloc::string::String,
324        ::prost::alloc::string::String,
325    >,
326    #[prost(bool, tag = "5")]
327    pub snapshot_done: bool,
328    #[prost(bool, tag = "6")]
329    pub is_source_job: bool,
330}
331#[derive(prost_helpers::AnyPB)]
332#[derive(Clone, PartialEq, ::prost::Message)]
333pub struct GetEventStreamResponse {
334    #[prost(uint64, tag = "1")]
335    pub source_id: u64,
336    #[prost(message, repeated, tag = "2")]
337    pub events: ::prost::alloc::vec::Vec<CdcMessage>,
338    #[prost(message, optional, tag = "3")]
339    pub control: ::core::option::Option<get_event_stream_response::ControlInfo>,
340}
341/// Nested message and enum types in `GetEventStreamResponse`.
342pub mod get_event_stream_response {
343    #[derive(prost_helpers::AnyPB)]
344    #[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)]
345    pub struct ControlInfo {
346        #[prost(bool, tag = "1")]
347        pub handshake_ok: bool,
348    }
349}
350#[derive(prost_helpers::AnyPB)]
351#[derive(Clone, PartialEq, ::prost::Message)]
352pub struct ValidateSourceRequest {
353    #[prost(uint64, tag = "1")]
354    pub source_id: u64,
355    #[prost(enumeration = "SourceType", tag = "2")]
356    pub source_type: i32,
357    #[prost(btree_map = "string, string", tag = "3")]
358    pub properties: ::prost::alloc::collections::BTreeMap<
359        ::prost::alloc::string::String,
360        ::prost::alloc::string::String,
361    >,
362    #[prost(message, optional, tag = "4")]
363    pub table_schema: ::core::option::Option<TableSchema>,
364    #[prost(bool, tag = "5")]
365    pub is_source_job: bool,
366    #[prost(bool, tag = "6")]
367    pub is_backfill_table: bool,
368}
369#[derive(prost_helpers::AnyPB)]
370#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
371pub struct ValidateSourceResponse {
372    /// On validation failure, we return the error.
373    #[prost(message, optional, tag = "1")]
374    pub error: ::core::option::Option<ValidationError>,
375}
376#[derive(prost_helpers::AnyPB)]
377#[derive(Clone, PartialEq, ::prost::Message)]
378pub struct CoordinateRequest {
379    #[prost(oneof = "coordinate_request::Msg", tags = "1, 2, 3, 4, 5")]
380    pub msg: ::core::option::Option<coordinate_request::Msg>,
381}
382/// Nested message and enum types in `CoordinateRequest`.
383pub mod coordinate_request {
384    /// The first request that starts a coordination between sink writer and coordinator.
385    /// The service will respond after sink writers of all vnodes have sent the request.
386    #[derive(prost_helpers::AnyPB)]
387    #[derive(Clone, PartialEq, ::prost::Message)]
388    pub struct StartCoordinationRequest {
389        #[prost(message, optional, tag = "1")]
390        pub vnode_bitmap: ::core::option::Option<super::super::common::Buffer>,
391        #[prost(message, optional, tag = "2")]
392        pub param: ::core::option::Option<super::SinkParam>,
393    }
394    #[derive(prost_helpers::AnyPB)]
395    #[derive(Clone, PartialEq, ::prost::Message)]
396    pub struct CommitRequest {
397        #[prost(uint64, tag = "1")]
398        pub epoch: u64,
399        #[prost(message, optional, tag = "2")]
400        pub metadata: ::core::option::Option<super::SinkMetadata>,
401        #[prost(message, optional, tag = "3")]
402        pub schema_change: ::core::option::Option<
403            super::super::stream_plan::SinkSchemaChange,
404        >,
405    }
406    #[derive(prost_helpers::AnyPB)]
407    #[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
408    pub struct UpdateVnodeBitmapRequest {
409        #[prost(message, optional, tag = "1")]
410        pub vnode_bitmap: ::core::option::Option<super::super::common::Buffer>,
411    }
412    #[derive(prost_helpers::AnyPB)]
413    #[derive(Clone, PartialEq, ::prost::Oneof)]
414    pub enum Msg {
415        #[prost(message, tag = "1")]
416        StartRequest(StartCoordinationRequest),
417        #[prost(message, tag = "2")]
418        CommitRequest(CommitRequest),
419        #[prost(message, tag = "3")]
420        UpdateVnodeRequest(UpdateVnodeBitmapRequest),
421        #[prost(bool, tag = "4")]
422        Stop(bool),
423        #[prost(uint64, tag = "5")]
424        AlignInitialEpochRequest(u64),
425    }
426}
427#[derive(prost_helpers::AnyPB)]
428#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)]
429pub struct CoordinateResponse {
430    #[prost(oneof = "coordinate_response::Msg", tags = "1, 2, 3, 4")]
431    pub msg: ::core::option::Option<coordinate_response::Msg>,
432}
433/// Nested message and enum types in `CoordinateResponse`.
434pub mod coordinate_response {
435    #[derive(prost_helpers::AnyPB)]
436    #[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)]
437    pub struct StartCoordinationResponse {
438        #[prost(uint64, optional, tag = "1")]
439        pub log_store_rewind_start_epoch: ::core::option::Option<u64>,
440    }
441    #[derive(prost_helpers::AnyPB)]
442    #[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)]
443    pub struct CommitResponse {
444        #[prost(uint64, tag = "1")]
445        pub epoch: u64,
446    }
447    #[derive(prost_helpers::AnyPB)]
448    #[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Oneof)]
449    pub enum Msg {
450        #[prost(message, tag = "1")]
451        StartResponse(StartCoordinationResponse),
452        #[prost(message, tag = "2")]
453        CommitResponse(CommitResponse),
454        #[prost(bool, tag = "3")]
455        Stopped(bool),
456        #[prost(uint64, tag = "4")]
457        AlignInitialEpochResponse(u64),
458    }
459}
460#[derive(prost_helpers::AnyPB)]
461#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
462pub struct IcebergV3PreCommitState {
463    #[prost(bytes = "vec", tag = "1")]
464    pub agg_result: ::prost::alloc::vec::Vec<u8>,
465    #[prost(int64, tag = "2")]
466    pub snapshot_id: i64,
467}
468#[derive(prost_helpers::AnyPB)]
469#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)]
470#[repr(i32)]
471pub enum SourceType {
472    Unspecified = 0,
473    Mysql = 1,
474    Postgres = 2,
475    Citus = 3,
476    Mongodb = 4,
477    SqlServer = 5,
478}
479impl SourceType {
480    /// String value of the enum field names used in the ProtoBuf definition.
481    ///
482    /// The values are not transformed in any way and thus are considered stable
483    /// (if the ProtoBuf definition does not change) and safe for programmatic use.
484    pub fn as_str_name(&self) -> &'static str {
485        match self {
486            Self::Unspecified => "UNSPECIFIED",
487            Self::Mysql => "MYSQL",
488            Self::Postgres => "POSTGRES",
489            Self::Citus => "CITUS",
490            Self::Mongodb => "MONGODB",
491            Self::SqlServer => "SQL_SERVER",
492        }
493    }
494    /// Creates an enum from field names used in the ProtoBuf definition.
495    pub fn from_str_name(value: &str) -> ::core::option::Option<Self> {
496        match value {
497            "UNSPECIFIED" => Some(Self::Unspecified),
498            "MYSQL" => Some(Self::Mysql),
499            "POSTGRES" => Some(Self::Postgres),
500            "CITUS" => Some(Self::Citus),
501            "MONGODB" => Some(Self::Mongodb),
502            "SQL_SERVER" => Some(Self::SqlServer),
503            _ => None,
504        }
505    }
506}
507/// Generated client implementations.
508pub mod connector_service_client {
509    #![allow(
510        unused_variables,
511        dead_code,
512        missing_docs,
513        clippy::wildcard_imports,
514        clippy::let_unit_value,
515    )]
516    use tonic::codegen::*;
517    use tonic::codegen::http::Uri;
518    #[derive(Debug, Clone)]
519    pub struct ConnectorServiceClient<T> {
520        inner: tonic::client::Grpc<T>,
521    }
522    impl ConnectorServiceClient<tonic::transport::Channel> {
523        /// Attempt to create a new client by connecting to a given endpoint.
524        pub async fn connect<D>(dst: D) -> Result<Self, tonic::transport::Error>
525        where
526            D: TryInto<tonic::transport::Endpoint>,
527            D::Error: Into<StdError>,
528        {
529            let conn = tonic::transport::Endpoint::new(dst)?.connect().await?;
530            Ok(Self::new(conn))
531        }
532    }
533    impl<T> ConnectorServiceClient<T>
534    where
535        T: tonic::client::GrpcService<tonic::body::Body>,
536        T::Error: Into<StdError>,
537        T::ResponseBody: Body<Data = Bytes> + std::marker::Send + 'static,
538        <T::ResponseBody as Body>::Error: Into<StdError> + std::marker::Send,
539    {
540        pub fn new(inner: T) -> Self {
541            let inner = tonic::client::Grpc::new(inner);
542            Self { inner }
543        }
544        pub fn with_origin(inner: T, origin: Uri) -> Self {
545            let inner = tonic::client::Grpc::with_origin(inner, origin);
546            Self { inner }
547        }
548        pub fn with_interceptor<F>(
549            inner: T,
550            interceptor: F,
551        ) -> ConnectorServiceClient<InterceptedService<T, F>>
552        where
553            F: tonic::service::Interceptor,
554            T::ResponseBody: Default,
555            T: tonic::codegen::Service<
556                http::Request<tonic::body::Body>,
557                Response = http::Response<
558                    <T as tonic::client::GrpcService<tonic::body::Body>>::ResponseBody,
559                >,
560            >,
561            <T as tonic::codegen::Service<
562                http::Request<tonic::body::Body>,
563            >>::Error: Into<StdError> + std::marker::Send + std::marker::Sync,
564        {
565            ConnectorServiceClient::new(InterceptedService::new(inner, interceptor))
566        }
567        /// Compress requests with the given encoding.
568        ///
569        /// This requires the server to support it otherwise it might respond with an
570        /// error.
571        #[must_use]
572        pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
573            self.inner = self.inner.send_compressed(encoding);
574            self
575        }
576        /// Enable decompressing responses.
577        #[must_use]
578        pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
579            self.inner = self.inner.accept_compressed(encoding);
580            self
581        }
582        /// Limits the maximum size of a decoded message.
583        ///
584        /// Default: `4MB`
585        #[must_use]
586        pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
587            self.inner = self.inner.max_decoding_message_size(limit);
588            self
589        }
590        /// Limits the maximum size of an encoded message.
591        ///
592        /// Default: `usize::MAX`
593        #[must_use]
594        pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
595            self.inner = self.inner.max_encoding_message_size(limit);
596            self
597        }
598        pub async fn sink_writer_stream(
599            &mut self,
600            request: impl tonic::IntoStreamingRequest<
601                Message = super::SinkWriterStreamRequest,
602            >,
603        ) -> std::result::Result<
604            tonic::Response<tonic::codec::Streaming<super::SinkWriterStreamResponse>>,
605            tonic::Status,
606        > {
607            self.inner
608                .ready()
609                .await
610                .map_err(|e| {
611                    tonic::Status::unknown(
612                        format!("Service was not ready: {}", e.into()),
613                    )
614                })?;
615            let codec = tonic_prost::ProstCodec::default();
616            let path = http::uri::PathAndQuery::from_static(
617                "/connector_service.ConnectorService/SinkWriterStream",
618            );
619            let mut req = request.into_streaming_request();
620            req.extensions_mut()
621                .insert(
622                    GrpcMethod::new(
623                        "connector_service.ConnectorService",
624                        "SinkWriterStream",
625                    ),
626                );
627            self.inner.streaming(req, path, codec).await
628        }
629        pub async fn sink_coordinator_stream(
630            &mut self,
631            request: impl tonic::IntoStreamingRequest<
632                Message = super::SinkCoordinatorStreamRequest,
633            >,
634        ) -> std::result::Result<
635            tonic::Response<
636                tonic::codec::Streaming<super::SinkCoordinatorStreamResponse>,
637            >,
638            tonic::Status,
639        > {
640            self.inner
641                .ready()
642                .await
643                .map_err(|e| {
644                    tonic::Status::unknown(
645                        format!("Service was not ready: {}", e.into()),
646                    )
647                })?;
648            let codec = tonic_prost::ProstCodec::default();
649            let path = http::uri::PathAndQuery::from_static(
650                "/connector_service.ConnectorService/SinkCoordinatorStream",
651            );
652            let mut req = request.into_streaming_request();
653            req.extensions_mut()
654                .insert(
655                    GrpcMethod::new(
656                        "connector_service.ConnectorService",
657                        "SinkCoordinatorStream",
658                    ),
659                );
660            self.inner.streaming(req, path, codec).await
661        }
662        pub async fn validate_sink(
663            &mut self,
664            request: impl tonic::IntoRequest<super::ValidateSinkRequest>,
665        ) -> std::result::Result<
666            tonic::Response<super::ValidateSinkResponse>,
667            tonic::Status,
668        > {
669            self.inner
670                .ready()
671                .await
672                .map_err(|e| {
673                    tonic::Status::unknown(
674                        format!("Service was not ready: {}", e.into()),
675                    )
676                })?;
677            let codec = tonic_prost::ProstCodec::default();
678            let path = http::uri::PathAndQuery::from_static(
679                "/connector_service.ConnectorService/ValidateSink",
680            );
681            let mut req = request.into_request();
682            req.extensions_mut()
683                .insert(
684                    GrpcMethod::new("connector_service.ConnectorService", "ValidateSink"),
685                );
686            self.inner.unary(req, path, codec).await
687        }
688        pub async fn get_event_stream(
689            &mut self,
690            request: impl tonic::IntoRequest<super::GetEventStreamRequest>,
691        ) -> std::result::Result<
692            tonic::Response<tonic::codec::Streaming<super::GetEventStreamResponse>>,
693            tonic::Status,
694        > {
695            self.inner
696                .ready()
697                .await
698                .map_err(|e| {
699                    tonic::Status::unknown(
700                        format!("Service was not ready: {}", e.into()),
701                    )
702                })?;
703            let codec = tonic_prost::ProstCodec::default();
704            let path = http::uri::PathAndQuery::from_static(
705                "/connector_service.ConnectorService/GetEventStream",
706            );
707            let mut req = request.into_request();
708            req.extensions_mut()
709                .insert(
710                    GrpcMethod::new(
711                        "connector_service.ConnectorService",
712                        "GetEventStream",
713                    ),
714                );
715            self.inner.server_streaming(req, path, codec).await
716        }
717        pub async fn validate_source(
718            &mut self,
719            request: impl tonic::IntoRequest<super::ValidateSourceRequest>,
720        ) -> std::result::Result<
721            tonic::Response<super::ValidateSourceResponse>,
722            tonic::Status,
723        > {
724            self.inner
725                .ready()
726                .await
727                .map_err(|e| {
728                    tonic::Status::unknown(
729                        format!("Service was not ready: {}", e.into()),
730                    )
731                })?;
732            let codec = tonic_prost::ProstCodec::default();
733            let path = http::uri::PathAndQuery::from_static(
734                "/connector_service.ConnectorService/ValidateSource",
735            );
736            let mut req = request.into_request();
737            req.extensions_mut()
738                .insert(
739                    GrpcMethod::new(
740                        "connector_service.ConnectorService",
741                        "ValidateSource",
742                    ),
743                );
744            self.inner.unary(req, path, codec).await
745        }
746    }
747}
748/// Generated server implementations.
749pub mod connector_service_server {
750    #![allow(
751        unused_variables,
752        dead_code,
753        missing_docs,
754        clippy::wildcard_imports,
755        clippy::let_unit_value,
756    )]
757    use tonic::codegen::*;
758    /// Generated trait containing gRPC methods that should be implemented for use with ConnectorServiceServer.
759    #[async_trait]
760    pub trait ConnectorService: std::marker::Send + std::marker::Sync + 'static {
761        /// Server streaming response type for the SinkWriterStream method.
762        type SinkWriterStreamStream: tonic::codegen::tokio_stream::Stream<
763                Item = std::result::Result<
764                    super::SinkWriterStreamResponse,
765                    tonic::Status,
766                >,
767            >
768            + std::marker::Send
769            + 'static;
770        async fn sink_writer_stream(
771            &self,
772            request: tonic::Request<tonic::Streaming<super::SinkWriterStreamRequest>>,
773        ) -> std::result::Result<
774            tonic::Response<Self::SinkWriterStreamStream>,
775            tonic::Status,
776        >;
777        /// Server streaming response type for the SinkCoordinatorStream method.
778        type SinkCoordinatorStreamStream: tonic::codegen::tokio_stream::Stream<
779                Item = std::result::Result<
780                    super::SinkCoordinatorStreamResponse,
781                    tonic::Status,
782                >,
783            >
784            + std::marker::Send
785            + 'static;
786        async fn sink_coordinator_stream(
787            &self,
788            request: tonic::Request<
789                tonic::Streaming<super::SinkCoordinatorStreamRequest>,
790            >,
791        ) -> std::result::Result<
792            tonic::Response<Self::SinkCoordinatorStreamStream>,
793            tonic::Status,
794        >;
795        async fn validate_sink(
796            &self,
797            request: tonic::Request<super::ValidateSinkRequest>,
798        ) -> std::result::Result<
799            tonic::Response<super::ValidateSinkResponse>,
800            tonic::Status,
801        >;
802        /// Server streaming response type for the GetEventStream method.
803        type GetEventStreamStream: tonic::codegen::tokio_stream::Stream<
804                Item = std::result::Result<super::GetEventStreamResponse, tonic::Status>,
805            >
806            + std::marker::Send
807            + 'static;
808        async fn get_event_stream(
809            &self,
810            request: tonic::Request<super::GetEventStreamRequest>,
811        ) -> std::result::Result<
812            tonic::Response<Self::GetEventStreamStream>,
813            tonic::Status,
814        >;
815        async fn validate_source(
816            &self,
817            request: tonic::Request<super::ValidateSourceRequest>,
818        ) -> std::result::Result<
819            tonic::Response<super::ValidateSourceResponse>,
820            tonic::Status,
821        >;
822    }
823    #[derive(Debug)]
824    pub struct ConnectorServiceServer<T> {
825        inner: Arc<T>,
826        accept_compression_encodings: EnabledCompressionEncodings,
827        send_compression_encodings: EnabledCompressionEncodings,
828        max_decoding_message_size: Option<usize>,
829        max_encoding_message_size: Option<usize>,
830    }
831    impl<T> ConnectorServiceServer<T> {
832        pub fn new(inner: T) -> Self {
833            Self::from_arc(Arc::new(inner))
834        }
835        pub fn from_arc(inner: Arc<T>) -> Self {
836            Self {
837                inner,
838                accept_compression_encodings: Default::default(),
839                send_compression_encodings: Default::default(),
840                max_decoding_message_size: None,
841                max_encoding_message_size: None,
842            }
843        }
844        pub fn with_interceptor<F>(
845            inner: T,
846            interceptor: F,
847        ) -> InterceptedService<Self, F>
848        where
849            F: tonic::service::Interceptor,
850        {
851            InterceptedService::new(Self::new(inner), interceptor)
852        }
853        /// Enable decompressing requests with the given encoding.
854        #[must_use]
855        pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
856            self.accept_compression_encodings.enable(encoding);
857            self
858        }
859        /// Compress responses with the given encoding, if the client supports it.
860        #[must_use]
861        pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
862            self.send_compression_encodings.enable(encoding);
863            self
864        }
865        /// Limits the maximum size of a decoded message.
866        ///
867        /// Default: `4MB`
868        #[must_use]
869        pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
870            self.max_decoding_message_size = Some(limit);
871            self
872        }
873        /// Limits the maximum size of an encoded message.
874        ///
875        /// Default: `usize::MAX`
876        #[must_use]
877        pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
878            self.max_encoding_message_size = Some(limit);
879            self
880        }
881    }
882    impl<T, B> tonic::codegen::Service<http::Request<B>> for ConnectorServiceServer<T>
883    where
884        T: ConnectorService,
885        B: Body + std::marker::Send + 'static,
886        B::Error: Into<StdError> + std::marker::Send + 'static,
887    {
888        type Response = http::Response<tonic::body::Body>;
889        type Error = std::convert::Infallible;
890        type Future = BoxFuture<Self::Response, Self::Error>;
891        fn poll_ready(
892            &mut self,
893            _cx: &mut Context<'_>,
894        ) -> Poll<std::result::Result<(), Self::Error>> {
895            Poll::Ready(Ok(()))
896        }
897        fn call(&mut self, req: http::Request<B>) -> Self::Future {
898            match req.uri().path() {
899                "/connector_service.ConnectorService/SinkWriterStream" => {
900                    #[allow(non_camel_case_types)]
901                    struct SinkWriterStreamSvc<T: ConnectorService>(pub Arc<T>);
902                    impl<
903                        T: ConnectorService,
904                    > tonic::server::StreamingService<super::SinkWriterStreamRequest>
905                    for SinkWriterStreamSvc<T> {
906                        type Response = super::SinkWriterStreamResponse;
907                        type ResponseStream = T::SinkWriterStreamStream;
908                        type Future = BoxFuture<
909                            tonic::Response<Self::ResponseStream>,
910                            tonic::Status,
911                        >;
912                        fn call(
913                            &mut self,
914                            request: tonic::Request<
915                                tonic::Streaming<super::SinkWriterStreamRequest>,
916                            >,
917                        ) -> Self::Future {
918                            let inner = Arc::clone(&self.0);
919                            let fut = async move {
920                                <T as ConnectorService>::sink_writer_stream(&inner, request)
921                                    .await
922                            };
923                            Box::pin(fut)
924                        }
925                    }
926                    let accept_compression_encodings = self.accept_compression_encodings;
927                    let send_compression_encodings = self.send_compression_encodings;
928                    let max_decoding_message_size = self.max_decoding_message_size;
929                    let max_encoding_message_size = self.max_encoding_message_size;
930                    let inner = self.inner.clone();
931                    let fut = async move {
932                        let method = SinkWriterStreamSvc(inner);
933                        let codec = tonic_prost::ProstCodec::default();
934                        let mut grpc = tonic::server::Grpc::new(codec)
935                            .apply_compression_config(
936                                accept_compression_encodings,
937                                send_compression_encodings,
938                            )
939                            .apply_max_message_size_config(
940                                max_decoding_message_size,
941                                max_encoding_message_size,
942                            );
943                        let res = grpc.streaming(method, req).await;
944                        Ok(res)
945                    };
946                    Box::pin(fut)
947                }
948                "/connector_service.ConnectorService/SinkCoordinatorStream" => {
949                    #[allow(non_camel_case_types)]
950                    struct SinkCoordinatorStreamSvc<T: ConnectorService>(pub Arc<T>);
951                    impl<
952                        T: ConnectorService,
953                    > tonic::server::StreamingService<
954                        super::SinkCoordinatorStreamRequest,
955                    > for SinkCoordinatorStreamSvc<T> {
956                        type Response = super::SinkCoordinatorStreamResponse;
957                        type ResponseStream = T::SinkCoordinatorStreamStream;
958                        type Future = BoxFuture<
959                            tonic::Response<Self::ResponseStream>,
960                            tonic::Status,
961                        >;
962                        fn call(
963                            &mut self,
964                            request: tonic::Request<
965                                tonic::Streaming<super::SinkCoordinatorStreamRequest>,
966                            >,
967                        ) -> Self::Future {
968                            let inner = Arc::clone(&self.0);
969                            let fut = async move {
970                                <T as ConnectorService>::sink_coordinator_stream(
971                                        &inner,
972                                        request,
973                                    )
974                                    .await
975                            };
976                            Box::pin(fut)
977                        }
978                    }
979                    let accept_compression_encodings = self.accept_compression_encodings;
980                    let send_compression_encodings = self.send_compression_encodings;
981                    let max_decoding_message_size = self.max_decoding_message_size;
982                    let max_encoding_message_size = self.max_encoding_message_size;
983                    let inner = self.inner.clone();
984                    let fut = async move {
985                        let method = SinkCoordinatorStreamSvc(inner);
986                        let codec = tonic_prost::ProstCodec::default();
987                        let mut grpc = tonic::server::Grpc::new(codec)
988                            .apply_compression_config(
989                                accept_compression_encodings,
990                                send_compression_encodings,
991                            )
992                            .apply_max_message_size_config(
993                                max_decoding_message_size,
994                                max_encoding_message_size,
995                            );
996                        let res = grpc.streaming(method, req).await;
997                        Ok(res)
998                    };
999                    Box::pin(fut)
1000                }
1001                "/connector_service.ConnectorService/ValidateSink" => {
1002                    #[allow(non_camel_case_types)]
1003                    struct ValidateSinkSvc<T: ConnectorService>(pub Arc<T>);
1004                    impl<
1005                        T: ConnectorService,
1006                    > tonic::server::UnaryService<super::ValidateSinkRequest>
1007                    for ValidateSinkSvc<T> {
1008                        type Response = super::ValidateSinkResponse;
1009                        type Future = BoxFuture<
1010                            tonic::Response<Self::Response>,
1011                            tonic::Status,
1012                        >;
1013                        fn call(
1014                            &mut self,
1015                            request: tonic::Request<super::ValidateSinkRequest>,
1016                        ) -> Self::Future {
1017                            let inner = Arc::clone(&self.0);
1018                            let fut = async move {
1019                                <T as ConnectorService>::validate_sink(&inner, request)
1020                                    .await
1021                            };
1022                            Box::pin(fut)
1023                        }
1024                    }
1025                    let accept_compression_encodings = self.accept_compression_encodings;
1026                    let send_compression_encodings = self.send_compression_encodings;
1027                    let max_decoding_message_size = self.max_decoding_message_size;
1028                    let max_encoding_message_size = self.max_encoding_message_size;
1029                    let inner = self.inner.clone();
1030                    let fut = async move {
1031                        let method = ValidateSinkSvc(inner);
1032                        let codec = tonic_prost::ProstCodec::default();
1033                        let mut grpc = tonic::server::Grpc::new(codec)
1034                            .apply_compression_config(
1035                                accept_compression_encodings,
1036                                send_compression_encodings,
1037                            )
1038                            .apply_max_message_size_config(
1039                                max_decoding_message_size,
1040                                max_encoding_message_size,
1041                            );
1042                        let res = grpc.unary(method, req).await;
1043                        Ok(res)
1044                    };
1045                    Box::pin(fut)
1046                }
1047                "/connector_service.ConnectorService/GetEventStream" => {
1048                    #[allow(non_camel_case_types)]
1049                    struct GetEventStreamSvc<T: ConnectorService>(pub Arc<T>);
1050                    impl<
1051                        T: ConnectorService,
1052                    > tonic::server::ServerStreamingService<super::GetEventStreamRequest>
1053                    for GetEventStreamSvc<T> {
1054                        type Response = super::GetEventStreamResponse;
1055                        type ResponseStream = T::GetEventStreamStream;
1056                        type Future = BoxFuture<
1057                            tonic::Response<Self::ResponseStream>,
1058                            tonic::Status,
1059                        >;
1060                        fn call(
1061                            &mut self,
1062                            request: tonic::Request<super::GetEventStreamRequest>,
1063                        ) -> Self::Future {
1064                            let inner = Arc::clone(&self.0);
1065                            let fut = async move {
1066                                <T as ConnectorService>::get_event_stream(&inner, request)
1067                                    .await
1068                            };
1069                            Box::pin(fut)
1070                        }
1071                    }
1072                    let accept_compression_encodings = self.accept_compression_encodings;
1073                    let send_compression_encodings = self.send_compression_encodings;
1074                    let max_decoding_message_size = self.max_decoding_message_size;
1075                    let max_encoding_message_size = self.max_encoding_message_size;
1076                    let inner = self.inner.clone();
1077                    let fut = async move {
1078                        let method = GetEventStreamSvc(inner);
1079                        let codec = tonic_prost::ProstCodec::default();
1080                        let mut grpc = tonic::server::Grpc::new(codec)
1081                            .apply_compression_config(
1082                                accept_compression_encodings,
1083                                send_compression_encodings,
1084                            )
1085                            .apply_max_message_size_config(
1086                                max_decoding_message_size,
1087                                max_encoding_message_size,
1088                            );
1089                        let res = grpc.server_streaming(method, req).await;
1090                        Ok(res)
1091                    };
1092                    Box::pin(fut)
1093                }
1094                "/connector_service.ConnectorService/ValidateSource" => {
1095                    #[allow(non_camel_case_types)]
1096                    struct ValidateSourceSvc<T: ConnectorService>(pub Arc<T>);
1097                    impl<
1098                        T: ConnectorService,
1099                    > tonic::server::UnaryService<super::ValidateSourceRequest>
1100                    for ValidateSourceSvc<T> {
1101                        type Response = super::ValidateSourceResponse;
1102                        type Future = BoxFuture<
1103                            tonic::Response<Self::Response>,
1104                            tonic::Status,
1105                        >;
1106                        fn call(
1107                            &mut self,
1108                            request: tonic::Request<super::ValidateSourceRequest>,
1109                        ) -> Self::Future {
1110                            let inner = Arc::clone(&self.0);
1111                            let fut = async move {
1112                                <T as ConnectorService>::validate_source(&inner, request)
1113                                    .await
1114                            };
1115                            Box::pin(fut)
1116                        }
1117                    }
1118                    let accept_compression_encodings = self.accept_compression_encodings;
1119                    let send_compression_encodings = self.send_compression_encodings;
1120                    let max_decoding_message_size = self.max_decoding_message_size;
1121                    let max_encoding_message_size = self.max_encoding_message_size;
1122                    let inner = self.inner.clone();
1123                    let fut = async move {
1124                        let method = ValidateSourceSvc(inner);
1125                        let codec = tonic_prost::ProstCodec::default();
1126                        let mut grpc = tonic::server::Grpc::new(codec)
1127                            .apply_compression_config(
1128                                accept_compression_encodings,
1129                                send_compression_encodings,
1130                            )
1131                            .apply_max_message_size_config(
1132                                max_decoding_message_size,
1133                                max_encoding_message_size,
1134                            );
1135                        let res = grpc.unary(method, req).await;
1136                        Ok(res)
1137                    };
1138                    Box::pin(fut)
1139                }
1140                _ => {
1141                    Box::pin(async move {
1142                        let mut response = http::Response::new(
1143                            tonic::body::Body::default(),
1144                        );
1145                        let headers = response.headers_mut();
1146                        headers
1147                            .insert(
1148                                tonic::Status::GRPC_STATUS,
1149                                (tonic::Code::Unimplemented as i32).into(),
1150                            );
1151                        headers
1152                            .insert(
1153                                http::header::CONTENT_TYPE,
1154                                tonic::metadata::GRPC_CONTENT_TYPE,
1155                            );
1156                        Ok(response)
1157                    })
1158                }
1159            }
1160        }
1161    }
1162    impl<T> Clone for ConnectorServiceServer<T> {
1163        fn clone(&self) -> Self {
1164            let inner = self.inner.clone();
1165            Self {
1166                inner,
1167                accept_compression_encodings: self.accept_compression_encodings,
1168                send_compression_encodings: self.send_compression_encodings,
1169                max_decoding_message_size: self.max_decoding_message_size,
1170                max_encoding_message_size: self.max_encoding_message_size,
1171            }
1172        }
1173    }
1174    /// Generated gRPC service name
1175    pub const SERVICE_NAME: &str = "connector_service.ConnectorService";
1176    impl<T> tonic::server::NamedService for ConnectorServiceServer<T> {
1177        const NAME: &'static str = SERVICE_NAME;
1178    }
1179}
1180/// Generated client implementations.
1181pub mod sink_coordination_service_client {
1182    #![allow(
1183        unused_variables,
1184        dead_code,
1185        missing_docs,
1186        clippy::wildcard_imports,
1187        clippy::let_unit_value,
1188    )]
1189    use tonic::codegen::*;
1190    use tonic::codegen::http::Uri;
1191    #[derive(Debug, Clone)]
1192    pub struct SinkCoordinationServiceClient<T> {
1193        inner: tonic::client::Grpc<T>,
1194    }
1195    impl SinkCoordinationServiceClient<tonic::transport::Channel> {
1196        /// Attempt to create a new client by connecting to a given endpoint.
1197        pub async fn connect<D>(dst: D) -> Result<Self, tonic::transport::Error>
1198        where
1199            D: TryInto<tonic::transport::Endpoint>,
1200            D::Error: Into<StdError>,
1201        {
1202            let conn = tonic::transport::Endpoint::new(dst)?.connect().await?;
1203            Ok(Self::new(conn))
1204        }
1205    }
1206    impl<T> SinkCoordinationServiceClient<T>
1207    where
1208        T: tonic::client::GrpcService<tonic::body::Body>,
1209        T::Error: Into<StdError>,
1210        T::ResponseBody: Body<Data = Bytes> + std::marker::Send + 'static,
1211        <T::ResponseBody as Body>::Error: Into<StdError> + std::marker::Send,
1212    {
1213        pub fn new(inner: T) -> Self {
1214            let inner = tonic::client::Grpc::new(inner);
1215            Self { inner }
1216        }
1217        pub fn with_origin(inner: T, origin: Uri) -> Self {
1218            let inner = tonic::client::Grpc::with_origin(inner, origin);
1219            Self { inner }
1220        }
1221        pub fn with_interceptor<F>(
1222            inner: T,
1223            interceptor: F,
1224        ) -> SinkCoordinationServiceClient<InterceptedService<T, F>>
1225        where
1226            F: tonic::service::Interceptor,
1227            T::ResponseBody: Default,
1228            T: tonic::codegen::Service<
1229                http::Request<tonic::body::Body>,
1230                Response = http::Response<
1231                    <T as tonic::client::GrpcService<tonic::body::Body>>::ResponseBody,
1232                >,
1233            >,
1234            <T as tonic::codegen::Service<
1235                http::Request<tonic::body::Body>,
1236            >>::Error: Into<StdError> + std::marker::Send + std::marker::Sync,
1237        {
1238            SinkCoordinationServiceClient::new(
1239                InterceptedService::new(inner, interceptor),
1240            )
1241        }
1242        /// Compress requests with the given encoding.
1243        ///
1244        /// This requires the server to support it otherwise it might respond with an
1245        /// error.
1246        #[must_use]
1247        pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
1248            self.inner = self.inner.send_compressed(encoding);
1249            self
1250        }
1251        /// Enable decompressing responses.
1252        #[must_use]
1253        pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
1254            self.inner = self.inner.accept_compressed(encoding);
1255            self
1256        }
1257        /// Limits the maximum size of a decoded message.
1258        ///
1259        /// Default: `4MB`
1260        #[must_use]
1261        pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
1262            self.inner = self.inner.max_decoding_message_size(limit);
1263            self
1264        }
1265        /// Limits the maximum size of an encoded message.
1266        ///
1267        /// Default: `usize::MAX`
1268        #[must_use]
1269        pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
1270            self.inner = self.inner.max_encoding_message_size(limit);
1271            self
1272        }
1273        pub async fn coordinate(
1274            &mut self,
1275            request: impl tonic::IntoStreamingRequest<Message = super::CoordinateRequest>,
1276        ) -> std::result::Result<
1277            tonic::Response<tonic::codec::Streaming<super::CoordinateResponse>>,
1278            tonic::Status,
1279        > {
1280            self.inner
1281                .ready()
1282                .await
1283                .map_err(|e| {
1284                    tonic::Status::unknown(
1285                        format!("Service was not ready: {}", e.into()),
1286                    )
1287                })?;
1288            let codec = tonic_prost::ProstCodec::default();
1289            let path = http::uri::PathAndQuery::from_static(
1290                "/connector_service.SinkCoordinationService/Coordinate",
1291            );
1292            let mut req = request.into_streaming_request();
1293            req.extensions_mut()
1294                .insert(
1295                    GrpcMethod::new(
1296                        "connector_service.SinkCoordinationService",
1297                        "Coordinate",
1298                    ),
1299                );
1300            self.inner.streaming(req, path, codec).await
1301        }
1302    }
1303}
1304/// Generated server implementations.
1305pub mod sink_coordination_service_server {
1306    #![allow(
1307        unused_variables,
1308        dead_code,
1309        missing_docs,
1310        clippy::wildcard_imports,
1311        clippy::let_unit_value,
1312    )]
1313    use tonic::codegen::*;
1314    /// Generated trait containing gRPC methods that should be implemented for use with SinkCoordinationServiceServer.
1315    #[async_trait]
1316    pub trait SinkCoordinationService: std::marker::Send + std::marker::Sync + 'static {
1317        /// Server streaming response type for the Coordinate method.
1318        type CoordinateStream: tonic::codegen::tokio_stream::Stream<
1319                Item = std::result::Result<super::CoordinateResponse, tonic::Status>,
1320            >
1321            + std::marker::Send
1322            + 'static;
1323        async fn coordinate(
1324            &self,
1325            request: tonic::Request<tonic::Streaming<super::CoordinateRequest>>,
1326        ) -> std::result::Result<tonic::Response<Self::CoordinateStream>, tonic::Status>;
1327    }
1328    #[derive(Debug)]
1329    pub struct SinkCoordinationServiceServer<T> {
1330        inner: Arc<T>,
1331        accept_compression_encodings: EnabledCompressionEncodings,
1332        send_compression_encodings: EnabledCompressionEncodings,
1333        max_decoding_message_size: Option<usize>,
1334        max_encoding_message_size: Option<usize>,
1335    }
1336    impl<T> SinkCoordinationServiceServer<T> {
1337        pub fn new(inner: T) -> Self {
1338            Self::from_arc(Arc::new(inner))
1339        }
1340        pub fn from_arc(inner: Arc<T>) -> Self {
1341            Self {
1342                inner,
1343                accept_compression_encodings: Default::default(),
1344                send_compression_encodings: Default::default(),
1345                max_decoding_message_size: None,
1346                max_encoding_message_size: None,
1347            }
1348        }
1349        pub fn with_interceptor<F>(
1350            inner: T,
1351            interceptor: F,
1352        ) -> InterceptedService<Self, F>
1353        where
1354            F: tonic::service::Interceptor,
1355        {
1356            InterceptedService::new(Self::new(inner), interceptor)
1357        }
1358        /// Enable decompressing requests with the given encoding.
1359        #[must_use]
1360        pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
1361            self.accept_compression_encodings.enable(encoding);
1362            self
1363        }
1364        /// Compress responses with the given encoding, if the client supports it.
1365        #[must_use]
1366        pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
1367            self.send_compression_encodings.enable(encoding);
1368            self
1369        }
1370        /// Limits the maximum size of a decoded message.
1371        ///
1372        /// Default: `4MB`
1373        #[must_use]
1374        pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
1375            self.max_decoding_message_size = Some(limit);
1376            self
1377        }
1378        /// Limits the maximum size of an encoded message.
1379        ///
1380        /// Default: `usize::MAX`
1381        #[must_use]
1382        pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
1383            self.max_encoding_message_size = Some(limit);
1384            self
1385        }
1386    }
1387    impl<T, B> tonic::codegen::Service<http::Request<B>>
1388    for SinkCoordinationServiceServer<T>
1389    where
1390        T: SinkCoordinationService,
1391        B: Body + std::marker::Send + 'static,
1392        B::Error: Into<StdError> + std::marker::Send + 'static,
1393    {
1394        type Response = http::Response<tonic::body::Body>;
1395        type Error = std::convert::Infallible;
1396        type Future = BoxFuture<Self::Response, Self::Error>;
1397        fn poll_ready(
1398            &mut self,
1399            _cx: &mut Context<'_>,
1400        ) -> Poll<std::result::Result<(), Self::Error>> {
1401            Poll::Ready(Ok(()))
1402        }
1403        fn call(&mut self, req: http::Request<B>) -> Self::Future {
1404            match req.uri().path() {
1405                "/connector_service.SinkCoordinationService/Coordinate" => {
1406                    #[allow(non_camel_case_types)]
1407                    struct CoordinateSvc<T: SinkCoordinationService>(pub Arc<T>);
1408                    impl<
1409                        T: SinkCoordinationService,
1410                    > tonic::server::StreamingService<super::CoordinateRequest>
1411                    for CoordinateSvc<T> {
1412                        type Response = super::CoordinateResponse;
1413                        type ResponseStream = T::CoordinateStream;
1414                        type Future = BoxFuture<
1415                            tonic::Response<Self::ResponseStream>,
1416                            tonic::Status,
1417                        >;
1418                        fn call(
1419                            &mut self,
1420                            request: tonic::Request<
1421                                tonic::Streaming<super::CoordinateRequest>,
1422                            >,
1423                        ) -> Self::Future {
1424                            let inner = Arc::clone(&self.0);
1425                            let fut = async move {
1426                                <T as SinkCoordinationService>::coordinate(&inner, request)
1427                                    .await
1428                            };
1429                            Box::pin(fut)
1430                        }
1431                    }
1432                    let accept_compression_encodings = self.accept_compression_encodings;
1433                    let send_compression_encodings = self.send_compression_encodings;
1434                    let max_decoding_message_size = self.max_decoding_message_size;
1435                    let max_encoding_message_size = self.max_encoding_message_size;
1436                    let inner = self.inner.clone();
1437                    let fut = async move {
1438                        let method = CoordinateSvc(inner);
1439                        let codec = tonic_prost::ProstCodec::default();
1440                        let mut grpc = tonic::server::Grpc::new(codec)
1441                            .apply_compression_config(
1442                                accept_compression_encodings,
1443                                send_compression_encodings,
1444                            )
1445                            .apply_max_message_size_config(
1446                                max_decoding_message_size,
1447                                max_encoding_message_size,
1448                            );
1449                        let res = grpc.streaming(method, req).await;
1450                        Ok(res)
1451                    };
1452                    Box::pin(fut)
1453                }
1454                _ => {
1455                    Box::pin(async move {
1456                        let mut response = http::Response::new(
1457                            tonic::body::Body::default(),
1458                        );
1459                        let headers = response.headers_mut();
1460                        headers
1461                            .insert(
1462                                tonic::Status::GRPC_STATUS,
1463                                (tonic::Code::Unimplemented as i32).into(),
1464                            );
1465                        headers
1466                            .insert(
1467                                http::header::CONTENT_TYPE,
1468                                tonic::metadata::GRPC_CONTENT_TYPE,
1469                            );
1470                        Ok(response)
1471                    })
1472                }
1473            }
1474        }
1475    }
1476    impl<T> Clone for SinkCoordinationServiceServer<T> {
1477        fn clone(&self) -> Self {
1478            let inner = self.inner.clone();
1479            Self {
1480                inner,
1481                accept_compression_encodings: self.accept_compression_encodings,
1482                send_compression_encodings: self.send_compression_encodings,
1483                max_decoding_message_size: self.max_decoding_message_size,
1484                max_encoding_message_size: self.max_encoding_message_size,
1485            }
1486        }
1487    }
1488    /// Generated gRPC service name
1489    pub const SERVICE_NAME: &str = "connector_service.SinkCoordinationService";
1490    impl<T> tonic::server::NamedService for SinkCoordinationServiceServer<T> {
1491        const NAME: &'static str = SERVICE_NAME;
1492    }
1493}