diff --git a/Cargo.lock b/Cargo.lock index e37712f389..5ceb3fcfcb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -659,11 +659,14 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5" name = "catalog" version = "0.1.0" dependencies = [ + "api", + "arc-swap", "async-stream", "async-trait", "chrono", "common-catalog", "common-error", + "common-grpc", "common-query", "common-recordbatch", "common-runtime", @@ -675,6 +678,7 @@ dependencies = [ "futures-util", "lazy_static", "log-store", + "meta-client", "object-store", "opendal", "regex", diff --git a/src/catalog/Cargo.toml b/src/catalog/Cargo.toml index 29935319c9..33248231d8 100644 --- a/src/catalog/Cargo.toml +++ b/src/catalog/Cargo.toml @@ -5,10 +5,13 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +api = { path = "../api" } +arc-swap = "1.0" async-stream = "0.3" async-trait = "0.1" common-catalog = { path = "../common/catalog" } common-error = { path = "../common/error" } +common-grpc = { path = "../common/grpc" } common-query = { path = "../common/query" } common-recordbatch = { path = "../common/recordbatch" } common-runtime = { path = "../common/runtime" } @@ -21,6 +24,7 @@ datatypes = { path = "../datatypes" } futures = "0.3" futures-util = "0.3" lazy_static = "1.4" +meta-client = { path = "../meta-client" } opendal = "0.17" regex = "1.6" serde = "1.0" diff --git a/src/catalog/src/error.rs b/src/catalog/src/error.rs index 0943aa7525..4263c94de0 100644 --- a/src/catalog/src/error.rs +++ b/src/catalog/src/error.rs @@ -126,6 +126,15 @@ pub enum Error { backtrace: Backtrace, source: std::io::Error, }, + + #[snafu(display("Local and remote catalog data are inconsistent, msg: {}", msg))] + CatalogStateInconsistent { msg: String, backtrace: Backtrace }, + + #[snafu(display("Failed to perform metasrv operation, source: {}", source))] + MetaSrv { + #[snafu(backtrace)] + source: meta_client::error::Error, + }, } pub type Result = std::result::Result; @@ -138,7 +147,8 @@ impl ErrorExt for Error { | Error::TableNotFound { .. } | Error::IllegalManagerState { .. } | Error::CatalogNotFound { .. } - | Error::InvalidEntryType { .. } => StatusCode::Unexpected, + | Error::InvalidEntryType { .. } + | Error::CatalogStateInconsistent { .. } => StatusCode::Unexpected, Error::SystemCatalog { .. } | Error::EmptyValue @@ -156,9 +166,9 @@ impl ErrorExt for Error { | Error::CreateSystemCatalog { source, .. } | Error::InsertTableRecord { source, .. } | Error::OpenTable { source, .. } - | Error::CreateTable { source, .. } - | Error::SystemCatalogTableScan { source } => source.status_code(), - + | Error::CreateTable { source, .. } => source.status_code(), + Error::MetaSrv { source, .. } => source.status_code(), + Error::SystemCatalogTableScan { source } => source.status_code(), Error::SystemCatalogTableScanExec { source } => source.status_code(), } } diff --git a/src/catalog/src/lib.rs b/src/catalog/src/lib.rs index 355dbfda78..2c6cbb254e 100644 --- a/src/catalog/src/lib.rs +++ b/src/catalog/src/lib.rs @@ -15,6 +15,7 @@ pub use crate::schema::{SchemaProvider, SchemaProviderRef}; pub mod error; pub mod local; +pub mod remote; pub mod schema; pub mod system; pub mod tables; diff --git a/src/catalog/src/remote.rs b/src/catalog/src/remote.rs new file mode 100644 index 0000000000..d0f343932a --- /dev/null +++ b/src/catalog/src/remote.rs @@ -0,0 +1,94 @@ +use std::fmt::Debug; +use std::pin::Pin; +use std::sync::Arc; + +pub use client::MetaKvBackend; +use futures::Stream; +use futures_util::StreamExt; +pub use manager::{RemoteCatalogManager, RemoteCatalogProvider, RemoteSchemaProvider}; + +use crate::error::Error; + +mod client; +mod manager; + +#[derive(Debug, Clone)] +pub struct Kv(pub Vec, pub Vec); + +pub type ValueIter<'a, E> = Pin> + Send + 'a>>; + +#[async_trait::async_trait] +pub trait KvBackend: Send + Sync { + fn range<'a, 'b>(&'a self, key: &[u8]) -> ValueIter<'b, crate::error::Error> + where + 'a: 'b; + + async fn set(&self, key: &[u8], val: &[u8]) -> Result<(), crate::error::Error>; + + async fn delete_range(&self, key: &[u8], end: &[u8]) -> Result<(), crate::error::Error>; + + async fn delete(&self, key: &[u8]) -> Result<(), Error> { + self.delete_range(key, &[]).await + } + + /// Default get is implemented based on `range` method. + async fn get(&self, key: &[u8]) -> Result, Error> { + let mut iter = self.range(key); + while let Some(r) = iter.next().await { + let kv = r?; + if kv.0 == key { + return Ok(Some(kv)); + } + } + return Ok(None); + } +} + +pub type KvBackendRef = Arc; + +#[cfg(test)] +mod tests { + use async_stream::stream; + + use super::*; + + struct MockKvBackend {} + + #[async_trait::async_trait] + impl KvBackend for MockKvBackend { + fn range<'a, 'b>(&'a self, _key: &[u8]) -> ValueIter<'b, Error> + where + 'a: 'b, + { + Box::pin(stream!({ + for i in 0..3 { + yield Ok(Kv( + i.to_string().as_bytes().to_vec(), + i.to_string().as_bytes().to_vec(), + )) + } + })) + } + + async fn set(&self, _key: &[u8], _val: &[u8]) -> Result<(), Error> { + unimplemented!() + } + + async fn delete_range(&self, _key: &[u8], _end: &[u8]) -> Result<(), Error> { + unimplemented!() + } + } + + #[tokio::test] + async fn test_get() { + let backend = MockKvBackend {}; + let result = backend.get(0.to_string().as_bytes()).await; + assert_eq!(0.to_string().as_bytes(), result.unwrap().unwrap().0); + let result = backend.get(1.to_string().as_bytes()).await; + assert_eq!(1.to_string().as_bytes(), result.unwrap().unwrap().0); + let result = backend.get(2.to_string().as_bytes()).await; + assert_eq!(2.to_string().as_bytes(), result.unwrap().unwrap().0); + let result = backend.get(3.to_string().as_bytes()).await; + assert!(result.unwrap().is_none()); + } +} diff --git a/src/catalog/src/remote/client.rs b/src/catalog/src/remote/client.rs new file mode 100644 index 0000000000..a9df193a00 --- /dev/null +++ b/src/catalog/src/remote/client.rs @@ -0,0 +1,71 @@ +use std::fmt::Debug; + +use async_stream::stream; +use common_telemetry::info; +use meta_client::client::MetaClient; +use meta_client::rpc::{DeleteRangeRequest, PutRequest, RangeRequest}; +use snafu::ResultExt; + +use crate::error::{Error, MetaSrvSnafu}; +use crate::remote::{Kv, KvBackend, ValueIter}; +#[derive(Debug)] +pub struct MetaKvBackend { + pub client: MetaClient, +} + +/// Implement `KvBackend` trait for `MetaKvBackend` instead of opendal's `Accessor` since +/// `MetaClient`'s range method can return both keys and values, which can reduce IO overhead +/// comparing to `Accessor`'s list and get method. +#[async_trait::async_trait] +impl KvBackend for MetaKvBackend { + fn range<'a, 'b>(&'a self, key: &[u8]) -> ValueIter<'b, Error> + where + 'a: 'b, + { + let key = key.to_vec(); + Box::pin(stream!({ + let mut resp = self + .client + .range(RangeRequest::new().with_prefix(key)) + .await + .context(MetaSrvSnafu)?; + let kvs = resp.take_kvs(); + for mut kv in kvs.into_iter() { + yield Ok(Kv(kv.take_key(), kv.take_value())) + } + })) + } + + async fn get(&self, key: &[u8]) -> Result, Error> { + let mut response = self + .client + .range(RangeRequest::new().with_key(key)) + .await + .context(MetaSrvSnafu)?; + Ok(response + .take_kvs() + .get_mut(0) + .map(|kv| Kv(kv.take_key(), kv.take_value()))) + } + + async fn set(&self, key: &[u8], val: &[u8]) -> Result<(), Error> { + let req = PutRequest::new() + .with_key(key.to_vec()) + .with_value(val.to_vec()); + let _ = self.client.put(req).await.context(MetaSrvSnafu)?; + Ok(()) + } + + async fn delete_range(&self, key: &[u8], end: &[u8]) -> Result<(), Error> { + let req = DeleteRangeRequest::new().with_range(key.to_vec(), end.to_vec()); + let resp = self.client.delete_range(req).await.context(MetaSrvSnafu)?; + info!( + "Delete range, key: {}, end: {}, deleted: {}", + String::from_utf8_lossy(key), + String::from_utf8_lossy(end), + resp.deleted() + ); + + Ok(()) + } +} diff --git a/src/catalog/src/remote/manager.rs b/src/catalog/src/remote/manager.rs new file mode 100644 index 0000000000..92236e9244 --- /dev/null +++ b/src/catalog/src/remote/manager.rs @@ -0,0 +1,669 @@ +use std::any::Any; +use std::collections::HashMap; +use std::pin::Pin; +use std::sync::atomic::{AtomicU32, Ordering}; +use std::sync::Arc; + +use arc_swap::ArcSwap; +use async_stream::stream; +use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MIN_USER_TABLE_ID}; +use common_catalog::{ + build_catalog_prefix, build_schema_prefix, build_table_prefix, CatalogKey, CatalogValue, + SchemaKey, SchemaValue, TableKey, TableValue, +}; +use common_telemetry::{debug, info}; +use datatypes::schema::Schema; +use futures::Stream; +use futures_util::StreamExt; +use snafu::{OptionExt, ResultExt}; +use table::engine::{EngineContext, TableEngineRef}; +use table::metadata::{TableId, TableVersion}; +use table::requests::{CreateTableRequest, OpenTableRequest}; +use table::TableRef; +use tokio::sync::Mutex; + +use crate::error::Result; +use crate::error::{ + CatalogNotFoundSnafu, CreateTableSnafu, InvalidCatalogValueSnafu, OpenTableSnafu, + SchemaNotFoundSnafu, TableExistsSnafu, +}; +use crate::remote::{Kv, KvBackendRef}; +use crate::{ + handle_system_table_request, CatalogList, CatalogManager, CatalogProvider, CatalogProviderRef, + RegisterSystemTableRequest, RegisterTableRequest, SchemaProvider, SchemaProviderRef, +}; + +/// Catalog manager based on metasrv. +pub struct RemoteCatalogManager { + node_id: u64, + backend: KvBackendRef, + catalogs: Arc>>, + next_table_id: Arc, + engine: TableEngineRef, + system_table_requests: Mutex>, + mutex: Arc>, +} + +impl RemoteCatalogManager { + pub fn new(engine: TableEngineRef, node_id: u64, backend: KvBackendRef) -> Self { + Self { + engine, + node_id, + backend, + catalogs: Default::default(), + next_table_id: Default::default(), + system_table_requests: Default::default(), + mutex: Default::default(), + } + } + + fn build_catalog_key(&self, catalog_name: impl AsRef) -> CatalogKey { + CatalogKey { + catalog_name: catalog_name.as_ref().to_string(), + node_id: self.node_id, + } + } + + fn new_catalog_provider(&self, catalog_name: &str) -> CatalogProviderRef { + Arc::new(RemoteCatalogProvider { + catalog_name: catalog_name.to_string(), + node_id: self.node_id, + backend: self.backend.clone(), + schemas: Default::default(), + mutex: Default::default(), + }) as _ + } + + fn new_schema_provider(&self, catalog_name: &str, schema_name: &str) -> SchemaProviderRef { + Arc::new(RemoteSchemaProvider { + catalog_name: catalog_name.to_string(), + schema_name: schema_name.to_string(), + tables: Default::default(), + node_id: self.node_id, + backend: self.backend.clone(), + mutex: Default::default(), + }) as _ + } + + async fn iter_remote_catalogs( + &self, + ) -> Pin> + Send + '_>> { + let catalog_range_prefix = build_catalog_prefix(); + info!("catalog_range_prefix: {}", catalog_range_prefix); + let mut catalogs = self.backend.range(catalog_range_prefix.as_bytes()); + Box::pin(stream!({ + while let Some(r) = catalogs.next().await { + let Kv(k, _) = r?; + if !k.starts_with(catalog_range_prefix.as_bytes()) { + debug!("Ignoring non-catalog key: {}", String::from_utf8_lossy(&k)); + continue; + } + let key = CatalogKey::parse(&String::from_utf8_lossy(&k)) + .context(InvalidCatalogValueSnafu)?; + if key.node_id == self.node_id { + yield Ok(key) + } + } + })) + } + + async fn iter_remote_schemas( + &self, + catalog_name: &str, + ) -> Pin> + Send + '_>> { + let schema_prefix = build_schema_prefix(catalog_name); + let mut schemas = self.backend.range(schema_prefix.as_bytes()); + + Box::pin(stream!({ + while let Some(r) = schemas.next().await { + let Kv(k, _) = r?; + if !k.starts_with(schema_prefix.as_bytes()) { + debug!("Ignoring non-schema key: {}", String::from_utf8_lossy(&k)); + continue; + } + + let schema_key = SchemaKey::parse(&String::from_utf8_lossy(&k)) + .context(InvalidCatalogValueSnafu)?; + + if schema_key.node_id == self.node_id { + yield Ok(schema_key) + } + } + })) + } + + /// Iterate over all table entries on metasrv + /// TODO(hl): table entries with different version is not currently considered. + /// Ideally deprecated table entry must be deleted when deregistering from catalog. + async fn iter_remote_tables( + &self, + catalog_name: &str, + schema_name: &str, + ) -> Pin> + Send + '_>> { + let table_prefix = build_table_prefix(catalog_name, schema_name); + let mut tables = self.backend.range(table_prefix.as_bytes()); + Box::pin(stream!({ + while let Some(r) = tables.next().await { + let Kv(k, v) = r?; + if !k.starts_with(table_prefix.as_bytes()) { + debug!("Ignoring non-table prefix: {}", String::from_utf8_lossy(&k)); + continue; + } + let table_key = TableKey::parse(&String::from_utf8_lossy(&k)) + .context(InvalidCatalogValueSnafu)?; + let table_value = TableValue::parse(&String::from_utf8_lossy(&v)) + .context(InvalidCatalogValueSnafu)?; + + if table_value.node_id == self.node_id { + yield Ok((table_key, table_value)) + } + } + })) + } + + /// Fetch catalogs/schemas/tables from remote catalog manager along with max table id allocated. + async fn initiate_catalogs(&self) -> Result<(HashMap, TableId)> { + let mut res = HashMap::new(); + let max_table_id = MIN_USER_TABLE_ID; + + // initiate default catalog and schema + let default_catalog = self.initiate_default_catalog().await?; + res.insert(DEFAULT_CATALOG_NAME.to_string(), default_catalog); + info!("Default catalog and schema registered"); + + let mut catalogs = self.iter_remote_catalogs().await; + while let Some(r) = catalogs.next().await { + let CatalogKey { catalog_name, .. } = r?; + info!("Fetch catalog from metasrv: {}", catalog_name); + let catalog = res + .entry(catalog_name.clone()) + .or_insert_with(|| self.new_catalog_provider(&catalog_name)) + .clone(); + + self.initiate_schemas(catalog_name, catalog, max_table_id) + .await?; + } + + Ok((res, max_table_id)) + } + + async fn initiate_schemas( + &self, + catalog_name: String, + catalog: CatalogProviderRef, + max_table_id: TableId, + ) -> Result<()> { + let mut schemas = self.iter_remote_schemas(&catalog_name).await; + while let Some(r) = schemas.next().await { + let SchemaKey { + catalog_name, + schema_name, + .. + } = r?; + info!("Found schema: {}.{}", catalog_name, schema_name); + let schema = match catalog.schema(&schema_name)? { + None => { + let schema = self.new_schema_provider(&catalog_name, &schema_name); + catalog.register_schema(schema_name.clone(), schema.clone())?; + info!("Registered schema: {}", &schema_name); + schema + } + Some(schema) => schema, + }; + + info!( + "Fetch schema from metasrv: {}.{}", + &catalog_name, &schema_name + ); + self.initiate_tables(&catalog_name, &schema_name, schema, max_table_id) + .await?; + } + Ok(()) + } + + /// Initiates all tables inside a catalog by fetching data from metasrv. + async fn initiate_tables<'a>( + &'a self, + catalog_name: &'a str, + schema_name: &'a str, + schema: SchemaProviderRef, + mut max_table_id: TableId, + ) -> Result<()> { + let mut tables = self.iter_remote_tables(catalog_name, schema_name).await; + while let Some(r) = tables.next().await { + let (table_key, table_value) = r?; + let table_ref = self.open_or_create_table(&table_key, &table_value).await?; + schema.register_table(table_key.table_name.to_string(), table_ref)?; + info!("Registered table {}", &table_key.table_name); + if table_value.id > max_table_id { + info!("Max table id: {} -> {}", max_table_id, table_value.id); + max_table_id = table_value.id; + } + } + Ok(()) + } + + async fn initiate_default_catalog(&self) -> Result { + let default_catalog = self.new_catalog_provider(DEFAULT_CATALOG_NAME); + let default_schema = self.new_schema_provider(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME); + default_catalog.register_schema(DEFAULT_SCHEMA_NAME.to_string(), default_schema)?; + let schema_key = SchemaKey { + schema_name: DEFAULT_SCHEMA_NAME.to_string(), + catalog_name: DEFAULT_CATALOG_NAME.to_string(), + node_id: self.node_id, + } + .to_string(); + self.backend + .set( + schema_key.as_bytes(), + &SchemaValue {} + .to_bytes() + .context(InvalidCatalogValueSnafu)?, + ) + .await?; + info!("Registered default schema"); + + let catalog_key = CatalogKey { + catalog_name: DEFAULT_CATALOG_NAME.to_string(), + node_id: self.node_id, + } + .to_string(); + self.backend + .set( + catalog_key.as_bytes(), + &CatalogValue {} + .to_bytes() + .context(InvalidCatalogValueSnafu)?, + ) + .await?; + info!("Registered default catalog"); + Ok(default_catalog) + } + + async fn open_or_create_table( + &self, + table_key: &TableKey, + table_value: &TableValue, + ) -> Result { + let context = EngineContext {}; + let TableKey { + catalog_name, + schema_name, + table_name, + .. + } = table_key; + + let TableValue { id, meta, .. } = table_value; + + let request = OpenTableRequest { + catalog_name: catalog_name.clone(), + schema_name: schema_name.clone(), + table_name: table_name.clone(), + table_id: *id, + }; + match self + .engine + .open_table(&context, request) + .await + .with_context(|_| OpenTableSnafu { + table_info: format!("{}.{}.{}, id:{}", catalog_name, schema_name, table_name, id,), + })? { + Some(table) => Ok(table), + None => { + let req = CreateTableRequest { + id: *id, + catalog_name: catalog_name.clone(), + schema_name: schema_name.clone(), + table_name: table_name.clone(), + desc: None, + schema: Arc::new(Schema::new(meta.schema.column_schemas.clone())), + primary_key_indices: meta.primary_key_indices.clone(), + create_if_not_exists: true, + table_options: meta.options.clone(), + }; + + self.engine + .create_table(&context, req) + .await + .context(CreateTableSnafu { + table_info: format!( + "{}.{}.{}, id:{}", + &catalog_name, &schema_name, &table_name, id + ), + }) + } + } + } +} + +#[async_trait::async_trait] +impl CatalogManager for RemoteCatalogManager { + async fn start(&self) -> Result<()> { + let (catalogs, max_table_id) = self.initiate_catalogs().await?; + info!( + "Initialized catalogs: {:?}", + catalogs.keys().cloned().collect::>() + ); + self.catalogs.store(Arc::new(catalogs)); + self.next_table_id + .store(max_table_id + 1, Ordering::Relaxed); + info!("Max table id allocated: {}", max_table_id); + + let mut system_table_requests = self.system_table_requests.lock().await; + handle_system_table_request(self, self.engine.clone(), &mut system_table_requests).await?; + info!("All system table opened"); + Ok(()) + } + + fn next_table_id(&self) -> TableId { + self.next_table_id.fetch_add(1, Ordering::Relaxed) + } + + async fn register_table(&self, request: RegisterTableRequest) -> Result { + let catalog_name = request.catalog; + let schema_name = request.schema; + let catalog_provider = self.catalog(&catalog_name)?.context(CatalogNotFoundSnafu { + catalog_name: &catalog_name, + })?; + let schema_provider = + catalog_provider + .schema(&schema_name)? + .with_context(|| SchemaNotFoundSnafu { + schema_info: format!("{}.{}", &catalog_name, &schema_name), + })?; + if schema_provider.table_exist(&request.table_name)? { + return TableExistsSnafu { + table: format!("{}.{}.{}", &catalog_name, &schema_name, &request.table_name), + } + .fail(); + } + schema_provider.register_table(request.table_name, request.table)?; + Ok(1) + } + + async fn register_system_table(&self, request: RegisterSystemTableRequest) -> Result<()> { + let mut requests = self.system_table_requests.lock().await; + requests.push(request); + Ok(()) + } + + fn table( + &self, + catalog_name: &str, + schema_name: &str, + table_name: &str, + ) -> Result> { + let catalog = self + .catalog(catalog_name)? + .with_context(|| CatalogNotFoundSnafu { catalog_name })?; + let schema = catalog + .schema(schema_name)? + .with_context(|| SchemaNotFoundSnafu { + schema_info: format!("{}.{}", catalog_name, schema_name), + })?; + schema.table(table_name) + } +} + +impl CatalogList for RemoteCatalogManager { + fn as_any(&self) -> &dyn Any { + self + } + + fn register_catalog( + &self, + name: String, + catalog: CatalogProviderRef, + ) -> Result> { + let key = self.build_catalog_key(&name).to_string(); + let backend = self.backend.clone(); + let mutex = self.mutex.clone(); + let catalogs = self.catalogs.clone(); + + std::thread::spawn(|| { + common_runtime::block_on_write(async move { + let _guard = mutex.lock().await; + backend + .set( + key.as_bytes(), + &CatalogValue {} + .to_bytes() + .context(InvalidCatalogValueSnafu)?, + ) + .await?; + let prev_catalogs = catalogs.load(); + let mut new_catalogs = HashMap::with_capacity(prev_catalogs.len() + 1); + new_catalogs.clone_from(&prev_catalogs); + let prev = new_catalogs.insert(name, catalog); + catalogs.store(Arc::new(new_catalogs)); + Ok(prev) + }) + }) + .join() + .unwrap() + } + + /// List all catalogs from metasrv + fn catalog_names(&self) -> Result> { + Ok(self.catalogs.load().keys().cloned().collect::>()) + } + + /// Read catalog info of given name from metasrv. + fn catalog(&self, name: &str) -> Result> { + Ok(self.catalogs.load().get(name).cloned()) + } +} + +pub struct RemoteCatalogProvider { + catalog_name: String, + node_id: u64, + backend: KvBackendRef, + schemas: Arc>>, + mutex: Arc>, +} + +impl RemoteCatalogProvider { + pub fn new(catalog_name: String, node_id: u64, backend: KvBackendRef) -> Self { + Self { + catalog_name, + node_id, + backend, + schemas: Default::default(), + mutex: Default::default(), + } + } + + fn build_schema_key(&self, schema_name: impl AsRef) -> SchemaKey { + SchemaKey { + catalog_name: self.catalog_name.clone(), + schema_name: schema_name.as_ref().to_string(), + node_id: self.node_id, + } + } +} + +impl CatalogProvider for RemoteCatalogProvider { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema_names(&self) -> Result> { + Ok(self.schemas.load().keys().cloned().collect::>()) + } + + fn register_schema( + &self, + name: String, + schema: SchemaProviderRef, + ) -> Result> { + let key = self.build_schema_key(&name).to_string(); + let backend = self.backend.clone(); + let mutex = self.mutex.clone(); + let schemas = self.schemas.clone(); + + std::thread::spawn(|| { + common_runtime::block_on_write(async move { + let _guard = mutex.lock().await; + backend + .set( + key.as_bytes(), + &SchemaValue {} + .to_bytes() + .context(InvalidCatalogValueSnafu)?, + ) + .await?; + let prev_schemas = schemas.load(); + let mut new_schemas = HashMap::with_capacity(prev_schemas.len() + 1); + new_schemas.clone_from(&prev_schemas); + let prev_schema = new_schemas.insert(name, schema); + schemas.store(Arc::new(new_schemas)); + Ok(prev_schema) + }) + }) + .join() + .unwrap() + } + + fn schema(&self, name: &str) -> Result>> { + Ok(self.schemas.load().get(name).cloned()) + } +} + +pub struct RemoteSchemaProvider { + catalog_name: String, + schema_name: String, + node_id: u64, + backend: KvBackendRef, + tables: Arc>>, + mutex: Arc>, +} + +impl RemoteSchemaProvider { + pub fn new( + catalog_name: String, + schema_name: String, + node_id: u64, + backend: KvBackendRef, + ) -> Self { + Self { + catalog_name, + schema_name, + node_id, + backend, + tables: Default::default(), + mutex: Default::default(), + } + } + + fn build_table_key( + &self, + table_name: impl AsRef, + table_version: TableVersion, + ) -> TableKey { + TableKey { + catalog_name: self.catalog_name.clone(), + schema_name: self.schema_name.clone(), + table_name: table_name.as_ref().to_string(), + version: table_version, + node_id: self.node_id, + } + } +} + +impl SchemaProvider for RemoteSchemaProvider { + fn as_any(&self) -> &dyn Any { + self + } + + fn table_names(&self) -> Result> { + Ok(self.tables.load().keys().cloned().collect::>()) + } + + fn table(&self, name: &str) -> Result> { + Ok(self.tables.load().get(name).cloned()) + } + + fn register_table(&self, name: String, table: TableRef) -> Result> { + let table_info = table.table_info(); + let table_version = table_info.ident.version; + let table_value = TableValue { + meta: table_info.meta.clone().into(), + id: table_info.ident.table_id, + node_id: self.node_id, + regions_ids: vec![], + }; + let backend = self.backend.clone(); + let mutex = self.mutex.clone(); + let tables = self.tables.clone(); + + let table_key = self + .build_table_key(name.clone(), table_version) + .to_string(); + + let prev = std::thread::spawn(move || { + common_runtime::block_on_read(async move { + let _guard = mutex.lock().await; + backend + .set( + table_key.as_bytes(), + &table_value.as_bytes().context(InvalidCatalogValueSnafu)?, + ) + .await?; + debug!( + "Successfully set catalog table entry, key: {}, table value: {:?}", + table_key, table_value + ); + + let prev_tables = tables.load(); + let mut new_tables = HashMap::with_capacity(prev_tables.len() + 1); + new_tables.clone_from(&prev_tables); + let prev = new_tables.insert(name, table); + tables.store(Arc::new(new_tables)); + Ok(prev) + }) + }) + .join() + .unwrap(); + prev + } + + fn deregister_table(&self, name: &str) -> Result> { + let table_version = match self.tables.load().get(name) { + None => return Ok(None), + Some(t) => t.table_info().ident.version, + }; + + let table_name = name.to_string(); + let table_key = self.build_table_key(&table_name, table_version).to_string(); + + let backend = self.backend.clone(); + let mutex = self.mutex.clone(); + let tables = self.tables.clone(); + + let prev = std::thread::spawn(move || { + common_runtime::block_on_read(async move { + let _guard = mutex.lock().await; + backend.delete(table_key.as_bytes()).await?; + debug!( + "Successfully deleted catalog table entry, key: {}", + table_key + ); + + let prev_tables = tables.load(); + let mut new_tables = HashMap::with_capacity(prev_tables.len() + 1); + new_tables.clone_from(&prev_tables); + let prev = new_tables.remove(&table_name); + tables.store(Arc::new(new_tables)); + Ok(prev) + }) + }) + .join() + .unwrap(); + prev + } + + /// Checks if table exists in schema provider based on locally opened table map. + fn table_exist(&self, name: &str) -> Result { + Ok(self.tables.load().contains_key(name)) + } +} diff --git a/src/catalog/tests/mock.rs b/src/catalog/tests/mock.rs new file mode 100644 index 0000000000..e19a5c914c --- /dev/null +++ b/src/catalog/tests/mock.rs @@ -0,0 +1,163 @@ +use std::collections::{BTreeMap, HashMap}; +use std::fmt::{Display, Formatter}; +use std::str::FromStr; +use std::sync::Arc; + +use async_stream::stream; +use catalog::error::Error; +use catalog::remote::{Kv, KvBackend, ValueIter}; +use common_recordbatch::RecordBatch; +use common_telemetry::logging::info; +use datatypes::data_type::ConcreteDataType; +use datatypes::schema::{ColumnSchema, Schema}; +use datatypes::vectors::StringVector; +use serde::Serializer; +use table::engine::{EngineContext, TableEngine}; +use table::metadata::TableId; +use table::requests::{AlterTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest}; +use table::test_util::MemTable; +use table::TableRef; +use tokio::sync::RwLock; + +#[derive(Default)] +pub struct MockKvBackend { + map: RwLock, Vec>>, +} + +impl Display for MockKvBackend { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + futures::executor::block_on(async { + let map = self.map.read().await; + for (k, v) in map.iter() { + f.serialize_str(&String::from_utf8_lossy(k))?; + f.serialize_str(" -> ")?; + f.serialize_str(&String::from_utf8_lossy(v))?; + f.serialize_str("\n")?; + } + Ok(()) + }) + } +} + +#[async_trait::async_trait] +impl KvBackend for MockKvBackend { + fn range<'a, 'b>(&'a self, key: &[u8]) -> ValueIter<'b, Error> + where + 'a: 'b, + { + let prefix = key.to_vec(); + let prefix_string = String::from_utf8_lossy(&prefix).to_string(); + Box::pin(stream!({ + let maps = self.map.read().await.clone(); + for (k, v) in maps.range(prefix.clone()..) { + let key_string = String::from_utf8_lossy(k).to_string(); + let matches = key_string.starts_with(&prefix_string); + if matches { + yield Ok(Kv(k.clone(), v.clone())) + } else { + info!("Stream finished"); + return; + } + } + })) + } + + async fn set(&self, key: &[u8], val: &[u8]) -> Result<(), Error> { + let mut map = self.map.write().await; + map.insert(key.to_vec(), val.to_vec()); + Ok(()) + } + + async fn delete_range(&self, key: &[u8], end: &[u8]) -> Result<(), Error> { + let start = key.to_vec(); + let end = end.to_vec(); + let range = start..end; + + let mut map = self.map.write().await; + map.retain(|k, _| !range.contains(k)); + Ok(()) + } +} + +#[derive(Default)] +pub struct MockTableEngine { + tables: RwLock>, +} + +#[async_trait::async_trait] +impl TableEngine for MockTableEngine { + fn name(&self) -> &str { + "MockTableEngine" + } + + /// Create a table with only one column + async fn create_table( + &self, + _ctx: &EngineContext, + request: CreateTableRequest, + ) -> table::Result { + let table_name = request.table_name.clone(); + let catalog_name = request.catalog_name.clone(); + let schema_name = request.schema_name.clone(); + + let default_table_id = "0".to_owned(); + let table_id = TableId::from_str( + request + .table_options + .get("table_id") + .unwrap_or(&default_table_id), + ) + .unwrap(); + let schema = Arc::new(Schema::new(vec![ColumnSchema::new( + "name", + ConcreteDataType::string_datatype(), + true, + )])); + + let data = vec![Arc::new(StringVector::from(vec!["a", "b", "c"])) as _]; + let record_batch = RecordBatch::new(schema, data).unwrap(); + let table: TableRef = Arc::new(MemTable::new_with_catalog( + &table_name, + record_batch, + table_id, + catalog_name, + schema_name, + )) as Arc<_>; + + let mut tables = self.tables.write().await; + tables.insert(table_name, table.clone() as TableRef); + Ok(table) + } + + async fn open_table( + &self, + _ctx: &EngineContext, + request: OpenTableRequest, + ) -> table::Result> { + Ok(self.tables.read().await.get(&request.table_name).cloned()) + } + + async fn alter_table( + &self, + _ctx: &EngineContext, + _request: AlterTableRequest, + ) -> table::Result { + unimplemented!() + } + + fn get_table(&self, _ctx: &EngineContext, name: &str) -> table::Result> { + futures::executor::block_on(async { Ok(self.tables.read().await.get(name).cloned()) }) + } + + fn table_exists(&self, _ctx: &EngineContext, name: &str) -> bool { + futures::executor::block_on(async { self.tables.read().await.contains_key(name) }) + } + + async fn drop_table( + &self, + _ctx: &EngineContext, + _request: DropTableRequest, + ) -> table::Result<()> { + unimplemented!() + } +} diff --git a/src/catalog/tests/remote_catalog_tests.rs b/src/catalog/tests/remote_catalog_tests.rs new file mode 100644 index 0000000000..40b9de8eb3 --- /dev/null +++ b/src/catalog/tests/remote_catalog_tests.rs @@ -0,0 +1,274 @@ +#![feature(assert_matches)] + +mod mock; + +#[cfg(test)] +mod tests { + use std::assert_matches::assert_matches; + use std::collections::HashSet; + use std::sync::Arc; + + use catalog::remote::{ + KvBackend, KvBackendRef, RemoteCatalogManager, RemoteCatalogProvider, RemoteSchemaProvider, + }; + use catalog::{CatalogManager, CatalogManagerRef, RegisterTableRequest}; + use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; + use common_catalog::{CatalogKey, CatalogValue, SchemaKey, SchemaValue}; + use datatypes::schema::Schema; + use futures_util::StreamExt; + use table::engine::{EngineContext, TableEngineRef}; + use table::requests::CreateTableRequest; + + use crate::mock::{MockKvBackend, MockTableEngine}; + + #[tokio::test] + async fn test_backend() { + common_telemetry::init_default_ut_logging(); + let node_id = 42; + let backend = MockKvBackend::default(); + + let default_catalog_key = CatalogKey { + catalog_name: DEFAULT_CATALOG_NAME.to_string(), + node_id, + } + .to_string(); + + backend + .set( + default_catalog_key.as_bytes(), + &CatalogValue {}.to_bytes().unwrap(), + ) + .await + .unwrap(); + + let schema_key = SchemaKey { + catalog_name: DEFAULT_CATALOG_NAME.to_string(), + schema_name: DEFAULT_SCHEMA_NAME.to_string(), + node_id, + } + .to_string(); + backend + .set(schema_key.as_bytes(), &SchemaValue {}.to_bytes().unwrap()) + .await + .unwrap(); + + let mut iter = backend.range("__c-".as_bytes()); + let mut res = HashSet::new(); + while let Some(r) = iter.next().await { + let kv = r.unwrap(); + res.insert(String::from_utf8_lossy(&kv.0).to_string()); + } + assert_eq!( + vec!["__c-greptime-42".to_string()], + res.into_iter().collect::>() + ); + } + + async fn prepare_components(node_id: u64) -> (KvBackendRef, TableEngineRef, CatalogManagerRef) { + let backend = Arc::new(MockKvBackend::default()) as KvBackendRef; + let table_engine = Arc::new(MockTableEngine::default()); + let catalog_manager = + RemoteCatalogManager::new(table_engine.clone(), node_id, backend.clone()); + catalog_manager.start().await.unwrap(); + (backend, table_engine, Arc::new(catalog_manager)) + } + + #[tokio::test] + async fn test_remote_catalog_default() { + common_telemetry::init_default_ut_logging(); + let node_id = 42; + let (_, _, catalog_manager) = prepare_components(node_id).await; + assert_eq!( + vec![DEFAULT_CATALOG_NAME.to_string()], + catalog_manager.catalog_names().unwrap() + ); + + let default_catalog = catalog_manager + .catalog(DEFAULT_CATALOG_NAME) + .unwrap() + .unwrap(); + assert_eq!( + vec![DEFAULT_SCHEMA_NAME.to_string()], + default_catalog.schema_names().unwrap() + ); + } + + #[tokio::test] + async fn test_remote_catalog_register_nonexistent() { + common_telemetry::init_default_ut_logging(); + let node_id = 42; + let (_, table_engine, catalog_manager) = prepare_components(node_id).await; + // register a new table with an nonexistent catalog + let catalog_name = "nonexistent_catalog".to_string(); + let schema_name = "nonexistent_schema".to_string(); + let table_name = "fail_table".to_string(); + // this schema has no effect + let table_schema = Arc::new(Schema::new(vec![])); + let table = table_engine + .create_table( + &EngineContext {}, + CreateTableRequest { + id: 1, + catalog_name: catalog_name.clone(), + schema_name: schema_name.clone(), + table_name: table_name.clone(), + desc: None, + schema: table_schema.clone(), + primary_key_indices: vec![], + create_if_not_exists: false, + table_options: Default::default(), + }, + ) + .await + .unwrap(); + let reg_req = RegisterTableRequest { + catalog: catalog_name, + schema: schema_name, + table_name, + table_id: 1, + table, + }; + let res = catalog_manager.register_table(reg_req).await; + + // because nonexistent_catalog does not exist yet. + assert_matches!( + res.err().unwrap(), + catalog::error::Error::CatalogNotFound { .. } + ); + } + + #[tokio::test] + async fn test_register_table() { + let node_id = 42; + let (_, table_engine, catalog_manager) = prepare_components(node_id).await; + let default_catalog = catalog_manager + .catalog(DEFAULT_CATALOG_NAME) + .unwrap() + .unwrap(); + assert_eq!( + vec![DEFAULT_SCHEMA_NAME.to_string()], + default_catalog.schema_names().unwrap() + ); + + let default_schema = default_catalog + .schema(DEFAULT_SCHEMA_NAME) + .unwrap() + .unwrap(); + assert_eq!(Vec::::new(), default_schema.table_names().unwrap()); + + // register a new table with an nonexistent catalog + let catalog_name = DEFAULT_CATALOG_NAME.to_string(); + let schema_name = DEFAULT_SCHEMA_NAME.to_string(); + let table_name = "test_table".to_string(); + let table_id = 1; + // this schema has no effect + let table_schema = Arc::new(Schema::new(vec![])); + let table = table_engine + .create_table( + &EngineContext {}, + CreateTableRequest { + id: table_id, + catalog_name: catalog_name.clone(), + schema_name: schema_name.clone(), + table_name: table_name.clone(), + desc: None, + schema: table_schema.clone(), + primary_key_indices: vec![], + create_if_not_exists: false, + table_options: Default::default(), + }, + ) + .await + .unwrap(); + let reg_req = RegisterTableRequest { + catalog: catalog_name, + schema: schema_name, + table_name: table_name.clone(), + table_id, + table, + }; + assert_eq!(1, catalog_manager.register_table(reg_req).await.unwrap()); + assert_eq!(vec![table_name], default_schema.table_names().unwrap()); + } + + #[tokio::test] + async fn test_register_catalog_schema_table() { + let node_id = 42; + let (backend, table_engine, catalog_manager) = prepare_components(node_id).await; + + let catalog_name = "test_catalog".to_string(); + let schema_name = "nonexistent_schema".to_string(); + let catalog = Arc::new(RemoteCatalogProvider::new( + catalog_name.clone(), + node_id, + backend.clone(), + )); + + // register catalog to catalog manager + catalog_manager + .register_catalog(catalog_name.clone(), catalog) + .unwrap(); + assert_eq!( + HashSet::::from_iter( + vec![DEFAULT_CATALOG_NAME.to_string(), catalog_name.clone()].into_iter() + ), + HashSet::from_iter(catalog_manager.catalog_names().unwrap().into_iter()) + ); + + let table_to_register = table_engine + .create_table( + &EngineContext {}, + CreateTableRequest { + id: 2, + catalog_name: catalog_name.clone(), + schema_name: schema_name.clone(), + table_name: "".to_string(), + desc: None, + schema: Arc::new(Schema::new(vec![])), + primary_key_indices: vec![], + create_if_not_exists: false, + table_options: Default::default(), + }, + ) + .await + .unwrap(); + + let reg_req = RegisterTableRequest { + catalog: catalog_name.clone(), + schema: schema_name.clone(), + table_name: " fail_table".to_string(), + table_id: 2, + table: table_to_register, + }; + // this register will fail since schema does not exist yet + assert_matches!( + catalog_manager + .register_table(reg_req.clone()) + .await + .unwrap_err(), + catalog::error::Error::SchemaNotFound { .. } + ); + + let new_catalog = catalog_manager + .catalog(&catalog_name) + .unwrap() + .expect("catalog should exist since it's already registered"); + let schema = Arc::new(RemoteSchemaProvider::new( + catalog_name.clone(), + schema_name.clone(), + node_id, + backend.clone(), + )); + + let prev = new_catalog + .register_schema(schema_name.clone(), schema.clone()) + .expect("Register schema should not fail"); + assert!(prev.is_none()); + assert_eq!(1, catalog_manager.register_table(reg_req).await.unwrap()); + + assert_eq!( + HashSet::from([schema_name.clone()]), + new_catalog.schema_names().unwrap().into_iter().collect() + ) + } +} diff --git a/src/common/catalog/src/error.rs b/src/common/catalog/src/error.rs index 36682b0ae6..0ce3a49f69 100644 --- a/src/common/catalog/src/error.rs +++ b/src/common/catalog/src/error.rs @@ -22,6 +22,9 @@ pub enum Error { backtrace: Backtrace, source: serde_json::error::Error, }, + + #[snafu(display("Failed to parse node id: {}", key))] + ParseNodeId { key: String, backtrace: Backtrace }, } impl ErrorExt for Error { @@ -30,6 +33,7 @@ impl ErrorExt for Error { Error::InvalidCatalog { .. } | Error::DeserializeCatalogEntryValue { .. } | Error::SerializeCatalogEntryValue { .. } => StatusCode::Unexpected, + Error::ParseNodeId { .. } => StatusCode::InvalidArguments, } } diff --git a/src/common/catalog/src/helper.rs b/src/common/catalog/src/helper.rs index a95ad70d79..dfbd5ef231 100644 --- a/src/common/catalog/src/helper.rs +++ b/src/common/catalog/src/helper.rs @@ -9,20 +9,18 @@ use table::metadata::{RawTableMeta, TableId, TableVersion}; use crate::consts::{CATALOG_KEY_PREFIX, SCHEMA_KEY_PREFIX, TABLE_KEY_PREFIX}; use crate::error::{ - DeserializeCatalogEntryValueSnafu, Error, InvalidCatalogSnafu, SerializeCatalogEntryValueSnafu, + DeserializeCatalogEntryValueSnafu, Error, InvalidCatalogSnafu, ParseNodeIdSnafu, + SerializeCatalogEntryValueSnafu, }; lazy_static! { - static ref CATALOG_KEY_PATTERN: Regex = Regex::new(&format!( - "^{}-([a-zA-Z_]+)-([a-zA-Z_]+)$", - CATALOG_KEY_PREFIX - )) - .unwrap(); + static ref CATALOG_KEY_PATTERN: Regex = + Regex::new(&format!("^{}-([a-zA-Z_]+)-([0-9]+)$", CATALOG_KEY_PREFIX)).unwrap(); } lazy_static! { static ref SCHEMA_KEY_PATTERN: Regex = Regex::new(&format!( - "^{}-([a-zA-Z_]+)-([a-zA-Z_]+)-([a-zA-Z_]+)$", + "^{}-([a-zA-Z_]+)-([a-zA-Z_]+)-([0-9]+)$", SCHEMA_KEY_PREFIX )) .unwrap(); @@ -30,7 +28,7 @@ lazy_static! { lazy_static! { static ref TABLE_KEY_PATTERN: Regex = Regex::new(&format!( - "^{}-([a-zA-Z_]+)-([a-zA-Z_]+)-([a-zA-Z_]+)-([0-9]+)-([a-zA-Z_]+)$", + "^{}-([a-zA-Z_]+)-([a-zA-Z_]+)-([a-zA-Z_]+)-([0-9]+)-([0-9]+)$", TABLE_KEY_PREFIX )) .unwrap(); @@ -58,7 +56,7 @@ pub struct TableKey { pub schema_name: String, pub table_name: String, pub version: TableVersion, - pub node_id: String, + pub node_id: u64, } impl Display for TableKey { @@ -73,7 +71,7 @@ impl Display for TableKey { f.write_str("-")?; f.serialize_u64(self.version)?; f.write_str("-")?; - f.write_str(&self.node_id) + f.serialize_u64(self.node_id) } } @@ -87,12 +85,15 @@ impl TableKey { let version = u64::from_str(&captures[4]).map_err(|_| InvalidCatalogSnafu { key }.build())?; + let node_id_str = captures[5].to_string(); + let node_id = u64::from_str(&node_id_str) + .map_err(|_| ParseNodeIdSnafu { key: node_id_str }.build())?; Ok(Self { catalog_name: captures[1].to_string(), schema_name: captures[2].to_string(), table_name: captures[3].to_string(), version, - node_id: captures[5].to_string(), + node_id, }) } } @@ -100,7 +101,8 @@ impl TableKey { #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub struct TableValue { pub id: TableId, - pub node_id: String, + pub node_id: u64, + pub regions_ids: Vec, pub meta: RawTableMeta, } @@ -119,7 +121,7 @@ impl TableValue { pub struct CatalogKey { pub catalog_name: String, - pub node_id: String, + pub node_id: u64, } impl Display for CatalogKey { @@ -128,7 +130,7 @@ impl Display for CatalogKey { f.write_str("-")?; f.write_str(&self.catalog_name)?; f.write_str("-")?; - f.write_str(&self.node_id) + f.serialize_u64(self.node_id) } } @@ -139,9 +141,14 @@ impl CatalogKey { .captures(key) .context(InvalidCatalogSnafu { key })?; ensure!(captures.len() == 3, InvalidCatalogSnafu { key }); + + let node_id_str = captures[2].to_string(); + let node_id = u64::from_str(&node_id_str) + .map_err(|_| ParseNodeIdSnafu { key: node_id_str }.build())?; + Ok(Self { catalog_name: captures[1].to_string(), - node_id: captures[2].to_string(), + node_id, }) } } @@ -160,7 +167,7 @@ impl CatalogValue { pub struct SchemaKey { pub catalog_name: String, pub schema_name: String, - pub node_id: String, + pub node_id: u64, } impl Display for SchemaKey { @@ -171,7 +178,7 @@ impl Display for SchemaKey { f.write_str("-")?; f.write_str(&self.schema_name)?; f.write_str("-")?; - f.write_str(&self.node_id) + f.serialize_u64(self.node_id) } } @@ -183,10 +190,14 @@ impl SchemaKey { .context(InvalidCatalogSnafu { key })?; ensure!(captures.len() == 4, InvalidCatalogSnafu { key }); + let node_id_str = captures[3].to_string(); + let node_id = u64::from_str(&node_id_str) + .map_err(|_| ParseNodeIdSnafu { key: node_id_str }.build())?; + Ok(Self { catalog_name: captures[1].to_string(), schema_name: captures[2].to_string(), - node_id: captures[3].to_string(), + node_id, }) } } @@ -211,31 +222,31 @@ mod tests { #[test] fn test_parse_catalog_key() { - let key = "__c-C-N"; + let key = "__c-C-2"; let catalog_key = CatalogKey::parse(key).unwrap(); assert_eq!("C", catalog_key.catalog_name); - assert_eq!("N", catalog_key.node_id); + assert_eq!(2, catalog_key.node_id); assert_eq!(key, catalog_key.to_string()); } #[test] fn test_parse_schema_key() { - let key = "__s-C-S-N"; + let key = "__s-C-S-3"; let schema_key = SchemaKey::parse(key).unwrap(); assert_eq!("C", schema_key.catalog_name); assert_eq!("S", schema_key.schema_name); - assert_eq!("N", schema_key.node_id); + assert_eq!(3, schema_key.node_id); assert_eq!(key, schema_key.to_string()); } #[test] fn test_parse_table_key() { - let key = "__t-C-S-T-42-N"; + let key = "__t-C-S-T-42-1"; let entry = TableKey::parse(key).unwrap(); assert_eq!("C", entry.catalog_name); assert_eq!("S", entry.schema_name); assert_eq!("T", entry.table_name); - assert_eq!("N", entry.node_id); + assert_eq!(1, entry.node_id); assert_eq!(42, entry.version); assert_eq!(key, &entry.to_string()); } @@ -271,7 +282,8 @@ mod tests { let value = TableValue { id: 42, - node_id: "localhost".to_string(), + node_id: 32, + regions_ids: vec![1, 2, 3], meta, }; let serialized = serde_json::to_string(&value).unwrap(); diff --git a/src/meta-client/Cargo.toml b/src/meta-client/Cargo.toml index c3f51f32be..f976db4a51 100644 --- a/src/meta-client/Cargo.toml +++ b/src/meta-client/Cargo.toml @@ -18,7 +18,7 @@ tonic = "0.8" [dev-dependencies] futures = "0.3" -meta-srv = { path = "../meta-srv", features = ["mock"]} +meta-srv = { path = "../meta-srv", features = ["mock"] } tower = "0.4" tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] }