diff --git a/Cargo.lock b/Cargo.lock index 836d461a8d..3b6bdd8082 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1800,6 +1800,7 @@ version = "0.4.2" dependencies = [ "api", "arrow-flight", + "async-recursion", "async-stream", "async-trait", "base64 0.21.5", @@ -1830,6 +1831,7 @@ dependencies = [ "strum 0.25.0", "table", "tokio", + "tonic 0.10.2", ] [[package]] @@ -3066,16 +3068,16 @@ dependencies = [ [[package]] name = "etcd-client" -version = "0.11.1" +version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f4b0ea5ef6dc2388a4b1669fa32097249bc03a15417b97cb75e38afb309e4a89" +checksum = "3d982a3b3088a5f95d19882d298b352a2e0be20703e3080c1e6767731d5dec79" dependencies = [ "http", - "prost 0.11.9", + "prost 0.12.1", "tokio", "tokio-stream", - "tonic 0.9.2", - "tonic-build 0.9.2", + "tonic 0.10.2", + "tonic-build 0.10.2", "tower", "tower-service", ] diff --git a/Cargo.toml b/Cargo.toml index 4843f15713..7944b098aa 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -77,7 +77,7 @@ datafusion-physical-expr = { git = "https://github.com/apache/arrow-datafusion.g datafusion-sql = { git = "https://github.com/apache/arrow-datafusion.git", rev = "26e43acac3a96cec8dd4c8365f22dfb1a84306e9" } datafusion-substrait = { git = "https://github.com/apache/arrow-datafusion.git", rev = "26e43acac3a96cec8dd4c8365f22dfb1a84306e9" } derive_builder = "0.12" -etcd-client = "0.11" +etcd-client = "0.12" futures = "0.3" futures-util = "0.3" greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "5da72f1cae6b24315e5afc87520aaf7b4d6bb872" } diff --git a/src/common/meta/Cargo.toml b/src/common/meta/Cargo.toml index 2f30773c28..c192ba6691 100644 --- a/src/common/meta/Cargo.toml +++ b/src/common/meta/Cargo.toml @@ -10,6 +10,7 @@ testing = [] [dependencies] api = { workspace = true } arrow-flight.workspace = true +async-recursion = "1.0" async-stream.workspace = true async-trait.workspace = true base64 = "0.21" @@ -38,6 +39,7 @@ store-api = { workspace = true } strum.workspace = true table = { workspace = true } tokio.workspace = true +tonic.workspace = true [dev-dependencies] chrono.workspace = true diff --git a/src/common/meta/src/error.rs b/src/common/meta/src/error.rs index 7fdc256d84..4e3c712082 100644 --- a/src/common/meta/src/error.rs +++ b/src/common/meta/src/error.rs @@ -343,4 +343,16 @@ impl Error { pub fn is_retry_later(&self) -> bool { matches!(self, Error::RetryLater { .. }) } + + /// Returns true if the response exceeds the size limit. + pub fn is_exceeded_size_limit(&self) -> bool { + if let Error::EtcdFailed { + error: etcd_client::Error::GRpcStatus(status), + .. + } = self + { + return status.code() == tonic::Code::OutOfRange; + } + false + } } diff --git a/src/common/meta/src/lib.rs b/src/common/meta/src/lib.rs index fabb9559aa..950d59d1dd 100644 --- a/src/common/meta/src/lib.rs +++ b/src/common/meta/src/lib.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#![feature(assert_matches)] #![feature(btree_extract_if)] #![feature(async_closure)] diff --git a/src/common/meta/src/range_stream.rs b/src/common/meta/src/range_stream.rs index 4aa60b00c3..dc9deaa31d 100644 --- a/src/common/meta/src/range_stream.rs +++ b/src/common/meta/src/range_stream.rs @@ -17,10 +17,12 @@ use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; +use common_telemetry::debug; use futures::future::BoxFuture; use futures::{ready, FutureExt, Stream}; +use snafu::ensure; -use crate::error::Result; +use crate::error::{self, Result}; use crate::kv_backend::KvBackendRef; use crate::rpc::store::{RangeRequest, RangeResponse}; use crate::rpc::KeyValue; @@ -64,10 +66,13 @@ struct PaginationStreamFactory { pub range_end: Vec, /// page_size is the pagination page size. - pub page_size: usize, + page_size: usize, /// keys_only when set returns only the keys and not the values. pub keys_only: bool, + /// It reduces the page size if the response size exceeds the limit. + pub adaptive_page_size: usize, + pub more: bool, } @@ -87,19 +92,58 @@ impl PaginationStreamFactory { page_size, keys_only, more, + adaptive_page_size: if page_size == 0 { + DEFAULT_ADAPTIVE_PAGE_SIZE + } else { + page_size + }, } } } +const DEFAULT_ADAPTIVE_PAGE_SIZE: usize = 1024; + impl PaginationStreamFactory { - pub async fn read_next(self) -> Result<(Self, Option)> { + fn try_reduce_adaptive_page_size(&mut self) -> Result<()> { + self.adaptive_page_size /= 2; + + ensure!( + self.adaptive_page_size != 0, + error::UnexpectedSnafu { + err_msg: "Exceeded maximum number of adaptive range retries" + } + ); + + Ok(()) + } + + /// Decreases the `page size` if the response message size exceeds the limitation. + /// TODO(weny): Considers to add an E2e test. + #[async_recursion::async_recursion] + async fn adaptive_range(&mut self, req: RangeRequest) -> Result { + match self.kv.range(req.clone()).await { + Ok(resp) => Ok(resp), + Err(err) => { + if err.is_exceeded_size_limit() { + self.try_reduce_adaptive_page_size()?; + debug!("Reset page_size to {}", self.adaptive_page_size); + + self.adaptive_range(req.with_limit(self.adaptive_page_size as i64)) + .await + } else { + Err(err) + } + } + } + } + + pub async fn read_next(mut self) -> Result<(Self, Option)> { if self.more { let resp = self - .kv - .range(RangeRequest { + .adaptive_range(RangeRequest { key: self.key.clone(), range_end: self.range_end.clone(), - limit: self.page_size as i64, + limit: self.adaptive_page_size as i64, keys_only: self.keys_only, }) .await?; @@ -120,6 +164,7 @@ impl PaginationStreamFactory { page_size: self.page_size, keys_only: self.keys_only, more: resp.more, + adaptive_page_size: self.adaptive_page_size, }, Some(resp), )) @@ -223,6 +268,7 @@ impl Stream for PaginationStream { #[cfg(test)] mod tests { + use std::assert_matches::assert_matches; use std::collections::BTreeMap; use futures::TryStreamExt; @@ -237,6 +283,41 @@ mod tests { Ok((kv.key.clone(), kv.value)) } + #[test] + fn test_try_reduce_page_size() { + let kv_backend = Arc::new(MemoryKvBackend::::new()) as _; + + let mut factory = + PaginationStreamFactory::new(&kv_backend, vec![], vec![], 2, false, false); + + // new adaptive page size: 1 + factory.try_reduce_adaptive_page_size().unwrap(); + + // new adaptive page size: 0 + assert_matches!( + factory.try_reduce_adaptive_page_size().unwrap_err(), + error::Error::Unexpected { .. } + ); + + let mut factory = + PaginationStreamFactory::new(&kv_backend, vec![], vec![], 1024, false, false); + + factory.try_reduce_adaptive_page_size().unwrap(); + + assert_eq!(factory.adaptive_page_size, 512); + + factory.try_reduce_adaptive_page_size().unwrap(); + + assert_eq!(factory.adaptive_page_size, 256); + + let mut factory = + PaginationStreamFactory::new(&kv_backend, vec![], vec![], 0, false, false); + + factory.try_reduce_adaptive_page_size().unwrap(); + + assert_eq!(factory.adaptive_page_size, DEFAULT_ADAPTIVE_PAGE_SIZE / 2); + } + #[tokio::test] async fn test_range_empty() { let kv_backend = Arc::new(MemoryKvBackend::::new());