refactor(tables): improve tables performance (#4737)

* chore: cherrypick 52e8eebb2dbbbe81179583c05094004a5eedd7fd

* refactor/tables: Change variable from immutable to mutable in KvBackendCatalogManager's method

* refactor/tables: Replace unbounded channel with bounded and use semaphore for concurrency control in KvBackendCatalogManager

* refactor/tables: Add common-runtime dependency and update KvBackendCatalogManager to use common_runtime::spawn_global

* refactor/tables: Await on sending error through channel in KvBackendCatalogManager
This commit is contained in:
Lei, HUANG
2024-09-19 12:44:02 +08:00
committed by GitHub
parent 08bd40333c
commit b5f7138d33
16 changed files with 108 additions and 142 deletions

3
Cargo.lock generated
View File

@@ -1381,6 +1381,7 @@ dependencies = [
"common-meta",
"common-query",
"common-recordbatch",
"common-runtime",
"common-telemetry",
"common-test-util",
"common-time",
@@ -1408,6 +1409,7 @@ dependencies = [
"store-api",
"table",
"tokio",
"tokio-stream",
]
[[package]]
@@ -2109,6 +2111,7 @@ dependencies = [
"anymap2",
"api",
"async-recursion",
"async-stream",
"async-trait",
"base64 0.21.7",
"bytes",

View File

@@ -24,6 +24,7 @@ common-macro.workspace = true
common-meta.workspace = true
common-query.workspace = true
common-recordbatch.workspace = true
common-runtime.workspace = true
common-telemetry.workspace = true
common-time.workspace = true
common-version.workspace = true
@@ -48,6 +49,7 @@ sql.workspace = true
store-api.workspace = true
table.workspace = true
tokio.workspace = true
tokio-stream = "0.1"
[dev-dependencies]
cache.workspace = true

View File

@@ -42,6 +42,8 @@ use table::dist_table::DistTable;
use table::table::numbers::{NumbersTable, NUMBERS_TABLE_NAME};
use table::table_name::TableName;
use table::TableRef;
use tokio::sync::Semaphore;
use tokio_stream::wrappers::ReceiverStream;
use crate::error::{
CacheNotFoundSnafu, GetTableCacheSnafu, InvalidTableInfoInCatalogSnafu, ListCatalogsSnafu,
@@ -179,21 +181,18 @@ impl CatalogManager for KvBackendCatalogManager {
schema: &str,
query_ctx: Option<&QueryContext>,
) -> Result<Vec<String>> {
let stream = self
let mut tables = self
.table_metadata_manager
.table_name_manager()
.tables(catalog, schema);
let mut tables = stream
.tables(catalog, schema)
.map_ok(|(table_name, _)| table_name)
.try_collect::<Vec<_>>()
.await
.map_err(BoxedError::new)
.context(ListTablesSnafu { catalog, schema })?
.into_iter()
.map(|(k, _)| k)
.collect::<Vec<_>>();
tables.extend_from_slice(&self.system_catalog.table_names(schema, query_ctx));
.context(ListTablesSnafu { catalog, schema })?;
Ok(tables.into_iter().collect())
tables.extend(self.system_catalog.table_names(schema, query_ctx));
Ok(tables)
}
async fn catalog_exists(&self, catalog: &str) -> Result<bool> {
@@ -303,36 +302,68 @@ impl CatalogManager for KvBackendCatalogManager {
}
});
let table_id_stream = self
.table_metadata_manager
.table_name_manager()
.tables(catalog, schema)
.map_ok(|(_, v)| v.table_id());
const BATCH_SIZE: usize = 128;
let user_tables = try_stream!({
const CONCURRENCY: usize = 8;
let (tx, rx) = tokio::sync::mpsc::channel(64);
let metadata_manager = self.table_metadata_manager.clone();
let catalog = catalog.to_string();
let schema = schema.to_string();
let semaphore = Arc::new(Semaphore::new(CONCURRENCY));
common_runtime::spawn_global(async move {
let table_id_stream = metadata_manager
.table_name_manager()
.tables(&catalog, &schema)
.map_ok(|(_, v)| v.table_id());
// Split table ids into chunks
let mut table_id_chunks = table_id_stream.ready_chunks(BATCH_SIZE);
while let Some(table_ids) = table_id_chunks.next().await {
let table_ids = table_ids
let table_ids = match table_ids
.into_iter()
.collect::<std::result::Result<Vec<_>, _>>()
.map_err(BoxedError::new)
.context(ListTablesSnafu { catalog, schema })?;
.context(ListTablesSnafu {
catalog: &catalog,
schema: &schema,
}) {
Ok(table_ids) => table_ids,
Err(e) => {
let _ = tx.send(Err(e)).await;
return;
}
};
let table_info_values = self
.table_metadata_manager
.table_info_manager()
.batch_get(&table_ids)
.await
.context(TableMetadataManagerSnafu)?;
let metadata_manager = metadata_manager.clone();
let tx = tx.clone();
let semaphore = semaphore.clone();
common_runtime::spawn_global(async move {
// we don't explicitly close the semaphore so just ignore the potential error.
let _ = semaphore.acquire().await;
let table_info_values = match metadata_manager
.table_info_manager()
.batch_get(&table_ids)
.await
.context(TableMetadataManagerSnafu)
{
Ok(table_info_values) => table_info_values,
Err(e) => {
let _ = tx.send(Err(e)).await;
return;
}
};
for table_info_value in table_info_values.into_values() {
yield build_table(table_info_value)?;
}
for table in table_info_values.into_values().map(build_table) {
if tx.send(table).await.is_err() {
return;
}
}
});
}
});
let user_tables = ReceiverStream::new(rx);
Box::pin(sys_tables.chain(user_tables))
}
}

View File

@@ -15,6 +15,7 @@ workspace = true
anymap2 = "0.13.0"
api.workspace = true
async-recursion = "1.0"
async-stream = "0.3"
async-trait.workspace = true
base64.workspace = true
bytes.workspace = true

View File

@@ -147,7 +147,8 @@ impl CatalogManager {
req,
DEFAULT_PAGE_SIZE,
Arc::new(catalog_decoder),
);
)
.into_stream();
Box::pin(stream)
}

View File

@@ -167,7 +167,8 @@ impl DatanodeTableManager {
req,
DEFAULT_PAGE_SIZE,
Arc::new(datanode_table_value_decoder),
);
)
.into_stream();
Box::pin(stream)
}

