refactor: move Database to client crate behind testing feature (#4059)

* refactor: move Database to client crate behind testing feature

Signed-off-by: tison <wander4096@gmail.com>

* partial move

Signed-off-by: tison <wander4096@gmail.com>

* catch up more

Signed-off-by: tison <wander4096@gmail.com>

* fix imports

Signed-off-by: tison <wander4096@gmail.com>

* finish

Signed-off-by: tison <wander4096@gmail.com>

* tidy

Signed-off-by: tison <wander4096@gmail.com>

---------

Signed-off-by: tison <wander4096@gmail.com>
This commit is contained in:
tison
2024-05-28 11:21:43 +08:00
committed by GitHub
parent 097f62f459
commit 9dd6e033a7
17 changed files with 167 additions and 114 deletions

3
Cargo.lock generated
View File

@@ -904,7 +904,6 @@ dependencies = [
"rskafka",
"serde",
"store-api",
"tests-integration",
"tokio",
"toml 0.8.12",
"uuid",
@@ -9631,7 +9630,6 @@ dependencies = [
"strum 0.25.0",
"table",
"tempfile",
"tests-integration",
"tikv-jemalloc-ctl",
"tokio",
"tokio-postgres",
@@ -9996,7 +9994,6 @@ dependencies = [
"serde_json",
"sqlness",
"tempfile",
"tests-integration",
"tinytemplate",
"tokio",
]

View File

@@ -233,8 +233,6 @@ sql = { path = "src/sql" }
store-api = { path = "src/store-api" }
substrait = { path = "src/common/substrait" }
table = { path = "src/table" }
# TODO some code depends on this
tests-integration = { path = "tests-integration" }
[workspace.dependencies.meter-macros]
git = "https://github.com/GreptimeTeam/greptime-meter.git"

View File

@@ -12,7 +12,7 @@ api.workspace = true
arrow.workspace = true
chrono.workspace = true
clap.workspace = true
client.workspace = true
client = { workspace = true, features = ["testing"] }
common-base.workspace = true
common-telemetry.workspace = true
common-wal.workspace = true
@@ -33,8 +33,6 @@ rand.workspace = true
rskafka.workspace = true
serde.workspace = true
store-api.workspace = true
# TODO depend `Database` client
tests-integration.workspace = true
tokio.workspace = true
toml.workspace = true
uuid.workspace = true

View File

@@ -23,8 +23,6 @@ use api::v1::{
};
use arrow_flight::Ticket;
use async_stream::stream;
use client::error::{ConvertFlightDataSnafu, Error, IllegalFlightMessagesSnafu, ServerSnafu};
use client::{from_grpc_response, Client, Result};
use common_error::ext::{BoxedError, ErrorExt};
use common_grpc::flight::{FlightDecoder, FlightMessage};
use common_query::Output;
@@ -37,7 +35,8 @@ use prost::Message;
use snafu::{ensure, ResultExt};
use tonic::transport::Channel;
pub const DEFAULT_LOOKBACK_STRING: &str = "5m";
use crate::error::{ConvertFlightDataSnafu, Error, IllegalFlightMessagesSnafu, ServerSnafu};
use crate::{from_grpc_response, Client, Result};
#[derive(Clone, Debug, Default)]
pub struct Database {
@@ -105,10 +104,18 @@ impl Database {
self.catalog = catalog.into();
}
pub fn catalog(&self) -> &String {
&self.catalog
}
pub fn set_schema(&mut self, schema: impl Into<String>) {
self.schema = schema.into();
}
pub fn schema(&self) -> &String {
&self.schema
}
pub fn set_timezone(&mut self, timezone: impl Into<String>) {
self.timezone = timezone.into();
}
@@ -156,6 +163,13 @@ impl Database {
.await
}
pub async fn logical_plan(&self, logical_plan: Vec<u8>) -> Result<Output> {
self.do_get(Request::Query(QueryRequest {
query: Some(Query::LogicalPlan(logical_plan)),
}))
.await
}
pub async fn create(&self, expr: CreateTableExpr) -> Result<Output> {
self.do_get(Request::Ddl(DdlRequest {
expr: Some(DdlExpr::CreateTable(expr)),
@@ -269,17 +283,12 @@ struct FlightContext {
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use api::v1::auth_header::AuthScheme;
use api::v1::{AuthHeader, Basic};
use clap::Parser;
use client::Client;
use cmd::error::Result as CmdResult;
use cmd::options::GlobalOptions;
use cmd::{cli, standalone, App};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_telemetry::logging::LoggingOptions;
use super::{Database, FlightContext};
use super::*;
#[test]
fn test_flight_ctx() {
@@ -295,76 +304,11 @@ mod tests {
auth_scheme: Some(basic),
});
assert!(matches!(
assert_matches!(
ctx.auth_header,
Some(AuthHeader {
auth_scheme: Some(AuthScheme::Basic(_)),
})
))
}
#[tokio::test(flavor = "multi_thread")]
async fn test_export_create_table_with_quoted_names() -> CmdResult<()> {
let output_dir = tempfile::tempdir().unwrap();
let standalone = standalone::Command::parse_from([
"standalone",
"start",
"--data-home",
&*output_dir.path().to_string_lossy(),
]);
let standalone_opts = standalone.load_options(&GlobalOptions::default()).unwrap();
let mut instance = standalone.build(standalone_opts).await?;
instance.start().await?;
let client = Client::with_urls(["127.0.0.1:4001"]);
let database = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client);
database
.sql(r#"CREATE DATABASE "cli.export.create_table";"#)
.await
.unwrap();
database
.sql(
r#"CREATE TABLE "cli.export.create_table"."a.b.c"(
ts TIMESTAMP,
TIME INDEX (ts)
) engine=mito;
"#,
)
.await
.unwrap();
let output_dir = tempfile::tempdir().unwrap();
let cli = cli::Command::parse_from([
"cli",
"export",
"--addr",
"127.0.0.1:4000",
"--output-dir",
&*output_dir.path().to_string_lossy(),
"--target",
"create-table",
]);
let mut cli_app = cli.build(LoggingOptions::default()).await?;
cli_app.start().await?;
instance.stop().await?;
let output_file = output_dir
.path()
.join("greptime-cli.export.create_table.sql");
let res = std::fs::read_to_string(output_file).unwrap();
let expect = r#"CREATE TABLE IF NOT EXISTS "a.b.c" (
"ts" TIMESTAMP(3) NOT NULL,
TIME INDEX ("ts")
)
ENGINE=mito
;
"#;
assert_eq!(res.trim(), expect.trim());
Ok(())
)
}
}

View File

@@ -12,8 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#![feature(assert_matches)]
mod client;
pub mod client_manager;
#[cfg(feature = "testing")]
mod database;
pub mod error;
pub mod load_balance;
mod metrics;
@@ -29,6 +33,8 @@ pub use common_recordbatch::{RecordBatches, SendableRecordBatchStream};
use snafu::OptionExt;
pub use self::client::Client;
#[cfg(feature = "testing")]
pub use self::database::Database;
pub use self::error::{Error, Result};
use crate::error::{IllegalDatabaseResponseSnafu, ServerSnafu};

View File

@@ -80,6 +80,7 @@ tracing-appender = "0.2"
tikv-jemallocator = "0.5"
[dev-dependencies]
client = { workspace = true, features = ["testing"] }
common-test-util.workspace = true
serde.workspace = true
temp-env = "0.3"

View File

@@ -22,8 +22,8 @@ mod helper;
// Wait for https://github.com/GreptimeTeam/greptimedb/issues/2373
#[allow(unused)]
// mod repl;
// TODO(weny): Removes it
mod repl;
// TODO(tisonkun): migrate deprecated methods
#[allow(deprecated)]
mod upgrade;
@@ -31,8 +31,8 @@ use async_trait::async_trait;
use bench::BenchTableMetadataCommand;
use clap::Parser;
use common_telemetry::logging::{LoggingOptions, TracingOptions};
pub use repl::Repl;
use tracing_appender::non_blocking::WorkerGuard;
// pub use repl::Repl;
use upgrade::UpgradeCommand;
use self::export::ExportCommand;

View File

@@ -434,3 +434,80 @@ fn split_database(database: &str) -> Result<(String, Option<String>)> {
Ok((catalog.to_string(), Some(schema.to_string())))
}
}
#[cfg(test)]
mod tests {
use clap::Parser;
use client::{Client, Database};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_telemetry::logging::LoggingOptions;
use crate::error::Result as CmdResult;
use crate::options::GlobalOptions;
use crate::{cli, standalone, App};
#[tokio::test(flavor = "multi_thread")]
async fn test_export_create_table_with_quoted_names() -> CmdResult<()> {
let output_dir = tempfile::tempdir().unwrap();
let standalone = standalone::Command::parse_from([
"standalone",
"start",
"--data-home",
&*output_dir.path().to_string_lossy(),
]);
let standalone_opts = standalone.load_options(&GlobalOptions::default()).unwrap();
let mut instance = standalone.build(standalone_opts).await?;
instance.start().await?;
let client = Client::with_urls(["127.0.0.1:4001"]);
let database = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client);
database
.sql(r#"CREATE DATABASE "cli.export.create_table";"#)
.await
.unwrap();
database
.sql(
r#"CREATE TABLE "cli.export.create_table"."a.b.c"(
ts TIMESTAMP,
TIME INDEX (ts)
) engine=mito;
"#,
)
.await
.unwrap();
let output_dir = tempfile::tempdir().unwrap();
let cli = cli::Command::parse_from([
"cli",
"export",
"--addr",
"127.0.0.1:4000",
"--output-dir",
&*output_dir.path().to_string_lossy(),
"--target",
"create-table",
]);
let mut cli_app = cli.build(LoggingOptions::default()).await?;
cli_app.start().await?;
instance.stop().await?;
let output_file = output_dir
.path()
.join("greptime-cli.export.create_table.sql");
let res = std::fs::read_to_string(output_file).unwrap();
let expect = r#"CREATE TABLE IF NOT EXISTS "a.b.c" (
"ts" TIMESTAMP(3) NOT NULL,
TIME INDEX ("ts")
)
ENGINE=mito
;
"#;
assert_eq!(res.trim(), expect.trim());
Ok(())
}
}

View File

@@ -16,14 +16,18 @@ use std::path::PathBuf;
use std::sync::Arc;
use std::time::Instant;
use catalog::kvbackend::{
CachedMetaKvBackend, CachedMetaKvBackendBuilder, KvBackendCatalogManager,
use cache::{
build_fundamental_cache_registry, with_default_composite_cache_registry, TABLE_CACHE_NAME,
TABLE_ROUTE_CACHE_NAME,
};
use client::{Client, OutputData, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use catalog::kvbackend::{
CachedMetaKvBackend, CachedMetaKvBackendBuilder, KvBackendCatalogManager, MetaKvBackend,
};
use client::{Client, Database, OutputData, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_base::Plugins;
use common_config::Mode;
use common_error::ext::ErrorExt;
use common_meta::cache_invalidator::MultiCacheInvalidator;
use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder};
use common_query::Output;
use common_recordbatch::RecordBatches;
use common_telemetry::debug;
@@ -38,12 +42,13 @@ use query::QueryEngine;
use rustyline::error::ReadlineError;
use rustyline::Editor;
use session::context::QueryContext;
use snafu::ResultExt;
use snafu::{OptionExt, ResultExt};
use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
use crate::cli::cmd::ReplCommand;
use crate::cli::helper::RustylineHelper;
use crate::cli::AttachCommand;
use crate::error;
use crate::error::{
CollectRecordBatchesSnafu, ParseSqlSnafu, PlanStatementSnafu, PrettyPrintRecordBatchesSnafu,
ReadlineSnafu, ReplCreationSnafu, RequestDatabaseSnafu, Result, StartMetaClientSnafu,
@@ -257,19 +262,42 @@ async fn create_query_engine(meta_addr: &str) -> Result<DatafusionQueryEngine> {
let cached_meta_backend =
Arc::new(CachedMetaKvBackendBuilder::new(meta_client.clone()).build());
let multi_cache_invalidator = Arc::new(MultiCacheInvalidator::with_invalidators(vec![
cached_meta_backend.clone(),
]));
let catalog_list = KvBackendCatalogManager::new(
let layered_cache_builder = LayeredCacheRegistryBuilder::default().add_cache_registry(
CacheRegistryBuilder::default()
.add_cache(cached_meta_backend.clone())
.build(),
);
let fundamental_cache_registry =
build_fundamental_cache_registry(Arc::new(MetaKvBackend::new(meta_client.clone())));
let layered_cache_registry = Arc::new(
with_default_composite_cache_registry(
layered_cache_builder.add_cache_registry(fundamental_cache_registry),
)
.context(error::BuildCacheRegistrySnafu)?
.build(),
);
let table_cache = layered_cache_registry
.get()
.context(error::CacheRequiredSnafu {
name: TABLE_CACHE_NAME,
})?;
let table_route_cache = layered_cache_registry
.get()
.context(error::CacheRequiredSnafu {
name: TABLE_ROUTE_CACHE_NAME,
})?;
let catalog_manager = KvBackendCatalogManager::new(
Mode::Distributed,
Some(meta_client.clone()),
cached_meta_backend.clone(),
multi_cache_invalidator,
table_cache,
table_route_cache,
)
.await;
let plugins: Plugins = Default::default();
let state = Arc::new(QueryEngineState::new(
catalog_list,
catalog_manager,
None,
None,
None,

View File

@@ -163,6 +163,15 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to request database, sql: {sql}"))]
RequestDatabase {
sql: String,
#[snafu(source)]
source: client::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to collect RecordBatches"))]
CollectRecordBatches {
#[snafu(implicit)]
@@ -354,6 +363,7 @@ impl ErrorExt for Error {
Error::ReplCreation { .. } | Error::Readline { .. } | Error::HttpQuerySql { .. } => {
StatusCode::Internal
}
Error::RequestDatabase { source, .. } => source.status_code(),
Error::CollectRecordBatches { source, .. }
| Error::PrettyPrintRecordBatches { source, .. } => source.status_code(),
Error::StartMetaClient { source, .. } => source.status_code(),

View File

@@ -111,7 +111,7 @@ tikv-jemalloc-ctl = { version = "0.5", features = ["use_std"] }
[dev-dependencies]
auth = { workspace = true, features = ["testing"] }
catalog = { workspace = true, features = ["testing"] }
client.workspace = true
client = { workspace = true, features = ["testing"] }
common-base.workspace = true
common-test-util.workspace = true
criterion = "0.4"
@@ -125,8 +125,6 @@ serde_json.workspace = true
session = { workspace = true, features = ["testing"] }
table.workspace = true
tempfile = "3.0.0"
# TODO depend `Database` client
tests-integration.workspace = true
tokio-postgres = "0.7"
tokio-postgres-rustls = "0.11"
tokio-test = "0.4"

View File

@@ -21,7 +21,7 @@ use arrow_flight::flight_service_server::{FlightService, FlightServiceServer};
use async_trait::async_trait;
use auth::tests::MockUserProvider;
use auth::UserProviderRef;
use client::{Client, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use client::{Client, Database, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_runtime::{Builder as RuntimeBuilder, Runtime};
use servers::error::{Result, StartGrpcSnafu, TcpBindSnafu};
use servers::grpc::flight::FlightCraftWrapper;
@@ -31,7 +31,6 @@ use servers::server::Server;
use snafu::ResultExt;
use table::test_util::MemTable;
use table::TableRef;
use tests_integration::database::Database;
use tokio::net::TcpListener;
use tokio_stream::wrappers::TcpListenerStream;
use tonic::codec::CompressionEncoding;

View File

@@ -15,7 +15,6 @@
#![feature(assert_matches)]
pub mod cluster;
pub mod database;
mod grpc;
mod influxdb;
mod instance;

View File

@@ -21,6 +21,7 @@ use std::time::Duration;
use auth::UserProviderRef;
use axum::Router;
use catalog::kvbackend::KvBackendCatalogManager;
use client::Database;
use common_base::secrets::ExposeSecret;
use common_config::Configurable;
use common_meta::key::catalog_name::CatalogNameKey;
@@ -56,7 +57,6 @@ use servers::tls::ReloadableTlsServerConfig;
use servers::Mode;
use session::context::QueryContext;
use crate::database::Database;
use crate::standalone::{GreptimeDbStandalone, GreptimeDbStandaloneBuilder};
pub const PEER_PLACEHOLDER_ADDR: &str = "127.0.0.1:3001";

View File

@@ -22,7 +22,7 @@ use api::v1::{
PromqlRequest, RequestHeader, SemanticType,
};
use auth::user_provider_from_option;
use client::{Client, OutputData, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use client::{Client, Database, OutputData, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_catalog::consts::MITO_ENGINE;
use common_grpc::channel_manager::ClientTlsOption;
use common_query::Output;
@@ -36,7 +36,6 @@ use servers::http::prometheus::{
};
use servers::server::Server;
use servers::tls::{TlsMode, TlsOption};
use tests_integration::database::Database;
use tests_integration::test_util::{
setup_grpc_server, setup_grpc_server_with, setup_grpc_server_with_user_provider, StorageType,
};

View File

@@ -10,7 +10,7 @@ workspace = true
[dependencies]
async-trait = "0.1"
clap.workspace = true
client.workspace = true
client = { workspace = true, features = ["testing"] }
common-error.workspace = true
common-query.workspace = true
common-recordbatch.workspace = true
@@ -19,7 +19,5 @@ serde.workspace = true
serde_json.workspace = true
sqlness = { version = "0.5" }
tempfile.workspace = true
# TODO depend `Database` client
tests-integration.workspace = true
tinytemplate = "1.2"
tokio.workspace = true

View File

@@ -24,13 +24,14 @@ use std::time::Duration;
use async_trait::async_trait;
use client::error::ServerSnafu;
use client::{Client, Error as ClientError, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use client::{
Client, Database as DB, Error as ClientError, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME,
};
use common_error::ext::ErrorExt;
use common_query::{Output, OutputData};
use common_recordbatch::RecordBatches;
use serde::Serialize;
use sqlness::{Database, EnvController, QueryContext};
use tests_integration::database::Database as DB;
use tinytemplate::TinyTemplate;
use tokio::sync::Mutex as TokioMutex;