1#[derive(prost_helpers::AnyPB)]
3#[derive(Clone, PartialEq, ::prost::Message)]
4pub struct TableSchema {
5 #[prost(message, repeated, tag = "1")]
6 pub columns: ::prost::alloc::vec::Vec<super::plan_common::ColumnDesc>,
7 #[prost(uint32, repeated, tag = "2")]
8 pub pk_indices: ::prost::alloc::vec::Vec<u32>,
9}
10#[derive(prost_helpers::AnyPB)]
11#[derive(Clone, PartialEq, ::prost::Message)]
12pub struct ValidationError {
13 #[prost(string, tag = "1")]
14 pub error_message: ::prost::alloc::string::String,
15}
16#[derive(prost_helpers::AnyPB)]
17#[derive(Clone, PartialEq, ::prost::Message)]
18pub struct SinkParam {
19 #[prost(uint32, tag = "1", wrapper = "crate::id::SinkId")]
20 pub sink_id: crate::id::SinkId,
21 #[prost(btree_map = "string, string", tag = "2")]
22 pub properties: ::prost::alloc::collections::BTreeMap<
23 ::prost::alloc::string::String,
24 ::prost::alloc::string::String,
25 >,
26 #[prost(message, optional, tag = "3")]
27 pub table_schema: ::core::option::Option<TableSchema>,
28 #[prost(enumeration = "super::catalog::SinkType", tag = "4")]
30 pub sink_type: i32,
31 #[prost(string, tag = "5")]
32 pub db_name: ::prost::alloc::string::String,
33 #[prost(string, tag = "6")]
34 pub sink_from_name: ::prost::alloc::string::String,
35 #[prost(message, optional, tag = "7")]
36 pub format_desc: ::core::option::Option<super::catalog::SinkFormatDesc>,
37 #[prost(string, tag = "8")]
38 pub sink_name: ::prost::alloc::string::String,
39 #[prost(bool, tag = "9")]
43 pub raw_ignore_delete: bool,
44}
45#[derive(prost_helpers::AnyPB)]
46#[derive(Clone, PartialEq, ::prost::Message)]
47pub struct SinkWriterStreamRequest {
48 #[prost(oneof = "sink_writer_stream_request::Request", tags = "1, 3, 4")]
49 pub request: ::core::option::Option<sink_writer_stream_request::Request>,
50}
51pub mod sink_writer_stream_request {
53 #[derive(prost_helpers::AnyPB)]
54 #[derive(Clone, PartialEq, ::prost::Message)]
55 pub struct StartSink {
56 #[prost(message, optional, tag = "1")]
57 pub sink_param: ::core::option::Option<super::SinkParam>,
58 #[prost(message, optional, tag = "3")]
59 pub payload_schema: ::core::option::Option<super::TableSchema>,
60 }
61 #[derive(prost_helpers::AnyPB)]
62 #[derive(Clone, PartialEq, ::prost::Message)]
63 pub struct WriteBatch {
64 #[prost(uint64, tag = "3")]
65 pub batch_id: u64,
66 #[prost(uint64, tag = "4")]
67 pub epoch: u64,
68 #[prost(oneof = "write_batch::Payload", tags = "2, 5")]
69 pub payload: ::core::option::Option<write_batch::Payload>,
70 }
71 pub mod write_batch {
73 #[derive(prost_helpers::AnyPB)]
74 #[derive(Clone, PartialEq, ::prost::Message)]
75 pub struct StreamChunkPayload {
76 #[prost(bytes = "vec", tag = "1")]
77 pub binary_data: ::prost::alloc::vec::Vec<u8>,
78 }
79 #[derive(prost_helpers::AnyPB)]
80 #[derive(Clone, PartialEq, ::prost::Oneof)]
81 pub enum Payload {
82 #[prost(message, tag = "2")]
83 StreamChunkPayload(StreamChunkPayload),
84 #[prost(int64, tag = "5")]
88 StreamChunkRefPointer(i64),
89 }
90 }
91 #[derive(prost_helpers::AnyPB)]
92 #[derive(Clone, Copy, PartialEq, ::prost::Message)]
93 pub struct Barrier {
94 #[prost(uint64, tag = "1")]
95 pub epoch: u64,
96 #[prost(bool, tag = "2")]
97 pub is_checkpoint: bool,
98 }
99 #[derive(prost_helpers::AnyPB)]
100 #[derive(Clone, PartialEq, ::prost::Oneof)]
101 pub enum Request {
102 #[prost(message, tag = "1")]
103 Start(StartSink),
104 #[prost(message, tag = "3")]
105 WriteBatch(WriteBatch),
106 #[prost(message, tag = "4")]
107 Barrier(Barrier),
108 }
109}
110#[derive(prost_helpers::AnyPB)]
111#[derive(Clone, PartialEq, ::prost::Message)]
112pub struct SinkWriterStreamResponse {
113 #[prost(oneof = "sink_writer_stream_response::Response", tags = "1, 2, 3")]
114 pub response: ::core::option::Option<sink_writer_stream_response::Response>,
115}
116pub mod sink_writer_stream_response {
118 #[derive(prost_helpers::AnyPB)]
119 #[derive(Clone, Copy, PartialEq, ::prost::Message)]
120 pub struct StartResponse {}
121 #[derive(prost_helpers::AnyPB)]
122 #[derive(Clone, PartialEq, ::prost::Message)]
123 pub struct CommitResponse {
124 #[prost(uint64, tag = "1")]
125 pub epoch: u64,
126 #[prost(message, optional, tag = "2")]
127 pub metadata: ::core::option::Option<super::SinkMetadata>,
128 }
129 #[derive(prost_helpers::AnyPB)]
130 #[derive(Clone, Copy, PartialEq, ::prost::Message)]
131 pub struct BatchWrittenResponse {
132 #[prost(uint64, tag = "1")]
133 pub epoch: u64,
134 #[prost(uint64, tag = "2")]
135 pub batch_id: u64,
136 }
137 #[derive(prost_helpers::AnyPB)]
138 #[derive(Clone, PartialEq, ::prost::Oneof)]
139 pub enum Response {
140 #[prost(message, tag = "1")]
141 Start(StartResponse),
142 #[prost(message, tag = "2")]
143 Commit(CommitResponse),
144 #[prost(message, tag = "3")]
145 Batch(BatchWrittenResponse),
146 }
147}
148#[derive(prost_helpers::AnyPB)]
149#[derive(Clone, PartialEq, ::prost::Message)]
150pub struct ValidateSinkRequest {
151 #[prost(message, optional, tag = "1")]
152 pub sink_param: ::core::option::Option<SinkParam>,
153}
154#[derive(prost_helpers::AnyPB)]
155#[derive(Clone, PartialEq, ::prost::Message)]
156pub struct ValidateSinkResponse {
157 #[prost(message, optional, tag = "1")]
159 pub error: ::core::option::Option<ValidationError>,
160}
161#[derive(prost_helpers::AnyPB)]
162#[derive(Clone, PartialEq, ::prost::Message)]
163pub struct SinkMetadata {
164 #[prost(oneof = "sink_metadata::Metadata", tags = "1")]
165 pub metadata: ::core::option::Option<sink_metadata::Metadata>,
166}
167pub mod sink_metadata {
169 #[derive(prost_helpers::AnyPB)]
170 #[derive(Clone, PartialEq, ::prost::Message)]
171 pub struct SerializedMetadata {
172 #[prost(bytes = "vec", tag = "1")]
173 pub metadata: ::prost::alloc::vec::Vec<u8>,
174 }
175 #[derive(prost_helpers::AnyPB)]
176 #[derive(Clone, PartialEq, ::prost::Oneof)]
177 pub enum Metadata {
178 #[prost(message, tag = "1")]
179 Serialized(SerializedMetadata),
180 }
181}
182#[derive(prost_helpers::AnyPB)]
183#[derive(Clone, PartialEq, ::prost::Message)]
184pub struct SinkCoordinatorStreamRequest {
185 #[prost(oneof = "sink_coordinator_stream_request::Request", tags = "1, 2")]
186 pub request: ::core::option::Option<sink_coordinator_stream_request::Request>,
187}
188pub mod sink_coordinator_stream_request {
190 #[derive(prost_helpers::AnyPB)]
191 #[derive(Clone, PartialEq, ::prost::Message)]
192 pub struct StartCoordinator {
193 #[prost(message, optional, tag = "1")]
194 pub param: ::core::option::Option<super::SinkParam>,
195 }
196 #[derive(prost_helpers::AnyPB)]
197 #[derive(Clone, PartialEq, ::prost::Message)]
198 pub struct CommitMetadata {
199 #[prost(uint64, tag = "1")]
200 pub epoch: u64,
201 #[prost(message, repeated, tag = "2")]
202 pub metadata: ::prost::alloc::vec::Vec<super::SinkMetadata>,
203 }
204 #[derive(prost_helpers::AnyPB)]
205 #[derive(Clone, PartialEq, ::prost::Oneof)]
206 pub enum Request {
207 #[prost(message, tag = "1")]
208 Start(StartCoordinator),
209 #[prost(message, tag = "2")]
210 Commit(CommitMetadata),
211 }
212}
213#[derive(prost_helpers::AnyPB)]
214#[derive(Clone, Copy, PartialEq, ::prost::Message)]
215pub struct SinkCoordinatorStreamResponse {
216 #[prost(oneof = "sink_coordinator_stream_response::Response", tags = "1, 2")]
217 pub response: ::core::option::Option<sink_coordinator_stream_response::Response>,
218}
219pub mod sink_coordinator_stream_response {
221 #[derive(prost_helpers::AnyPB)]
222 #[derive(Clone, Copy, PartialEq, ::prost::Message)]
223 pub struct StartResponse {}
224 #[derive(prost_helpers::AnyPB)]
225 #[derive(Clone, Copy, PartialEq, ::prost::Message)]
226 pub struct CommitResponse {
227 #[prost(uint64, tag = "1")]
228 pub epoch: u64,
229 }
230 #[derive(prost_helpers::AnyPB)]
231 #[derive(Clone, Copy, PartialEq, ::prost::Oneof)]
232 pub enum Response {
233 #[prost(message, tag = "1")]
234 Start(StartResponse),
235 #[prost(message, tag = "2")]
236 Commit(CommitResponse),
237 }
238}
239#[derive(prost_helpers::AnyPB)]
240#[derive(Clone, PartialEq, ::prost::Message)]
241pub struct CdcMessage {
242 #[prost(string, tag = "1")]
244 pub payload: ::prost::alloc::string::String,
245 #[prost(string, tag = "2")]
246 pub partition: ::prost::alloc::string::String,
247 #[prost(string, tag = "3")]
248 pub offset: ::prost::alloc::string::String,
249 #[prost(string, tag = "4")]
250 pub full_table_name: ::prost::alloc::string::String,
251 #[prost(int64, tag = "5")]
252 pub source_ts_ms: i64,
253 #[prost(enumeration = "cdc_message::CdcMessageType", tag = "6")]
254 pub msg_type: i32,
255 #[prost(string, tag = "7")]
257 pub key: ::prost::alloc::string::String,
258 #[prost(enumeration = "SourceType", tag = "8")]
261 pub source_type: i32,
262}
263pub mod cdc_message {
265 #[derive(prost_helpers::AnyPB)]
266 #[derive(
267 Clone,
268 Copy,
269 Debug,
270 PartialEq,
271 Eq,
272 Hash,
273 PartialOrd,
274 Ord,
275 ::prost::Enumeration
276 )]
277 #[repr(i32)]
278 pub enum CdcMessageType {
279 Unspecified = 0,
280 Heartbeat = 1,
281 Data = 2,
282 TransactionMeta = 3,
283 SchemaChange = 4,
284 }
285 impl CdcMessageType {
286 pub fn as_str_name(&self) -> &'static str {
291 match self {
292 Self::Unspecified => "UNSPECIFIED",
293 Self::Heartbeat => "HEARTBEAT",
294 Self::Data => "DATA",
295 Self::TransactionMeta => "TRANSACTION_META",
296 Self::SchemaChange => "SCHEMA_CHANGE",
297 }
298 }
299 pub fn from_str_name(value: &str) -> ::core::option::Option<Self> {
301 match value {
302 "UNSPECIFIED" => Some(Self::Unspecified),
303 "HEARTBEAT" => Some(Self::Heartbeat),
304 "DATA" => Some(Self::Data),
305 "TRANSACTION_META" => Some(Self::TransactionMeta),
306 "SCHEMA_CHANGE" => Some(Self::SchemaChange),
307 _ => None,
308 }
309 }
310 }
311}
312#[derive(prost_helpers::AnyPB)]
313#[derive(Clone, PartialEq, ::prost::Message)]
314pub struct GetEventStreamRequest {
315 #[prost(uint64, tag = "1")]
316 pub source_id: u64,
317 #[prost(enumeration = "SourceType", tag = "2")]
318 pub source_type: i32,
319 #[prost(string, tag = "3")]
320 pub start_offset: ::prost::alloc::string::String,
321 #[prost(btree_map = "string, string", tag = "4")]
322 pub properties: ::prost::alloc::collections::BTreeMap<
323 ::prost::alloc::string::String,
324 ::prost::alloc::string::String,
325 >,
326 #[prost(bool, tag = "5")]
327 pub snapshot_done: bool,
328 #[prost(bool, tag = "6")]
329 pub is_source_job: bool,
330}
331#[derive(prost_helpers::AnyPB)]
332#[derive(Clone, PartialEq, ::prost::Message)]
333pub struct GetEventStreamResponse {
334 #[prost(uint64, tag = "1")]
335 pub source_id: u64,
336 #[prost(message, repeated, tag = "2")]
337 pub events: ::prost::alloc::vec::Vec<CdcMessage>,
338 #[prost(message, optional, tag = "3")]
339 pub control: ::core::option::Option<get_event_stream_response::ControlInfo>,
340}
341pub mod get_event_stream_response {
343 #[derive(prost_helpers::AnyPB)]
344 #[derive(Clone, Copy, PartialEq, ::prost::Message)]
345 pub struct ControlInfo {
346 #[prost(bool, tag = "1")]
347 pub handshake_ok: bool,
348 }
349}
350#[derive(prost_helpers::AnyPB)]
351#[derive(Clone, PartialEq, ::prost::Message)]
352pub struct ValidateSourceRequest {
353 #[prost(uint64, tag = "1")]
354 pub source_id: u64,
355 #[prost(enumeration = "SourceType", tag = "2")]
356 pub source_type: i32,
357 #[prost(btree_map = "string, string", tag = "3")]
358 pub properties: ::prost::alloc::collections::BTreeMap<
359 ::prost::alloc::string::String,
360 ::prost::alloc::string::String,
361 >,
362 #[prost(message, optional, tag = "4")]
363 pub table_schema: ::core::option::Option<TableSchema>,
364 #[prost(bool, tag = "5")]
365 pub is_source_job: bool,
366 #[prost(bool, tag = "6")]
367 pub is_backfill_table: bool,
368}
369#[derive(prost_helpers::AnyPB)]
370#[derive(Clone, PartialEq, ::prost::Message)]
371pub struct ValidateSourceResponse {
372 #[prost(message, optional, tag = "1")]
374 pub error: ::core::option::Option<ValidationError>,
375}
376#[derive(prost_helpers::AnyPB)]
377#[derive(Clone, PartialEq, ::prost::Message)]
378pub struct CoordinateRequest {
379 #[prost(oneof = "coordinate_request::Msg", tags = "1, 2, 3, 4, 5")]
380 pub msg: ::core::option::Option<coordinate_request::Msg>,
381}
382pub mod coordinate_request {
384 #[derive(prost_helpers::AnyPB)]
387 #[derive(Clone, PartialEq, ::prost::Message)]
388 pub struct StartCoordinationRequest {
389 #[prost(message, optional, tag = "1")]
390 pub vnode_bitmap: ::core::option::Option<super::super::common::Buffer>,
391 #[prost(message, optional, tag = "2")]
392 pub param: ::core::option::Option<super::SinkParam>,
393 }
394 #[derive(prost_helpers::AnyPB)]
395 #[derive(Clone, PartialEq, ::prost::Message)]
396 pub struct CommitRequest {
397 #[prost(uint64, tag = "1")]
398 pub epoch: u64,
399 #[prost(message, optional, tag = "2")]
400 pub metadata: ::core::option::Option<super::SinkMetadata>,
401 #[prost(message, optional, tag = "3")]
402 pub schema_change: ::core::option::Option<
403 super::super::stream_plan::SinkSchemaChange,
404 >,
405 }
406 #[derive(prost_helpers::AnyPB)]
407 #[derive(Clone, PartialEq, ::prost::Message)]
408 pub struct UpdateVnodeBitmapRequest {
409 #[prost(message, optional, tag = "1")]
410 pub vnode_bitmap: ::core::option::Option<super::super::common::Buffer>,
411 }
412 #[derive(prost_helpers::AnyPB)]
413 #[derive(Clone, PartialEq, ::prost::Oneof)]
414 pub enum Msg {
415 #[prost(message, tag = "1")]
416 StartRequest(StartCoordinationRequest),
417 #[prost(message, tag = "2")]
418 CommitRequest(CommitRequest),
419 #[prost(message, tag = "3")]
420 UpdateVnodeRequest(UpdateVnodeBitmapRequest),
421 #[prost(bool, tag = "4")]
422 Stop(bool),
423 #[prost(uint64, tag = "5")]
424 AlignInitialEpochRequest(u64),
425 }
426}
427#[derive(prost_helpers::AnyPB)]
428#[derive(Clone, Copy, PartialEq, ::prost::Message)]
429pub struct CoordinateResponse {
430 #[prost(oneof = "coordinate_response::Msg", tags = "1, 2, 3, 4")]
431 pub msg: ::core::option::Option<coordinate_response::Msg>,
432}
433pub mod coordinate_response {
435 #[derive(prost_helpers::AnyPB)]
436 #[derive(Clone, Copy, PartialEq, ::prost::Message)]
437 pub struct StartCoordinationResponse {
438 #[prost(uint64, optional, tag = "1")]
439 pub log_store_rewind_start_epoch: ::core::option::Option<u64>,
440 }
441 #[derive(prost_helpers::AnyPB)]
442 #[derive(Clone, Copy, PartialEq, ::prost::Message)]
443 pub struct CommitResponse {
444 #[prost(uint64, tag = "1")]
445 pub epoch: u64,
446 }
447 #[derive(prost_helpers::AnyPB)]
448 #[derive(Clone, Copy, PartialEq, ::prost::Oneof)]
449 pub enum Msg {
450 #[prost(message, tag = "1")]
451 StartResponse(StartCoordinationResponse),
452 #[prost(message, tag = "2")]
453 CommitResponse(CommitResponse),
454 #[prost(bool, tag = "3")]
455 Stopped(bool),
456 #[prost(uint64, tag = "4")]
457 AlignInitialEpochResponse(u64),
458 }
459}
460#[derive(prost_helpers::AnyPB)]
461#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)]
462#[repr(i32)]
463pub enum SourceType {
464 Unspecified = 0,
465 Mysql = 1,
466 Postgres = 2,
467 Citus = 3,
468 Mongodb = 4,
469 SqlServer = 5,
470}
471impl SourceType {
472 pub fn as_str_name(&self) -> &'static str {
477 match self {
478 Self::Unspecified => "UNSPECIFIED",
479 Self::Mysql => "MYSQL",
480 Self::Postgres => "POSTGRES",
481 Self::Citus => "CITUS",
482 Self::Mongodb => "MONGODB",
483 Self::SqlServer => "SQL_SERVER",
484 }
485 }
486 pub fn from_str_name(value: &str) -> ::core::option::Option<Self> {
488 match value {
489 "UNSPECIFIED" => Some(Self::Unspecified),
490 "MYSQL" => Some(Self::Mysql),
491 "POSTGRES" => Some(Self::Postgres),
492 "CITUS" => Some(Self::Citus),
493 "MONGODB" => Some(Self::Mongodb),
494 "SQL_SERVER" => Some(Self::SqlServer),
495 _ => None,
496 }
497 }
498}
499pub mod connector_service_client {
501 #![allow(
502 unused_variables,
503 dead_code,
504 missing_docs,
505 clippy::wildcard_imports,
506 clippy::let_unit_value,
507 )]
508 use tonic::codegen::*;
509 use tonic::codegen::http::Uri;
510 #[derive(Debug, Clone)]
511 pub struct ConnectorServiceClient<T> {
512 inner: tonic::client::Grpc<T>,
513 }
514 impl ConnectorServiceClient<tonic::transport::Channel> {
515 pub async fn connect<D>(dst: D) -> Result<Self, tonic::transport::Error>
517 where
518 D: TryInto<tonic::transport::Endpoint>,
519 D::Error: Into<StdError>,
520 {
521 let conn = tonic::transport::Endpoint::new(dst)?.connect().await?;
522 Ok(Self::new(conn))
523 }
524 }
525 impl<T> ConnectorServiceClient<T>
526 where
527 T: tonic::client::GrpcService<tonic::body::BoxBody>,
528 T::Error: Into<StdError>,
529 T::ResponseBody: Body<Data = Bytes> + std::marker::Send + 'static,
530 <T::ResponseBody as Body>::Error: Into<StdError> + std::marker::Send,
531 {
532 pub fn new(inner: T) -> Self {
533 let inner = tonic::client::Grpc::new(inner);
534 Self { inner }
535 }
536 pub fn with_origin(inner: T, origin: Uri) -> Self {
537 let inner = tonic::client::Grpc::with_origin(inner, origin);
538 Self { inner }
539 }
540 pub fn with_interceptor<F>(
541 inner: T,
542 interceptor: F,
543 ) -> ConnectorServiceClient<InterceptedService<T, F>>
544 where
545 F: tonic::service::Interceptor,
546 T::ResponseBody: Default,
547 T: tonic::codegen::Service<
548 http::Request<tonic::body::BoxBody>,
549 Response = http::Response<
550 <T as tonic::client::GrpcService<tonic::body::BoxBody>>::ResponseBody,
551 >,
552 >,
553 <T as tonic::codegen::Service<
554 http::Request<tonic::body::BoxBody>,
555 >>::Error: Into<StdError> + std::marker::Send + std::marker::Sync,
556 {
557 ConnectorServiceClient::new(InterceptedService::new(inner, interceptor))
558 }
559 #[must_use]
564 pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
565 self.inner = self.inner.send_compressed(encoding);
566 self
567 }
568 #[must_use]
570 pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
571 self.inner = self.inner.accept_compressed(encoding);
572 self
573 }
574 #[must_use]
578 pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
579 self.inner = self.inner.max_decoding_message_size(limit);
580 self
581 }
582 #[must_use]
586 pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
587 self.inner = self.inner.max_encoding_message_size(limit);
588 self
589 }
590 pub async fn sink_writer_stream(
591 &mut self,
592 request: impl tonic::IntoStreamingRequest<
593 Message = super::SinkWriterStreamRequest,
594 >,
595 ) -> std::result::Result<
596 tonic::Response<tonic::codec::Streaming<super::SinkWriterStreamResponse>>,
597 tonic::Status,
598 > {
599 self.inner
600 .ready()
601 .await
602 .map_err(|e| {
603 tonic::Status::unknown(
604 format!("Service was not ready: {}", e.into()),
605 )
606 })?;
607 let codec = tonic::codec::ProstCodec::default();
608 let path = http::uri::PathAndQuery::from_static(
609 "/connector_service.ConnectorService/SinkWriterStream",
610 );
611 let mut req = request.into_streaming_request();
612 req.extensions_mut()
613 .insert(
614 GrpcMethod::new(
615 "connector_service.ConnectorService",
616 "SinkWriterStream",
617 ),
618 );
619 self.inner.streaming(req, path, codec).await
620 }
621 pub async fn sink_coordinator_stream(
622 &mut self,
623 request: impl tonic::IntoStreamingRequest<
624 Message = super::SinkCoordinatorStreamRequest,
625 >,
626 ) -> std::result::Result<
627 tonic::Response<
628 tonic::codec::Streaming<super::SinkCoordinatorStreamResponse>,
629 >,
630 tonic::Status,
631 > {
632 self.inner
633 .ready()
634 .await
635 .map_err(|e| {
636 tonic::Status::unknown(
637 format!("Service was not ready: {}", e.into()),
638 )
639 })?;
640 let codec = tonic::codec::ProstCodec::default();
641 let path = http::uri::PathAndQuery::from_static(
642 "/connector_service.ConnectorService/SinkCoordinatorStream",
643 );
644 let mut req = request.into_streaming_request();
645 req.extensions_mut()
646 .insert(
647 GrpcMethod::new(
648 "connector_service.ConnectorService",
649 "SinkCoordinatorStream",
650 ),
651 );
652 self.inner.streaming(req, path, codec).await
653 }
654 pub async fn validate_sink(
655 &mut self,
656 request: impl tonic::IntoRequest<super::ValidateSinkRequest>,
657 ) -> std::result::Result<
658 tonic::Response<super::ValidateSinkResponse>,
659 tonic::Status,
660 > {
661 self.inner
662 .ready()
663 .await
664 .map_err(|e| {
665 tonic::Status::unknown(
666 format!("Service was not ready: {}", e.into()),
667 )
668 })?;
669 let codec = tonic::codec::ProstCodec::default();
670 let path = http::uri::PathAndQuery::from_static(
671 "/connector_service.ConnectorService/ValidateSink",
672 );
673 let mut req = request.into_request();
674 req.extensions_mut()
675 .insert(
676 GrpcMethod::new("connector_service.ConnectorService", "ValidateSink"),
677 );
678 self.inner.unary(req, path, codec).await
679 }
680 pub async fn get_event_stream(
681 &mut self,
682 request: impl tonic::IntoRequest<super::GetEventStreamRequest>,
683 ) -> std::result::Result<
684 tonic::Response<tonic::codec::Streaming<super::GetEventStreamResponse>>,
685 tonic::Status,
686 > {
687 self.inner
688 .ready()
689 .await
690 .map_err(|e| {
691 tonic::Status::unknown(
692 format!("Service was not ready: {}", e.into()),
693 )
694 })?;
695 let codec = tonic::codec::ProstCodec::default();
696 let path = http::uri::PathAndQuery::from_static(
697 "/connector_service.ConnectorService/GetEventStream",
698 );
699 let mut req = request.into_request();
700 req.extensions_mut()
701 .insert(
702 GrpcMethod::new(
703 "connector_service.ConnectorService",
704 "GetEventStream",
705 ),
706 );
707 self.inner.server_streaming(req, path, codec).await
708 }
709 pub async fn validate_source(
710 &mut self,
711 request: impl tonic::IntoRequest<super::ValidateSourceRequest>,
712 ) -> std::result::Result<
713 tonic::Response<super::ValidateSourceResponse>,
714 tonic::Status,
715 > {
716 self.inner
717 .ready()
718 .await
719 .map_err(|e| {
720 tonic::Status::unknown(
721 format!("Service was not ready: {}", e.into()),
722 )
723 })?;
724 let codec = tonic::codec::ProstCodec::default();
725 let path = http::uri::PathAndQuery::from_static(
726 "/connector_service.ConnectorService/ValidateSource",
727 );
728 let mut req = request.into_request();
729 req.extensions_mut()
730 .insert(
731 GrpcMethod::new(
732 "connector_service.ConnectorService",
733 "ValidateSource",
734 ),
735 );
736 self.inner.unary(req, path, codec).await
737 }
738 }
739}
740pub mod sink_coordination_service_client {
742 #![allow(
743 unused_variables,
744 dead_code,
745 missing_docs,
746 clippy::wildcard_imports,
747 clippy::let_unit_value,
748 )]
749 use tonic::codegen::*;
750 use tonic::codegen::http::Uri;
751 #[derive(Debug, Clone)]
752 pub struct SinkCoordinationServiceClient<T> {
753 inner: tonic::client::Grpc<T>,
754 }
755 impl SinkCoordinationServiceClient<tonic::transport::Channel> {
756 pub async fn connect<D>(dst: D) -> Result<Self, tonic::transport::Error>
758 where
759 D: TryInto<tonic::transport::Endpoint>,
760 D::Error: Into<StdError>,
761 {
762 let conn = tonic::transport::Endpoint::new(dst)?.connect().await?;
763 Ok(Self::new(conn))
764 }
765 }
766 impl<T> SinkCoordinationServiceClient<T>
767 where
768 T: tonic::client::GrpcService<tonic::body::BoxBody>,
769 T::Error: Into<StdError>,
770 T::ResponseBody: Body<Data = Bytes> + std::marker::Send + 'static,
771 <T::ResponseBody as Body>::Error: Into<StdError> + std::marker::Send,
772 {
773 pub fn new(inner: T) -> Self {
774 let inner = tonic::client::Grpc::new(inner);
775 Self { inner }
776 }
777 pub fn with_origin(inner: T, origin: Uri) -> Self {
778 let inner = tonic::client::Grpc::with_origin(inner, origin);
779 Self { inner }
780 }
781 pub fn with_interceptor<F>(
782 inner: T,
783 interceptor: F,
784 ) -> SinkCoordinationServiceClient<InterceptedService<T, F>>
785 where
786 F: tonic::service::Interceptor,
787 T::ResponseBody: Default,
788 T: tonic::codegen::Service<
789 http::Request<tonic::body::BoxBody>,
790 Response = http::Response<
791 <T as tonic::client::GrpcService<tonic::body::BoxBody>>::ResponseBody,
792 >,
793 >,
794 <T as tonic::codegen::Service<
795 http::Request<tonic::body::BoxBody>,
796 >>::Error: Into<StdError> + std::marker::Send + std::marker::Sync,
797 {
798 SinkCoordinationServiceClient::new(
799 InterceptedService::new(inner, interceptor),
800 )
801 }
802 #[must_use]
807 pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
808 self.inner = self.inner.send_compressed(encoding);
809 self
810 }
811 #[must_use]
813 pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
814 self.inner = self.inner.accept_compressed(encoding);
815 self
816 }
817 #[must_use]
821 pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
822 self.inner = self.inner.max_decoding_message_size(limit);
823 self
824 }
825 #[must_use]
829 pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
830 self.inner = self.inner.max_encoding_message_size(limit);
831 self
832 }
833 pub async fn coordinate(
834 &mut self,
835 request: impl tonic::IntoStreamingRequest<Message = super::CoordinateRequest>,
836 ) -> std::result::Result<
837 tonic::Response<tonic::codec::Streaming<super::CoordinateResponse>>,
838 tonic::Status,
839 > {
840 self.inner
841 .ready()
842 .await
843 .map_err(|e| {
844 tonic::Status::unknown(
845 format!("Service was not ready: {}", e.into()),
846 )
847 })?;
848 let codec = tonic::codec::ProstCodec::default();
849 let path = http::uri::PathAndQuery::from_static(
850 "/connector_service.SinkCoordinationService/Coordinate",
851 );
852 let mut req = request.into_streaming_request();
853 req.extensions_mut()
854 .insert(
855 GrpcMethod::new(
856 "connector_service.SinkCoordinationService",
857 "Coordinate",
858 ),
859 );
860 self.inner.streaming(req, path, codec).await
861 }
862 }
863}
864pub mod connector_service_server {
866 #![allow(
867 unused_variables,
868 dead_code,
869 missing_docs,
870 clippy::wildcard_imports,
871 clippy::let_unit_value,
872 )]
873 use tonic::codegen::*;
874 #[async_trait]
876 pub trait ConnectorService: std::marker::Send + std::marker::Sync + 'static {
877 type SinkWriterStreamStream: tonic::codegen::tokio_stream::Stream<
879 Item = std::result::Result<
880 super::SinkWriterStreamResponse,
881 tonic::Status,
882 >,
883 >
884 + std::marker::Send
885 + 'static;
886 async fn sink_writer_stream(
887 &self,
888 request: tonic::Request<tonic::Streaming<super::SinkWriterStreamRequest>>,
889 ) -> std::result::Result<
890 tonic::Response<Self::SinkWriterStreamStream>,
891 tonic::Status,
892 >;
893 type SinkCoordinatorStreamStream: tonic::codegen::tokio_stream::Stream<
895 Item = std::result::Result<
896 super::SinkCoordinatorStreamResponse,
897 tonic::Status,
898 >,
899 >
900 + std::marker::Send
901 + 'static;
902 async fn sink_coordinator_stream(
903 &self,
904 request: tonic::Request<
905 tonic::Streaming<super::SinkCoordinatorStreamRequest>,
906 >,
907 ) -> std::result::Result<
908 tonic::Response<Self::SinkCoordinatorStreamStream>,
909 tonic::Status,
910 >;
911 async fn validate_sink(
912 &self,
913 request: tonic::Request<super::ValidateSinkRequest>,
914 ) -> std::result::Result<
915 tonic::Response<super::ValidateSinkResponse>,
916 tonic::Status,
917 >;
918 type GetEventStreamStream: tonic::codegen::tokio_stream::Stream<
920 Item = std::result::Result<super::GetEventStreamResponse, tonic::Status>,
921 >
922 + std::marker::Send
923 + 'static;
924 async fn get_event_stream(
925 &self,
926 request: tonic::Request<super::GetEventStreamRequest>,
927 ) -> std::result::Result<
928 tonic::Response<Self::GetEventStreamStream>,
929 tonic::Status,
930 >;
931 async fn validate_source(
932 &self,
933 request: tonic::Request<super::ValidateSourceRequest>,
934 ) -> std::result::Result<
935 tonic::Response<super::ValidateSourceResponse>,
936 tonic::Status,
937 >;
938 }
939 #[derive(Debug)]
940 pub struct ConnectorServiceServer<T> {
941 inner: Arc<T>,
942 accept_compression_encodings: EnabledCompressionEncodings,
943 send_compression_encodings: EnabledCompressionEncodings,
944 max_decoding_message_size: Option<usize>,
945 max_encoding_message_size: Option<usize>,
946 }
947 impl<T> ConnectorServiceServer<T> {
948 pub fn new(inner: T) -> Self {
949 Self::from_arc(Arc::new(inner))
950 }
951 pub fn from_arc(inner: Arc<T>) -> Self {
952 Self {
953 inner,
954 accept_compression_encodings: Default::default(),
955 send_compression_encodings: Default::default(),
956 max_decoding_message_size: None,
957 max_encoding_message_size: None,
958 }
959 }
960 pub fn with_interceptor<F>(
961 inner: T,
962 interceptor: F,
963 ) -> InterceptedService<Self, F>
964 where
965 F: tonic::service::Interceptor,
966 {
967 InterceptedService::new(Self::new(inner), interceptor)
968 }
969 #[must_use]
971 pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
972 self.accept_compression_encodings.enable(encoding);
973 self
974 }
975 #[must_use]
977 pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
978 self.send_compression_encodings.enable(encoding);
979 self
980 }
981 #[must_use]
985 pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
986 self.max_decoding_message_size = Some(limit);
987 self
988 }
989 #[must_use]
993 pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
994 self.max_encoding_message_size = Some(limit);
995 self
996 }
997 }
998 impl<T, B> tonic::codegen::Service<http::Request<B>> for ConnectorServiceServer<T>
999 where
1000 T: ConnectorService,
1001 B: Body + std::marker::Send + 'static,
1002 B::Error: Into<StdError> + std::marker::Send + 'static,
1003 {
1004 type Response = http::Response<tonic::body::BoxBody>;
1005 type Error = std::convert::Infallible;
1006 type Future = BoxFuture<Self::Response, Self::Error>;
1007 fn poll_ready(
1008 &mut self,
1009 _cx: &mut Context<'_>,
1010 ) -> Poll<std::result::Result<(), Self::Error>> {
1011 Poll::Ready(Ok(()))
1012 }
1013 fn call(&mut self, req: http::Request<B>) -> Self::Future {
1014 match req.uri().path() {
1015 "/connector_service.ConnectorService/SinkWriterStream" => {
1016 #[allow(non_camel_case_types)]
1017 struct SinkWriterStreamSvc<T: ConnectorService>(pub Arc<T>);
1018 impl<
1019 T: ConnectorService,
1020 > tonic::server::StreamingService<super::SinkWriterStreamRequest>
1021 for SinkWriterStreamSvc<T> {
1022 type Response = super::SinkWriterStreamResponse;
1023 type ResponseStream = T::SinkWriterStreamStream;
1024 type Future = BoxFuture<
1025 tonic::Response<Self::ResponseStream>,
1026 tonic::Status,
1027 >;
1028 fn call(
1029 &mut self,
1030 request: tonic::Request<
1031 tonic::Streaming<super::SinkWriterStreamRequest>,
1032 >,
1033 ) -> Self::Future {
1034 let inner = Arc::clone(&self.0);
1035 let fut = async move {
1036 <T as ConnectorService>::sink_writer_stream(&inner, request)
1037 .await
1038 };
1039 Box::pin(fut)
1040 }
1041 }
1042 let accept_compression_encodings = self.accept_compression_encodings;
1043 let send_compression_encodings = self.send_compression_encodings;
1044 let max_decoding_message_size = self.max_decoding_message_size;
1045 let max_encoding_message_size = self.max_encoding_message_size;
1046 let inner = self.inner.clone();
1047 let fut = async move {
1048 let method = SinkWriterStreamSvc(inner);
1049 let codec = tonic::codec::ProstCodec::default();
1050 let mut grpc = tonic::server::Grpc::new(codec)
1051 .apply_compression_config(
1052 accept_compression_encodings,
1053 send_compression_encodings,
1054 )
1055 .apply_max_message_size_config(
1056 max_decoding_message_size,
1057 max_encoding_message_size,
1058 );
1059 let res = grpc.streaming(method, req).await;
1060 Ok(res)
1061 };
1062 Box::pin(fut)
1063 }
1064 "/connector_service.ConnectorService/SinkCoordinatorStream" => {
1065 #[allow(non_camel_case_types)]
1066 struct SinkCoordinatorStreamSvc<T: ConnectorService>(pub Arc<T>);
1067 impl<
1068 T: ConnectorService,
1069 > tonic::server::StreamingService<
1070 super::SinkCoordinatorStreamRequest,
1071 > for SinkCoordinatorStreamSvc<T> {
1072 type Response = super::SinkCoordinatorStreamResponse;
1073 type ResponseStream = T::SinkCoordinatorStreamStream;
1074 type Future = BoxFuture<
1075 tonic::Response<Self::ResponseStream>,
1076 tonic::Status,
1077 >;
1078 fn call(
1079 &mut self,
1080 request: tonic::Request<
1081 tonic::Streaming<super::SinkCoordinatorStreamRequest>,
1082 >,
1083 ) -> Self::Future {
1084 let inner = Arc::clone(&self.0);
1085 let fut = async move {
1086 <T as ConnectorService>::sink_coordinator_stream(
1087 &inner,
1088 request,
1089 )
1090 .await
1091 };
1092 Box::pin(fut)
1093 }
1094 }
1095 let accept_compression_encodings = self.accept_compression_encodings;
1096 let send_compression_encodings = self.send_compression_encodings;
1097 let max_decoding_message_size = self.max_decoding_message_size;
1098 let max_encoding_message_size = self.max_encoding_message_size;
1099 let inner = self.inner.clone();
1100 let fut = async move {
1101 let method = SinkCoordinatorStreamSvc(inner);
1102 let codec = tonic::codec::ProstCodec::default();
1103 let mut grpc = tonic::server::Grpc::new(codec)
1104 .apply_compression_config(
1105 accept_compression_encodings,
1106 send_compression_encodings,
1107 )
1108 .apply_max_message_size_config(
1109 max_decoding_message_size,
1110 max_encoding_message_size,
1111 );
1112 let res = grpc.streaming(method, req).await;
1113 Ok(res)
1114 };
1115 Box::pin(fut)
1116 }
1117 "/connector_service.ConnectorService/ValidateSink" => {
1118 #[allow(non_camel_case_types)]
1119 struct ValidateSinkSvc<T: ConnectorService>(pub Arc<T>);
1120 impl<
1121 T: ConnectorService,
1122 > tonic::server::UnaryService<super::ValidateSinkRequest>
1123 for ValidateSinkSvc<T> {
1124 type Response = super::ValidateSinkResponse;
1125 type Future = BoxFuture<
1126 tonic::Response<Self::Response>,
1127 tonic::Status,
1128 >;
1129 fn call(
1130 &mut self,
1131 request: tonic::Request<super::ValidateSinkRequest>,
1132 ) -> Self::Future {
1133 let inner = Arc::clone(&self.0);
1134 let fut = async move {
1135 <T as ConnectorService>::validate_sink(&inner, request)
1136 .await
1137 };
1138 Box::pin(fut)
1139 }
1140 }
1141 let accept_compression_encodings = self.accept_compression_encodings;
1142 let send_compression_encodings = self.send_compression_encodings;
1143 let max_decoding_message_size = self.max_decoding_message_size;
1144 let max_encoding_message_size = self.max_encoding_message_size;
1145 let inner = self.inner.clone();
1146 let fut = async move {
1147 let method = ValidateSinkSvc(inner);
1148 let codec = tonic::codec::ProstCodec::default();
1149 let mut grpc = tonic::server::Grpc::new(codec)
1150 .apply_compression_config(
1151 accept_compression_encodings,
1152 send_compression_encodings,
1153 )
1154 .apply_max_message_size_config(
1155 max_decoding_message_size,
1156 max_encoding_message_size,
1157 );
1158 let res = grpc.unary(method, req).await;
1159 Ok(res)
1160 };
1161 Box::pin(fut)
1162 }
1163 "/connector_service.ConnectorService/GetEventStream" => {
1164 #[allow(non_camel_case_types)]
1165 struct GetEventStreamSvc<T: ConnectorService>(pub Arc<T>);
1166 impl<
1167 T: ConnectorService,
1168 > tonic::server::ServerStreamingService<super::GetEventStreamRequest>
1169 for GetEventStreamSvc<T> {
1170 type Response = super::GetEventStreamResponse;
1171 type ResponseStream = T::GetEventStreamStream;
1172 type Future = BoxFuture<
1173 tonic::Response<Self::ResponseStream>,
1174 tonic::Status,
1175 >;
1176 fn call(
1177 &mut self,
1178 request: tonic::Request<super::GetEventStreamRequest>,
1179 ) -> Self::Future {
1180 let inner = Arc::clone(&self.0);
1181 let fut = async move {
1182 <T as ConnectorService>::get_event_stream(&inner, request)
1183 .await
1184 };
1185 Box::pin(fut)
1186 }
1187 }
1188 let accept_compression_encodings = self.accept_compression_encodings;
1189 let send_compression_encodings = self.send_compression_encodings;
1190 let max_decoding_message_size = self.max_decoding_message_size;
1191 let max_encoding_message_size = self.max_encoding_message_size;
1192 let inner = self.inner.clone();
1193 let fut = async move {
1194 let method = GetEventStreamSvc(inner);
1195 let codec = tonic::codec::ProstCodec::default();
1196 let mut grpc = tonic::server::Grpc::new(codec)
1197 .apply_compression_config(
1198 accept_compression_encodings,
1199 send_compression_encodings,
1200 )
1201 .apply_max_message_size_config(
1202 max_decoding_message_size,
1203 max_encoding_message_size,
1204 );
1205 let res = grpc.server_streaming(method, req).await;
1206 Ok(res)
1207 };
1208 Box::pin(fut)
1209 }
1210 "/connector_service.ConnectorService/ValidateSource" => {
1211 #[allow(non_camel_case_types)]
1212 struct ValidateSourceSvc<T: ConnectorService>(pub Arc<T>);
1213 impl<
1214 T: ConnectorService,
1215 > tonic::server::UnaryService<super::ValidateSourceRequest>
1216 for ValidateSourceSvc<T> {
1217 type Response = super::ValidateSourceResponse;
1218 type Future = BoxFuture<
1219 tonic::Response<Self::Response>,
1220 tonic::Status,
1221 >;
1222 fn call(
1223 &mut self,
1224 request: tonic::Request<super::ValidateSourceRequest>,
1225 ) -> Self::Future {
1226 let inner = Arc::clone(&self.0);
1227 let fut = async move {
1228 <T as ConnectorService>::validate_source(&inner, request)
1229 .await
1230 };
1231 Box::pin(fut)
1232 }
1233 }
1234 let accept_compression_encodings = self.accept_compression_encodings;
1235 let send_compression_encodings = self.send_compression_encodings;
1236 let max_decoding_message_size = self.max_decoding_message_size;
1237 let max_encoding_message_size = self.max_encoding_message_size;
1238 let inner = self.inner.clone();
1239 let fut = async move {
1240 let method = ValidateSourceSvc(inner);
1241 let codec = tonic::codec::ProstCodec::default();
1242 let mut grpc = tonic::server::Grpc::new(codec)
1243 .apply_compression_config(
1244 accept_compression_encodings,
1245 send_compression_encodings,
1246 )
1247 .apply_max_message_size_config(
1248 max_decoding_message_size,
1249 max_encoding_message_size,
1250 );
1251 let res = grpc.unary(method, req).await;
1252 Ok(res)
1253 };
1254 Box::pin(fut)
1255 }
1256 _ => {
1257 Box::pin(async move {
1258 let mut response = http::Response::new(empty_body());
1259 let headers = response.headers_mut();
1260 headers
1261 .insert(
1262 tonic::Status::GRPC_STATUS,
1263 (tonic::Code::Unimplemented as i32).into(),
1264 );
1265 headers
1266 .insert(
1267 http::header::CONTENT_TYPE,
1268 tonic::metadata::GRPC_CONTENT_TYPE,
1269 );
1270 Ok(response)
1271 })
1272 }
1273 }
1274 }
1275 }
1276 impl<T> Clone for ConnectorServiceServer<T> {
1277 fn clone(&self) -> Self {
1278 let inner = self.inner.clone();
1279 Self {
1280 inner,
1281 accept_compression_encodings: self.accept_compression_encodings,
1282 send_compression_encodings: self.send_compression_encodings,
1283 max_decoding_message_size: self.max_decoding_message_size,
1284 max_encoding_message_size: self.max_encoding_message_size,
1285 }
1286 }
1287 }
1288 pub const SERVICE_NAME: &str = "connector_service.ConnectorService";
1290 impl<T> tonic::server::NamedService for ConnectorServiceServer<T> {
1291 const NAME: &'static str = SERVICE_NAME;
1292 }
1293}
1294pub mod sink_coordination_service_server {
1296 #![allow(
1297 unused_variables,
1298 dead_code,
1299 missing_docs,
1300 clippy::wildcard_imports,
1301 clippy::let_unit_value,
1302 )]
1303 use tonic::codegen::*;
1304 #[async_trait]
1306 pub trait SinkCoordinationService: std::marker::Send + std::marker::Sync + 'static {
1307 type CoordinateStream: tonic::codegen::tokio_stream::Stream<
1309 Item = std::result::Result<super::CoordinateResponse, tonic::Status>,
1310 >
1311 + std::marker::Send
1312 + 'static;
1313 async fn coordinate(
1314 &self,
1315 request: tonic::Request<tonic::Streaming<super::CoordinateRequest>>,
1316 ) -> std::result::Result<tonic::Response<Self::CoordinateStream>, tonic::Status>;
1317 }
1318 #[derive(Debug)]
1319 pub struct SinkCoordinationServiceServer<T> {
1320 inner: Arc<T>,
1321 accept_compression_encodings: EnabledCompressionEncodings,
1322 send_compression_encodings: EnabledCompressionEncodings,
1323 max_decoding_message_size: Option<usize>,
1324 max_encoding_message_size: Option<usize>,
1325 }
1326 impl<T> SinkCoordinationServiceServer<T> {
1327 pub fn new(inner: T) -> Self {
1328 Self::from_arc(Arc::new(inner))
1329 }
1330 pub fn from_arc(inner: Arc<T>) -> Self {
1331 Self {
1332 inner,
1333 accept_compression_encodings: Default::default(),
1334 send_compression_encodings: Default::default(),
1335 max_decoding_message_size: None,
1336 max_encoding_message_size: None,
1337 }
1338 }
1339 pub fn with_interceptor<F>(
1340 inner: T,
1341 interceptor: F,
1342 ) -> InterceptedService<Self, F>
1343 where
1344 F: tonic::service::Interceptor,
1345 {
1346 InterceptedService::new(Self::new(inner), interceptor)
1347 }
1348 #[must_use]
1350 pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
1351 self.accept_compression_encodings.enable(encoding);
1352 self
1353 }
1354 #[must_use]
1356 pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
1357 self.send_compression_encodings.enable(encoding);
1358 self
1359 }
1360 #[must_use]
1364 pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
1365 self.max_decoding_message_size = Some(limit);
1366 self
1367 }
1368 #[must_use]
1372 pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
1373 self.max_encoding_message_size = Some(limit);
1374 self
1375 }
1376 }
1377 impl<T, B> tonic::codegen::Service<http::Request<B>>
1378 for SinkCoordinationServiceServer<T>
1379 where
1380 T: SinkCoordinationService,
1381 B: Body + std::marker::Send + 'static,
1382 B::Error: Into<StdError> + std::marker::Send + 'static,
1383 {
1384 type Response = http::Response<tonic::body::BoxBody>;
1385 type Error = std::convert::Infallible;
1386 type Future = BoxFuture<Self::Response, Self::Error>;
1387 fn poll_ready(
1388 &mut self,
1389 _cx: &mut Context<'_>,
1390 ) -> Poll<std::result::Result<(), Self::Error>> {
1391 Poll::Ready(Ok(()))
1392 }
1393 fn call(&mut self, req: http::Request<B>) -> Self::Future {
1394 match req.uri().path() {
1395 "/connector_service.SinkCoordinationService/Coordinate" => {
1396 #[allow(non_camel_case_types)]
1397 struct CoordinateSvc<T: SinkCoordinationService>(pub Arc<T>);
1398 impl<
1399 T: SinkCoordinationService,
1400 > tonic::server::StreamingService<super::CoordinateRequest>
1401 for CoordinateSvc<T> {
1402 type Response = super::CoordinateResponse;
1403 type ResponseStream = T::CoordinateStream;
1404 type Future = BoxFuture<
1405 tonic::Response<Self::ResponseStream>,
1406 tonic::Status,
1407 >;
1408 fn call(
1409 &mut self,
1410 request: tonic::Request<
1411 tonic::Streaming<super::CoordinateRequest>,
1412 >,
1413 ) -> Self::Future {
1414 let inner = Arc::clone(&self.0);
1415 let fut = async move {
1416 <T as SinkCoordinationService>::coordinate(&inner, request)
1417 .await
1418 };
1419 Box::pin(fut)
1420 }
1421 }
1422 let accept_compression_encodings = self.accept_compression_encodings;
1423 let send_compression_encodings = self.send_compression_encodings;
1424 let max_decoding_message_size = self.max_decoding_message_size;
1425 let max_encoding_message_size = self.max_encoding_message_size;
1426 let inner = self.inner.clone();
1427 let fut = async move {
1428 let method = CoordinateSvc(inner);
1429 let codec = tonic::codec::ProstCodec::default();
1430 let mut grpc = tonic::server::Grpc::new(codec)
1431 .apply_compression_config(
1432 accept_compression_encodings,
1433 send_compression_encodings,
1434 )
1435 .apply_max_message_size_config(
1436 max_decoding_message_size,
1437 max_encoding_message_size,
1438 );
1439 let res = grpc.streaming(method, req).await;
1440 Ok(res)
1441 };
1442 Box::pin(fut)
1443 }
1444 _ => {
1445 Box::pin(async move {
1446 let mut response = http::Response::new(empty_body());
1447 let headers = response.headers_mut();
1448 headers
1449 .insert(
1450 tonic::Status::GRPC_STATUS,
1451 (tonic::Code::Unimplemented as i32).into(),
1452 );
1453 headers
1454 .insert(
1455 http::header::CONTENT_TYPE,
1456 tonic::metadata::GRPC_CONTENT_TYPE,
1457 );
1458 Ok(response)
1459 })
1460 }
1461 }
1462 }
1463 }
1464 impl<T> Clone for SinkCoordinationServiceServer<T> {
1465 fn clone(&self) -> Self {
1466 let inner = self.inner.clone();
1467 Self {
1468 inner,
1469 accept_compression_encodings: self.accept_compression_encodings,
1470 send_compression_encodings: self.send_compression_encodings,
1471 max_decoding_message_size: self.max_decoding_message_size,
1472 max_encoding_message_size: self.max_encoding_message_size,
1473 }
1474 }
1475 }
1476 pub const SERVICE_NAME: &str = "connector_service.SinkCoordinationService";
1478 impl<T> tonic::server::NamedService for SinkCoordinationServiceServer<T> {
1479 const NAME: &'static str = SERVICE_NAME;
1480 }
1481}