View File

@@ -200,7 +200,8 @@ impl FlowNameManager {
req,
DEFAULT_PAGE_SIZE,
Arc::new(flow_name_decoder),
);
)
.into_stream();
Box::pin(stream)
}

View File

@@ -180,7 +180,8 @@ impl FlowRouteManager {
req,
DEFAULT_PAGE_SIZE,
Arc::new(flow_route_decoder),
);
)
.into_stream();
Box::pin(stream)
}

View File

@@ -180,7 +180,8 @@ impl FlownodeFlowManager {
req,
DEFAULT_PAGE_SIZE,
Arc::new(flownode_flow_key_decoder),
);
)
.into_stream();
Box::pin(stream.map_ok(|key| (key.flow_id(), key.partition_id())))
}

View File

@@ -207,7 +207,8 @@ impl TableFlowManager {
req,
DEFAULT_PAGE_SIZE,
Arc::new(table_flow_decoder),
);
)
.into_stream();
Box::pin(stream)
}

View File

@@ -230,7 +230,8 @@ impl SchemaManager {
req,
DEFAULT_PAGE_SIZE,
Arc::new(schema_decoder),
);
)
.into_stream();
Box::pin(stream)
}

View File

@@ -259,7 +259,8 @@ impl TableNameManager {
req,
DEFAULT_PAGE_SIZE,
Arc::new(table_decoder),
);
)
.into_stream();
Box::pin(stream)
}

View File

