diff --git a/src/cmd/src/cli/upgrade.rs b/src/cmd/src/cli/upgrade.rs index 0aa787fbe9..8d1be2c3c5 100644 --- a/src/cmd/src/cli/upgrade.rs +++ b/src/cmd/src/cli/upgrade.rs @@ -192,10 +192,10 @@ impl MigrateTableMetadata { let key = v1SchemaKey::parse(key_str) .unwrap_or_else(|e| panic!("schema key is corrupted: {e}, key: {key_str}")); - Ok((key, ())) + Ok(key) }), ); - while let Some((key, _)) = stream.try_next().await.context(error::IterStreamSnafu)? { + while let Some(key) = stream.try_next().await.context(error::IterStreamSnafu)? { let _ = self.migrate_schema_key(&key).await; keys.push(key.to_string().as_bytes().to_vec()); } @@ -244,10 +244,10 @@ impl MigrateTableMetadata { let key = v1CatalogKey::parse(key_str) .unwrap_or_else(|e| panic!("catalog key is corrupted: {e}, key: {key_str}")); - Ok((key, ())) + Ok(key) }), ); - while let Some((key, _)) = stream.try_next().await.context(error::IterStreamSnafu)? { + while let Some(key) = stream.try_next().await.context(error::IterStreamSnafu)? { let _ = self.migrate_catalog_key(&key).await; keys.push(key.to_string().as_bytes().to_vec()); } diff --git a/src/common/meta/src/key/catalog_name.rs b/src/common/meta/src/key/catalog_name.rs index 4bbfb367b9..63873177b1 100644 --- a/src/common/meta/src/key/catalog_name.rs +++ b/src/common/meta/src/key/catalog_name.rs @@ -17,7 +17,6 @@ use std::sync::Arc; use common_catalog::consts::DEFAULT_CATALOG_NAME; use futures::stream::BoxStream; -use futures::StreamExt; use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt}; @@ -84,11 +83,11 @@ impl<'a> TryFrom<&'a str> for CatalogNameKey<'a> { } /// Decoder `KeyValue` to ({catalog},()) -pub fn catalog_decoder(kv: KeyValue) -> Result<(String, ())> { +pub fn catalog_decoder(kv: KeyValue) -> Result { let str = std::str::from_utf8(&kv.key).context(error::ConvertRawKeySnafu)?; let catalog_name = CatalogNameKey::try_from(str)?; - Ok((catalog_name.catalog.to_string(), ())) + Ok(catalog_name.catalog.to_string()) } pub struct CatalogManager { @@ -134,7 +133,7 @@ impl CatalogManager { Arc::new(catalog_decoder), ); - Box::pin(stream.map(|kv| kv.map(|kv| kv.0))) + Box::pin(stream) } } diff --git a/src/common/meta/src/key/datanode_table.rs b/src/common/meta/src/key/datanode_table.rs index aa63eef09e..96bebb7466 100644 --- a/src/common/meta/src/key/datanode_table.rs +++ b/src/common/meta/src/key/datanode_table.rs @@ -16,7 +16,6 @@ use std::collections::HashMap; use std::sync::Arc; use futures::stream::BoxStream; -use futures::StreamExt; use serde::{Deserialize, Serialize}; use snafu::OptionExt; use store_api::storage::RegionNumber; @@ -126,10 +125,8 @@ impl DatanodeTableValue { } /// Decodes `KeyValue` to ((),`DatanodeTableValue`) -pub fn datanode_table_value_decoder(kv: KeyValue) -> Result<((), DatanodeTableValue)> { - let value = DatanodeTableValue::try_from_raw_value(&kv.value)?; - - Ok(((), value)) +pub fn datanode_table_value_decoder(kv: KeyValue) -> Result { + DatanodeTableValue::try_from_raw_value(&kv.value) } pub struct DatanodeTableManager { @@ -163,7 +160,7 @@ impl DatanodeTableManager { Arc::new(datanode_table_value_decoder), ); - Box::pin(stream.map(|kv| kv.map(|kv| kv.1))) + Box::pin(stream) } /// Builds the create datanode table transactions. It only executes while the primary keys comparing successes. diff --git a/src/common/meta/src/key/schema_name.rs b/src/common/meta/src/key/schema_name.rs index 17fdb6a4c2..f56adbaec4 100644 --- a/src/common/meta/src/key/schema_name.rs +++ b/src/common/meta/src/key/schema_name.rs @@ -19,7 +19,6 @@ use std::time::Duration; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use futures::stream::BoxStream; -use futures::StreamExt; use humantime_serde::re::humantime; use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt}; @@ -103,11 +102,11 @@ impl TableMetaKey for SchemaNameKey<'_> { } /// Decodes `KeyValue` to ({schema},()) -pub fn schema_decoder(kv: KeyValue) -> Result<(String, ())> { +pub fn schema_decoder(kv: KeyValue) -> Result { let str = std::str::from_utf8(&kv.key).context(error::ConvertRawKeySnafu)?; let schema_name = SchemaNameKey::try_from(str)?; - Ok((schema_name.schema.to_string(), ())) + Ok(schema_name.schema.to_string()) } impl<'a> TryFrom<&'a str> for SchemaNameKey<'a> { @@ -193,7 +192,7 @@ impl SchemaManager { Arc::new(schema_decoder), ); - Box::pin(stream.map(|kv| kv.map(|kv| kv.0))) + Box::pin(stream) } } diff --git a/src/common/meta/src/range_stream.rs b/src/common/meta/src/range_stream.rs index 878d5c0c9f..b14a2bf8f1 100644 --- a/src/common/meta/src/range_stream.rs +++ b/src/common/meta/src/range_stream.rs @@ -28,13 +28,13 @@ use crate::rpc::store::{RangeRequest, RangeResponse}; use crate::rpc::KeyValue; use crate::util::get_next_prefix_key; -pub type KeyValueDecoderFn = dyn Fn(KeyValue) -> Result<(K, V)> + Send + Sync; +pub type KeyValueDecoderFn = dyn Fn(KeyValue) -> Result + Send + Sync; -enum PaginationStreamState { +enum PaginationStreamState { /// At the start of reading. Init, /// Decoding key value pairs. - Decoding(SimpleKeyValueDecoder), + Decoding(SimpleKeyValueDecoder), /// Retrieving data from backend. Reading(BoxFuture<'static, Result<(PaginationStreamFactory, Option)>>), /// Error @@ -77,7 +77,7 @@ struct PaginationStreamFactory { } impl PaginationStreamFactory { - pub fn new( + fn new( kv: &KvBackendRef, key: Vec, range_end: Vec, @@ -137,7 +137,7 @@ impl PaginationStreamFactory { } } - pub async fn read_next(mut self) -> Result<(Self, Option)> { + async fn read_next(mut self) -> Result<(Self, Option)> { if self.more { let resp = self .adaptive_range(RangeRequest { @@ -174,18 +174,19 @@ impl PaginationStreamFactory { } } -pub struct PaginationStream { - state: PaginationStreamState, - decoder_fn: Arc>, +pub struct PaginationStream { + state: PaginationStreamState, + decoder_fn: Arc>, factory: Option, } -impl PaginationStream { +impl PaginationStream { + /// Returns a new [PaginationStream]. pub fn new( kv: KvBackendRef, req: RangeRequest, page_size: usize, - decoder_fn: Arc>, + decoder_fn: Arc>, ) -> Self { Self { state: PaginationStreamState::Init, @@ -202,13 +203,13 @@ impl PaginationStream { } } -struct SimpleKeyValueDecoder { +struct SimpleKeyValueDecoder { kv: VecDeque, - decoder: Arc>, + decoder: Arc>, } -impl Iterator for SimpleKeyValueDecoder { - type Item = Result<(K, V)>; +impl Iterator for SimpleKeyValueDecoder { + type Item = Result; fn next(&mut self) -> Option { if let Some(kv) = self.kv.pop_front() { @@ -219,8 +220,8 @@ impl Iterator for SimpleKeyValueDecoder { } } -impl Stream for PaginationStream { - type Item = Result<(K, V)>; +impl Stream for PaginationStream { + type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { loop { diff --git a/src/meta-srv/src/service/store/cached_kv.rs b/src/meta-srv/src/service/store/cached_kv.rs index 8772c47a1d..8bf02dc047 100644 --- a/src/meta-srv/src/service/store/cached_kv.rs +++ b/src/meta-srv/src/service/store/cached_kv.rs @@ -102,15 +102,10 @@ impl LeaderCachedKvBackend { self.store.clone(), RangeRequest::new().with_prefix(prefix.as_bytes()), DEFAULT_PAGE_SIZE, - Arc::new(|kv| Ok((kv, ()))), + Arc::new(Ok), ); - let kvs = stream - .try_collect::>() - .await? - .into_iter() - .map(|(kv, _)| kv) - .collect(); + let kvs = stream.try_collect::>().await?.into_iter().collect(); self.cache .batch_put(BatchPutRequest {