poc/create-alter-for-metrics:

### Add Object Store Integration and Access Layer Factory

 - **Cargo.lock, Cargo.toml**: Added `object-store` as a dependency to integrate object storage capabilities.
 - **frontend.rs, instance.rs, builder.rs, server.rs**: Introduced `ObjectStoreConfig` and `AccessLayerFactory` to manage object storage configurations and access layers.
 - **access_layer.rs**: Made `AccessLayerFactory` clonable and its constructor public to facilitate object store access layer creation.
 - **prom_store.rs**: Updated `PromBulkState` to include `AccessLayerFactory` and modified `remote_write` to utilize the access layer for processing requests.
 - **lib.rs**: Made `access_layer` module public to expose access layer functionalities.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
This commit is contained in:
Lei, HUANG
2025-06-27 08:50:25 +00:00
parent 3d81a17360
commit ecbf372de3
10 changed files with 41 additions and 5 deletions

1
Cargo.lock generated
View File

@@ -4738,6 +4738,7 @@ dependencies = [
"log-store",
"meta-client",
"num_cpus",
"object-store",
"opentelemetry-proto 0.27.0",
"operator",
"otel-arrow-rust",

View File

@@ -49,6 +49,7 @@ log-query.workspace = true
log-store.workspace = true
meta-client.workspace = true
num_cpus.workspace = true
object-store.workspace = true
opentelemetry-proto.workspace = true
operator.workspace = true
otel-arrow-rust.workspace = true

View File

@@ -19,6 +19,7 @@ use common_config::config::Configurable;
use common_options::datanode::DatanodeClientOptions;
use common_telemetry::logging::{LoggingOptions, SlowQueryOptions, TracingOptions};
use meta_client::MetaClientOptions;
use object_store::config::ObjectStoreConfig;
use query::options::QueryOptions;
use serde::{Deserialize, Serialize};
use servers::export_metrics::{ExportMetricsOption, ExportMetricsTask};
@@ -62,6 +63,7 @@ pub struct FrontendOptions {
pub query: QueryOptions,
pub max_in_flight_write_bytes: Option<ReadableSize>,
pub slow_query: Option<SlowQueryOptions>,
pub store: ObjectStoreConfig,
}
impl Default for FrontendOptions {
@@ -88,6 +90,7 @@ impl Default for FrontendOptions {
query: QueryOptions::default(),
max_in_flight_write_bytes: None,
slow_query: Some(SlowQueryOptions::default()),
store: ObjectStoreConfig::default(),
}
}
}

View File

@@ -61,6 +61,7 @@ use query::parser::{PromQuery, QueryLanguageParser, QueryStatement};
use query::query_engine::options::{validate_catalog_and_schema, QueryOptions};
use query::query_engine::DescribeResult;
use query::QueryEngineRef;
use servers::access_layer::AccessLayerFactory;
use servers::error as server_error;
use servers::error::{AuthSnafu, ExecuteQuerySnafu, ParsePromQLSnafu};
use servers::interceptor::{
@@ -103,6 +104,7 @@ pub struct Instance {
slow_query_recorder: Option<SlowQueryRecorder>,
limiter: Option<LimiterRef>,
process_manager: ProcessManagerRef,
access_layer_factory: AccessLayerFactory,
}
impl Instance {
@@ -181,6 +183,10 @@ impl Instance {
pub fn node_manager(&self) -> &NodeManagerRef {
self.inserter.node_manager()
}
pub fn access_layer_factory(&self) -> &AccessLayerFactory {
&self.access_layer_factory
}
}
fn parse_stmt(sql: &str, dialect: &(dyn Dialect + Send + Sync)) -> Result<Vec<Statement>> {

View File

@@ -37,6 +37,7 @@ use partition::manager::PartitionRuleManager;
use pipeline::pipeline_operator::PipelineOperator;
use query::region_query::RegionQueryHandlerFactoryRef;
use query::QueryEngineFactory;
use servers::access_layer::AccessLayerFactory;
use snafu::OptionExt;
use crate::error::{self, Result};
@@ -219,6 +220,7 @@ impl FrontendBuilder {
Arc::new(Limiter::new(max_in_flight_write_bytes.as_bytes()))
});
let access_layer_factory = AccessLayerFactory::new(&self.options.store).await.unwrap();
Ok(Instance {
catalog_manager: self.catalog_manager,
pipeline_operator,
@@ -231,6 +233,7 @@ impl FrontendBuilder {
slow_query_recorder,
limiter,
process_manager,
access_layer_factory,
})
}
}

View File

@@ -101,6 +101,7 @@ where
schema_helper: self.instance.create_schema_helper(),
partition_manager: self.instance.partition_manager().clone(),
node_manager: self.instance.node_manager().clone(),
access_layer_factory: self.instance.access_layer_factory().clone(),
})
} else {
None

View File

@@ -36,6 +36,7 @@ chrono.workspace = true
common-base.workspace = true
common-catalog.workspace = true
common-config.workspace = true
common-datasource.workspace = true
common-error.workspace = true
common-frontend.workspace = true
common-grpc.workspace = true
@@ -77,15 +78,14 @@ loki-proto.workspace = true
metric-engine.workspace = true
mime_guess = "2.0"
mito-codec.workspace = true
mito2.workspace = true
notify.workspace = true
object-pool = "0.5"
object-store.workspace = true
common-datasource.workspace = true
mito2.workspace = true
parquet.workspace = true
once_cell.workspace = true
openmetrics-parser = "0.4"
operator.workspace = true
parquet.workspace = true
partition.workspace = true
simd-json.workspace = true
socket2 = "0.5"

View File

@@ -37,12 +37,13 @@ use crate::error;
type AsyncParquetWriter = AsyncArrowWriter<AsyncWriter>;
#[derive(Clone)]
pub struct AccessLayerFactory {
object_store: ObjectStore,
}
impl AccessLayerFactory {
async fn new(config: &ObjectStoreConfig) -> error::Result<AccessLayerFactory> {
pub async fn new(config: &ObjectStoreConfig) -> error::Result<AccessLayerFactory> {
let object_store = object_store::factory::new_raw_object_store(config, "")
.await
.context(error::ObjectStoreSnafu)?;

View File

@@ -62,6 +62,7 @@ pub struct PromBulkState {
pub schema_helper: SchemaHelper,
pub partition_manager: PartitionRuleManagerRef,
pub node_manager: NodeManagerRef,
pub access_layer_factory: AccessLayerFactory,
}
#[derive(Clone)]
@@ -146,6 +147,25 @@ pub async fn remote_write(
processor.set_pipeline(pipeline_handler, query_ctx.clone(), pipeline_def);
}
if let Some(state) = state.bulk_state {
let context = PromBulkContext {
schema_helper: state.schema_helper,
query_ctx: query_ctx.clone(),
partition_manager: state.partition_manager,
node_manager: state.node_manager,
access_layer_factory: state.access_layer_factory,
};
decode_remote_write_request_to_batch(
is_zstd,
body,
prom_validation_mode,
&mut processor,
context,
)
.await?;
return Ok((StatusCode::NO_CONTENT, write_cost_header_map(0)).into_response());
}
let req =
decode_remote_write_request(is_zstd, body, prom_validation_mode, &mut processor).await?;

View File

@@ -21,7 +21,7 @@
use datafusion_expr::LogicalPlan;
use datatypes::schema::Schema;
mod access_layer;
pub mod access_layer;
pub mod addrs;
#[allow(dead_code)]
mod batch_builder;