diff --git a/src/catalog/src/information_schema.rs b/src/catalog/src/information_schema.rs index ec118f942c..484529cd57 100644 --- a/src/catalog/src/information_schema.rs +++ b/src/catalog/src/information_schema.rs @@ -20,7 +20,7 @@ use std::sync::{Arc, Weak}; use common_catalog::consts::INFORMATION_SCHEMA_NAME; use common_error::ext::BoxedError; -use common_recordbatch::{RecordBatchStreamAdaptor, SendableRecordBatchStream}; +use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream}; use datatypes::schema::SchemaRef; use futures_util::StreamExt; use snafu::ResultExt; @@ -171,7 +171,7 @@ impl DataSource for InformationTableDataSource { None => batch, }); - let stream = RecordBatchStreamAdaptor { + let stream = RecordBatchStreamWrapper { schema: projected_schema, stream: Box::pin(stream), output_ordering: None, diff --git a/src/client/src/database.rs b/src/client/src/database.rs index ebbb4fa60a..4060cc4797 100644 --- a/src/client/src/database.rs +++ b/src/client/src/database.rs @@ -27,7 +27,7 @@ use common_error::ext::{BoxedError, ErrorExt}; use common_grpc::flight::{FlightDecoder, FlightMessage}; use common_query::Output; use common_recordbatch::error::ExternalSnafu; -use common_recordbatch::RecordBatchStreamAdaptor; +use common_recordbatch::RecordBatchStreamWrapper; use common_telemetry::logging; use common_telemetry::tracing_context::W3cTrace; use futures_util::StreamExt; @@ -315,7 +315,7 @@ impl Database { yield Ok(record_batch); } })); - let record_batch_stream = RecordBatchStreamAdaptor { + let record_batch_stream = RecordBatchStreamWrapper { schema, stream, output_ordering: None, diff --git a/src/client/src/region.rs b/src/client/src/region.rs index 95bef40b2a..3967c23ed0 100644 --- a/src/client/src/region.rs +++ b/src/client/src/region.rs @@ -23,7 +23,7 @@ use common_grpc::flight::{FlightDecoder, FlightMessage}; use common_meta::datanode_manager::{AffectedRows, Datanode}; use common_meta::error::{self as meta_error, Result as MetaResult}; use common_recordbatch::error::ExternalSnafu; -use common_recordbatch::{RecordBatchStreamAdaptor, SendableRecordBatchStream}; +use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream}; use common_telemetry::error; use prost::Message; use snafu::{location, Location, OptionExt, ResultExt}; @@ -136,7 +136,7 @@ impl RegionRequester { yield Ok(record_batch); } })); - let record_batch_stream = RecordBatchStreamAdaptor { + let record_batch_stream = RecordBatchStreamWrapper { schema, stream, output_ordering: None, diff --git a/src/common/recordbatch/src/lib.rs b/src/common/recordbatch/src/lib.rs index 0e34dc4cff..1b0d33f915 100644 --- a/src/common/recordbatch/src/lib.rs +++ b/src/common/recordbatch/src/lib.rs @@ -202,16 +202,16 @@ impl Stream for SimpleRecordBatchStream { } /// Adapt a [Stream] of [RecordBatch] to a [RecordBatchStream]. -pub struct RecordBatchStreamAdaptor { +pub struct RecordBatchStreamWrapper { pub schema: SchemaRef, pub stream: S, pub output_ordering: Option>, } -impl RecordBatchStreamAdaptor { - /// Creates a RecordBatchStreamAdaptor without output ordering requirement. - pub fn new(schema: SchemaRef, stream: S) -> RecordBatchStreamAdaptor { - RecordBatchStreamAdaptor { +impl RecordBatchStreamWrapper { + /// Creates a [RecordBatchStreamWrapper] without output ordering requirement. + pub fn new(schema: SchemaRef, stream: S) -> RecordBatchStreamWrapper { + RecordBatchStreamWrapper { schema, stream, output_ordering: None, @@ -220,7 +220,7 @@ impl RecordBatchStreamAdaptor { } impl> + Unpin> RecordBatchStream - for RecordBatchStreamAdaptor + for RecordBatchStreamWrapper { fn schema(&self) -> SchemaRef { self.schema.clone() @@ -231,7 +231,7 @@ impl> + Unpin> RecordBatchStream } } -impl> + Unpin> Stream for RecordBatchStreamAdaptor { +impl> + Unpin> Stream for RecordBatchStreamWrapper { type Item = Result; fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll> { diff --git a/src/frontend/src/server.rs b/src/frontend/src/server.rs index 5831a9ab32..712bbce0b6 100644 --- a/src/frontend/src/server.rs +++ b/src/frontend/src/server.rs @@ -25,8 +25,8 @@ use servers::metrics_handler::MetricsHandler; use servers::mysql::server::{MysqlServer, MysqlSpawnConfig, MysqlSpawnRef}; use servers::opentsdb::OpentsdbServer; use servers::postgres::PostgresServer; -use servers::query_handler::grpc::ServerGrpcQueryHandlerAdaptor; -use servers::query_handler::sql::ServerSqlQueryHandlerAdaptor; +use servers::query_handler::grpc::ServerGrpcQueryHandlerAdapter; +use servers::query_handler::sql::ServerSqlQueryHandlerAdapter; use servers::server::{Server, ServerHandler, ServerHandlers}; use snafu::ResultExt; @@ -70,7 +70,7 @@ impl Services { }; let grpc_server = GrpcServer::new( Some(grpc_config), - Some(ServerGrpcQueryHandlerAdaptor::arc(instance.clone())), + Some(ServerGrpcQueryHandlerAdapter::arc(instance.clone())), Some(instance.clone()), None, None, @@ -88,8 +88,8 @@ impl Services { let mut http_server_builder = HttpServerBuilder::new(http_options.clone()); let _ = http_server_builder - .with_sql_handler(ServerSqlQueryHandlerAdaptor::arc(instance.clone())) - .with_grpc_handler(ServerGrpcQueryHandlerAdaptor::arc(instance.clone())); + .with_sql_handler(ServerSqlQueryHandlerAdapter::arc(instance.clone())) + .with_grpc_handler(ServerGrpcQueryHandlerAdapter::arc(instance.clone())); if let Some(user_provider) = user_provider.clone() { let _ = http_server_builder.with_user_provider(user_provider); @@ -137,7 +137,7 @@ impl Services { let mysql_server = MysqlServer::create_server( mysql_io_runtime, Arc::new(MysqlSpawnRef::new( - ServerSqlQueryHandlerAdaptor::arc(instance.clone()), + ServerSqlQueryHandlerAdapter::arc(instance.clone()), user_provider.clone(), )), Arc::new(MysqlSpawnConfig::new( @@ -167,7 +167,7 @@ impl Services { ); let pg_server = Box::new(PostgresServer::new( - ServerSqlQueryHandlerAdaptor::arc(instance.clone()), + ServerSqlQueryHandlerAdapter::arc(instance.clone()), opts.tls.clone(), pg_io_runtime, user_provider.clone(), diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index 31956a0c2d..f963568cad 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -20,7 +20,7 @@ use std::time::{Duration, Instant}; use async_stream::try_stream; use common_error::ext::BoxedError; use common_recordbatch::error::ExternalSnafu; -use common_recordbatch::{RecordBatch, RecordBatchStreamAdaptor, SendableRecordBatchStream}; +use common_recordbatch::{RecordBatch, RecordBatchStreamWrapper, SendableRecordBatchStream}; use common_telemetry::{debug, error}; use common_time::range::TimestampRange; use snafu::ResultExt; @@ -164,7 +164,7 @@ impl SeqScan { // Update metrics. READ_STAGE_ELAPSED.with_label_values(&["total"]).observe(metrics.scan_cost.as_secs_f64()); }; - let stream = Box::pin(RecordBatchStreamAdaptor::new( + let stream = Box::pin(RecordBatchStreamWrapper::new( self.mapper.output_schema(), Box::pin(stream), )); diff --git a/src/query/src/dist_plan/merge_scan.rs b/src/query/src/dist_plan/merge_scan.rs index 845e7b813d..80f1492d45 100644 --- a/src/query/src/dist_plan/merge_scan.rs +++ b/src/query/src/dist_plan/merge_scan.rs @@ -25,7 +25,7 @@ use common_query::physical_plan::TaskContext; use common_recordbatch::adapter::DfRecordBatchStreamAdapter; use common_recordbatch::error::ExternalSnafu; use common_recordbatch::{ - DfSendableRecordBatchStream, RecordBatch, RecordBatchStreamAdaptor, SendableRecordBatchStream, + DfSendableRecordBatchStream, RecordBatch, RecordBatchStreamWrapper, SendableRecordBatchStream, }; use common_telemetry::tracing; use common_telemetry::tracing_context::TracingContext; @@ -217,7 +217,7 @@ impl MergeScanExec { } })); - Ok(Box::pin(RecordBatchStreamAdaptor { + Ok(Box::pin(RecordBatchStreamWrapper { schema: self.schema.clone(), stream, output_ordering: None, diff --git a/src/servers/src/http.rs b/src/servers/src/http.rs index 9fdbad5237..864ed652ca 100644 --- a/src/servers/src/http.rs +++ b/src/servers/src/http.rs @@ -802,8 +802,8 @@ mod test { use super::*; use crate::error::Error; - use crate::query_handler::grpc::{GrpcQueryHandler, ServerGrpcQueryHandlerAdaptor}; - use crate::query_handler::sql::{ServerSqlQueryHandlerAdaptor, SqlQueryHandler}; + use crate::query_handler::grpc::{GrpcQueryHandler, ServerGrpcQueryHandlerAdapter}; + use crate::query_handler::sql::{ServerSqlQueryHandlerAdapter, SqlQueryHandler}; struct DummyInstance { _tx: mpsc::Sender<(String, Vec)>, @@ -869,8 +869,8 @@ mod test { fn make_test_app(tx: mpsc::Sender<(String, Vec)>) -> Router { let instance = Arc::new(DummyInstance { _tx: tx }); - let sql_instance = ServerSqlQueryHandlerAdaptor::arc(instance.clone()); - let grpc_instance = ServerGrpcQueryHandlerAdaptor::arc(instance); + let sql_instance = ServerSqlQueryHandlerAdapter::arc(instance.clone()); + let grpc_instance = ServerGrpcQueryHandlerAdapter::arc(instance); let server = HttpServerBuilder::new(HttpOptions::default()) .with_sql_handler(sql_instance) .with_grpc_handler(grpc_instance) diff --git a/src/servers/src/query_handler/grpc.rs b/src/servers/src/query_handler/grpc.rs index ce99c31dde..01464012d6 100644 --- a/src/servers/src/query_handler/grpc.rs +++ b/src/servers/src/query_handler/grpc.rs @@ -37,16 +37,16 @@ pub trait GrpcQueryHandler { ) -> std::result::Result; } -pub struct ServerGrpcQueryHandlerAdaptor(GrpcQueryHandlerRef); +pub struct ServerGrpcQueryHandlerAdapter(GrpcQueryHandlerRef); -impl ServerGrpcQueryHandlerAdaptor { +impl ServerGrpcQueryHandlerAdapter { pub fn arc(handler: GrpcQueryHandlerRef) -> Arc { Arc::new(Self(handler)) } } #[async_trait] -impl GrpcQueryHandler for ServerGrpcQueryHandlerAdaptor +impl GrpcQueryHandler for ServerGrpcQueryHandlerAdapter where E: ErrorExt + Send + Sync + 'static, { diff --git a/src/servers/src/query_handler/sql.rs b/src/servers/src/query_handler/sql.rs index cc788ad272..79e63f86e7 100644 --- a/src/servers/src/query_handler/sql.rs +++ b/src/servers/src/query_handler/sql.rs @@ -64,16 +64,16 @@ pub trait SqlQueryHandler { ) -> std::result::Result; } -pub struct ServerSqlQueryHandlerAdaptor(SqlQueryHandlerRef); +pub struct ServerSqlQueryHandlerAdapter(SqlQueryHandlerRef); -impl ServerSqlQueryHandlerAdaptor { +impl ServerSqlQueryHandlerAdapter { pub fn arc(handler: SqlQueryHandlerRef) -> Arc { Arc::new(Self(handler)) } } #[async_trait] -impl SqlQueryHandler for ServerSqlQueryHandlerAdaptor +impl SqlQueryHandler for ServerSqlQueryHandlerAdapter where E: ErrorExt + Send + Sync + 'static, { diff --git a/tests-integration/src/test_util.rs b/tests-integration/src/test_util.rs index ad69c40ada..6bb91b89eb 100644 --- a/tests-integration/src/test_util.rs +++ b/tests-integration/src/test_util.rs @@ -45,8 +45,8 @@ use servers::http::{HttpOptions, HttpServerBuilder}; use servers::metrics_handler::MetricsHandler; use servers::mysql::server::{MysqlServer, MysqlSpawnConfig, MysqlSpawnRef}; use servers::postgres::PostgresServer; -use servers::query_handler::grpc::ServerGrpcQueryHandlerAdaptor; -use servers::query_handler::sql::{ServerSqlQueryHandlerAdaptor, SqlQueryHandler}; +use servers::query_handler::grpc::ServerGrpcQueryHandlerAdapter; +use servers::query_handler::sql::{ServerSqlQueryHandlerAdapter, SqlQueryHandler}; use servers::server::Server; use servers::Mode; use session::context::QueryContext; @@ -378,8 +378,8 @@ pub async fn setup_test_http_app(store_type: StorageType, name: &str) -> (Router ..Default::default() }; let http_server = HttpServerBuilder::new(http_opts) - .with_sql_handler(ServerSqlQueryHandlerAdaptor::arc(instance.instance.clone())) - .with_grpc_handler(ServerGrpcQueryHandlerAdaptor::arc( + .with_sql_handler(ServerSqlQueryHandlerAdapter::arc(instance.instance.clone())) + .with_grpc_handler(ServerGrpcQueryHandlerAdapter::arc( instance.instance.clone(), )) .with_metrics_handler(MetricsHandler) @@ -412,8 +412,8 @@ pub async fn setup_test_http_app_with_frontend_and_user_provider( let mut http_server = HttpServerBuilder::new(http_opts); http_server - .with_sql_handler(ServerSqlQueryHandlerAdaptor::arc(instance.instance.clone())) - .with_grpc_handler(ServerGrpcQueryHandlerAdaptor::arc( + .with_sql_handler(ServerSqlQueryHandlerAdapter::arc(instance.instance.clone())) + .with_grpc_handler(ServerGrpcQueryHandlerAdapter::arc( instance.instance.clone(), )) .with_script_handler(instance.instance.clone()) @@ -449,8 +449,8 @@ pub async fn setup_test_prom_app_with_frontend( }; let frontend_ref = instance.instance.clone(); let http_server = HttpServerBuilder::new(http_opts) - .with_sql_handler(ServerSqlQueryHandlerAdaptor::arc(frontend_ref.clone())) - .with_grpc_handler(ServerGrpcQueryHandlerAdaptor::arc(frontend_ref.clone())) + .with_sql_handler(ServerSqlQueryHandlerAdapter::arc(frontend_ref.clone())) + .with_grpc_handler(ServerGrpcQueryHandlerAdapter::arc(frontend_ref.clone())) .with_script_handler(frontend_ref.clone()) .with_prom_handler(frontend_ref.clone()) .with_prometheus_handler(frontend_ref) @@ -493,14 +493,14 @@ pub async fn setup_grpc_server_with( let fe_instance_ref = instance.instance.clone(); let flight_handler = Arc::new(GreptimeRequestHandler::new( - ServerGrpcQueryHandlerAdaptor::arc(fe_instance_ref.clone()), + ServerGrpcQueryHandlerAdapter::arc(fe_instance_ref.clone()), user_provider.clone(), runtime.clone(), )); let fe_grpc_server = Arc::new(GrpcServer::new( grpc_config, - Some(ServerGrpcQueryHandlerAdaptor::arc(fe_instance_ref.clone())), + Some(ServerGrpcQueryHandlerAdapter::arc(fe_instance_ref.clone())), Some(fe_instance_ref.clone()), Some(flight_handler), None, @@ -563,7 +563,7 @@ pub async fn setup_mysql_server_with_user_provider( let fe_mysql_server = Arc::new(MysqlServer::create_server( runtime, Arc::new(MysqlSpawnRef::new( - ServerSqlQueryHandlerAdaptor::arc(fe_instance_ref), + ServerSqlQueryHandlerAdapter::arc(fe_instance_ref), user_provider, )), Arc::new(MysqlSpawnConfig::new( @@ -615,7 +615,7 @@ pub async fn setup_pg_server_with_user_provider( ..Default::default() }; let fe_pg_server = Arc::new(Box::new(PostgresServer::new( - ServerSqlQueryHandlerAdaptor::arc(fe_instance_ref), + ServerSqlQueryHandlerAdapter::arc(fe_instance_ref), opts.tls.clone(), runtime, user_provider,