chore: avoid double future (#3890)

This commit is contained in:
Jeremyhi
2024-05-09 15:11:22 +08:00
committed by GitHub
parent 7de62ef5d0
commit f995f6099f
9 changed files with 10 additions and 26 deletions

View File

@@ -258,7 +258,7 @@ impl InformationSchemaColumnsBuilder {
let predicates = Predicates::from_scan_request(&request);
for schema_name in catalog_manager.schema_names(&catalog_name).await? {
let mut stream = catalog_manager.tables(&catalog_name, &schema_name).await;
let mut stream = catalog_manager.tables(&catalog_name, &schema_name);
while let Some(table) = stream.try_next().await? {
let keys = &table.table_info().meta.primary_key_indices;

View File

@@ -243,7 +243,6 @@ impl InformationSchemaPartitionsBuilder {
for schema_name in catalog_manager.schema_names(&catalog_name).await? {
let table_info_stream = catalog_manager
.tables(&catalog_name, &schema_name)
.await
.try_filter_map(|t| async move {
let table_info = t.table_info();
if table_info.table_type == TableType::Temporary {

View File

@@ -179,7 +179,6 @@ impl InformationSchemaRegionPeersBuilder {
for schema_name in catalog_manager.schema_names(&catalog_name).await? {
let table_id_stream = catalog_manager
.tables(&catalog_name, &schema_name)
.await
.try_filter_map(|t| async move {
let table_info = t.table_info();
if table_info.table_type == TableType::Temporary {

View File

@@ -177,7 +177,7 @@ impl InformationSchemaTableConstraintsBuilder {
let predicates = Predicates::from_scan_request(&request);
for schema_name in catalog_manager.schema_names(&catalog_name).await? {
let mut stream = catalog_manager.tables(&catalog_name, &schema_name).await;
let mut stream = catalog_manager.tables(&catalog_name, &schema_name);
while let Some(table) = stream.try_next().await? {
let keys = &table.table_info().meta.primary_key_indices;

View File

@@ -161,7 +161,7 @@ impl InformationSchemaTablesBuilder {
let predicates = Predicates::from_scan_request(&request);
for schema_name in catalog_manager.schema_names(&catalog_name).await? {
let mut stream = catalog_manager.tables(&catalog_name, &schema_name).await;
let mut stream = catalog_manager.tables(&catalog_name, &schema_name);
while let Some(table) = stream.try_next().await? {
let table_info = table.table_info();

View File

@@ -301,11 +301,7 @@ impl CatalogManager for KvBackendCatalogManager {
})
}
async fn tables<'a>(
&'a self,
catalog: &'a str,
schema: &'a str,
) -> BoxStream<'a, Result<TableRef>> {
fn tables<'a>(&'a self, catalog: &'a str, schema: &'a str) -> BoxStream<'a, Result<TableRef>> {
let sys_tables = try_stream!({
// System tables
let sys_table_names = self.system_catalog.table_names(schema);

View File

@@ -59,11 +59,7 @@ pub trait CatalogManager: Send + Sync {
) -> Result<Option<TableRef>>;
/// Returns all tables with a stream by catalog and schema.
async fn tables<'a>(
&'a self,
catalog: &'a str,
schema: &'a str,
) -> BoxStream<'a, Result<TableRef>>;
fn tables<'a>(&'a self, catalog: &'a str, schema: &'a str) -> BoxStream<'a, Result<TableRef>>;
}
pub type CatalogManagerRef = Arc<dyn CatalogManager>;

View File

@@ -117,11 +117,7 @@ impl CatalogManager for MemoryCatalogManager {
Ok(result)
}
async fn tables<'a>(
&'a self,
catalog: &'a str,
schema: &'a str,
) -> BoxStream<'a, Result<TableRef>> {
fn tables<'a>(&'a self, catalog: &'a str, schema: &'a str) -> BoxStream<'a, Result<TableRef>> {
let catalogs = self.catalogs.read().unwrap();
let Some(schemas) = catalogs.get(catalog) else {
@@ -141,11 +137,11 @@ impl CatalogManager for MemoryCatalogManager {
let tables = tables.values().cloned().collect::<Vec<_>>();
return Box::pin(try_stream!({
Box::pin(try_stream!({
for table in tables {
yield table;
}
}));
}))
}
}
@@ -368,9 +364,7 @@ mod tests {
.await
.unwrap()
.unwrap();
let stream = catalog_list
.tables(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME)
.await;
let stream = catalog_list.tables(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME);
let tables = stream.try_collect::<Vec<_>>().await.unwrap();
assert_eq!(tables.len(), 1);
assert_eq!(

View File

@@ -771,7 +771,7 @@ async fn retrieve_field_names(
if matches.is_empty() {
// query all tables if no matcher is provided
while let Some(table) = manager.tables(catalog, schema).await.next().await {
while let Some(table) = manager.tables(catalog, schema).next().await {
let table = table.context(CatalogSnafu)?;
for column in table.field_columns() {
field_columns.insert(column.name);