Compare commits

...

1 Commits

Author SHA1 Message Date
Discord9
4b2f274971 feat: resend row insert 2023-12-28 11:13:01 +08:00
4 changed files with 27 additions and 1 deletions

View File

@@ -23,7 +23,7 @@ use paste::paste;
use crate::{Builder, JoinHandle, Runtime};
const READ_WORKERS: usize = 8;
const WRITE_WORKERS: usize = 8;
const WRITE_WORKERS: usize = 16;
const BG_WORKERS: usize = 8;
pub fn create_runtime(runtime_name: &str, thread_name: &str, worker_threads: usize) -> Runtime {

View File

@@ -24,6 +24,7 @@ pub mod standalone;
use std::sync::Arc;
use api::v1::RowInsertRequests;
use api::v1::meta::Role;
use async_trait::async_trait;
use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
@@ -75,6 +76,7 @@ use sql::statements::copy::CopyTable;
use sql::statements::statement::Statement;
use sqlparser::ast::ObjectName;
pub use standalone::StandaloneDatanodeManager;
use tokio::sync::mpsc::UnboundedSender;
use crate::error::{
self, Error, ExecLogicalPlanSnafu, ExecutePromqlSnafu, ExternalSnafu, ParseSqlSnafu,
@@ -119,6 +121,7 @@ pub struct Instance {
inserter: InserterRef,
deleter: DeleterRef,
export_metrics_task: Option<ExportMetricsTask>,
proxy_sender: Option<UnboundedSender<RowInsertRequests>>
}
impl Instance {

View File

@@ -15,6 +15,8 @@
use std::collections::HashMap;
use std::sync::Arc;
use api::v1::greptime_database_client::GreptimeDatabaseClient;
use api::v1::GreptimeRequest;
use catalog::kvbackend::KvBackendCatalogManager;
use common_base::Plugins;
use common_meta::cache_invalidator::{CacheInvalidatorRef, DummyCacheInvalidator};
@@ -27,6 +29,7 @@ use operator::statement::StatementExecutor;
use operator::table::TableMutationOperator;
use partition::manager::PartitionRuleManager;
use query::QueryEngineFactory;
use tokio::sync::mpsc::unbounded_channel;
use crate::error::Result;
use crate::heartbeat::HeartbeatTask;
@@ -134,6 +137,20 @@ impl FrontendBuilder {
plugins.insert::<StatementExecutorRef>(statement_executor.clone());
let addr = std::env::var("FLOW_ADDR").unwrap_or("http://[::1]:14514".to_string());
let (send, mut recv) = unbounded_channel();
tokio::spawn(async move {
let conn = tonic::transport::Endpoint::new(addr)
.unwrap()
.connect_lazy();
let mut client = GreptimeDatabaseClient::new(conn);
let mut outer_request = GreptimeRequest::default();
while let Some(request) = recv.recv().await {
outer_request.request =
Some(api::v1::greptime_request::Request::RowInserts(request));
let _response = client.handle(outer_request.clone()).await.unwrap();
}
});
Ok(Instance {
catalog_manager,
script_executor,
@@ -145,6 +162,7 @@ impl FrontendBuilder {
inserter,
deleter,
export_metrics_task: None,
proxy_sender: Some(send),
})
}
}

View File

@@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::time::Instant;
use api::v1::ddl_request::{Expr as DdlExpr, Expr};
use api::v1::greptime_request::Request;
use api::v1::query_request::Query;
@@ -20,6 +22,7 @@ use async_trait::async_trait;
use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
use common_meta::table_name::TableName;
use common_query::Output;
use common_telemetry::info;
use query::parser::PromQuery;
use servers::interceptor::{GrpcQueryInterceptor, GrpcQueryInterceptorRef};
use servers::query_handler::grpc::GrpcQueryHandler;
@@ -189,6 +192,8 @@ impl Instance {
requests: RowInsertRequests,
ctx: QueryContextRef,
) -> Result<Output> {
self.proxy_sender.as_ref().map(|s|s.send(requests.clone()));
self.inserter
.handle_row_inserts(requests, ctx, self.statement_executor.as_ref())
.await