@@ -12,14 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::VecDeque;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use async_stream::try_stream;
use common_telemetry::debug;
use futures::future::BoxFuture;
use futures::{ready, FutureExt, Stream};
use futures::Stream;
use snafu::ensure;
use crate::error::{self, Result};
@@ -30,17 +27,6 @@ use crate::util::get_next_prefix_key;
pub type KeyValueDecoderFn<T> = dyn Fn(KeyValue) -> Result<T> + Send + Sync;
enum PaginationStreamState<T> {
/// At the start of reading.
Init,
/// Decoding key value pairs.
Decoding(SimpleKeyValueDecoder<T>),
/// Retrieving data from backend.
Reading(BoxFuture<'static, Result<(PaginationStreamFactory, Option<RangeResponse>)>>),
/// Error
Error,
}
/// The Range Request's default page size.
///
/// It dependents on upstream KvStore server side grpc message size limitation.
@@ -65,8 +51,6 @@ struct PaginationStreamFactory {
/// keys.
pub range_end: Vec<u8>,
/// page_size is the pagination page size.
page_size: usize,
/// keys_only when set returns only the keys and not the values.
pub keys_only: bool,
@@ -89,7 +73,6 @@ impl PaginationStreamFactory {
kv: kv.clone(),
key,
range_end,
page_size,
keys_only,
more,
adaptive_page_size: if page_size == 0 {
@@ -137,7 +120,7 @@ impl PaginationStreamFactory {
}
}
async fn read_next(mut self) -> Result<(Self, Option<RangeResponse>)> {
async fn read_next(&mut self) -> Result<Option<RangeResponse>> {
if self.more {
let resp = self
.adaptive_range(RangeRequest {
@@ -151,33 +134,22 @@ impl PaginationStreamFactory {
let key = resp
.kvs
.last()
.map(|kv| kv.key.clone())
.unwrap_or_else(Vec::new);
.map(|kv| kv.key.as_slice())
.unwrap_or_default();
let next_key = get_next_prefix_key(&key);
Ok((
Self {
kv: self.kv,
key: next_key,
range_end: self.range_end,
page_size: self.page_size,
keys_only: self.keys_only,
more: resp.more,
adaptive_page_size: self.adaptive_page_size,
},
Some(resp),
))
let next_key = get_next_prefix_key(key);
self.key = next_key;
self.more = resp.more;
Ok(Some(resp))
} else {
Ok((self, None))
Ok(None)
}
}
}
pub struct PaginationStream<T> {
state: PaginationStreamState<T>,
decoder_fn: Arc<KeyValueDecoderFn<T>>,
factory: Option<PaginationStreamFactory>,
factory: PaginationStreamFactory,
}
impl<T> PaginationStream<T> {
@@ -189,82 +161,28 @@ impl<T> PaginationStream<T> {
decoder_fn: Arc<KeyValueDecoderFn<T>>,
) -> Self {
Self {
state: PaginationStreamState::Init,
decoder_fn,
factory: Some(PaginationStreamFactory::new(
factory: PaginationStreamFactory::new(
&kv,
req.key,
req.range_end,
page_size,
req.keys_only,
true,
)),
),
}
}
}
struct SimpleKeyValueDecoder<T> {
kv: VecDeque<KeyValue>,
decoder: Arc<KeyValueDecoderFn<T>>,
}
impl<T> Iterator for SimpleKeyValueDecoder<T> {
type Item = Result<T>;
fn next(&mut self) -> Option<Self::Item> {
if let Some(kv) = self.kv.pop_front() {
Some((self.decoder)(kv))
} else {
None
}
}
}
impl<T> Stream for PaginationStream<T> {
type Item = Result<T>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
match &mut self.state {
PaginationStreamState::Decoding(decoder) => match decoder.next() {
Some(Ok(result)) => return Poll::Ready(Some(Ok(result))),
Some(Err(e)) => {
self.state = PaginationStreamState::Error;
return Poll::Ready(Some(Err(e)));
}
None => self.state = PaginationStreamState::Init,
},
PaginationStreamState::Init => {
let factory = self.factory.take().expect("lost factory");
if !factory.more {
// Ensures the factory always exists.
self.factory = Some(factory);
return Poll::Ready(None);
}
let fut = factory.read_next().boxed();
self.state = PaginationStreamState::Reading(fut);
impl<T> PaginationStream<T> {
pub fn into_stream(mut self) -> impl Stream<Item = Result<T>> {
try_stream!({
while let Some(resp) = self.factory.read_next().await? {
for kv in resp.kvs {
yield (self.decoder_fn)(kv)?
}
PaginationStreamState::Reading(f) => match ready!(f.poll_unpin(cx)) {
Ok((factory, Some(resp))) => {
self.factory = Some(factory);
let decoder = SimpleKeyValueDecoder {
kv: resp.kvs.into(),
decoder: self.decoder_fn.clone(),
};
self.state = PaginationStreamState::Decoding(decoder);
}
Ok((factory, None)) => {
self.factory = Some(factory);
self.state = PaginationStreamState::Init;
}
Err(e) => {
self.state = PaginationStreamState::Error;
return Poll::Ready(Some(Err(e)));
}
},
PaginationStreamState::Error => return Poll::Ready(None), // Ends the stream as error happens.
}
}
})
}
}
@@ -333,7 +251,8 @@ mod tests {
},
DEFAULT_PAGE_SIZE,
Arc::new(decoder),
);
)
.into_stream();
let kv = stream.try_collect::<Vec<_>>().await.unwrap();
assert!(kv.is_empty());
@@ -374,6 +293,7 @@ mod tests {
Arc::new(decoder),
);
let kv = stream
.into_stream()
.try_collect::<Vec<_>>()
.await
.unwrap()

View File

@@ -172,7 +172,8 @@ impl StateStore for KvStateStore {
req,
self.max_num_per_range_request.unwrap_or_default(),
Arc::new(decode_kv),
);
)
.into_stream();
let stream = stream.map(move |r| {
let path = path.clone();

View File

@@ -264,9 +264,8 @@ impl KvBackend for RaftEngineBackend {
let mut response = BatchGetResponse {
kvs: Vec::with_capacity(req.keys.len()),
};
let engine = self.engine.read().unwrap();
for key in req.keys {
let Some(value) = engine.get(SYSTEM_NAMESPACE, &key) else {
let Some(value) = self.engine.read().unwrap().get(SYSTEM_NAMESPACE, &key) else {
continue;
};
response.kvs.push(KeyValue { key, value });

View File

@@ -103,9 +103,10 @@ impl LeaderCachedKvBackend {
RangeRequest::new().with_prefix(prefix.as_bytes()),
DEFAULT_PAGE_SIZE,
Arc::new(Ok),
);
)
.into_stream();
let kvs = stream.try_collect::<Vec<_>>().await?.into_iter().collect();
let kvs = stream.try_collect::<Vec<_>>().await?;
self.cache
.batch_put(BatchPutRequest {