refactor: remove unnecessary wrap (#5221)

* chore: remove unnecessary arc

* chore: remove unnecessary box
This commit is contained in:
Weny Xu
2024-12-24 16:43:14 +08:00
committed by Yingwen
parent 3fec71b5c0
commit d4cae6af1e
22 changed files with 81 additions and 53 deletions

View File

@@ -38,7 +38,7 @@ pub fn new_table_cache(
) -> TableCache {
let init = init_factory(table_info_cache, table_name_cache);
CacheContainer::new(name, cache, Box::new(invalidator), init, Box::new(filter))
CacheContainer::new(name, cache, Box::new(invalidator), init, filter)
}
fn init_factory(

View File

@@ -43,7 +43,7 @@ pub struct CacheContainer<K, V, CacheToken> {
cache: Cache<K, V>,
invalidator: Invalidator<K, V, CacheToken>,
initializer: Initializer<K, V>,
token_filter: TokenFilter<CacheToken>,
token_filter: fn(&CacheToken) -> bool,
}
impl<K, V, CacheToken> CacheContainer<K, V, CacheToken>
@@ -58,7 +58,7 @@ where
cache: Cache<K, V>,
invalidator: Invalidator<K, V, CacheToken>,
initializer: Initializer<K, V>,
token_filter: TokenFilter<CacheToken>,
token_filter: fn(&CacheToken) -> bool,
) -> Self {
Self {
name,
@@ -206,10 +206,13 @@ mod tests {
name: &'a str,
}
fn always_true_filter(_: &String) -> bool {
true
}
#[tokio::test]
async fn test_get() {
let cache: Cache<NameKey, String> = CacheBuilder::new(128).build();
let filter: TokenFilter<String> = Box::new(|_| true);
let counter = Arc::new(AtomicI32::new(0));
let moved_counter = counter.clone();
let init: Initializer<NameKey, String> = Arc::new(move |_| {
@@ -219,7 +222,13 @@ mod tests {
let invalidator: Invalidator<NameKey, String, String> =
Box::new(|_, _| Box::pin(async { Ok(()) }));
let adv_cache = CacheContainer::new("test".to_string(), cache, invalidator, init, filter);
let adv_cache = CacheContainer::new(
"test".to_string(),
cache,
invalidator,
init,
always_true_filter,
);
let key = NameKey { name: "key" };
let value = adv_cache.get(key).await.unwrap().unwrap();
assert_eq!(value, "hi");
@@ -233,7 +242,6 @@ mod tests {
#[tokio::test]
async fn test_get_by_ref() {
let cache: Cache<String, String> = CacheBuilder::new(128).build();
let filter: TokenFilter<String> = Box::new(|_| true);
let counter = Arc::new(AtomicI32::new(0));
let moved_counter = counter.clone();
let init: Initializer<String, String> = Arc::new(move |_| {
@@ -243,7 +251,13 @@ mod tests {
let invalidator: Invalidator<String, String, String> =
Box::new(|_, _| Box::pin(async { Ok(()) }));
let adv_cache = CacheContainer::new("test".to_string(), cache, invalidator, init, filter);
let adv_cache = CacheContainer::new(
"test".to_string(),
cache,
invalidator,
init,
always_true_filter,
);
let value = adv_cache.get_by_ref("foo").await.unwrap().unwrap();
assert_eq!(value, "hi");
let value = adv_cache.get_by_ref("foo").await.unwrap().unwrap();
@@ -257,13 +271,18 @@ mod tests {
#[tokio::test]
async fn test_get_value_not_exits() {
let cache: Cache<String, String> = CacheBuilder::new(128).build();
let filter: TokenFilter<String> = Box::new(|_| true);
let init: Initializer<String, String> =
Arc::new(move |_| Box::pin(async { error::ValueNotExistSnafu {}.fail() }));
let invalidator: Invalidator<String, String, String> =
Box::new(|_, _| Box::pin(async { Ok(()) }));
let adv_cache = CacheContainer::new("test".to_string(), cache, invalidator, init, filter);
let adv_cache = CacheContainer::new(
"test".to_string(),
cache,
invalidator,
init,
always_true_filter,
);
let value = adv_cache.get_by_ref("foo").await.unwrap();
assert!(value.is_none());
}
@@ -271,7 +290,6 @@ mod tests {
#[tokio::test]
async fn test_invalidate() {
let cache: Cache<String, String> = CacheBuilder::new(128).build();
let filter: TokenFilter<String> = Box::new(|_| true);
let counter = Arc::new(AtomicI32::new(0));
let moved_counter = counter.clone();
let init: Initializer<String, String> = Arc::new(move |_| {
@@ -285,7 +303,13 @@ mod tests {
})
});
let adv_cache = CacheContainer::new("test".to_string(), cache, invalidator, init, filter);
let adv_cache = CacheContainer::new(
"test".to_string(),
cache,
invalidator,
init,
always_true_filter,
);
let value = adv_cache.get_by_ref("foo").await.unwrap().unwrap();
assert_eq!(value, "hi");
let value = adv_cache.get_by_ref("foo").await.unwrap().unwrap();

View File

@@ -45,7 +45,7 @@ pub fn new_table_flownode_set_cache(
let table_flow_manager = Arc::new(TableFlowManager::new(kv_backend));
let init = init_factory(table_flow_manager);
CacheContainer::new(name, cache, Box::new(invalidator), init, Box::new(filter))
CacheContainer::new(name, cache, Box::new(invalidator), init, filter)
}
fn init_factory(table_flow_manager: TableFlowManagerRef) -> Initializer<TableId, FlownodeSet> {

View File

@@ -151,12 +151,15 @@ mod tests {
use crate::cache::*;
use crate::instruction::CacheIdent;
fn always_true_filter(_: &CacheIdent) -> bool {
true
}
fn test_cache(
name: &str,
invalidator: Invalidator<String, String, CacheIdent>,
) -> CacheContainer<String, String, CacheIdent> {
let cache: Cache<String, String> = CacheBuilder::new(128).build();
let filter: TokenFilter<CacheIdent> = Box::new(|_| true);
let counter = Arc::new(AtomicI32::new(0));
let moved_counter = counter.clone();
let init: Initializer<String, String> = Arc::new(move |_| {
@@ -164,7 +167,13 @@ mod tests {
Box::pin(async { Ok(Some("hi".to_string())) })
});
CacheContainer::new(name.to_string(), cache, invalidator, init, filter)
CacheContainer::new(
name.to_string(),
cache,
invalidator,
init,
always_true_filter,
)
}
fn test_i32_cache(
@@ -172,7 +181,6 @@ mod tests {
invalidator: Invalidator<i32, String, CacheIdent>,
) -> CacheContainer<i32, String, CacheIdent> {
let cache: Cache<i32, String> = CacheBuilder::new(128).build();
let filter: TokenFilter<CacheIdent> = Box::new(|_| true);
let counter = Arc::new(AtomicI32::new(0));
let moved_counter = counter.clone();
let init: Initializer<i32, String> = Arc::new(move |_| {
@@ -180,7 +188,13 @@ mod tests {
Box::pin(async { Ok(Some("foo".to_string())) })
});
CacheContainer::new(name.to_string(), cache, invalidator, init, filter)
CacheContainer::new(
name.to_string(),
cache,
invalidator,
init,
always_true_filter,
)
}
#[tokio::test]

View File

@@ -36,7 +36,7 @@ pub fn new_schema_cache(
let schema_manager = SchemaManager::new(kv_backend.clone());
let init = init_factory(schema_manager);
CacheContainer::new(name, cache, Box::new(invalidator), init, Box::new(filter))
CacheContainer::new(name, cache, Box::new(invalidator), init, filter)
}
fn init_factory(schema_manager: SchemaManager) -> Initializer<SchemaName, Arc<SchemaNameValue>> {

View File

@@ -41,7 +41,7 @@ pub fn new_table_info_cache(
let table_info_manager = Arc::new(TableInfoManager::new(kv_backend));
let init = init_factory(table_info_manager);
CacheContainer::new(name, cache, Box::new(invalidator), init, Box::new(filter))
CacheContainer::new(name, cache, Box::new(invalidator), init, filter)
}
fn init_factory(table_info_manager: TableInfoManagerRef) -> Initializer<TableId, Arc<TableInfo>> {

View File

@@ -41,7 +41,7 @@ pub fn new_table_name_cache(
let table_name_manager = Arc::new(TableNameManager::new(kv_backend));
let init = init_factory(table_name_manager);
CacheContainer::new(name, cache, Box::new(invalidator), init, Box::new(filter))
CacheContainer::new(name, cache, Box::new(invalidator), init, filter)
}
fn init_factory(table_name_manager: TableNameManagerRef) -> Initializer<TableName, TableId> {

View File

@@ -65,7 +65,7 @@ pub fn new_table_route_cache(
let table_info_manager = Arc::new(TableRouteManager::new(kv_backend));
let init = init_factory(table_info_manager);
CacheContainer::new(name, cache, Box::new(invalidator), init, Box::new(filter))
CacheContainer::new(name, cache, Box::new(invalidator), init, filter)
}
fn init_factory(

View File

@@ -40,7 +40,7 @@ pub fn new_table_schema_cache(
let table_info_manager = TableInfoManager::new(kv_backend);
let init = init_factory(table_info_manager);
CacheContainer::new(name, cache, Box::new(invalidator), init, Box::new(filter))
CacheContainer::new(name, cache, Box::new(invalidator), init, filter)
}
fn init_factory(table_info_manager: TableInfoManager) -> Initializer<TableId, Arc<SchemaName>> {

View File

@@ -40,7 +40,7 @@ pub fn new_view_info_cache(
let view_info_manager = Arc::new(ViewInfoManager::new(kv_backend));
let init = init_factory(view_info_manager);
CacheContainer::new(name, cache, Box::new(invalidator), init, Box::new(filter))
CacheContainer::new(name, cache, Box::new(invalidator), init, filter)
}
fn init_factory(view_info_manager: ViewInfoManagerRef) -> Initializer<TableId, Arc<ViewInfoValue>> {

View File

@@ -13,7 +13,6 @@
// limitations under the License.
use std::fmt::Display;
use std::sync::Arc;
use common_catalog::consts::DEFAULT_CATALOG_NAME;
use futures::stream::BoxStream;
@@ -146,7 +145,7 @@ impl CatalogManager {
self.kv_backend.clone(),
req,
DEFAULT_PAGE_SIZE,
Arc::new(catalog_decoder),
catalog_decoder,
)
.into_stream();
@@ -156,6 +155,8 @@ impl CatalogManager {
#[cfg(test)]
mod tests {
use std::sync::Arc;
use super::*;
use crate::kv_backend::memory::MemoryKvBackend;

View File

@@ -14,7 +14,6 @@
use std::collections::HashMap;
use std::fmt::Display;
use std::sync::Arc;
use futures::stream::BoxStream;
use serde::{Deserialize, Serialize};
@@ -166,7 +165,7 @@ impl DatanodeTableManager {
self.kv_backend.clone(),
req,
DEFAULT_PAGE_SIZE,
Arc::new(datanode_table_value_decoder),
datanode_table_value_decoder,
)
.into_stream();

View File

@@ -12,8 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use futures::stream::BoxStream;
use lazy_static::lazy_static;
use regex::Regex;
@@ -201,7 +199,7 @@ impl FlowNameManager {
self.kv_backend.clone(),
req,
DEFAULT_PAGE_SIZE,
Arc::new(flow_name_decoder),
flow_name_decoder,
)
.into_stream();

View File

@@ -12,8 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use futures::stream::BoxStream;
use lazy_static::lazy_static;
use regex::Regex;
@@ -179,7 +177,7 @@ impl FlowRouteManager {
self.kv_backend.clone(),
req,
DEFAULT_PAGE_SIZE,
Arc::new(flow_route_decoder),
flow_route_decoder,
)
.into_stream();

View File

@@ -12,8 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use futures::stream::BoxStream;
use futures::TryStreamExt;
use lazy_static::lazy_static;
@@ -179,7 +177,7 @@ impl FlownodeFlowManager {
self.kv_backend.clone(),
req,
DEFAULT_PAGE_SIZE,
Arc::new(flownode_flow_key_decoder),
flownode_flow_key_decoder,
)
.into_stream();

View File

@@ -206,7 +206,7 @@ impl TableFlowManager {
self.kv_backend.clone(),
req,
DEFAULT_PAGE_SIZE,
Arc::new(table_flow_decoder),
table_flow_decoder,
)
.into_stream();

View File

@@ -14,7 +14,6 @@
use std::collections::HashMap;
use std::fmt::Display;
use std::sync::Arc;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_time::DatabaseTimeToLive;
@@ -283,7 +282,7 @@ impl SchemaManager {
self.kv_backend.clone(),
req,
DEFAULT_PAGE_SIZE,
Arc::new(schema_decoder),
schema_decoder,
)
.into_stream();
@@ -308,6 +307,7 @@ impl<'a> From<&'a SchemaName> for SchemaNameKey<'a> {
#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::time::Duration;
use super::*;

View File

@@ -269,7 +269,7 @@ impl TableNameManager {
self.kv_backend.clone(),
req,
DEFAULT_PAGE_SIZE,
Arc::new(table_decoder),
table_decoder,
)
.into_stream();

View File

@@ -12,8 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use async_stream::try_stream;
use common_telemetry::debug;
use futures::Stream;
@@ -148,7 +146,7 @@ impl PaginationStreamFactory {
}
pub struct PaginationStream<T> {
decoder_fn: Arc<KeyValueDecoderFn<T>>,
decoder_fn: fn(KeyValue) -> Result<T>,
factory: PaginationStreamFactory,
}
@@ -158,7 +156,7 @@ impl<T> PaginationStream<T> {
kv: KvBackendRef,
req: RangeRequest,
page_size: usize,
decoder_fn: Arc<KeyValueDecoderFn<T>>,
decoder_fn: fn(KeyValue) -> Result<T>,
) -> Self {
Self {
decoder_fn,
@@ -191,6 +189,7 @@ mod tests {
use std::assert_matches::assert_matches;
use std::collections::BTreeMap;
use std::sync::Arc;
use futures::TryStreamExt;
@@ -250,7 +249,7 @@ mod tests {
..Default::default()
},
DEFAULT_PAGE_SIZE,
Arc::new(decoder),
decoder,
)
.into_stream();
let kv = stream.try_collect::<Vec<_>>().await.unwrap();
@@ -290,7 +289,7 @@ mod tests {
..Default::default()
},
2,
Arc::new(decoder),
decoder,
);
let kv = stream
.into_stream()

View File

@@ -12,8 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use async_trait::async_trait;
use common_error::ext::BoxedError;
use common_procedure::error::{DeleteStatesSnafu, ListStateSnafu, PutStateSnafu};
@@ -171,7 +169,7 @@ impl StateStore for KvStateStore {
self.kv_backend.clone(),
req,
self.max_num_per_range_request.unwrap_or_default(),
Arc::new(decode_kv),
decode_kv,
)
.into_stream();

View File

@@ -326,8 +326,8 @@ impl ClusterInfo for MetaClient {
let cluster_kv_backend = Arc::new(self.cluster_client()?);
let range_prefix = DatanodeStatKey::key_prefix_with_cluster_id(self.id.0);
let req = RangeRequest::new().with_prefix(range_prefix);
let stream = PaginationStream::new(cluster_kv_backend, req, 256, Arc::new(decode_stats))
.into_stream();
let stream =
PaginationStream::new(cluster_kv_backend, req, 256, decode_stats).into_stream();
let mut datanode_stats = stream
.try_collect::<Vec<_>>()
.await
@@ -994,8 +994,7 @@ mod tests {
let req = RangeRequest::new().with_prefix(b"__prefix/");
let stream =
PaginationStream::new(Arc::new(cluster_client), req, 10, Arc::new(mock_decoder))
.into_stream();
PaginationStream::new(Arc::new(cluster_client), req, 10, mock_decoder).into_stream();
let res = stream.try_collect::<Vec<_>>().await.unwrap();
assert_eq!(10, res.len());

View File

@@ -102,7 +102,7 @@ impl LeaderCachedKvBackend {
self.store.clone(),
RangeRequest::new().with_prefix(prefix.as_bytes()),
DEFAULT_PAGE_SIZE,
Arc::new(Ok),
Ok,
)
.into_stream();