diff --git a/google/cloud/pubsublite/v1/common.proto b/google/cloud/pubsublite/v1/common.proto index 28597242..f7cabeb1 100644 --- a/google/cloud/pubsublite/v1/common.proto +++ b/google/cloud/pubsublite/v1/common.proto @@ -80,15 +80,31 @@ message Topic { // The settings for a topic's partitions. message PartitionConfig { + // The throughput capacity configuration for each partition. + message Capacity { + // Publish throughput capacity per partition in MiB/s. + // Must be >= 4 and <= 16. + int32 publish_mib_per_sec = 1; + + // Subscribe throughput capacity per partition in MiB/s. + // Must be >= 4 and <= 32. + int32 subscribe_mib_per_sec = 2; + } + // The number of partitions in the topic. Must be at least 1. int64 count = 1; - // Every partition in the topic is allocated throughput equivalent to - // `scale` times the standard partition throughput (4 MiB/s). This is also - // reflected in the cost of this topic; a topic with `scale` of 2 and count - // of 10 is charged for 20 partitions. This value must be in the range - // [1,4]. - int32 scale = 2; + oneof dimension { + // Every partition in the topic is allocated throughput equivalent to + // `scale` times the standard partition throughput (4 MiB/s). This is also + // reflected in the cost of this topic; a topic with `scale` of 2 and + // count of 10 is charged for 20 partitions. This value must be in the + // range [1,4]. + int32 scale = 2; + + // The capacity configuration. + Capacity capacity = 3; + } } // The settings for a topic's message retention. @@ -127,38 +143,14 @@ message Subscription { // The settings for a subscription's message delivery. message DeliveryConfig { // When this subscription should send messages to subscribers relative to - // messages persistence in storage. + // messages persistence in storage. For details, see [Creating Lite + // subscriptions](https://cloud.google.com/pubsub/lite/docs/subscriptions#creating_lite_subscriptions). enum DeliveryRequirement { // Default value. This value is unused. DELIVERY_REQUIREMENT_UNSPECIFIED = 0; // The server does not wait for a published message to be successfully - // written to storage before delivering it to subscribers. As such, a - // subscriber may receive a message for which the write to storage failed. - // If the subscriber re-reads the offset of that message later on (e.g., - // after a `Seek` operation), there may be a gap at that offset. Even if - // not re-reading messages, the delivery of messages for which the write - // to storage fails may be inconsistent across subscriptions, with some - // receiving the message (e.g., those connected at the time the message is - // published) and others not receiving it (e.g., those disconnected at - // publish time). Note that offsets are never reused, so even if - // DELIVER_IMMEDIATELY is used, subscribers will not receive different - // messages when re-reading, they will just see gaps. EXAMPLE: - // (0) Topic 'topic1' is created with a single partition. - // (1) Two subscriptions 'sub1' and 'sub2' are created on topic1. sub1 - // has 'DELIVER_IMMEDIATELY', sub2 has 'DELIVER_AFTER_STORED'. - // (2) A stream is opened for sub1 but not sub2. - // (3) A stream is opened for a publisher client using pub1. - // (4) pub1 successfully publishes m0 at offset 0 and m0 is delivered to - // sub1. - // (5) pub1 publishes m1 at offset 1 and m1 is delivered to sub1 but the - // write to storage fails (their stream then breaks). - // (6) A stream is reopened for pub1. - // (6) pub1 successfully publishes m2 at offset 2 and m2 is delivered to - // sub1. - // (some time elapses...) - // (7) A stream is opened for sub2 and it receives m0 and m2 but not m1. - // (8) sub1 seeks to offset 1 but only receives m2 and not m1. + // written to storage before delivering it to subscribers. DELIVER_IMMEDIATELY = 1; // The server will not deliver a published message to subscribers until diff --git a/google/cloud/pubsublite/v1/pubsublite_v1.yaml b/google/cloud/pubsublite/v1/pubsublite_v1.yaml index e4d2835b..82c3c332 100644 --- a/google/cloud/pubsublite/v1/pubsublite_v1.yaml +++ b/google/cloud/pubsublite/v1/pubsublite_v1.yaml @@ -6,8 +6,10 @@ title: Pub/Sub Lite API apis: - name: google.cloud.pubsublite.v1.AdminService - name: google.cloud.pubsublite.v1.CursorService +- name: google.cloud.pubsublite.v1.PartitionAssignmentService - name: google.cloud.pubsublite.v1.PublisherService - name: google.cloud.pubsublite.v1.SubscriberService +- name: google.cloud.pubsublite.v1.TopicStatsService authentication: rules: @@ -19,6 +21,10 @@ authentication: oauth: canonical_scopes: |- https://www.googleapis.com/auth/cloud-platform + - selector: google.cloud.pubsublite.v1.PartitionAssignmentService.AssignPartitions + oauth: + canonical_scopes: |- + https://www.googleapis.com/auth/cloud-platform - selector: google.cloud.pubsublite.v1.PublisherService.Publish oauth: canonical_scopes: |- @@ -27,3 +33,7 @@ authentication: oauth: canonical_scopes: |- https://www.googleapis.com/auth/cloud-platform + - selector: google.cloud.pubsublite.v1.TopicStatsService.ComputeMessageStats + oauth: + canonical_scopes: |- + https://www.googleapis.com/auth/cloud-platform diff --git a/google/cloud/pubsublite/v1/subscriber.proto b/google/cloud/pubsublite/v1/subscriber.proto index c16b34b8..98163bcd 100644 --- a/google/cloud/pubsublite/v1/subscriber.proto +++ b/google/cloud/pubsublite/v1/subscriber.proto @@ -36,6 +36,26 @@ service SubscriberService { } } +// The service that a subscriber client application uses to determine which +// partitions it should connect to. +// +// This is an under development API being published to build client libraries. +// Users will not be able to access it until fully launched. +service PartitionAssignmentService { + option (google.api.default_host) = "pubsublite.googleapis.com"; + option (google.api.oauth_scopes) = "https://www.googleapis.com/auth/cloud-platform"; + + // Assign partitions for this client to handle for the specified subscription. + // + // The client must send an InitialPartitionAssignmentRequest first. + // The server will then send at most one unacknowledged PartitionAssignment + // outstanding on the stream at a time. + // The client should send a PartitionAssignmentAck after updating the + // partitions it is connected to to reflect the new assignment. + rpc AssignPartitions(stream PartitionAssignmentRequest) returns (stream PartitionAssignment) { + } +} + // The first request that must be sent on a newly-opened stream. The client must // wait for the response before sending subsequent requests on the stream. message InitialSubscribeRequest { @@ -138,3 +158,50 @@ message SubscribeResponse { MessageResponse messages = 3; } } + +// The first request that must be sent on a newly-opened stream. The client must +// wait for the response before sending subsequent requests on the stream. +message InitialPartitionAssignmentRequest { + // The subscription name. Structured like: + // projects//locations//subscriptions/ + string subscription = 1; + + // An opaque, unique client identifier. This field must be exactly 16 bytes + // long and is interpreted as an unsigned 128 bit integer. Other size values + // will be rejected and the stream will be failed with a non-retryable error. + // + // This field is large enough to fit a uuid from standard uuid algorithms like + // uuid1 or uuid4, which should be used to generate this number. The same + // identifier should be reused following disconnections with retryable stream + // errors. + bytes client_id = 2; +} + +// PartitionAssignments should not race with acknowledgements. There +// should be exactly one unacknowledged PartitionAssignment at a time. If not, +// the client must break the stream. +message PartitionAssignment { + // The list of partition numbers this subscriber is assigned to. + repeated int64 partitions = 1; +} + +// Acknowledge receipt and handling of the previous assignment. +// If not sent within a short period after receiving the assignment, +// partitions may remain unassigned for a period of time until the +// client is known to be inactive, after which time the server will break the +// stream. +message PartitionAssignmentAck { + +} + +// A request on the PartitionAssignment stream. +message PartitionAssignmentRequest { + oneof request { + // Initial request on the stream. + InitialPartitionAssignmentRequest initial = 1; + + // Acknowledgement of a partition assignment. + PartitionAssignmentAck ack = 2; + } +} diff --git a/google/cloud/pubsublite/v1/topic_stats.proto b/google/cloud/pubsublite/v1/topic_stats.proto new file mode 100644 index 00000000..3ff3f82c --- /dev/null +++ b/google/cloud/pubsublite/v1/topic_stats.proto @@ -0,0 +1,85 @@ +// Copyright 2020 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +package google.cloud.pubsublite.v1; + +import "google/api/annotations.proto"; +import "google/api/field_behavior.proto"; +import "google/api/resource.proto"; +import "google/cloud/pubsublite/v1/common.proto"; +import "google/protobuf/timestamp.proto"; +import "google/api/client.proto"; + +option go_package = "google.golang.org/genproto/googleapis/cloud/pubsublite/v1;pubsublite"; +option java_multiple_files = true; +option java_outer_classname = "TopicStatsProto"; +option java_package = "com.google.cloud.pubsublite.proto"; + +// This service allows users to get stats about messages in their topic. +service TopicStatsService { + option (google.api.default_host) = "pubsublite.googleapis.com"; + option (google.api.oauth_scopes) = "https://www.googleapis.com/auth/cloud-platform"; + + // Compute statistics about a range of messages in a given topic and + // partition. + rpc ComputeMessageStats(ComputeMessageStatsRequest) returns (ComputeMessageStatsResponse) { + option (google.api.http) = { + post: "/v1/topicStats/{topic=projects/*/locations/*/topics/*}:computeMessageStats" + body: "*" + }; + } +} + +// Compute statistics about a range of messages in a given topic and partition. +message ComputeMessageStatsRequest { + // Required. The topic for which we should compute message stats. + string topic = 1 [ + (google.api.field_behavior) = REQUIRED, + (google.api.resource_reference) = { + type: "pubsublite.googleapis.com/Topic" + } + ]; + + // Required. The partition for which we should compute message stats. + int64 partition = 2 [(google.api.field_behavior) = REQUIRED]; + + // The inclusive start of the range. + Cursor start_cursor = 3; + + // The exclusive end of the range. The range is empty if end_cursor <= + // start_cursor. Specifying a start_cursor before the first message and an + // end_cursor after the last message will retrieve all messages. + Cursor end_cursor = 4; +} + +// Response containing stats for messages in the requested topic and partition. +message ComputeMessageStatsResponse { + // The count of messages. + int64 message_count = 1; + + // The number of quota bytes accounted to these messages. + int64 message_bytes = 2; + + // The minimum publish timestamp across these messages. Note that publish + // timestamps within a partition are non-decreasing. The timestamp will be + // unset if there are no messages. + google.protobuf.Timestamp minimum_publish_time = 3; + + // The minimum event timestamp across these messages. For the purposes of this + // computation, if a message does not have an event time, we use the publish + // time. The timestamp will be unset if there are no messages. + google.protobuf.Timestamp minimum_event_time = 4; +}