From b5f7138d337ee2229c3b45633e9ccc38f8600104 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" <6406592+v0y4g3r@users.noreply.github.com> Date: Thu, 19 Sep 2024 12:44:02 +0800 Subject: [PATCH] refactor(tables): improve `tables` performance (#4737) * chore: cherrypick 52e8eebb2dbbbe81179583c05094004a5eedd7fd * refactor/tables: Change variable from immutable to mutable in KvBackendCatalogManager's method * refactor/tables: Replace unbounded channel with bounded and use semaphore for concurrency control in KvBackendCatalogManager * refactor/tables: Add common-runtime dependency and update KvBackendCatalogManager to use common_runtime::spawn_global * refactor/tables: Await on sending error through channel in KvBackendCatalogManager --- Cargo.lock | 3 + src/catalog/Cargo.toml | 2 + src/catalog/src/kvbackend/manager.rs | 83 ++++++++---- src/common/meta/Cargo.toml | 1 + src/common/meta/src/key/catalog_name.rs | 3 +- src/common/meta/src/key/datanode_table.rs | 3 +- src/common/meta/src/key/flow/flow_name.rs | 3 +- src/common/meta/src/key/flow/flow_route.rs | 3 +- src/common/meta/src/key/flow/flownode_flow.rs | 3 +- src/common/meta/src/key/flow/table_flow.rs | 3 +- src/common/meta/src/key/schema_name.rs | 3 +- src/common/meta/src/key/table_name.rs | 3 +- src/common/meta/src/range_stream.rs | 126 ++++-------------- src/common/meta/src/state_store.rs | 3 +- src/log-store/src/raft_engine/backend.rs | 3 +- src/meta-srv/src/service/store/cached_kv.rs | 5 +- 16 files changed, 108 insertions(+), 142 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3176657706..786c1c3a8b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1381,6 +1381,7 @@ dependencies = [ "common-meta", "common-query", "common-recordbatch", + "common-runtime", "common-telemetry", "common-test-util", "common-time", @@ -1408,6 +1409,7 @@ dependencies = [ "store-api", "table", "tokio", + "tokio-stream", ] [[package]] @@ -2109,6 +2111,7 @@ dependencies = [ "anymap2", "api", "async-recursion", + "async-stream", "async-trait", "base64 0.21.7", "bytes", diff --git a/src/catalog/Cargo.toml b/src/catalog/Cargo.toml index a484171733..cfea5e02c6 100644 --- a/src/catalog/Cargo.toml +++ b/src/catalog/Cargo.toml @@ -24,6 +24,7 @@ common-macro.workspace = true common-meta.workspace = true common-query.workspace = true common-recordbatch.workspace = true +common-runtime.workspace = true common-telemetry.workspace = true common-time.workspace = true common-version.workspace = true @@ -48,6 +49,7 @@ sql.workspace = true store-api.workspace = true table.workspace = true tokio.workspace = true +tokio-stream = "0.1" [dev-dependencies] cache.workspace = true diff --git a/src/catalog/src/kvbackend/manager.rs b/src/catalog/src/kvbackend/manager.rs index feb5e31d09..1559022514 100644 --- a/src/catalog/src/kvbackend/manager.rs +++ b/src/catalog/src/kvbackend/manager.rs @@ -42,6 +42,8 @@ use table::dist_table::DistTable; use table::table::numbers::{NumbersTable, NUMBERS_TABLE_NAME}; use table::table_name::TableName; use table::TableRef; +use tokio::sync::Semaphore; +use tokio_stream::wrappers::ReceiverStream; use crate::error::{ CacheNotFoundSnafu, GetTableCacheSnafu, InvalidTableInfoInCatalogSnafu, ListCatalogsSnafu, @@ -179,21 +181,18 @@ impl CatalogManager for KvBackendCatalogManager { schema: &str, query_ctx: Option<&QueryContext>, ) -> Result> { - let stream = self + let mut tables = self .table_metadata_manager .table_name_manager() - .tables(catalog, schema); - let mut tables = stream + .tables(catalog, schema) + .map_ok(|(table_name, _)| table_name) .try_collect::>() .await .map_err(BoxedError::new) - .context(ListTablesSnafu { catalog, schema })? - .into_iter() - .map(|(k, _)| k) - .collect::>(); - tables.extend_from_slice(&self.system_catalog.table_names(schema, query_ctx)); + .context(ListTablesSnafu { catalog, schema })?; - Ok(tables.into_iter().collect()) + tables.extend(self.system_catalog.table_names(schema, query_ctx)); + Ok(tables) } async fn catalog_exists(&self, catalog: &str) -> Result { @@ -303,36 +302,68 @@ impl CatalogManager for KvBackendCatalogManager { } }); - let table_id_stream = self - .table_metadata_manager - .table_name_manager() - .tables(catalog, schema) - .map_ok(|(_, v)| v.table_id()); const BATCH_SIZE: usize = 128; - let user_tables = try_stream!({ + const CONCURRENCY: usize = 8; + + let (tx, rx) = tokio::sync::mpsc::channel(64); + let metadata_manager = self.table_metadata_manager.clone(); + let catalog = catalog.to_string(); + let schema = schema.to_string(); + let semaphore = Arc::new(Semaphore::new(CONCURRENCY)); + + common_runtime::spawn_global(async move { + let table_id_stream = metadata_manager + .table_name_manager() + .tables(&catalog, &schema) + .map_ok(|(_, v)| v.table_id()); // Split table ids into chunks let mut table_id_chunks = table_id_stream.ready_chunks(BATCH_SIZE); while let Some(table_ids) = table_id_chunks.next().await { - let table_ids = table_ids + let table_ids = match table_ids .into_iter() .collect::, _>>() .map_err(BoxedError::new) - .context(ListTablesSnafu { catalog, schema })?; + .context(ListTablesSnafu { + catalog: &catalog, + schema: &schema, + }) { + Ok(table_ids) => table_ids, + Err(e) => { + let _ = tx.send(Err(e)).await; + return; + } + }; - let table_info_values = self - .table_metadata_manager - .table_info_manager() - .batch_get(&table_ids) - .await - .context(TableMetadataManagerSnafu)?; + let metadata_manager = metadata_manager.clone(); + let tx = tx.clone(); + let semaphore = semaphore.clone(); + common_runtime::spawn_global(async move { + // we don't explicitly close the semaphore so just ignore the potential error. + let _ = semaphore.acquire().await; + let table_info_values = match metadata_manager + .table_info_manager() + .batch_get(&table_ids) + .await + .context(TableMetadataManagerSnafu) + { + Ok(table_info_values) => table_info_values, + Err(e) => { + let _ = tx.send(Err(e)).await; + return; + } + }; - for table_info_value in table_info_values.into_values() { - yield build_table(table_info_value)?; - } + for table in table_info_values.into_values().map(build_table) { + if tx.send(table).await.is_err() { + return; + } + } + }); } }); + let user_tables = ReceiverStream::new(rx); Box::pin(sys_tables.chain(user_tables)) } } diff --git a/src/common/meta/Cargo.toml b/src/common/meta/Cargo.toml index 28f518190a..591055d472 100644 --- a/src/common/meta/Cargo.toml +++ b/src/common/meta/Cargo.toml @@ -15,6 +15,7 @@ workspace = true anymap2 = "0.13.0" api.workspace = true async-recursion = "1.0" +async-stream = "0.3" async-trait.workspace = true base64.workspace = true bytes.workspace = true diff --git a/src/common/meta/src/key/catalog_name.rs b/src/common/meta/src/key/catalog_name.rs index a3a9841618..9238f95c82 100644 --- a/src/common/meta/src/key/catalog_name.rs +++ b/src/common/meta/src/key/catalog_name.rs @@ -147,7 +147,8 @@ impl CatalogManager { req, DEFAULT_PAGE_SIZE, Arc::new(catalog_decoder), - ); + ) + .into_stream(); Box::pin(stream) } diff --git a/src/common/meta/src/key/datanode_table.rs b/src/common/meta/src/key/datanode_table.rs index b53060cd16..b10d536672 100644 --- a/src/common/meta/src/key/datanode_table.rs +++ b/src/common/meta/src/key/datanode_table.rs @@ -167,7 +167,8 @@ impl DatanodeTableManager { req, DEFAULT_PAGE_SIZE, Arc::new(datanode_table_value_decoder), - ); + ) + .into_stream(); Box::pin(stream) } diff --git a/src/common/meta/src/key/flow/flow_name.rs b/src/common/meta/src/key/flow/flow_name.rs index 201c354f99..3a331be800 100644 --- a/src/common/meta/src/key/flow/flow_name.rs +++ b/src/common/meta/src/key/flow/flow_name.rs @@ -200,7 +200,8 @@ impl FlowNameManager { req, DEFAULT_PAGE_SIZE, Arc::new(flow_name_decoder), - ); + ) + .into_stream(); Box::pin(stream) } diff --git a/src/common/meta/src/key/flow/flow_route.rs b/src/common/meta/src/key/flow/flow_route.rs index 96db01aef0..47ee94ce95 100644 --- a/src/common/meta/src/key/flow/flow_route.rs +++ b/src/common/meta/src/key/flow/flow_route.rs @@ -180,7 +180,8 @@ impl FlowRouteManager { req, DEFAULT_PAGE_SIZE, Arc::new(flow_route_decoder), - ); + ) + .into_stream(); Box::pin(stream) } diff --git a/src/common/meta/src/key/flow/flownode_flow.rs b/src/common/meta/src/key/flow/flownode_flow.rs index 4b4c31a7b0..552abfcdbe 100644 --- a/src/common/meta/src/key/flow/flownode_flow.rs +++ b/src/common/meta/src/key/flow/flownode_flow.rs @@ -180,7 +180,8 @@ impl FlownodeFlowManager { req, DEFAULT_PAGE_SIZE, Arc::new(flownode_flow_key_decoder), - ); + ) + .into_stream(); Box::pin(stream.map_ok(|key| (key.flow_id(), key.partition_id()))) } diff --git a/src/common/meta/src/key/flow/table_flow.rs b/src/common/meta/src/key/flow/table_flow.rs index 5297734a23..c4f47cde51 100644 --- a/src/common/meta/src/key/flow/table_flow.rs +++ b/src/common/meta/src/key/flow/table_flow.rs @@ -207,7 +207,8 @@ impl TableFlowManager { req, DEFAULT_PAGE_SIZE, Arc::new(table_flow_decoder), - ); + ) + .into_stream(); Box::pin(stream) } diff --git a/src/common/meta/src/key/schema_name.rs b/src/common/meta/src/key/schema_name.rs index 7c9822e18d..f9cbc4a9f2 100644 --- a/src/common/meta/src/key/schema_name.rs +++ b/src/common/meta/src/key/schema_name.rs @@ -230,7 +230,8 @@ impl SchemaManager { req, DEFAULT_PAGE_SIZE, Arc::new(schema_decoder), - ); + ) + .into_stream(); Box::pin(stream) } diff --git a/src/common/meta/src/key/table_name.rs b/src/common/meta/src/key/table_name.rs index 7054ffc4e9..a632e3a233 100644 --- a/src/common/meta/src/key/table_name.rs +++ b/src/common/meta/src/key/table_name.rs @@ -259,7 +259,8 @@ impl TableNameManager { req, DEFAULT_PAGE_SIZE, Arc::new(table_decoder), - ); + ) + .into_stream(); Box::pin(stream) } diff --git a/src/common/meta/src/range_stream.rs b/src/common/meta/src/range_stream.rs index b14a2bf8f1..be54865281 100644 --- a/src/common/meta/src/range_stream.rs +++ b/src/common/meta/src/range_stream.rs @@ -12,14 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::VecDeque; -use std::pin::Pin; use std::sync::Arc; -use std::task::{Context, Poll}; +use async_stream::try_stream; use common_telemetry::debug; -use futures::future::BoxFuture; -use futures::{ready, FutureExt, Stream}; +use futures::Stream; use snafu::ensure; use crate::error::{self, Result}; @@ -30,17 +27,6 @@ use crate::util::get_next_prefix_key; pub type KeyValueDecoderFn = dyn Fn(KeyValue) -> Result + Send + Sync; -enum PaginationStreamState { - /// At the start of reading. - Init, - /// Decoding key value pairs. - Decoding(SimpleKeyValueDecoder), - /// Retrieving data from backend. - Reading(BoxFuture<'static, Result<(PaginationStreamFactory, Option)>>), - /// Error - Error, -} - /// The Range Request's default page size. /// /// It dependents on upstream KvStore server side grpc message size limitation. @@ -65,8 +51,6 @@ struct PaginationStreamFactory { /// keys. pub range_end: Vec, - /// page_size is the pagination page size. - page_size: usize, /// keys_only when set returns only the keys and not the values. pub keys_only: bool, @@ -89,7 +73,6 @@ impl PaginationStreamFactory { kv: kv.clone(), key, range_end, - page_size, keys_only, more, adaptive_page_size: if page_size == 0 { @@ -137,7 +120,7 @@ impl PaginationStreamFactory { } } - async fn read_next(mut self) -> Result<(Self, Option)> { + async fn read_next(&mut self) -> Result> { if self.more { let resp = self .adaptive_range(RangeRequest { @@ -151,33 +134,22 @@ impl PaginationStreamFactory { let key = resp .kvs .last() - .map(|kv| kv.key.clone()) - .unwrap_or_else(Vec::new); + .map(|kv| kv.key.as_slice()) + .unwrap_or_default(); - let next_key = get_next_prefix_key(&key); - - Ok(( - Self { - kv: self.kv, - key: next_key, - range_end: self.range_end, - page_size: self.page_size, - keys_only: self.keys_only, - more: resp.more, - adaptive_page_size: self.adaptive_page_size, - }, - Some(resp), - )) + let next_key = get_next_prefix_key(key); + self.key = next_key; + self.more = resp.more; + Ok(Some(resp)) } else { - Ok((self, None)) + Ok(None) } } } pub struct PaginationStream { - state: PaginationStreamState, decoder_fn: Arc>, - factory: Option, + factory: PaginationStreamFactory, } impl PaginationStream { @@ -189,82 +161,28 @@ impl PaginationStream { decoder_fn: Arc>, ) -> Self { Self { - state: PaginationStreamState::Init, decoder_fn, - factory: Some(PaginationStreamFactory::new( + factory: PaginationStreamFactory::new( &kv, req.key, req.range_end, page_size, req.keys_only, true, - )), + ), } } } -struct SimpleKeyValueDecoder { - kv: VecDeque, - decoder: Arc>, -} - -impl Iterator for SimpleKeyValueDecoder { - type Item = Result; - - fn next(&mut self) -> Option { - if let Some(kv) = self.kv.pop_front() { - Some((self.decoder)(kv)) - } else { - None - } - } -} - -impl Stream for PaginationStream { - type Item = Result; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - loop { - match &mut self.state { - PaginationStreamState::Decoding(decoder) => match decoder.next() { - Some(Ok(result)) => return Poll::Ready(Some(Ok(result))), - Some(Err(e)) => { - self.state = PaginationStreamState::Error; - return Poll::Ready(Some(Err(e))); - } - None => self.state = PaginationStreamState::Init, - }, - PaginationStreamState::Init => { - let factory = self.factory.take().expect("lost factory"); - if !factory.more { - // Ensures the factory always exists. - self.factory = Some(factory); - return Poll::Ready(None); - } - let fut = factory.read_next().boxed(); - self.state = PaginationStreamState::Reading(fut); +impl PaginationStream { + pub fn into_stream(mut self) -> impl Stream> { + try_stream!({ + while let Some(resp) = self.factory.read_next().await? { + for kv in resp.kvs { + yield (self.decoder_fn)(kv)? } - PaginationStreamState::Reading(f) => match ready!(f.poll_unpin(cx)) { - Ok((factory, Some(resp))) => { - self.factory = Some(factory); - let decoder = SimpleKeyValueDecoder { - kv: resp.kvs.into(), - decoder: self.decoder_fn.clone(), - }; - self.state = PaginationStreamState::Decoding(decoder); - } - Ok((factory, None)) => { - self.factory = Some(factory); - self.state = PaginationStreamState::Init; - } - Err(e) => { - self.state = PaginationStreamState::Error; - return Poll::Ready(Some(Err(e))); - } - }, - PaginationStreamState::Error => return Poll::Ready(None), // Ends the stream as error happens. } - } + }) } } @@ -333,7 +251,8 @@ mod tests { }, DEFAULT_PAGE_SIZE, Arc::new(decoder), - ); + ) + .into_stream(); let kv = stream.try_collect::>().await.unwrap(); assert!(kv.is_empty()); @@ -374,6 +293,7 @@ mod tests { Arc::new(decoder), ); let kv = stream + .into_stream() .try_collect::>() .await .unwrap() diff --git a/src/common/meta/src/state_store.rs b/src/common/meta/src/state_store.rs index 1cf1ea8649..89d5dfd0ff 100644 --- a/src/common/meta/src/state_store.rs +++ b/src/common/meta/src/state_store.rs @@ -172,7 +172,8 @@ impl StateStore for KvStateStore { req, self.max_num_per_range_request.unwrap_or_default(), Arc::new(decode_kv), - ); + ) + .into_stream(); let stream = stream.map(move |r| { let path = path.clone(); diff --git a/src/log-store/src/raft_engine/backend.rs b/src/log-store/src/raft_engine/backend.rs index e2cd65c8fa..33cb64a2e8 100644 --- a/src/log-store/src/raft_engine/backend.rs +++ b/src/log-store/src/raft_engine/backend.rs @@ -264,9 +264,8 @@ impl KvBackend for RaftEngineBackend { let mut response = BatchGetResponse { kvs: Vec::with_capacity(req.keys.len()), }; - let engine = self.engine.read().unwrap(); for key in req.keys { - let Some(value) = engine.get(SYSTEM_NAMESPACE, &key) else { + let Some(value) = self.engine.read().unwrap().get(SYSTEM_NAMESPACE, &key) else { continue; }; response.kvs.push(KeyValue { key, value }); diff --git a/src/meta-srv/src/service/store/cached_kv.rs b/src/meta-srv/src/service/store/cached_kv.rs index ddf7b3e516..d4b6f84f58 100644 --- a/src/meta-srv/src/service/store/cached_kv.rs +++ b/src/meta-srv/src/service/store/cached_kv.rs @@ -103,9 +103,10 @@ impl LeaderCachedKvBackend { RangeRequest::new().with_prefix(prefix.as_bytes()), DEFAULT_PAGE_SIZE, Arc::new(Ok), - ); + ) + .into_stream(); - let kvs = stream.try_collect::>().await?.into_iter().collect(); + let kvs = stream.try_collect::>().await?; self.cache .batch_put(BatchPutRequest {