diff --git a/src/common/runtime/src/global.rs b/src/common/runtime/src/global.rs index 51bad13107..bce164f0e0 100644 --- a/src/common/runtime/src/global.rs +++ b/src/common/runtime/src/global.rs @@ -23,7 +23,7 @@ use paste::paste; use crate::{Builder, JoinHandle, Runtime}; const READ_WORKERS: usize = 8; -const WRITE_WORKERS: usize = 8; +const WRITE_WORKERS: usize = 16; const BG_WORKERS: usize = 8; pub fn create_runtime(runtime_name: &str, thread_name: &str, worker_threads: usize) -> Runtime { diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 59e6a7b4b2..9655d6d6e5 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -24,6 +24,7 @@ pub mod standalone; use std::sync::Arc; +use api::v1::RowInsertRequests; use api::v1::meta::Role; use async_trait::async_trait; use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq}; @@ -75,6 +76,7 @@ use sql::statements::copy::CopyTable; use sql::statements::statement::Statement; use sqlparser::ast::ObjectName; pub use standalone::StandaloneDatanodeManager; +use tokio::sync::mpsc::UnboundedSender; use crate::error::{ self, Error, ExecLogicalPlanSnafu, ExecutePromqlSnafu, ExternalSnafu, ParseSqlSnafu, @@ -119,6 +121,7 @@ pub struct Instance { inserter: InserterRef, deleter: DeleterRef, export_metrics_task: Option, + proxy_sender: Option> } impl Instance { diff --git a/src/frontend/src/instance/builder.rs b/src/frontend/src/instance/builder.rs index 15711f9a7b..b0cc03fcdd 100644 --- a/src/frontend/src/instance/builder.rs +++ b/src/frontend/src/instance/builder.rs @@ -15,6 +15,8 @@ use std::collections::HashMap; use std::sync::Arc; +use api::v1::greptime_database_client::GreptimeDatabaseClient; +use api::v1::GreptimeRequest; use catalog::kvbackend::KvBackendCatalogManager; use common_base::Plugins; use common_meta::cache_invalidator::{CacheInvalidatorRef, DummyCacheInvalidator}; @@ -27,6 +29,7 @@ use operator::statement::StatementExecutor; use operator::table::TableMutationOperator; use partition::manager::PartitionRuleManager; use query::QueryEngineFactory; +use tokio::sync::mpsc::unbounded_channel; use crate::error::Result; use crate::heartbeat::HeartbeatTask; @@ -134,6 +137,20 @@ impl FrontendBuilder { plugins.insert::(statement_executor.clone()); + let addr = std::env::var("FLOW_ADDR").unwrap_or("http://[::1]:14514".to_string()); + let (send, mut recv) = unbounded_channel(); + tokio::spawn(async move { + let conn = tonic::transport::Endpoint::new(addr) + .unwrap() + .connect_lazy(); + let mut client = GreptimeDatabaseClient::new(conn); + let mut outer_request = GreptimeRequest::default(); + while let Some(request) = recv.recv().await { + outer_request.request = + Some(api::v1::greptime_request::Request::RowInserts(request)); + let _response = client.handle(outer_request.clone()).await.unwrap(); + } + }); Ok(Instance { catalog_manager, script_executor, @@ -145,6 +162,7 @@ impl FrontendBuilder { inserter, deleter, export_metrics_task: None, + proxy_sender: Some(send), }) } } diff --git a/src/frontend/src/instance/grpc.rs b/src/frontend/src/instance/grpc.rs index 2ba67483d2..3601c7ed4a 100644 --- a/src/frontend/src/instance/grpc.rs +++ b/src/frontend/src/instance/grpc.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::time::Instant; + use api::v1::ddl_request::{Expr as DdlExpr, Expr}; use api::v1::greptime_request::Request; use api::v1::query_request::Query; @@ -20,6 +22,7 @@ use async_trait::async_trait; use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq}; use common_meta::table_name::TableName; use common_query::Output; +use common_telemetry::info; use query::parser::PromQuery; use servers::interceptor::{GrpcQueryInterceptor, GrpcQueryInterceptorRef}; use servers::query_handler::grpc::GrpcQueryHandler; @@ -189,6 +192,8 @@ impl Instance { requests: RowInsertRequests, ctx: QueryContextRef, ) -> Result { + self.proxy_sender.as_ref().map(|s|s.send(requests.clone())); + self.inserter .handle_row_inserts(requests, ctx, self.statement_executor.as_ref()) .await