mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-10 07:12:54 +00:00
feat: let tables API return a stream (#3170)
This commit is contained in:
@@ -41,6 +41,14 @@ pub enum Error {
|
||||
source: BoxedError,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to list {}.{}'s tables", catalog, schema))]
|
||||
ListTables {
|
||||
location: Location,
|
||||
catalog: String,
|
||||
schema: String,
|
||||
source: BoxedError,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to re-compile script due to internal error"))]
|
||||
CompileScriptInternal {
|
||||
location: Location,
|
||||
@@ -270,9 +278,9 @@ impl ErrorExt for Error {
|
||||
StatusCode::InvalidArguments
|
||||
}
|
||||
|
||||
Error::ListCatalogs { source, .. } | Error::ListSchemas { source, .. } => {
|
||||
source.status_code()
|
||||
}
|
||||
Error::ListCatalogs { source, .. }
|
||||
| Error::ListSchemas { source, .. }
|
||||
| Error::ListTables { source, .. } => source.status_code(),
|
||||
|
||||
Error::OpenSystemCatalog { source, .. }
|
||||
| Error::CreateSystemCatalog { source, .. }
|
||||
|
||||
@@ -35,8 +35,8 @@ use table::table::numbers::{NumbersTable, NUMBERS_TABLE_NAME};
|
||||
use table::TableRef;
|
||||
|
||||
use crate::error::{
|
||||
self as catalog_err, ListCatalogsSnafu, ListSchemasSnafu, Result as CatalogResult,
|
||||
TableMetadataManagerSnafu,
|
||||
self as catalog_err, ListCatalogsSnafu, ListSchemasSnafu, ListTablesSnafu,
|
||||
Result as CatalogResult, TableMetadataManagerSnafu,
|
||||
};
|
||||
use crate::information_schema::InformationSchemaProvider;
|
||||
use crate::CatalogManager;
|
||||
@@ -135,18 +135,22 @@ impl CatalogManager for KvBackendCatalogManager {
|
||||
}
|
||||
|
||||
async fn table_names(&self, catalog: &str, schema: &str) -> CatalogResult<Vec<String>> {
|
||||
let mut tables = self
|
||||
let stream = self
|
||||
.table_metadata_manager
|
||||
.table_name_manager()
|
||||
.tables(catalog, schema)
|
||||
.await;
|
||||
let mut tables = stream
|
||||
.try_collect::<Vec<_>>()
|
||||
.await
|
||||
.context(TableMetadataManagerSnafu)?
|
||||
.map_err(BoxedError::new)
|
||||
.context(ListTablesSnafu { catalog, schema })?
|
||||
.into_iter()
|
||||
.map(|(k, _)| k)
|
||||
.collect::<Vec<String>>();
|
||||
.collect::<Vec<_>>();
|
||||
tables.extend_from_slice(&self.system_catalog.table_names(schema));
|
||||
|
||||
Ok(tables)
|
||||
Ok(tables.into_iter().collect())
|
||||
}
|
||||
|
||||
async fn catalog_exists(&self, catalog: &str) -> CatalogResult<bool> {
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use futures_util::stream::BoxStream;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use snafu::OptionExt;
|
||||
use table::metadata::TableId;
|
||||
@@ -24,7 +25,9 @@ use crate::key::{to_removed_key, TableMetaKey};
|
||||
use crate::kv_backend::memory::MemoryKvBackend;
|
||||
use crate::kv_backend::txn::{Txn, TxnOp};
|
||||
use crate::kv_backend::KvBackendRef;
|
||||
use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE};
|
||||
use crate::rpc::store::RangeRequest;
|
||||
use crate::rpc::KeyValue;
|
||||
use crate::table_name::TableName;
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
@@ -79,6 +82,14 @@ impl TableMetaKey for TableNameKey<'_> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Decodes `KeyValue` to ({table_name}, TableNameValue)
|
||||
pub fn table_decoder(kv: KeyValue) -> Result<(String, TableNameValue)> {
|
||||
let table_name = TableNameKey::strip_table_name(kv.key())?;
|
||||
let table_name_value = TableNameValue::try_from_raw_value(&kv.value)?;
|
||||
|
||||
Ok((table_name, table_name_value))
|
||||
}
|
||||
|
||||
impl<'a> From<&'a TableName> for TableNameKey<'a> {
|
||||
fn from(value: &'a TableName) -> Self {
|
||||
Self {
|
||||
@@ -218,19 +229,18 @@ impl TableNameManager {
|
||||
&self,
|
||||
catalog: &str,
|
||||
schema: &str,
|
||||
) -> Result<Vec<(String, TableNameValue)>> {
|
||||
) -> BoxStream<'static, Result<(String, TableNameValue)>> {
|
||||
let key = TableNameKey::prefix_to_table(catalog, schema).into_bytes();
|
||||
let req = RangeRequest::new().with_prefix(key);
|
||||
let resp = self.kv_backend.range(req).await?;
|
||||
|
||||
let mut res = Vec::with_capacity(resp.kvs.len());
|
||||
for kv in resp.kvs {
|
||||
res.push((
|
||||
TableNameKey::strip_table_name(kv.key())?,
|
||||
TableNameValue::try_from_raw_value(&kv.value)?,
|
||||
))
|
||||
}
|
||||
Ok(res)
|
||||
let stream = PaginationStream::new(
|
||||
self.kv_backend.clone(),
|
||||
req,
|
||||
DEFAULT_PAGE_SIZE,
|
||||
Arc::new(table_decoder),
|
||||
);
|
||||
|
||||
Box::pin(stream)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -111,6 +111,14 @@ pub enum Error {
|
||||
source: BoxedError,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to list {}.{}'s tables", catalog, schema))]
|
||||
ListTables {
|
||||
location: Location,
|
||||
catalog: String,
|
||||
schema: String,
|
||||
source: BoxedError,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to join a future"))]
|
||||
Join {
|
||||
location: Location,
|
||||
@@ -732,9 +740,9 @@ impl ErrorExt for Error {
|
||||
Error::StartProcedureManager { source, .. }
|
||||
| Error::StopProcedureManager { source, .. } => source.status_code(),
|
||||
|
||||
Error::ListCatalogs { source, .. } | Error::ListSchemas { source, .. } => {
|
||||
source.status_code()
|
||||
}
|
||||
Error::ListCatalogs { source, .. }
|
||||
| Error::ListSchemas { source, .. }
|
||||
| Error::ListTables { source, .. } => source.status_code(),
|
||||
Error::StartTelemetryTask { source, .. } => source.status_code(),
|
||||
|
||||
Error::RegionFailoverCandidatesNotFound { .. } => StatusCode::RuntimeResourcesExhausted,
|
||||
|
||||
@@ -115,15 +115,19 @@ impl HttpHandler for TablesHandler {
|
||||
let catalog = util::get_value(params, "catalog")?;
|
||||
let schema = util::get_value(params, "schema")?;
|
||||
|
||||
let tables = self
|
||||
let stream = self
|
||||
.table_metadata_manager
|
||||
.table_name_manager()
|
||||
.tables(catalog, schema)
|
||||
.await;
|
||||
let tables = stream
|
||||
.try_collect::<Vec<_>>()
|
||||
.await
|
||||
.context(TableMetadataManagerSnafu)?
|
||||
.map_err(BoxedError::new)
|
||||
.context(error::ListTablesSnafu { catalog, schema })?
|
||||
.into_iter()
|
||||
.map(|(k, _)| k)
|
||||
.collect();
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
to_http_response(tables)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user