feat: decrease the page size if the response size exceeds the limit (#2689)

* feat: decrease the `page size` if the response message size exceeds the limit

* chore: apply suggestions from CR

* feat: prefer to use adaptive_page_size

* chore: apply suggestions from CR
This commit is contained in:
Weny Xu
2023-11-06 12:36:15 +09:00
committed by GitHub
parent 395632c874
commit 060864d0c1
6 changed files with 110 additions and 12 deletions

12
Cargo.lock generated
View File

@@ -1800,6 +1800,7 @@ version = "0.4.2"
dependencies = [
"api",
"arrow-flight",
"async-recursion",
"async-stream",
"async-trait",
"base64 0.21.5",
@@ -1830,6 +1831,7 @@ dependencies = [
"strum 0.25.0",
"table",
"tokio",
"tonic 0.10.2",
]
[[package]]
@@ -3066,16 +3068,16 @@ dependencies = [
[[package]]
name = "etcd-client"
version = "0.11.1"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f4b0ea5ef6dc2388a4b1669fa32097249bc03a15417b97cb75e38afb309e4a89"
checksum = "3d982a3b3088a5f95d19882d298b352a2e0be20703e3080c1e6767731d5dec79"
dependencies = [
"http",
"prost 0.11.9",
"prost 0.12.1",
"tokio",
"tokio-stream",
"tonic 0.9.2",
"tonic-build 0.9.2",
"tonic 0.10.2",
"tonic-build 0.10.2",
"tower",
"tower-service",
]

View File

@@ -77,7 +77,7 @@ datafusion-physical-expr = { git = "https://github.com/apache/arrow-datafusion.g
datafusion-sql = { git = "https://github.com/apache/arrow-datafusion.git", rev = "26e43acac3a96cec8dd4c8365f22dfb1a84306e9" }
datafusion-substrait = { git = "https://github.com/apache/arrow-datafusion.git", rev = "26e43acac3a96cec8dd4c8365f22dfb1a84306e9" }
derive_builder = "0.12"
etcd-client = "0.11"
etcd-client = "0.12"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "5da72f1cae6b24315e5afc87520aaf7b4d6bb872" }

View File

@@ -10,6 +10,7 @@ testing = []
[dependencies]
api = { workspace = true }
arrow-flight.workspace = true
async-recursion = "1.0"
async-stream.workspace = true
async-trait.workspace = true
base64 = "0.21"
@@ -38,6 +39,7 @@ store-api = { workspace = true }
strum.workspace = true
table = { workspace = true }
tokio.workspace = true
tonic.workspace = true
[dev-dependencies]
chrono.workspace = true

View File

@@ -343,4 +343,16 @@ impl Error {
pub fn is_retry_later(&self) -> bool {
matches!(self, Error::RetryLater { .. })
}
/// Returns true if the response exceeds the size limit.
pub fn is_exceeded_size_limit(&self) -> bool {
if let Error::EtcdFailed {
error: etcd_client::Error::GRpcStatus(status),
..
} = self
{
return status.code() == tonic::Code::OutOfRange;
}
false
}
}

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#![feature(assert_matches)]
#![feature(btree_extract_if)]
#![feature(async_closure)]

View File

@@ -17,10 +17,12 @@ use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use common_telemetry::debug;
use futures::future::BoxFuture;
use futures::{ready, FutureExt, Stream};
use snafu::ensure;
use crate::error::Result;
use crate::error::{self, Result};
use crate::kv_backend::KvBackendRef;
use crate::rpc::store::{RangeRequest, RangeResponse};
use crate::rpc::KeyValue;
@@ -64,10 +66,13 @@ struct PaginationStreamFactory {
pub range_end: Vec<u8>,
/// page_size is the pagination page size.
pub page_size: usize,
page_size: usize,
/// keys_only when set returns only the keys and not the values.
pub keys_only: bool,
/// It reduces the page size if the response size exceeds the limit.
pub adaptive_page_size: usize,
pub more: bool,
}
@@ -87,19 +92,58 @@ impl PaginationStreamFactory {
page_size,
keys_only,
more,
adaptive_page_size: if page_size == 0 {
DEFAULT_ADAPTIVE_PAGE_SIZE
} else {
page_size
},
}
}
}
const DEFAULT_ADAPTIVE_PAGE_SIZE: usize = 1024;
impl PaginationStreamFactory {
pub async fn read_next(self) -> Result<(Self, Option<RangeResponse>)> {
fn try_reduce_adaptive_page_size(&mut self) -> Result<()> {
self.adaptive_page_size /= 2;
ensure!(
self.adaptive_page_size != 0,
error::UnexpectedSnafu {
err_msg: "Exceeded maximum number of adaptive range retries"
}
);
Ok(())
}
/// Decreases the `page size` if the response message size exceeds the limitation.
/// TODO(weny): Considers to add an E2e test.
#[async_recursion::async_recursion]
async fn adaptive_range(&mut self, req: RangeRequest) -> Result<RangeResponse> {
match self.kv.range(req.clone()).await {
Ok(resp) => Ok(resp),
Err(err) => {
if err.is_exceeded_size_limit() {
self.try_reduce_adaptive_page_size()?;
debug!("Reset page_size to {}", self.adaptive_page_size);
self.adaptive_range(req.with_limit(self.adaptive_page_size as i64))
.await
} else {
Err(err)
}
}
}
}
pub async fn read_next(mut self) -> Result<(Self, Option<RangeResponse>)> {
if self.more {
let resp = self
.kv
.range(RangeRequest {
.adaptive_range(RangeRequest {
key: self.key.clone(),
range_end: self.range_end.clone(),
limit: self.page_size as i64,
limit: self.adaptive_page_size as i64,
keys_only: self.keys_only,
})
.await?;
@@ -120,6 +164,7 @@ impl PaginationStreamFactory {
page_size: self.page_size,
keys_only: self.keys_only,
more: resp.more,
adaptive_page_size: self.adaptive_page_size,
},
Some(resp),
))
@@ -223,6 +268,7 @@ impl<K, V> Stream for PaginationStream<K, V> {
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use std::collections::BTreeMap;
use futures::TryStreamExt;
@@ -237,6 +283,41 @@ mod tests {
Ok((kv.key.clone(), kv.value))
}
#[test]
fn test_try_reduce_page_size() {
let kv_backend = Arc::new(MemoryKvBackend::<Error>::new()) as _;
let mut factory =
PaginationStreamFactory::new(&kv_backend, vec![], vec![], 2, false, false);
// new adaptive page size: 1
factory.try_reduce_adaptive_page_size().unwrap();
// new adaptive page size: 0
assert_matches!(
factory.try_reduce_adaptive_page_size().unwrap_err(),
error::Error::Unexpected { .. }
);
let mut factory =
PaginationStreamFactory::new(&kv_backend, vec![], vec![], 1024, false, false);
factory.try_reduce_adaptive_page_size().unwrap();
assert_eq!(factory.adaptive_page_size, 512);
factory.try_reduce_adaptive_page_size().unwrap();
assert_eq!(factory.adaptive_page_size, 256);
let mut factory =
PaginationStreamFactory::new(&kv_backend, vec![], vec![], 0, false, false);
factory.try_reduce_adaptive_page_size().unwrap();
assert_eq!(factory.adaptive_page_size, DEFAULT_ADAPTIVE_PAGE_SIZE / 2);
}
#[tokio::test]
async fn test_range_empty() {
let kv_backend = Arc::new(MemoryKvBackend::<Error>::new());