Compare commits

...

8 Commits

Author SHA1 Message Date
liyang
01fdbf3626 chore: upgrade 0.4.2 (#2644) 2023-10-24 12:21:58 +08:00
Lei, HUANG
97897aaf9b fix: predicate shall use real schema to create physical exprs (#2642)
* fix: prune predicate show use real schema to create physical exprs

* refactor: remove redundant results

* fix: unit tests

* test: add more sqlness cases

* test: add more sqlness cases

* fix: sqlness orderby

* chore: update log

* fix: cache physical expr in memtable iter

---------

Co-authored-by: Yingwen <realevenyag@gmail.com>
2023-10-24 03:41:25 +00:00
Wei
1fc42a681f refactor: create_or_open always set writable (#2641)
feat: set opened region writable
2023-10-23 10:32:51 +00:00
Wei
fbc8f56eaa feat: lookup manifest file size (#2590)
* feat: get manifest file size

* feat: manifest size statistics

* refactor: manifest map key

* chore: comment and unit test

* chore: remove no-use function

* chore: change style

* Apply suggestions from code review

Co-authored-by: Yingwen <realevenyag@gmail.com>

* chore: cr comment

* chore: cr comment

* chore: cr comment

* chore: cr comment

---------

Co-authored-by: Yingwen <realevenyag@gmail.com>
2023-10-23 08:59:00 +00:00
yuanbohan
44280f7c9d feat(otlp): initial OTLP trace support (#2627)
* feat: otlp tracing framework via http

* feat: otlp trace transformer plugin

* feat: successfully write traces into db

* chore: plugin to parse request

* test: helper functions

* feat: parse_request_to_spans function

* chore: remove implicite calling parse in PraceParser

* chore: fix clippy

* chore: add TODO marker for span fields

* refactor TraceParser trait

* refactor TraceParser trait

* table_name method in OTLP TraceParser trait

* fix: approximate row, column count

* chore: function signature without row

* chore: do not clone by moving span.kind upper

* docs for parse and to_grpc_insert_requests

---------

Co-authored-by: fys <fengys1996@gmail.com>
Co-authored-by: fys <40801205+fengys1996@users.noreply.github.com>
2023-10-23 06:37:43 +00:00
Ning Sun
0fbde48655 feat: hide internal error and unknown error message from end user (#2544)
* feat: use fixed error message for unknown error

* feat: return fixed message for internal error as well

* chore: include status code in error message

* test: update tests for asserts of error message

* feat: change status code of some datafusion error

* fix: make CollectRecordbatch an query error

* test: update sqlness results
2023-10-23 03:07:35 +00:00
Niwaka
9dcfd28f61 feat: impl ObjectStoreManager for custom_storage (#2621)
* feat: impl ObjectStoreManager for custom_storage

* fix: rename object_store_manager to manager

* fix: rename global to default

* chore: add document for ObjectStoreManager

* refactor: simplify default_object_store

* fix: address review
2023-10-23 03:00:29 +00:00
Yingwen
82dbc3e1ae feat(mito): Ports InMemoryRowGroup from parquet crate (#2633)
* feat: ports InMemoryRowGroup from parquet

* chore: pub InMemoryRowGroup

* style: allow some clippy lints
2023-10-23 02:22:19 +00:00
46 changed files with 2295 additions and 854 deletions

119
Cargo.lock generated
View File

@@ -204,7 +204,7 @@ checksum = "8f1f8f5a6f3d50d89e3797d7593a50f96bb2aaa20ca0cc7be1fb673232c91d72"
[[package]]
name = "api"
version = "0.4.1"
version = "0.4.2"
dependencies = [
"common-base",
"common-error",
@@ -666,7 +666,7 @@ dependencies = [
[[package]]
name = "auth"
version = "0.4.1"
version = "0.4.2"
dependencies = [
"api",
"async-trait",
@@ -839,7 +839,7 @@ dependencies = [
[[package]]
name = "benchmarks"
version = "0.4.1"
version = "0.4.2"
dependencies = [
"arrow",
"chrono",
@@ -1222,7 +1222,7 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5"
[[package]]
name = "catalog"
version = "0.4.1"
version = "0.4.2"
dependencies = [
"api",
"arc-swap",
@@ -1506,7 +1506,7 @@ checksum = "cd7cc57abe963c6d3b9d8be5b06ba7c8957a930305ca90304f24ef040aa6f961"
[[package]]
name = "client"
version = "0.4.1"
version = "0.4.2"
dependencies = [
"api",
"arrow-flight",
@@ -1536,7 +1536,7 @@ dependencies = [
"rand",
"session",
"snafu",
"substrait 0.4.1",
"substrait 0.4.2",
"substrait 0.7.5",
"tokio",
"tokio-stream",
@@ -1573,7 +1573,7 @@ dependencies = [
[[package]]
name = "cmd"
version = "0.4.1"
version = "0.4.2"
dependencies = [
"anymap",
"async-trait",
@@ -1621,7 +1621,7 @@ dependencies = [
"servers",
"session",
"snafu",
"substrait 0.4.1",
"substrait 0.4.2",
"table",
"temp-env",
"tikv-jemallocator",
@@ -1654,7 +1654,7 @@ checksum = "55b672471b4e9f9e95499ea597ff64941a309b2cdbffcc46f2cc5e2d971fd335"
[[package]]
name = "common-base"
version = "0.4.1"
version = "0.4.2"
dependencies = [
"anymap",
"bitvec",
@@ -1669,7 +1669,7 @@ dependencies = [
[[package]]
name = "common-catalog"
version = "0.4.1"
version = "0.4.2"
dependencies = [
"chrono",
"common-error",
@@ -1682,7 +1682,7 @@ dependencies = [
[[package]]
name = "common-config"
version = "0.4.1"
version = "0.4.2"
dependencies = [
"common-base",
"humantime-serde",
@@ -1691,7 +1691,7 @@ dependencies = [
[[package]]
name = "common-datasource"
version = "0.4.1"
version = "0.4.2"
dependencies = [
"arrow",
"arrow-schema",
@@ -1720,7 +1720,7 @@ dependencies = [
[[package]]
name = "common-error"
version = "0.4.1"
version = "0.4.2"
dependencies = [
"snafu",
"strum 0.25.0",
@@ -1728,7 +1728,7 @@ dependencies = [
[[package]]
name = "common-function"
version = "0.4.1"
version = "0.4.2"
dependencies = [
"arc-swap",
"chrono-tz 0.6.3",
@@ -1751,7 +1751,7 @@ dependencies = [
[[package]]
name = "common-greptimedb-telemetry"
version = "0.4.1"
version = "0.4.2"
dependencies = [
"async-trait",
"common-error",
@@ -1770,7 +1770,7 @@ dependencies = [
[[package]]
name = "common-grpc"
version = "0.4.1"
version = "0.4.2"
dependencies = [
"api",
"arrow-flight",
@@ -1800,7 +1800,7 @@ dependencies = [
[[package]]
name = "common-grpc-expr"
version = "0.4.1"
version = "0.4.2"
dependencies = [
"api",
"async-trait",
@@ -1819,7 +1819,7 @@ dependencies = [
[[package]]
name = "common-macro"
version = "0.4.1"
version = "0.4.2"
dependencies = [
"arc-swap",
"backtrace",
@@ -1836,7 +1836,7 @@ dependencies = [
[[package]]
name = "common-mem-prof"
version = "0.4.1"
version = "0.4.2"
dependencies = [
"common-error",
"common-macro",
@@ -1849,7 +1849,7 @@ dependencies = [
[[package]]
name = "common-meta"
version = "0.4.1"
version = "0.4.2"
dependencies = [
"api",
"arrow-flight",
@@ -1887,7 +1887,7 @@ dependencies = [
[[package]]
name = "common-procedure"
version = "0.4.1"
version = "0.4.2"
dependencies = [
"async-stream",
"async-trait",
@@ -1911,7 +1911,7 @@ dependencies = [
[[package]]
name = "common-procedure-test"
version = "0.4.1"
version = "0.4.2"
dependencies = [
"async-trait",
"common-procedure",
@@ -1919,7 +1919,7 @@ dependencies = [
[[package]]
name = "common-query"
version = "0.4.1"
version = "0.4.2"
dependencies = [
"api",
"async-trait",
@@ -1942,7 +1942,7 @@ dependencies = [
[[package]]
name = "common-recordbatch"
version = "0.4.1"
version = "0.4.2"
dependencies = [
"common-error",
"common-macro",
@@ -1959,7 +1959,7 @@ dependencies = [
[[package]]
name = "common-runtime"
version = "0.4.1"
version = "0.4.2"
dependencies = [
"async-trait",
"common-error",
@@ -1976,7 +1976,7 @@ dependencies = [
[[package]]
name = "common-telemetry"
version = "0.4.1"
version = "0.4.2"
dependencies = [
"backtrace",
"common-error",
@@ -2003,7 +2003,7 @@ dependencies = [
[[package]]
name = "common-test-util"
version = "0.4.1"
version = "0.4.2"
dependencies = [
"once_cell",
"rand",
@@ -2012,7 +2012,7 @@ dependencies = [
[[package]]
name = "common-time"
version = "0.4.1"
version = "0.4.2"
dependencies = [
"arrow",
"chrono",
@@ -2027,7 +2027,7 @@ dependencies = [
[[package]]
name = "common-version"
version = "0.4.1"
version = "0.4.2"
dependencies = [
"build-data",
]
@@ -2665,7 +2665,7 @@ dependencies = [
[[package]]
name = "datanode"
version = "0.4.1"
version = "0.4.2"
dependencies = [
"api",
"arrow-flight",
@@ -2724,7 +2724,7 @@ dependencies = [
"sql",
"storage",
"store-api",
"substrait 0.4.1",
"substrait 0.4.2",
"table",
"tokio",
"tokio-stream",
@@ -2738,7 +2738,7 @@ dependencies = [
[[package]]
name = "datatypes"
version = "0.4.1"
version = "0.4.2"
dependencies = [
"arrow",
"arrow-array",
@@ -3201,7 +3201,7 @@ dependencies = [
[[package]]
name = "file-engine"
version = "0.4.1"
version = "0.4.2"
dependencies = [
"api",
"async-trait",
@@ -3311,7 +3311,7 @@ dependencies = [
[[package]]
name = "frontend"
version = "0.4.1"
version = "0.4.2"
dependencies = [
"api",
"arc-swap",
@@ -3375,7 +3375,7 @@ dependencies = [
"storage",
"store-api",
"strfmt",
"substrait 0.4.1",
"substrait 0.4.2",
"table",
"tokio",
"toml 0.7.6",
@@ -5006,7 +5006,7 @@ checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f"
[[package]]
name = "log-store"
version = "0.4.1"
version = "0.4.2"
dependencies = [
"async-stream",
"async-trait",
@@ -5276,7 +5276,7 @@ dependencies = [
[[package]]
name = "meta-client"
version = "0.4.1"
version = "0.4.2"
dependencies = [
"api",
"async-trait",
@@ -5306,7 +5306,7 @@ dependencies = [
[[package]]
name = "meta-srv"
version = "0.4.1"
version = "0.4.2"
dependencies = [
"anymap",
"api",
@@ -5498,7 +5498,7 @@ dependencies = [
[[package]]
name = "mito2"
version = "0.4.1"
version = "0.4.2"
dependencies = [
"anymap",
"api",
@@ -5960,11 +5960,13 @@ dependencies = [
[[package]]
name = "object-store"
version = "0.4.1"
version = "0.4.2"
dependencies = [
"anyhow",
"async-trait",
"bytes",
"common-error",
"common-macro",
"common-runtime",
"common-telemetry",
"common-test-util",
@@ -5973,6 +5975,7 @@ dependencies = [
"metrics",
"moka",
"opendal",
"snafu",
"tokio",
"uuid",
]
@@ -6184,7 +6187,7 @@ dependencies = [
[[package]]
name = "operator"
version = "0.4.1"
version = "0.4.2"
dependencies = [
"api",
"async-compat",
@@ -6229,7 +6232,7 @@ dependencies = [
"sqlparser 0.34.0",
"storage",
"store-api",
"substrait 0.4.1",
"substrait 0.4.2",
"table",
"tokio",
"tonic 0.9.2",
@@ -6449,7 +6452,7 @@ dependencies = [
[[package]]
name = "partition"
version = "0.4.1"
version = "0.4.2"
dependencies = [
"api",
"async-trait",
@@ -6775,7 +6778,7 @@ dependencies = [
[[package]]
name = "plugins"
version = "0.4.1"
version = "0.4.2"
dependencies = [
"auth",
"common-base",
@@ -7025,7 +7028,7 @@ dependencies = [
[[package]]
name = "promql"
version = "0.4.1"
version = "0.4.2"
dependencies = [
"async-recursion",
"async-trait",
@@ -7288,7 +7291,7 @@ dependencies = [
[[package]]
name = "query"
version = "0.4.1"
version = "0.4.2"
dependencies = [
"ahash 0.8.3",
"api",
@@ -7345,7 +7348,7 @@ dependencies = [
"stats-cli",
"store-api",
"streaming-stats",
"substrait 0.4.1",
"substrait 0.4.2",
"table",
"tokio",
"tokio-stream",
@@ -8544,7 +8547,7 @@ checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49"
[[package]]
name = "script"
version = "0.4.1"
version = "0.4.2"
dependencies = [
"api",
"arc-swap",
@@ -8824,7 +8827,7 @@ dependencies = [
[[package]]
name = "servers"
version = "0.4.1"
version = "0.4.2"
dependencies = [
"aide",
"api",
@@ -8918,7 +8921,7 @@ dependencies = [
[[package]]
name = "session"
version = "0.4.1"
version = "0.4.2"
dependencies = [
"api",
"arc-swap",
@@ -9196,7 +9199,7 @@ dependencies = [
[[package]]
name = "sql"
version = "0.4.1"
version = "0.4.2"
dependencies = [
"api",
"common-base",
@@ -9247,7 +9250,7 @@ dependencies = [
[[package]]
name = "sqlness-runner"
version = "0.4.1"
version = "0.4.2"
dependencies = [
"async-trait",
"clap 4.4.1",
@@ -9453,7 +9456,7 @@ dependencies = [
[[package]]
name = "storage"
version = "0.4.1"
version = "0.4.2"
dependencies = [
"api",
"arc-swap",
@@ -9507,7 +9510,7 @@ dependencies = [
[[package]]
name = "store-api"
version = "0.4.1"
version = "0.4.2"
dependencies = [
"api",
"aquamarine",
@@ -9645,7 +9648,7 @@ dependencies = [
[[package]]
name = "substrait"
version = "0.4.1"
version = "0.4.2"
dependencies = [
"async-recursion",
"async-trait",
@@ -9803,7 +9806,7 @@ dependencies = [
[[package]]
name = "table"
version = "0.4.1"
version = "0.4.2"
dependencies = [
"anymap",
"async-trait",
@@ -9909,7 +9912,7 @@ dependencies = [
[[package]]
name = "tests-integration"
version = "0.4.1"
version = "0.4.2"
dependencies = [
"api",
"async-trait",
@@ -9962,7 +9965,7 @@ dependencies = [
"sql",
"sqlx",
"store-api",
"substrait 0.4.1",
"substrait 0.4.2",
"table",
"tempfile",
"tokio",

View File

@@ -55,7 +55,7 @@ members = [
resolver = "2"
[workspace.package]
version = "0.4.1"
version = "0.4.2"
edition = "2021"
license = "Apache-2.0"
@@ -87,7 +87,7 @@ meter-core = { git = "https://github.com/GreptimeTeam/greptime-meter.git", rev =
metrics = "0.20"
moka = "0.12"
once_cell = "1.18"
opentelemetry-proto = { version = "0.2", features = ["gen-tonic", "metrics"] }
opentelemetry-proto = { version = "0.2", features = ["gen-tonic", "metrics", "traces"] }
parquet = "43.0"
paste = "1.0"
prost = "0.11"

View File

@@ -39,17 +39,25 @@ pub trait ErrorExt: StackError {
where
Self: Sized,
{
let error = self.last();
if let Some(external_error) = error.source() {
let external_root = external_error.sources().last().unwrap();
if error.to_string().is_empty() {
format!("{external_root}")
} else {
format!("{error}: {external_root}")
match self.status_code() {
StatusCode::Unknown | StatusCode::Unexpected | StatusCode::Internal => {
// masks internal error from end user
format!("Internal error: {}", self.status_code() as u32)
}
_ => {
let error = self.last();
if let Some(external_error) = error.source() {
let external_root = external_error.sources().last().unwrap();
if error.to_string().is_empty() {
format!("{external_root}")
} else {
format!("{error}: {external_root}")
}
} else {
format!("{error}")
}
}
} else {
format!("{error}")
}
}
}

View File

@@ -1047,6 +1047,6 @@ mod tests {
// Run the runner and execute the procedure.
runner.run().await;
let err = meta.state().error().unwrap().output_msg();
assert!(err.contains("subprocedure failed"), "{err}");
assert!(err.contains("Internal error"), "{err}");
}
}

View File

@@ -19,14 +19,18 @@ use metrics::counter;
use opentelemetry_proto::tonic::collector::metrics::v1::{
ExportMetricsServiceRequest, ExportMetricsServiceResponse,
};
use opentelemetry_proto::tonic::collector::trace::v1::{
ExportTraceServiceRequest, ExportTraceServiceResponse,
};
use servers::error::{self, AuthSnafu, Result as ServerResult};
use servers::otlp;
use servers::otlp::plugin::TraceParserRef;
use servers::query_handler::OpenTelemetryProtocolHandler;
use session::context::QueryContextRef;
use snafu::ResultExt;
use crate::instance::Instance;
use crate::metrics::OTLP_METRICS_ROWS;
use crate::metrics::{OTLP_METRICS_ROWS, OTLP_TRACES_ROWS};
#[async_trait]
impl OpenTelemetryProtocolHandler for Instance {
@@ -40,7 +44,7 @@ impl OpenTelemetryProtocolHandler for Instance {
.as_ref()
.check_permission(ctx.current_user(), PermissionReq::Otlp)
.context(AuthSnafu)?;
let (requests, rows) = otlp::to_grpc_insert_requests(request)?;
let (requests, rows) = otlp::metrics::to_grpc_insert_requests(request)?;
let _ = self
.handle_row_inserts(requests, ctx)
.await
@@ -55,4 +59,40 @@ impl OpenTelemetryProtocolHandler for Instance {
};
Ok(resp)
}
async fn traces(
&self,
request: ExportTraceServiceRequest,
ctx: QueryContextRef,
) -> ServerResult<ExportTraceServiceResponse> {
self.plugins
.get::<PermissionCheckerRef>()
.as_ref()
.check_permission(ctx.current_user(), PermissionReq::Otlp)
.context(AuthSnafu)?;
let (table_name, spans) = match self.plugins.get::<TraceParserRef>() {
Some(parser) => (parser.table_name(), parser.parse(request)),
None => (
otlp::trace::TRACE_TABLE_NAME.to_string(),
otlp::trace::parse(request),
),
};
let (requests, rows) = otlp::trace::to_grpc_insert_requests(table_name, spans)?;
let _ = self
.handle_row_inserts(requests, ctx)
.await
.map_err(BoxedError::new)
.context(error::ExecuteGrpcQuerySnafu)?;
counter!(OTLP_TRACES_ROWS, rows as u64);
let resp = ExportTraceServiceResponse {
// TODO(fys): add support for partial_success in future patch
partial_success: None,
};
Ok(resp)
}
}

View File

@@ -22,3 +22,4 @@ pub(crate) const METRIC_RUN_SCRIPT_ELAPSED: &str = "frontend.run_script_elapsed"
pub const PROM_STORE_REMOTE_WRITE_SAMPLES: &str = "frontend.prometheus.remote_write.samples";
pub const OTLP_METRICS_ROWS: &str = "frontend.otlp.metrics.rows";
pub const OTLP_TRACES_ROWS: &str = "frontend.otlp.traces.rows";

View File

@@ -15,7 +15,7 @@
use std::time::Duration;
use store_api::region_engine::RegionEngine;
use store_api::region_request::RegionRequest;
use store_api::region_request::{RegionCloseRequest, RegionRequest};
use store_api::storage::RegionId;
use crate::config::MitoConfig;
@@ -55,6 +55,37 @@ async fn test_engine_create_existing_region() {
.unwrap();
}
#[tokio::test]
async fn test_engine_create_close_create_region() {
// This test will trigger create_or_open function.
let mut env = TestEnv::with_prefix("create-close-create");
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
let builder = CreateRequestBuilder::new();
// Create a region with id 1.
engine
.handle_request(region_id, RegionRequest::Create(builder.build()))
.await
.unwrap();
// Close the region.
engine
.handle_request(region_id, RegionRequest::Close(RegionCloseRequest {}))
.await
.unwrap();
// Create the same region id again.
engine
.handle_request(region_id, RegionRequest::Create(builder.build()))
.await
.unwrap();
assert!(engine.is_region_exists(region_id));
let region = engine.get_region(region_id).unwrap();
assert!(region.is_writable());
}
#[tokio::test]
async fn test_engine_create_with_different_id() {
let mut env = TestEnv::new();

View File

@@ -17,7 +17,7 @@ use common_query::logical_plan::DfExpr;
use common_query::prelude::Expr;
use common_recordbatch::RecordBatches;
use datafusion_common::ScalarValue;
use datafusion_expr::lit;
use datafusion_expr::{col, lit};
use store_api::region_engine::RegionEngine;
use store_api::region_request::RegionRequest;
use store_api::storage::{RegionId, ScanRequest};
@@ -46,7 +46,7 @@ async fn check_prune_row_groups(expr: DfExpr, expected: &str) {
region_id,
Rows {
schema: column_schemas.clone(),
rows: build_rows(0, 10),
rows: build_rows(0, 15),
},
)
.await;
@@ -76,6 +76,16 @@ async fn test_read_parquet_stats() {
+-------+---------+---------------------+
| tag_0 | field_0 | ts |
+-------+---------+---------------------+
| 0 | 0.0 | 1970-01-01T00:00:00 |
| 1 | 1.0 | 1970-01-01T00:00:01 |
| 10 | 10.0 | 1970-01-01T00:00:10 |
| 11 | 11.0 | 1970-01-01T00:00:11 |
| 12 | 12.0 | 1970-01-01T00:00:12 |
| 13 | 13.0 | 1970-01-01T00:00:13 |
| 14 | 14.0 | 1970-01-01T00:00:14 |
| 2 | 2.0 | 1970-01-01T00:00:02 |
| 3 | 3.0 | 1970-01-01T00:00:03 |
| 4 | 4.0 | 1970-01-01T00:00:04 |
| 5 | 5.0 | 1970-01-01T00:00:05 |
| 6 | 6.0 | 1970-01-01T00:00:06 |
| 7 | 7.0 | 1970-01-01T00:00:07 |
@@ -84,7 +94,11 @@ async fn test_read_parquet_stats() {
+-------+---------+---------------------+",
)
.await;
}
#[tokio::test]
async fn test_prune_tag() {
// prune result: only row group 1&2
check_prune_row_groups(
datafusion_expr::col("tag_0").gt(lit(ScalarValue::Utf8(Some("4".to_string())))),
"\
@@ -100,3 +114,25 @@ async fn test_read_parquet_stats() {
)
.await;
}
#[tokio::test]
async fn test_prune_tag_and_field() {
common_telemetry::init_default_ut_logging();
// prune result: only row group 1
check_prune_row_groups(
col("tag_0")
.gt(lit(ScalarValue::Utf8(Some("4".to_string()))))
.and(col("field_0").lt(lit(8.0))),
"\
+-------+---------+---------------------+
| tag_0 | field_0 | ts |
+-------+---------+---------------------+
| 5 | 5.0 | 1970-01-01T00:00:05 |
| 6 | 6.0 | 1970-01-01T00:00:06 |
| 7 | 7.0 | 1970-01-01T00:00:07 |
| 8 | 8.0 | 1970-01-01T00:00:08 |
| 9 | 9.0 | 1970-01-01T00:00:09 |
+-------+---------+---------------------+",
)
.await;
}

View File

@@ -154,6 +154,12 @@ impl RegionManifestManager {
let inner = self.inner.read().await;
inner.store.clone()
}
/// Returns total manifest size.
pub async fn manifest_size(&self) -> u64 {
let inner = self.inner.read().await;
inner.total_manifest_size()
}
}
#[cfg(test)]
@@ -186,7 +192,7 @@ impl RegionManifestManagerInner {
/// Creates a new manifest.
async fn new(metadata: RegionMetadataRef, options: RegionManifestOptions) -> Result<Self> {
// construct storage
let store = ManifestObjectStore::new(
let mut store = ManifestObjectStore::new(
&options.manifest_dir,
options.object_store.clone(),
options.compress_type,
@@ -232,7 +238,7 @@ impl RegionManifestManagerInner {
/// Returns `Ok(None)` if no such manifest.
async fn open(options: RegionManifestOptions) -> Result<Option<Self>> {
// construct storage
let store = ManifestObjectStore::new(
let mut store = ManifestObjectStore::new(
&options.manifest_dir,
options.object_store.clone(),
options.compress_type,
@@ -240,8 +246,9 @@ impl RegionManifestManagerInner {
// recover from storage
// construct manifest builder
// calculate the manifest size from the latest checkpoint
let mut version = MIN_VERSION;
let checkpoint = Self::last_checkpoint(&store).await?;
let checkpoint = Self::last_checkpoint(&mut store).await?;
let last_checkpoint_version = checkpoint
.as_ref()
.map(|checkpoint| checkpoint.last_version)
@@ -265,6 +272,8 @@ impl RegionManifestManagerInner {
let mut action_iter = store.scan(version, MAX_VERSION).await?;
while let Some((manifest_version, raw_action_list)) = action_iter.next_log().await? {
let action_list = RegionMetaActionList::decode(&raw_action_list)?;
// set manifest size after last checkpoint
store.set_delta_file_size(manifest_version, raw_action_list.len() as u64);
for action in action_list.actions {
match action {
RegionMetaAction::Change(action) => {
@@ -312,6 +321,7 @@ impl RegionManifestManagerInner {
Ok(())
}
/// Update the manifest. Return the current manifest version number.
async fn update(&mut self, action_list: RegionMetaActionList) -> Result<ManifestVersion> {
let version = self.increase_version();
self.store.save(version, &action_list.encode()?).await?;
@@ -343,6 +353,11 @@ impl RegionManifestManagerInner {
Ok(version)
}
/// Returns total manifest size.
pub(crate) fn total_manifest_size(&self) -> u64 {
self.store.total_manifest_size()
}
}
impl RegionManifestManagerInner {
@@ -369,8 +384,8 @@ impl RegionManifestManagerInner {
}
/// Make a new checkpoint. Return the fresh one if there are some actions to compact.
async fn do_checkpoint(&self) -> Result<Option<RegionCheckpoint>> {
let last_checkpoint = Self::last_checkpoint(&self.store).await?;
async fn do_checkpoint(&mut self) -> Result<Option<RegionCheckpoint>> {
let last_checkpoint = Self::last_checkpoint(&mut self.store).await?;
let current_version = self.last_version;
let (start_version, mut manifest_builder) = if let Some(checkpoint) = last_checkpoint {
@@ -441,7 +456,7 @@ impl RegionManifestManagerInner {
/// Fetch the last [RegionCheckpoint] from storage.
pub(crate) async fn last_checkpoint(
store: &ManifestObjectStore,
store: &mut ManifestObjectStore,
) -> Result<Option<RegionCheckpoint>> {
let last_checkpoint = store.load_last_checkpoint().await?;
@@ -456,14 +471,16 @@ impl RegionManifestManagerInner {
#[cfg(test)]
mod test {
use api::v1::SemanticType;
use common_datasource::compression::CompressionType;
use common_test_util::temp_dir::create_temp_dir;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
use super::*;
use crate::manifest::action::RegionChange;
use crate::manifest::action::{RegionChange, RegionEdit};
use crate::manifest::tests::utils::basic_region_metadata;
use crate::test_util::TestEnv;
@@ -546,4 +563,95 @@ mod test {
.unwrap();
manager.validate_manifest(&new_metadata, 1).await;
}
/// Just for test, refer to wal_dir_usage in src/store-api/src/logstore.rs.
async fn manifest_dir_usage(path: &str) -> u64 {
let mut size = 0;
let mut read_dir = tokio::fs::read_dir(path).await.unwrap();
while let Ok(dir_entry) = read_dir.next_entry().await {
let Some(entry) = dir_entry else {
break;
};
if entry.file_type().await.unwrap().is_file() {
let file_name = entry.file_name().into_string().unwrap();
if file_name.contains(".checkpoint") || file_name.contains(".json") {
let file_size = entry.metadata().await.unwrap().len() as usize;
debug!("File: {file_name:?}, size: {file_size}");
size += file_size;
}
}
}
size as u64
}
#[tokio::test]
async fn test_manifest_size() {
let metadata = Arc::new(basic_region_metadata());
let data_home = create_temp_dir("");
let data_home_path = data_home.path().to_str().unwrap().to_string();
let env = TestEnv::with_data_home(data_home);
let manifest_dir = format!("{}/manifest", data_home_path);
let manager = env
.create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone()))
.await
.unwrap()
.unwrap();
let mut new_metadata_builder = RegionMetadataBuilder::from_existing((*metadata).clone());
new_metadata_builder.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new("val2", ConcreteDataType::float64_datatype(), false),
semantic_type: SemanticType::Field,
column_id: 252,
});
let new_metadata = Arc::new(new_metadata_builder.build().unwrap());
let action_list =
RegionMetaActionList::with_action(RegionMetaAction::Change(RegionChange {
metadata: new_metadata.clone(),
}));
let current_version = manager.update(action_list).await.unwrap();
assert_eq!(current_version, 1);
manager.validate_manifest(&new_metadata, 1).await;
// get manifest size
let manifest_size = manager.manifest_size().await;
assert_eq!(manifest_size, manifest_dir_usage(&manifest_dir).await);
// update 10 times nop_action to trigger checkpoint
for _ in 0..10 {
manager
.update(RegionMetaActionList::new(vec![RegionMetaAction::Edit(
RegionEdit {
files_to_add: vec![],
files_to_remove: vec![],
compaction_time_window: None,
flushed_entry_id: None,
flushed_sequence: None,
},
)]))
.await
.unwrap();
}
// check manifest size again
let manifest_size = manager.manifest_size().await;
assert_eq!(manifest_size, manifest_dir_usage(&manifest_dir).await);
// Reopen the manager,
// we just calculate the size from the latest checkpoint file
manager.stop().await.unwrap();
let manager = env
.create_manifest_manager(CompressionType::Uncompressed, 10, None)
.await
.unwrap()
.unwrap();
manager.validate_manifest(&new_metadata, 11).await;
// get manifest size again
let manifest_size = manager.manifest_size().await;
assert_eq!(manifest_size, 1312);
}
}

View File

@@ -129,11 +129,22 @@ impl ObjectStoreLogIterator {
}
}
/// Key to identify a manifest file.
#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)]
enum FileKey {
/// A delta file (`.json`).
Delta(ManifestVersion),
/// A checkpoint file (`.checkpoint`).
Checkpoint(ManifestVersion),
}
#[derive(Clone, Debug)]
pub struct ManifestObjectStore {
object_store: ObjectStore,
compress_type: CompressionType,
path: String,
/// Stores the size of each manifest file.
manifest_size_map: HashMap<FileKey, u64>,
}
impl ManifestObjectStore {
@@ -142,6 +153,7 @@ impl ManifestObjectStore {
object_store,
compress_type,
path: util::normalize_dir(path),
manifest_size_map: HashMap::new(),
}
}
@@ -184,6 +196,7 @@ impl ManifestObjectStore {
.context(OpenDalSnafu)
}
/// Scan the manifest files in the range of [start, end) and return the iterator.
pub async fn scan(
&self,
start: ManifestVersion,
@@ -212,8 +225,12 @@ impl ManifestObjectStore {
})
}
/// Delete manifest files that version < end.
/// If keep_last_checkpoint is true, the last checkpoint file will be kept.
/// ### Return
/// The number of deleted files.
pub async fn delete_until(
&self,
&mut self,
end: ManifestVersion,
keep_last_checkpoint: bool,
) -> Result<usize> {
@@ -248,7 +265,7 @@ impl ManifestObjectStore {
} else {
None
};
let paths: Vec<_> = entries
let del_entries: Vec<_> = entries
.iter()
.filter(|(_e, is_checkpoint, version)| {
if let Some(max_version) = checkpoint_version {
@@ -264,12 +281,15 @@ impl ManifestObjectStore {
true
}
})
.map(|e| e.0.path().to_string())
.collect();
let paths = del_entries
.iter()
.map(|(e, _, _)| e.path().to_string())
.collect::<Vec<_>>();
let ret = paths.len();
debug!(
"Deleting {} logs from manifest storage path {} until {}, checkpoint: {:?}, paths: {:?}",
"Deleting {} logs from manifest storage path {} until {}, checkpoint_version: {:?}, paths: {:?}",
ret,
self.path,
end,
@@ -282,10 +302,21 @@ impl ManifestObjectStore {
.await
.context(OpenDalSnafu)?;
// delete manifest sizes
for (_, is_checkpoint, version) in &del_entries {
if *is_checkpoint {
self.manifest_size_map
.remove(&FileKey::Checkpoint(*version));
} else {
self.manifest_size_map.remove(&FileKey::Delta(*version));
}
}
Ok(ret)
}
pub async fn save(&self, version: ManifestVersion, bytes: &[u8]) -> Result<()> {
/// Save the delta manifest file.
pub async fn save(&mut self, version: ManifestVersion, bytes: &[u8]) -> Result<()> {
let path = self.delta_file_path(version);
debug!("Save log to manifest storage, version: {}", version);
let data = self
@@ -296,13 +327,17 @@ impl ManifestObjectStore {
compress_type: self.compress_type,
path: &path,
})?;
let delta_size = data.len();
self.object_store
.write(&path, data)
.await
.context(OpenDalSnafu)
.context(OpenDalSnafu)?;
self.set_delta_file_size(version, delta_size as u64);
Ok(())
}
pub async fn save_checkpoint(&self, version: ManifestVersion, bytes: &[u8]) -> Result<()> {
/// Save the checkpoint manifest file.
pub async fn save_checkpoint(&mut self, version: ManifestVersion, bytes: &[u8]) -> Result<()> {
let path = self.checkpoint_file_path(version);
let data = self
.compress_type
@@ -312,10 +347,12 @@ impl ManifestObjectStore {
compress_type: self.compress_type,
path: &path,
})?;
let checkpoint_size = data.len();
self.object_store
.write(&path, data)
.await
.context(OpenDalSnafu)?;
self.set_checkpoint_file_size(version, checkpoint_size as u64);
// Because last checkpoint file only contain size and version, which is tiny, so we don't compress it.
let last_checkpoint_path = self.last_checkpoint_path();
@@ -342,7 +379,7 @@ impl ManifestObjectStore {
}
pub async fn load_checkpoint(
&self,
&mut self,
version: ManifestVersion,
) -> Result<Option<(ManifestVersion, Vec<u8>)>> {
let path = self.checkpoint_file_path(version);
@@ -351,12 +388,15 @@ impl ManifestObjectStore {
let checkpoint_data =
match self.object_store.read(&path).await {
Ok(checkpoint) => {
let checkpoint_size = checkpoint.len();
let decompress_data = self.compress_type.decode(checkpoint).await.context(
DecompressObjectSnafu {
compress_type: self.compress_type,
path,
},
)?;
// set the checkpoint size
self.set_checkpoint_file_size(version, checkpoint_size as u64);
Ok(Some(decompress_data))
}
Err(e) => {
@@ -373,6 +413,7 @@ impl ManifestObjectStore {
);
match self.object_store.read(&fall_back_path).await {
Ok(checkpoint) => {
let checkpoint_size = checkpoint.len();
let decompress_data = FALL_BACK_COMPRESS_TYPE
.decode(checkpoint)
.await
@@ -380,6 +421,7 @@ impl ManifestObjectStore {
compress_type: FALL_BACK_COMPRESS_TYPE,
path,
})?;
self.set_checkpoint_file_size(version, checkpoint_size as u64);
Ok(Some(decompress_data))
}
Err(e) if e.kind() == ErrorKind::NotFound => Ok(None),
@@ -398,7 +440,7 @@ impl ManifestObjectStore {
/// Load the latest checkpoint.
/// Return manifest version and the raw [RegionCheckpoint](crate::manifest::action::RegionCheckpoint) content if any
pub async fn load_last_checkpoint(&self) -> Result<Option<(ManifestVersion, Vec<u8>)>> {
pub async fn load_last_checkpoint(&mut self) -> Result<Option<(ManifestVersion, Vec<u8>)>> {
let last_checkpoint_path = self.last_checkpoint_path();
let last_checkpoint_data = match self.object_store.read(&last_checkpoint_path).await {
Ok(data) => data,
@@ -424,6 +466,22 @@ impl ManifestObjectStore {
pub async fn read_file(&self, path: &str) -> Result<Vec<u8>> {
self.object_store.read(path).await.context(OpenDalSnafu)
}
/// Compute the size(Byte) in manifest size map.
pub(crate) fn total_manifest_size(&self) -> u64 {
self.manifest_size_map.values().sum()
}
/// Set the size of the delta file by delta version.
pub(crate) fn set_delta_file_size(&mut self, version: ManifestVersion, size: u64) {
self.manifest_size_map.insert(FileKey::Delta(version), size);
}
/// Set the size of the checkpoint file by checkpoint version.
pub(crate) fn set_checkpoint_file_size(&mut self, version: ManifestVersion, size: u64) {
self.manifest_size_map
.insert(FileKey::Checkpoint(version), size);
}
}
#[derive(Serialize, Deserialize, Debug)]
@@ -489,7 +547,7 @@ mod tests {
test_manifest_log_store_case(log_store).await;
}
async fn test_manifest_log_store_case(log_store: ManifestObjectStore) {
async fn test_manifest_log_store_case(mut log_store: ManifestObjectStore) {
for v in 0..5 {
log_store
.save(v, format!("hello, {v}").as_bytes())
@@ -600,4 +658,92 @@ mod tests {
let mut it = log_store.scan(0, 10).await.unwrap();
assert!(it.next_log().await.unwrap().is_none());
}
#[tokio::test]
async fn test_file_version() {
let version = file_version("00000000000000000007.checkpoint");
assert_eq!(version, 7);
let name = delta_file(version);
assert_eq!(name, "00000000000000000007.json");
let name = checkpoint_file(version);
assert_eq!(name, "00000000000000000007.checkpoint");
}
#[tokio::test]
async fn test_uncompressed_manifest_files_size() {
let mut log_store = new_test_manifest_store();
// write 5 manifest files with uncompressed8B per file
log_store.compress_type = CompressionType::Uncompressed;
for v in 0..5 {
log_store
.save(v, format!("hello, {v}").as_bytes())
.await
.unwrap();
}
// write 1 checkpoint file with uncompressed23B
log_store
.save_checkpoint(5, "checkpoint_uncompressed".as_bytes())
.await
.unwrap();
// manifest files size
assert_eq!(log_store.total_manifest_size(), 63);
// delete 3 manifest files
assert_eq!(log_store.delete_until(3, false).await.unwrap(), 3);
// manifest files size after delete
assert_eq!(log_store.total_manifest_size(), 39);
// delete all manifest files
assert_eq!(
log_store
.delete_until(ManifestVersion::MAX, false)
.await
.unwrap(),
3
);
assert_eq!(log_store.total_manifest_size(), 0);
}
#[tokio::test]
async fn test_compressed_manifest_files_size() {
let mut log_store = new_test_manifest_store();
// Test with compressed manifest files
log_store.compress_type = CompressionType::Gzip;
// write 5 manifest files
for v in 0..5 {
log_store
.save(v, format!("hello, {v}").as_bytes())
.await
.unwrap();
}
log_store
.save_checkpoint(5, "checkpoint_compressed".as_bytes())
.await
.unwrap();
// manifest files size
assert_eq!(log_store.total_manifest_size(), 181);
// delete 3 manifest files
assert_eq!(log_store.delete_until(3, false).await.unwrap(), 3);
// manifest files size after delete
assert_eq!(log_store.total_manifest_size(), 97);
// delete all manifest files
assert_eq!(
log_store
.delete_until(ManifestVersion::MAX, false)
.await
.unwrap(),
3
);
assert_eq!(log_store.total_manifest_size(), 0);
}
}

View File

@@ -202,7 +202,7 @@ async fn generate_checkpoint_with_compression_types(
manager.update(action).await.unwrap();
}
RegionManifestManagerInner::last_checkpoint(&manager.store().await)
RegionManifestManagerInner::last_checkpoint(&mut manager.store().await)
.await
.unwrap()
.unwrap()

View File

@@ -20,8 +20,11 @@ use std::sync::{Arc, RwLock};
use api::v1::OpType;
use common_telemetry::debug;
use datafusion::physical_plan::PhysicalExpr;
use datafusion_common::ScalarValue;
use datafusion_expr::ColumnarValue;
use datatypes::arrow;
use datatypes::arrow::array::ArrayRef;
use datatypes::arrow::array::{ArrayRef, BooleanArray};
use datatypes::arrow::record_batch::RecordBatch;
use datatypes::data_type::DataType;
use datatypes::prelude::{MutableVector, ScalarVectorBuilder, Vector, VectorRef};
@@ -300,12 +303,16 @@ impl SeriesSet {
let (primary_key_builders, primary_key_schema) =
primary_key_builders(&self.region_metadata, 1);
let physical_exprs: Vec<_> = predicate
.and_then(|p| p.to_physical_exprs(&primary_key_schema).ok())
.unwrap_or_default();
Iter {
metadata: self.region_metadata.clone(),
series: self.series.clone(),
projection,
last_key: None,
predicate,
predicate: physical_exprs,
pk_schema: primary_key_schema,
primary_key_builders,
codec: self.codec.clone(),
@@ -341,7 +348,7 @@ struct Iter {
series: Arc<SeriesRwLockMap>,
projection: HashSet<ColumnId>,
last_key: Option<Vec<u8>>,
predicate: Option<Predicate>,
predicate: Vec<Arc<dyn PhysicalExpr>>,
pk_schema: arrow::datatypes::SchemaRef,
primary_key_builders: Vec<Box<dyn MutableVector>>,
codec: Arc<McmpRowCodec>,
@@ -362,18 +369,18 @@ impl Iterator for Iter {
// TODO(hl): maybe yield more than one time series to amortize range overhead.
for (primary_key, series) in range {
let mut series = series.write().unwrap();
if let Some(predicate) = &self.predicate {
if !prune_primary_key(
if !self.predicate.is_empty()
&& !prune_primary_key(
&self.codec,
primary_key.as_slice(),
&mut series,
&mut self.primary_key_builders,
self.pk_schema.clone(),
predicate,
) {
// read next series
continue;
}
&self.predicate,
)
{
// read next series
continue;
}
self.last_key = Some(primary_key.clone());
@@ -392,7 +399,7 @@ fn prune_primary_key(
series: &mut Series,
builders: &mut Vec<Box<dyn MutableVector>>,
pk_schema: arrow::datatypes::SchemaRef,
predicate: &Predicate,
predicate: &[Arc<dyn PhysicalExpr>],
) -> bool {
// no primary key, we simply return true.
if pk_schema.fields().is_empty() {
@@ -400,20 +407,52 @@ fn prune_primary_key(
}
if let Some(rb) = series.pk_cache.as_ref() {
let res = predicate.prune_primary_key(rb).unwrap_or(true);
let res = prune_inner(predicate, rb).unwrap_or(true);
debug!("Prune primary key: {:?}, res: {:?}", rb, res);
res
} else {
let Ok(rb) = pk_to_record_batch(codec, pk, builders, pk_schema) else {
return true;
};
let res = predicate.prune_primary_key(&rb).unwrap_or(true);
let res = prune_inner(predicate, &rb).unwrap_or(true);
debug!("Prune primary key: {:?}, res: {:?}", rb, res);
series.update_pk_cache(rb);
res
}
}
fn prune_inner(predicates: &[Arc<dyn PhysicalExpr>], primary_key: &RecordBatch) -> Result<bool> {
for expr in predicates {
// evaluate every filter against primary key
let Ok(eva) = expr.evaluate(primary_key) else {
continue;
};
let result = match eva {
ColumnarValue::Array(array) => {
let predicate_array = array.as_any().downcast_ref::<BooleanArray>().unwrap();
predicate_array
.into_iter()
.map(|x| x.unwrap_or(true))
.next()
.unwrap_or(true)
}
// result was a column
ColumnarValue::Scalar(ScalarValue::Boolean(v)) => v.unwrap_or(true),
_ => {
unreachable!("Unexpected primary key record batch evaluation result: {:?}, primary key: {:?}", eva, primary_key);
}
};
debug!(
"Evaluate primary key {:?} against filter: {:?}, result: {:?}",
primary_key, expr, result
);
if !result {
return Ok(false);
}
}
Ok(true)
}
fn pk_to_record_batch(
codec: &Arc<McmpRowCodec>,
bytes: &[u8],

View File

@@ -17,13 +17,12 @@
use common_recordbatch::SendableRecordBatchStream;
use common_telemetry::debug;
use common_time::range::TimestampRange;
use snafu::ResultExt;
use store_api::storage::ScanRequest;
use table::predicate::{Predicate, TimeRangePredicateBuilder};
use crate::access_layer::AccessLayerRef;
use crate::cache::CacheManagerRef;
use crate::error::{BuildPredicateSnafu, Result};
use crate::error::Result;
use crate::read::projection::ProjectionMapper;
use crate::read::seq_scan::SeqScan;
use crate::region::version::VersionRef;
@@ -173,11 +172,7 @@ impl ScanRegion {
total_ssts
);
let predicate = Predicate::try_new(
self.request.filters.clone(),
self.version.metadata.schema.clone(),
)
.context(BuildPredicateSnafu)?;
let predicate = Predicate::new(self.request.filters.clone());
let mapper = match &self.request.projection {
Some(p) => ProjectionMapper::new(&self.version.metadata, p.iter().copied())?,
None => ProjectionMapper::all(&self.version.metadata)?,

View File

@@ -119,7 +119,8 @@ impl RegionOpener {
&expect.column_metadatas,
&expect.primary_key,
)?;
// To keep consistence with Create behavior, set the opened Region writable.
region.set_writable(true);
return Ok(region);
}
Ok(None) => {

View File

@@ -16,6 +16,7 @@
mod format;
pub mod reader;
pub mod row_group;
mod stats;
pub mod writer;

View File

@@ -188,8 +188,9 @@ impl ParquetReaderBuilder {
&read_format,
column_ids,
);
let pruned_row_groups = predicate
.prune_with_stats(&stats)
.prune_with_stats(&stats, read_format.metadata().schema.arrow_schema())
.into_iter()
.enumerate()
.filter_map(|(idx, valid)| if valid { Some(idx) } else { None })

View File

@@ -0,0 +1,230 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Ports private structs from [parquet crate](https://github.com/apache/arrow-rs/blob/7e134f4d277c0b62c27529fc15a4739de3ad0afd/parquet/src/arrow/async_reader/mod.rs#L644-L650).
use std::sync::Arc;
use bytes::{Buf, Bytes};
use parquet::arrow::arrow_reader::{RowGroups, RowSelection};
use parquet::arrow::async_reader::AsyncFileReader;
use parquet::arrow::ProjectionMask;
use parquet::column::page::{PageIterator, PageReader};
use parquet::errors::{ParquetError, Result};
use parquet::file::metadata::RowGroupMetaData;
use parquet::file::reader::{ChunkReader, Length};
use parquet::file::serialized_reader::SerializedPageReader;
use parquet::format::PageLocation;
/// An in-memory collection of column chunks
pub struct InMemoryRowGroup<'a> {
metadata: &'a RowGroupMetaData,
page_locations: Option<&'a [Vec<PageLocation>]>,
column_chunks: Vec<Option<Arc<ColumnChunkData>>>,
row_count: usize,
}
impl<'a> InMemoryRowGroup<'a> {
/// Fetches the necessary column data into memory
// TODO(yingwen): Fix clippy warnings.
#[allow(clippy::filter_map_bool_then)]
#[allow(clippy::useless_conversion)]
pub async fn fetch<T: AsyncFileReader + Send>(
&mut self,
input: &mut T,
projection: &ProjectionMask,
selection: Option<&RowSelection>,
) -> Result<()> {
if let Some((selection, page_locations)) = selection.zip(self.page_locations) {
// If we have a `RowSelection` and an `OffsetIndex` then only fetch pages required for the
// `RowSelection`
let mut page_start_offsets: Vec<Vec<usize>> = vec![];
let fetch_ranges = self
.column_chunks
.iter()
.zip(self.metadata.columns())
.enumerate()
.filter_map(|(idx, (chunk, chunk_meta))| {
(chunk.is_none() && projection.leaf_included(idx)).then(|| {
// If the first page does not start at the beginning of the column,
// then we need to also fetch a dictionary page.
let mut ranges = vec![];
let (start, _len) = chunk_meta.byte_range();
match page_locations[idx].first() {
Some(first) if first.offset as u64 != start => {
ranges.push(start as usize..first.offset as usize);
}
_ => (),
}
ranges.extend(selection.scan_ranges(&page_locations[idx]));
page_start_offsets.push(ranges.iter().map(|range| range.start).collect());
ranges
})
})
.flatten()
.collect();
let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter();
let mut page_start_offsets = page_start_offsets.into_iter();
for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
if chunk.is_some() || !projection.leaf_included(idx) {
continue;
}
if let Some(offsets) = page_start_offsets.next() {
let mut chunks = Vec::with_capacity(offsets.len());
for _ in 0..offsets.len() {
chunks.push(chunk_data.next().unwrap());
}
*chunk = Some(Arc::new(ColumnChunkData::Sparse {
length: self.metadata.column(idx).byte_range().1 as usize,
data: offsets.into_iter().zip(chunks.into_iter()).collect(),
}))
}
}
} else {
let fetch_ranges = self
.column_chunks
.iter()
.enumerate()
.filter_map(|(idx, chunk)| {
(chunk.is_none() && projection.leaf_included(idx)).then(|| {
let column = self.metadata.column(idx);
let (start, length) = column.byte_range();
start as usize..(start + length) as usize
})
})
.collect();
let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter();
for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
if chunk.is_some() || !projection.leaf_included(idx) {
continue;
}
if let Some(data) = chunk_data.next() {
*chunk = Some(Arc::new(ColumnChunkData::Dense {
offset: self.metadata.column(idx).byte_range().0 as usize,
data,
}));
}
}
}
Ok(())
}
}
impl<'a> RowGroups for InMemoryRowGroup<'a> {
fn num_rows(&self) -> usize {
self.row_count
}
fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
match &self.column_chunks[i] {
None => Err(ParquetError::General(format!(
"Invalid column index {i}, column was not fetched"
))),
Some(data) => {
let page_locations = self.page_locations.map(|index| index[i].clone());
let page_reader: Box<dyn PageReader> = Box::new(SerializedPageReader::new(
data.clone(),
self.metadata.column(i),
self.row_count,
page_locations,
)?);
Ok(Box::new(ColumnChunkIterator {
reader: Some(Ok(page_reader)),
}))
}
}
}
}
/// An in-memory column chunk
#[derive(Clone)]
enum ColumnChunkData {
/// Column chunk data representing only a subset of data pages
Sparse {
/// Length of the full column chunk
length: usize,
/// Set of data pages included in this sparse chunk. Each element is a tuple
/// of (page offset, page data)
data: Vec<(usize, Bytes)>,
},
/// Full column chunk and its offset
Dense { offset: usize, data: Bytes },
}
impl ColumnChunkData {
fn get(&self, start: u64) -> Result<Bytes> {
match &self {
ColumnChunkData::Sparse { data, .. } => data
.binary_search_by_key(&start, |(offset, _)| *offset as u64)
.map(|idx| data[idx].1.clone())
.map_err(|_| {
ParquetError::General(format!(
"Invalid offset in sparse column chunk data: {start}"
))
}),
ColumnChunkData::Dense { offset, data } => {
let start = start as usize - *offset;
Ok(data.slice(start..))
}
}
}
}
impl Length for ColumnChunkData {
fn len(&self) -> u64 {
match &self {
ColumnChunkData::Sparse { length, .. } => *length as u64,
ColumnChunkData::Dense { data, .. } => data.len() as u64,
}
}
}
impl ChunkReader for ColumnChunkData {
type T = bytes::buf::Reader<Bytes>;
fn get_read(&self, start: u64) -> Result<Self::T> {
Ok(self.get(start)?.reader())
}
fn get_bytes(&self, start: u64, length: usize) -> Result<Bytes> {
Ok(self.get(start)?.slice(..length))
}
}
/// Implements [`PageIterator`] for a single column chunk, yielding a single [`PageReader`]
struct ColumnChunkIterator {
reader: Option<Result<Box<dyn PageReader>>>,
}
impl Iterator for ColumnChunkIterator {
type Item = Result<Box<dyn PageReader>>;
fn next(&mut self) -> Option<Self::Item> {
self.reader.take()
}
}
impl PageIterator for ColumnChunkIterator {}

View File

@@ -99,6 +99,15 @@ impl TestEnv {
}
}
/// Returns a new env with specific `data_home` for test.
pub fn with_data_home(data_home: TempDir) -> TestEnv {
TestEnv {
data_home,
logstore: None,
object_store: None,
}
}
pub fn get_logstore(&self) -> Option<Arc<RaftEngineLogStore>> {
self.logstore.clone()
}

View File

@@ -7,6 +7,8 @@ license.workspace = true
[dependencies]
async-trait = "0.1"
bytes = "1.4"
common-error.workspace = true
common-macro.workspace = true
common-runtime.workspace = true
common-telemetry.workspace = true
futures.workspace = true
@@ -17,6 +19,7 @@ opendal = { version = "0.40", features = [
"layers-tracing",
"layers-metrics",
] }
snafu.workspace = true
uuid.workspace = true
[dev-dependencies]

View File

@@ -0,0 +1,45 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use snafu::{Location, Snafu};
#[derive(Snafu)]
#[snafu(visibility(pub))]
#[stack_trace_debug]
pub enum Error {
#[snafu(display("Default storage not found: {}", default_object_store))]
DefaultStorageNotFound {
location: Location,
default_object_store: String,
},
}
pub type Result<T> = std::result::Result<T, Error>;
impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
match self {
Error::DefaultStorageNotFound { .. } => StatusCode::InvalidArguments,
}
}
fn as_any(&self) -> &dyn Any {
self
}
}

View File

@@ -19,7 +19,9 @@ pub use opendal::{
Operator as ObjectStore, Reader, Result, Writer,
};
pub mod error;
pub mod layers;
pub mod manager;
mod metrics;
pub mod test_util;
pub mod util;

View File

@@ -0,0 +1,107 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use snafu::OptionExt;
use crate::error::{DefaultStorageNotFoundSnafu, Result};
use crate::ObjectStore;
/// Manages multiple object stores so that users can configure a storage for each table.
/// This struct certainly have one default object store, and can have zero or more custom object stores.
pub struct ObjectStoreManager {
stores: HashMap<String, ObjectStore>,
default_object_store: ObjectStore,
}
impl ObjectStoreManager {
/// Creates a new manager with specific object stores. Returns an error if `stores` doesn't contain the default object store.
pub fn try_new(
stores: HashMap<String, ObjectStore>,
default_object_store: &str,
) -> Result<Self> {
let default_object_store = stores
.get(default_object_store)
.context(DefaultStorageNotFoundSnafu {
default_object_store,
})?
.clone();
Ok(ObjectStoreManager {
stores,
default_object_store,
})
}
pub fn find(&self, name: &str) -> Option<&ObjectStore> {
self.stores.get(name)
}
pub fn default_object_store(&self) -> &ObjectStore {
&self.default_object_store
}
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use common_test_util::temp_dir::{create_temp_dir, TempDir};
use super::ObjectStoreManager;
use crate::error::Error;
use crate::services::Fs as Builder;
use crate::ObjectStore;
fn new_object_store(dir: &TempDir) -> ObjectStore {
let store_dir = dir.path().to_str().unwrap();
let mut builder = Builder::default();
let _ = builder.root(store_dir);
ObjectStore::new(builder).unwrap().finish()
}
#[test]
fn test_new_returns_err_when_global_store_not_exist() {
let dir = create_temp_dir("new");
let object_store = new_object_store(&dir);
let stores: HashMap<String, ObjectStore> = vec![
("File".to_string(), object_store.clone()),
("S3".to_string(), object_store.clone()),
]
.into_iter()
.collect();
assert!(matches!(
ObjectStoreManager::try_new(stores, "Gcs"),
Err(Error::DefaultStorageNotFound { .. })
));
}
#[test]
fn test_new_returns_ok() {
let dir = create_temp_dir("new");
let object_store = new_object_store(&dir);
let stores: HashMap<String, ObjectStore> = vec![
("File".to_string(), object_store.clone()),
("S3".to_string(), object_store.clone()),
]
.into_iter()
.collect();
let object_store_manager = ObjectStoreManager::try_new(stores, "File").unwrap();
assert_eq!(object_store_manager.stores.len(), 2);
assert!(object_store_manager.find("File").is_some());
assert!(object_store_manager.find("S3").is_some());
assert!(object_store_manager.find("Gcs").is_none());
}
}

View File

@@ -316,9 +316,13 @@ impl ErrorExt for Error {
ParseSql { source, .. } => source.status_code(),
CreateRecordBatch { source, .. } => source.status_code(),
QueryExecution { source, .. } | QueryPlan { source, .. } => source.status_code(),
DataFusion { .. } | MissingTimestampColumn { .. } | RoutePartition { .. } => {
StatusCode::Internal
}
DataFusion { error, .. } => match error {
DataFusionError::Internal(_) => StatusCode::Internal,
DataFusionError::NotImplemented(_) => StatusCode::Unsupported,
DataFusionError::Plan(_) => StatusCode::PlanQuery,
_ => StatusCode::EngineExecuteQuery,
},
MissingTimestampColumn { .. } | RoutePartition { .. } => StatusCode::EngineExecuteQuery,
Sql { source, .. } => source.status_code(),
PlanSql { .. } => StatusCode::PlanQuery,
ConvertSqlType { source, .. } | ConvertSqlValue { source, .. } => source.status_code(),

View File

@@ -392,7 +392,6 @@ impl ErrorExt for Error {
Internal { .. }
| InternalIo { .. }
| TokioIo { .. }
| CollectRecordbatch { .. }
| StartHttp { .. }
| StartGrpc { .. }
| AlreadyStarted { .. }
@@ -403,6 +402,8 @@ impl ErrorExt for Error {
| GrpcReflectionService { .. }
| BuildHttpResponse { .. } => StatusCode::Internal,
CollectRecordbatch { .. } => StatusCode::EngineExecuteQuery,
InsertScript { source, .. }
| ExecuteScript { source, .. }
| ExecuteQuery { source, .. }

View File

@@ -660,6 +660,7 @@ impl HttpServer {
fn route_otlp<S>(&self, otlp_handler: OpenTelemetryProtocolHandlerRef) -> Router<S> {
Router::new()
.route("/v1/metrics", routing::post(otlp::metrics))
.route("/v1/traces", routing::post(otlp::traces))
.with_state(otlp_handler)
}

View File

@@ -21,6 +21,9 @@ use hyper::Body;
use opentelemetry_proto::tonic::collector::metrics::v1::{
ExportMetricsServiceRequest, ExportMetricsServiceResponse,
};
use opentelemetry_proto::tonic::collector::trace::v1::{
ExportTraceServiceRequest, ExportTraceServiceResponse,
};
use prost::Message;
use session::context::QueryContextRef;
use snafu::prelude::*;
@@ -33,16 +36,19 @@ pub async fn metrics(
State(handler): State<OpenTelemetryProtocolHandlerRef>,
Extension(query_ctx): Extension<QueryContextRef>,
RawBody(body): RawBody,
) -> Result<OtlpResponse> {
) -> Result<OtlpMetricsResponse> {
let _timer = timer!(
crate::metrics::METRIC_HTTP_OPENTELEMETRY_ELAPSED,
crate::metrics::METRIC_HTTP_OPENTELEMETRY_METRICS_ELAPSED,
&[(crate::metrics::METRIC_DB_LABEL, query_ctx.get_db_string())]
);
let request = parse_body(body).await?;
handler.metrics(request, query_ctx).await.map(OtlpResponse)
let request = parse_metrics_body(body).await?;
handler
.metrics(request, query_ctx)
.await
.map(OtlpMetricsResponse)
}
async fn parse_body(body: Body) -> Result<ExportMetricsServiceRequest> {
async fn parse_metrics_body(body: Body) -> Result<ExportMetricsServiceRequest> {
hyper::body::to_bytes(body)
.await
.context(error::HyperSnafu)
@@ -51,9 +57,47 @@ async fn parse_body(body: Body) -> Result<ExportMetricsServiceRequest> {
})
}
pub struct OtlpResponse(ExportMetricsServiceResponse);
pub struct OtlpMetricsResponse(ExportMetricsServiceResponse);
impl IntoResponse for OtlpResponse {
impl IntoResponse for OtlpMetricsResponse {
fn into_response(self) -> axum::response::Response {
(
[(header::CONTENT_TYPE, "application/x-protobuf")],
self.0.encode_to_vec(),
)
.into_response()
}
}
#[axum_macros::debug_handler]
pub async fn traces(
State(handler): State<OpenTelemetryProtocolHandlerRef>,
Extension(query_ctx): Extension<QueryContextRef>,
RawBody(body): RawBody,
) -> Result<OtlpTracesResponse> {
let _timer = timer!(
crate::metrics::METRIC_HTTP_OPENTELEMETRY_TRACES_ELAPSED,
&[(crate::metrics::METRIC_DB_LABEL, query_ctx.get_db_string())]
);
let request = parse_traces_body(body).await?;
handler
.traces(request, query_ctx)
.await
.map(OtlpTracesResponse)
}
async fn parse_traces_body(body: Body) -> Result<ExportTraceServiceRequest> {
hyper::body::to_bytes(body)
.await
.context(error::HyperSnafu)
.and_then(|buf| {
ExportTraceServiceRequest::decode(&buf[..]).context(error::DecodeOtlpRequestSnafu)
})
}
pub struct OtlpTracesResponse(ExportTraceServiceResponse);
impl IntoResponse for OtlpTracesResponse {
fn into_response(self) -> axum::response::Response {
(
[(header::CONTENT_TYPE, "application/x-protobuf")],

View File

@@ -37,7 +37,10 @@ pub(crate) const METRIC_HTTP_INFLUXDB_WRITE_ELAPSED: &str = "servers.http_influx
pub(crate) const METRIC_HTTP_PROM_STORE_WRITE_ELAPSED: &str =
"servers.http_prometheus_write_elapsed";
pub(crate) const METRIC_HTTP_PROM_STORE_READ_ELAPSED: &str = "servers.http_prometheus_read_elapsed";
pub(crate) const METRIC_HTTP_OPENTELEMETRY_ELAPSED: &str = "servers.http_otlp_elapsed";
pub(crate) const METRIC_HTTP_OPENTELEMETRY_METRICS_ELAPSED: &str =
"servers.http_otlp_metrics_elapsed";
pub(crate) const METRIC_HTTP_OPENTELEMETRY_TRACES_ELAPSED: &str =
"servers.http_otlp_traces_elapsed";
pub(crate) const METRIC_TCP_OPENTSDB_LINE_WRITE_ELAPSED: &str =
"servers.opentsdb_line_write_elapsed";
pub(crate) const METRIC_HTTP_PROMQL_INSTANT_QUERY_ELAPSED: &str =

View File

@@ -169,7 +169,7 @@ mod tests {
.await
.unwrap();
let resp = client.read_line().await.unwrap();
assert_eq!(resp, Some("Internal error: expected".to_string()));
assert_eq!(resp, Some("Internal error: 1003".to_string()));
client.write_line("get".to_string()).await.unwrap();
let resp = client.read_line().await.unwrap();

View File

@@ -12,649 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::{RowInsertRequests, Value};
use common_grpc::writer::Precision;
use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest;
use opentelemetry_proto::tonic::common::v1::{any_value, KeyValue};
use opentelemetry_proto::tonic::metrics::v1::{metric, number_data_point, *};
use crate::error::Result;
use crate::row_writer::{self, MultiTableData, TableData};
pub mod metrics;
pub mod plugin;
pub mod trace;
const GREPTIME_TIMESTAMP: &str = "greptime_timestamp";
const GREPTIME_VALUE: &str = "greptime_value";
const GREPTIME_COUNT: &str = "greptime_count";
/// the default column count for table writer
const APPROXIMATE_COLUMN_COUNT: usize = 8;
/// Normalize otlp instrumentation, metric and attribute names
///
/// <https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/api.md#instrument-name-syntax>
/// - since the name are case-insensitive, we transform them to lowercase for
/// better sql usability
/// - replace `.` and `-` with `_`
fn normalize_otlp_name(name: &str) -> String {
name.to_lowercase().replace(|c| c == '.' || c == '-', "_")
}
/// Convert OpenTelemetry metrics to GreptimeDB insert requests
///
/// See
/// <https://github.com/open-telemetry/opentelemetry-proto/blob/main/opentelemetry/proto/metrics/v1/metrics.proto#L162>
/// for data structure of OTLP metrics.
///
/// Returns `InsertRequests` and total number of rows to ingest
pub fn to_grpc_insert_requests(
request: ExportMetricsServiceRequest,
) -> Result<(RowInsertRequests, usize)> {
let mut table_writer = MultiTableData::default();
for resource in &request.resource_metrics {
let resource_attrs = resource.resource.as_ref().map(|r| &r.attributes);
for scope in &resource.scope_metrics {
let scope_attrs = scope.scope.as_ref().map(|s| &s.attributes);
for metric in &scope.metrics {
encode_metrics(&mut table_writer, metric, resource_attrs, scope_attrs)?;
}
}
}
Ok(table_writer.into_row_insert_requests())
}
fn encode_metrics(
table_writer: &mut MultiTableData,
metric: &Metric,
resource_attrs: Option<&Vec<KeyValue>>,
scope_attrs: Option<&Vec<KeyValue>>,
) -> Result<()> {
let name = &metric.name;
// note that we don't store description or unit, we might want to deal with
// these fields in the future.
if let Some(data) = &metric.data {
match data {
metric::Data::Gauge(gauge) => {
encode_gauge(table_writer, name, gauge, resource_attrs, scope_attrs)?;
}
metric::Data::Sum(sum) => {
encode_sum(table_writer, name, sum, resource_attrs, scope_attrs)?;
}
metric::Data::Summary(summary) => {
encode_summary(table_writer, name, summary, resource_attrs, scope_attrs)?;
}
metric::Data::Histogram(hist) => {
encode_histogram(table_writer, name, hist, resource_attrs, scope_attrs)?;
}
// TODO(sunng87) leave ExponentialHistogram for next release
metric::Data::ExponentialHistogram(_hist) => {}
}
}
Ok(())
}
fn write_attributes(
writer: &mut TableData,
row: &mut Vec<Value>,
attrs: Option<&Vec<KeyValue>>,
) -> Result<()> {
if let Some(attrs) = attrs {
let table_tags = attrs.iter().filter_map(|attr| {
if let Some(val) = attr.value.as_ref().and_then(|v| v.value.as_ref()) {
let key = normalize_otlp_name(&attr.key);
match val {
any_value::Value::StringValue(s) => Some((key, s.to_string())),
any_value::Value::IntValue(v) => Some((key, v.to_string())),
any_value::Value::DoubleValue(v) => Some((key, v.to_string())),
_ => None, // TODO(sunng87): allow different type of values
}
} else {
None
}
});
row_writer::write_tags(writer, table_tags, row)?;
}
Ok(())
}
fn write_timestamp(table: &mut TableData, row: &mut Vec<Value>, time_nano: i64) -> Result<()> {
row_writer::write_ts_precision(
table,
GREPTIME_TIMESTAMP,
Some(time_nano),
Precision::Nanosecond,
row,
)
}
fn write_data_point_value(
table: &mut TableData,
row: &mut Vec<Value>,
field: &str,
value: &Option<number_data_point::Value>,
) -> Result<()> {
match value {
Some(number_data_point::Value::AsInt(val)) => {
// we coerce all values to f64
row_writer::write_f64(table, field, *val as f64, row)?;
}
Some(number_data_point::Value::AsDouble(val)) => {
row_writer::write_f64(table, field, *val, row)?;
}
_ => {}
}
Ok(())
}
fn write_tags_and_timestamp(
table: &mut TableData,
row: &mut Vec<Value>,
resource_attrs: Option<&Vec<KeyValue>>,
scope_attrs: Option<&Vec<KeyValue>>,
data_point_attrs: Option<&Vec<KeyValue>>,
timestamp_nanos: i64,
) -> Result<()> {
write_attributes(table, row, resource_attrs)?;
write_attributes(table, row, scope_attrs)?;
write_attributes(table, row, data_point_attrs)?;
write_timestamp(table, row, timestamp_nanos)?;
Ok(())
}
/// encode this gauge metric
///
/// note that there can be multiple data points in the request, it's going to be
/// stored as multiple rows
fn encode_gauge(
table_writer: &mut MultiTableData,
name: &str,
gauge: &Gauge,
resource_attrs: Option<&Vec<KeyValue>>,
scope_attrs: Option<&Vec<KeyValue>>,
) -> Result<()> {
let table = table_writer.get_or_default_table_data(
&normalize_otlp_name(name),
APPROXIMATE_COLUMN_COUNT,
gauge.data_points.len(),
);
for data_point in &gauge.data_points {
let mut row = table.alloc_one_row();
write_tags_and_timestamp(
table,
&mut row,
resource_attrs,
scope_attrs,
Some(data_point.attributes.as_ref()),
data_point.time_unix_nano as i64,
)?;
write_data_point_value(table, &mut row, GREPTIME_VALUE, &data_point.value)?;
table.add_row(row);
}
Ok(())
}
/// encode this sum metric
///
/// `aggregation_temporality` and `monotonic` are ignored for now
fn encode_sum(
table_writer: &mut MultiTableData,
name: &str,
sum: &Sum,
resource_attrs: Option<&Vec<KeyValue>>,
scope_attrs: Option<&Vec<KeyValue>>,
) -> Result<()> {
let table = table_writer.get_or_default_table_data(
&normalize_otlp_name(name),
APPROXIMATE_COLUMN_COUNT,
sum.data_points.len(),
);
for data_point in &sum.data_points {
let mut row = table.alloc_one_row();
write_tags_and_timestamp(
table,
&mut row,
resource_attrs,
scope_attrs,
Some(data_point.attributes.as_ref()),
data_point.time_unix_nano as i64,
)?;
write_data_point_value(table, &mut row, GREPTIME_VALUE, &data_point.value)?;
table.add_row(row);
}
Ok(())
}
const HISTOGRAM_LE_COLUMN: &str = "le";
/// Encode histogram data. This function returns 3 insert requests for 3 tables.
///
/// The implementation has been following Prometheus histogram table format:
///
/// - A `%metric%_bucket` table including `greptime_le` tag that stores bucket upper
/// limit, and `greptime_value` for bucket count
/// - A `%metric%_sum` table storing sum of samples
/// - A `%metric%_count` table storing count of samples.
///
/// By its Prometheus compatibility, we hope to be able to use prometheus
/// quantile functions on this table.
fn encode_histogram(
table_writer: &mut MultiTableData,
name: &str,
hist: &Histogram,
resource_attrs: Option<&Vec<KeyValue>>,
scope_attrs: Option<&Vec<KeyValue>>,
) -> Result<()> {
let normalized_name = normalize_otlp_name(name);
let bucket_table_name = format!("{}_bucket", normalized_name);
let sum_table_name = format!("{}_sum", normalized_name);
let count_table_name = format!("{}_count", normalized_name);
let data_points_len = hist.data_points.len();
// Note that the row and columns number here is approximate
let mut bucket_table = TableData::new(APPROXIMATE_COLUMN_COUNT, data_points_len * 3);
let mut sum_table = TableData::new(APPROXIMATE_COLUMN_COUNT, data_points_len);
let mut count_table = TableData::new(APPROXIMATE_COLUMN_COUNT, data_points_len);
for data_point in &hist.data_points {
let mut accumulated_count = 0;
for (idx, count) in data_point.bucket_counts.iter().enumerate() {
let mut bucket_row = bucket_table.alloc_one_row();
write_tags_and_timestamp(
&mut bucket_table,
&mut bucket_row,
resource_attrs,
scope_attrs,
Some(data_point.attributes.as_ref()),
data_point.time_unix_nano as i64,
)?;
if let Some(upper_bounds) = data_point.explicit_bounds.get(idx) {
row_writer::write_tag(
&mut bucket_table,
HISTOGRAM_LE_COLUMN,
upper_bounds,
&mut bucket_row,
)?;
} else if idx == data_point.explicit_bounds.len() {
// The last bucket
row_writer::write_tag(
&mut bucket_table,
HISTOGRAM_LE_COLUMN,
f64::INFINITY,
&mut bucket_row,
)?;
}
accumulated_count += count;
row_writer::write_f64(
&mut bucket_table,
GREPTIME_VALUE,
accumulated_count as f64,
&mut bucket_row,
)?;
bucket_table.add_row(bucket_row);
}
if let Some(sum) = data_point.sum {
let mut sum_row = sum_table.alloc_one_row();
write_tags_and_timestamp(
&mut sum_table,
&mut sum_row,
resource_attrs,
scope_attrs,
Some(data_point.attributes.as_ref()),
data_point.time_unix_nano as i64,
)?;
row_writer::write_f64(&mut sum_table, GREPTIME_VALUE, sum, &mut sum_row)?;
sum_table.add_row(sum_row);
}
let mut count_row = count_table.alloc_one_row();
write_tags_and_timestamp(
&mut count_table,
&mut count_row,
resource_attrs,
scope_attrs,
Some(data_point.attributes.as_ref()),
data_point.time_unix_nano as i64,
)?;
row_writer::write_f64(
&mut count_table,
GREPTIME_VALUE,
data_point.count as f64,
&mut count_row,
)?;
count_table.add_row(count_row);
}
table_writer.add_table_data(bucket_table_name, bucket_table);
table_writer.add_table_data(sum_table_name, sum_table);
table_writer.add_table_data(count_table_name, count_table);
Ok(())
}
#[allow(dead_code)]
fn encode_exponential_histogram(_name: &str, _hist: &ExponentialHistogram) -> Result<()> {
// TODO(sunng87): implement this using a prometheus compatible way
Ok(())
}
fn encode_summary(
table_writer: &mut MultiTableData,
name: &str,
summary: &Summary,
resource_attrs: Option<&Vec<KeyValue>>,
scope_attrs: Option<&Vec<KeyValue>>,
) -> Result<()> {
let table = table_writer.get_or_default_table_data(
&normalize_otlp_name(name),
APPROXIMATE_COLUMN_COUNT,
summary.data_points.len(),
);
for data_point in &summary.data_points {
let mut row = table.alloc_one_row();
write_tags_and_timestamp(
table,
&mut row,
resource_attrs,
scope_attrs,
Some(data_point.attributes.as_ref()),
data_point.time_unix_nano as i64,
)?;
for quantile in &data_point.quantile_values {
row_writer::write_f64(
table,
&format!("greptime_p{:02}", quantile.quantile * 100f64),
quantile.value,
&mut row,
)?;
}
row_writer::write_f64(table, GREPTIME_COUNT, data_point.count as f64, &mut row)?;
table.add_row(row);
}
Ok(())
}
#[cfg(test)]
mod tests {
use opentelemetry_proto::tonic::common::v1::any_value::Value as Val;
use opentelemetry_proto::tonic::common::v1::{AnyValue, KeyValue};
use opentelemetry_proto::tonic::metrics::v1::number_data_point::Value;
use opentelemetry_proto::tonic::metrics::v1::summary_data_point::ValueAtQuantile;
use opentelemetry_proto::tonic::metrics::v1::{HistogramDataPoint, NumberDataPoint};
use super::*;
#[test]
fn test_normalize_otlp_name() {
assert_eq!(normalize_otlp_name("jvm.memory.free"), "jvm_memory_free");
assert_eq!(normalize_otlp_name("jvm-memory-free"), "jvm_memory_free");
assert_eq!(normalize_otlp_name("jvm_memory_free"), "jvm_memory_free");
assert_eq!(normalize_otlp_name("JVM_MEMORY_FREE"), "jvm_memory_free");
assert_eq!(normalize_otlp_name("JVM_memory_FREE"), "jvm_memory_free");
}
fn keyvalue(key: &str, value: &str) -> KeyValue {
KeyValue {
key: key.into(),
value: Some(AnyValue {
value: Some(Val::StringValue(value.into())),
}),
}
}
#[test]
fn test_encode_gauge() {
let mut tables = MultiTableData::default();
let data_points = vec![
NumberDataPoint {
attributes: vec![keyvalue("host", "testsevrer")],
time_unix_nano: 100,
value: Some(Value::AsInt(100)),
..Default::default()
},
NumberDataPoint {
attributes: vec![keyvalue("host", "testserver")],
time_unix_nano: 105,
value: Some(Value::AsInt(105)),
..Default::default()
},
];
let gauge = Gauge { data_points };
encode_gauge(
&mut tables,
"datamon",
&gauge,
Some(&vec![keyvalue("resource", "app")]),
Some(&vec![keyvalue("scope", "otel")]),
)
.unwrap();
let table = tables.get_or_default_table_data("datamon", 0, 0);
assert_eq!(table.num_rows(), 2);
assert_eq!(table.num_columns(), 5);
assert_eq!(
table
.columns()
.iter()
.map(|c| &c.column_name)
.collect::<Vec<&String>>(),
vec![
"resource",
"scope",
"host",
"greptime_timestamp",
"greptime_value"
]
);
}
#[test]
fn test_encode_sum() {
let mut tables = MultiTableData::default();
let data_points = vec![
NumberDataPoint {
attributes: vec![keyvalue("host", "testserver")],
time_unix_nano: 100,
value: Some(Value::AsInt(100)),
..Default::default()
},
NumberDataPoint {
attributes: vec![keyvalue("host", "testserver")],
time_unix_nano: 105,
value: Some(Value::AsInt(0)),
..Default::default()
},
];
let sum = Sum {
data_points,
..Default::default()
};
encode_sum(
&mut tables,
"datamon",
&sum,
Some(&vec![keyvalue("resource", "app")]),
Some(&vec![keyvalue("scope", "otel")]),
)
.unwrap();
let table = tables.get_or_default_table_data("datamon", 0, 0);
assert_eq!(table.num_rows(), 2);
assert_eq!(table.num_columns(), 5);
assert_eq!(
table
.columns()
.iter()
.map(|c| &c.column_name)
.collect::<Vec<&String>>(),
vec![
"resource",
"scope",
"host",
"greptime_timestamp",
"greptime_value"
]
);
}
#[test]
fn test_encode_summary() {
let mut tables = MultiTableData::default();
let data_points = vec![SummaryDataPoint {
attributes: vec![keyvalue("host", "testserver")],
time_unix_nano: 100,
count: 25,
sum: 5400.0,
quantile_values: vec![
ValueAtQuantile {
quantile: 0.90,
value: 1000.0,
},
ValueAtQuantile {
quantile: 0.95,
value: 3030.0,
},
],
..Default::default()
}];
let summary = Summary { data_points };
encode_summary(
&mut tables,
"datamon",
&summary,
Some(&vec![keyvalue("resource", "app")]),
Some(&vec![keyvalue("scope", "otel")]),
)
.unwrap();
let table = tables.get_or_default_table_data("datamon", 0, 0);
assert_eq!(table.num_rows(), 1);
assert_eq!(table.num_columns(), 7);
assert_eq!(
table
.columns()
.iter()
.map(|c| &c.column_name)
.collect::<Vec<&String>>(),
vec![
"resource",
"scope",
"host",
"greptime_timestamp",
"greptime_p90",
"greptime_p95",
"greptime_count"
]
);
}
#[test]
fn test_encode_histogram() {
let mut tables = MultiTableData::default();
let data_points = vec![HistogramDataPoint {
attributes: vec![keyvalue("host", "testserver")],
time_unix_nano: 100,
start_time_unix_nano: 23,
count: 25,
sum: Some(100.),
max: Some(200.),
min: Some(0.03),
bucket_counts: vec![2, 4, 6, 9, 4],
explicit_bounds: vec![0.1, 1., 10., 100.],
..Default::default()
}];
let histogram = Histogram {
data_points,
aggregation_temporality: AggregationTemporality::Delta.into(),
};
encode_histogram(
&mut tables,
"histo",
&histogram,
Some(&vec![keyvalue("resource", "app")]),
Some(&vec![keyvalue("scope", "otel")]),
)
.unwrap();
assert_eq!(3, tables.num_tables());
// bucket table
let bucket_table = tables.get_or_default_table_data("histo_bucket", 0, 0);
assert_eq!(bucket_table.num_rows(), 5);
assert_eq!(bucket_table.num_columns(), 6);
assert_eq!(
bucket_table
.columns()
.iter()
.map(|c| &c.column_name)
.collect::<Vec<&String>>(),
vec![
"resource",
"scope",
"host",
"greptime_timestamp",
"le",
"greptime_value",
]
);
let sum_table = tables.get_or_default_table_data("histo_sum", 0, 0);
assert_eq!(sum_table.num_rows(), 1);
assert_eq!(sum_table.num_columns(), 5);
assert_eq!(
sum_table
.columns()
.iter()
.map(|c| &c.column_name)
.collect::<Vec<&String>>(),
vec![
"resource",
"scope",
"host",
"greptime_timestamp",
"greptime_value",
]
);
let count_table = tables.get_or_default_table_data("histo_count", 0, 0);
assert_eq!(count_table.num_rows(), 1);
assert_eq!(count_table.num_columns(), 5);
assert_eq!(
count_table
.columns()
.iter()
.map(|c| &c.column_name)
.collect::<Vec<&String>>(),
vec![
"resource",
"scope",
"host",
"greptime_timestamp",
"greptime_value",
]
);
}
}

View File

@@ -0,0 +1,658 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::{RowInsertRequests, Value};
use common_grpc::writer::Precision;
use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest;
use opentelemetry_proto::tonic::common::v1::{any_value, KeyValue};
use opentelemetry_proto::tonic::metrics::v1::{metric, number_data_point, *};
use super::{GREPTIME_COUNT, GREPTIME_TIMESTAMP, GREPTIME_VALUE};
use crate::error::Result;
use crate::row_writer::{self, MultiTableData, TableData};
/// the default column count for table writer
const APPROXIMATE_COLUMN_COUNT: usize = 8;
/// Normalize otlp instrumentation, metric and attribute names
///
/// <https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/api.md#instrument-name-syntax>
/// - since the name are case-insensitive, we transform them to lowercase for
/// better sql usability
/// - replace `.` and `-` with `_`
fn normalize_otlp_name(name: &str) -> String {
name.to_lowercase().replace(|c| c == '.' || c == '-', "_")
}
/// Convert OpenTelemetry metrics to GreptimeDB insert requests
///
/// See
/// <https://github.com/open-telemetry/opentelemetry-proto/blob/main/opentelemetry/proto/metrics/v1/metrics.proto>
/// for data structure of OTLP metrics.
///
/// Returns `InsertRequests` and total number of rows to ingest
pub fn to_grpc_insert_requests(
request: ExportMetricsServiceRequest,
) -> Result<(RowInsertRequests, usize)> {
let mut table_writer = MultiTableData::default();
for resource in &request.resource_metrics {
let resource_attrs = resource.resource.as_ref().map(|r| &r.attributes);
for scope in &resource.scope_metrics {
let scope_attrs = scope.scope.as_ref().map(|s| &s.attributes);
for metric in &scope.metrics {
encode_metrics(&mut table_writer, metric, resource_attrs, scope_attrs)?;
}
}
}
Ok(table_writer.into_row_insert_requests())
}
fn encode_metrics(
table_writer: &mut MultiTableData,
metric: &Metric,
resource_attrs: Option<&Vec<KeyValue>>,
scope_attrs: Option<&Vec<KeyValue>>,
) -> Result<()> {
let name = &metric.name;
// note that we don't store description or unit, we might want to deal with
// these fields in the future.
if let Some(data) = &metric.data {
match data {
metric::Data::Gauge(gauge) => {
encode_gauge(table_writer, name, gauge, resource_attrs, scope_attrs)?;
}
metric::Data::Sum(sum) => {
encode_sum(table_writer, name, sum, resource_attrs, scope_attrs)?;
}
metric::Data::Summary(summary) => {
encode_summary(table_writer, name, summary, resource_attrs, scope_attrs)?;
}
metric::Data::Histogram(hist) => {
encode_histogram(table_writer, name, hist, resource_attrs, scope_attrs)?;
}
// TODO(sunng87) leave ExponentialHistogram for next release
metric::Data::ExponentialHistogram(_hist) => {}
}
}
Ok(())
}
fn write_attributes(
writer: &mut TableData,
row: &mut Vec<Value>,
attrs: Option<&Vec<KeyValue>>,
) -> Result<()> {
if let Some(attrs) = attrs {
let table_tags = attrs.iter().filter_map(|attr| {
if let Some(val) = attr.value.as_ref().and_then(|v| v.value.as_ref()) {
let key = normalize_otlp_name(&attr.key);
match val {
any_value::Value::StringValue(s) => Some((key, s.to_string())),
any_value::Value::IntValue(v) => Some((key, v.to_string())),
any_value::Value::DoubleValue(v) => Some((key, v.to_string())),
_ => None, // TODO(sunng87): allow different type of values
}
} else {
None
}
});
row_writer::write_tags(writer, table_tags, row)?;
}
Ok(())
}
fn write_timestamp(table: &mut TableData, row: &mut Vec<Value>, time_nano: i64) -> Result<()> {
row_writer::write_ts_precision(
table,
GREPTIME_TIMESTAMP,
Some(time_nano),
Precision::Nanosecond,
row,
)
}
fn write_data_point_value(
table: &mut TableData,
row: &mut Vec<Value>,
field: &str,
value: &Option<number_data_point::Value>,
) -> Result<()> {
match value {
Some(number_data_point::Value::AsInt(val)) => {
// we coerce all values to f64
row_writer::write_f64(table, field, *val as f64, row)?;
}
Some(number_data_point::Value::AsDouble(val)) => {
row_writer::write_f64(table, field, *val, row)?;
}
_ => {}
}
Ok(())
}
fn write_tags_and_timestamp(
table: &mut TableData,
row: &mut Vec<Value>,
resource_attrs: Option<&Vec<KeyValue>>,
scope_attrs: Option<&Vec<KeyValue>>,
data_point_attrs: Option<&Vec<KeyValue>>,
timestamp_nanos: i64,
) -> Result<()> {
write_attributes(table, row, resource_attrs)?;
write_attributes(table, row, scope_attrs)?;
write_attributes(table, row, data_point_attrs)?;
write_timestamp(table, row, timestamp_nanos)?;
Ok(())
}
/// encode this gauge metric
///
/// note that there can be multiple data points in the request, it's going to be
/// stored as multiple rows
fn encode_gauge(
table_writer: &mut MultiTableData,
name: &str,
gauge: &Gauge,
resource_attrs: Option<&Vec<KeyValue>>,
scope_attrs: Option<&Vec<KeyValue>>,
) -> Result<()> {
let table = table_writer.get_or_default_table_data(
&normalize_otlp_name(name),
APPROXIMATE_COLUMN_COUNT,
gauge.data_points.len(),
);
for data_point in &gauge.data_points {
let mut row = table.alloc_one_row();
write_tags_and_timestamp(
table,
&mut row,
resource_attrs,
scope_attrs,
Some(data_point.attributes.as_ref()),
data_point.time_unix_nano as i64,
)?;
write_data_point_value(table, &mut row, GREPTIME_VALUE, &data_point.value)?;
table.add_row(row);
}
Ok(())
}
/// encode this sum metric
///
/// `aggregation_temporality` and `monotonic` are ignored for now
fn encode_sum(
table_writer: &mut MultiTableData,
name: &str,
sum: &Sum,
resource_attrs: Option<&Vec<KeyValue>>,
scope_attrs: Option<&Vec<KeyValue>>,
) -> Result<()> {
let table = table_writer.get_or_default_table_data(
&normalize_otlp_name(name),
APPROXIMATE_COLUMN_COUNT,
sum.data_points.len(),
);
for data_point in &sum.data_points {
let mut row = table.alloc_one_row();
write_tags_and_timestamp(
table,
&mut row,
resource_attrs,
scope_attrs,
Some(data_point.attributes.as_ref()),
data_point.time_unix_nano as i64,
)?;
write_data_point_value(table, &mut row, GREPTIME_VALUE, &data_point.value)?;
table.add_row(row);
}
Ok(())
}
const HISTOGRAM_LE_COLUMN: &str = "le";
/// Encode histogram data. This function returns 3 insert requests for 3 tables.
///
/// The implementation has been following Prometheus histogram table format:
///
/// - A `%metric%_bucket` table including `greptime_le` tag that stores bucket upper
/// limit, and `greptime_value` for bucket count
/// - A `%metric%_sum` table storing sum of samples
/// - A `%metric%_count` table storing count of samples.
///
/// By its Prometheus compatibility, we hope to be able to use prometheus
/// quantile functions on this table.
fn encode_histogram(
table_writer: &mut MultiTableData,
name: &str,
hist: &Histogram,
resource_attrs: Option<&Vec<KeyValue>>,
scope_attrs: Option<&Vec<KeyValue>>,
) -> Result<()> {
let normalized_name = normalize_otlp_name(name);
let bucket_table_name = format!("{}_bucket", normalized_name);
let sum_table_name = format!("{}_sum", normalized_name);
let count_table_name = format!("{}_count", normalized_name);
let data_points_len = hist.data_points.len();
// Note that the row and columns number here is approximate
let mut bucket_table = TableData::new(APPROXIMATE_COLUMN_COUNT, data_points_len * 3);
let mut sum_table = TableData::new(APPROXIMATE_COLUMN_COUNT, data_points_len);
let mut count_table = TableData::new(APPROXIMATE_COLUMN_COUNT, data_points_len);
for data_point in &hist.data_points {
let mut accumulated_count = 0;
for (idx, count) in data_point.bucket_counts.iter().enumerate() {
let mut bucket_row = bucket_table.alloc_one_row();
write_tags_and_timestamp(
&mut bucket_table,
&mut bucket_row,
resource_attrs,
scope_attrs,
Some(data_point.attributes.as_ref()),
data_point.time_unix_nano as i64,
)?;
if let Some(upper_bounds) = data_point.explicit_bounds.get(idx) {
row_writer::write_tag(
&mut bucket_table,
HISTOGRAM_LE_COLUMN,
upper_bounds,
&mut bucket_row,
)?;
} else if idx == data_point.explicit_bounds.len() {
// The last bucket
row_writer::write_tag(
&mut bucket_table,
HISTOGRAM_LE_COLUMN,
f64::INFINITY,
&mut bucket_row,
)?;
}
accumulated_count += count;
row_writer::write_f64(
&mut bucket_table,
GREPTIME_VALUE,
accumulated_count as f64,
&mut bucket_row,
)?;
bucket_table.add_row(bucket_row);
}
if let Some(sum) = data_point.sum {
let mut sum_row = sum_table.alloc_one_row();
write_tags_and_timestamp(
&mut sum_table,
&mut sum_row,
resource_attrs,
scope_attrs,
Some(data_point.attributes.as_ref()),
data_point.time_unix_nano as i64,
)?;
row_writer::write_f64(&mut sum_table, GREPTIME_VALUE, sum, &mut sum_row)?;
sum_table.add_row(sum_row);
}
let mut count_row = count_table.alloc_one_row();
write_tags_and_timestamp(
&mut count_table,
&mut count_row,
resource_attrs,
scope_attrs,
Some(data_point.attributes.as_ref()),
data_point.time_unix_nano as i64,
)?;
row_writer::write_f64(
&mut count_table,
GREPTIME_VALUE,
data_point.count as f64,
&mut count_row,
)?;
count_table.add_row(count_row);
}
table_writer.add_table_data(bucket_table_name, bucket_table);
table_writer.add_table_data(sum_table_name, sum_table);
table_writer.add_table_data(count_table_name, count_table);
Ok(())
}
#[allow(dead_code)]
fn encode_exponential_histogram(_name: &str, _hist: &ExponentialHistogram) -> Result<()> {
// TODO(sunng87): implement this using a prometheus compatible way
Ok(())
}
fn encode_summary(
table_writer: &mut MultiTableData,
name: &str,
summary: &Summary,
resource_attrs: Option<&Vec<KeyValue>>,
scope_attrs: Option<&Vec<KeyValue>>,
) -> Result<()> {
let table = table_writer.get_or_default_table_data(
&normalize_otlp_name(name),
APPROXIMATE_COLUMN_COUNT,
summary.data_points.len(),
);
for data_point in &summary.data_points {
let mut row = table.alloc_one_row();
write_tags_and_timestamp(
table,
&mut row,
resource_attrs,
scope_attrs,
Some(data_point.attributes.as_ref()),
data_point.time_unix_nano as i64,
)?;
for quantile in &data_point.quantile_values {
row_writer::write_f64(
table,
&format!("greptime_p{:02}", quantile.quantile * 100f64),
quantile.value,
&mut row,
)?;
}
row_writer::write_f64(table, GREPTIME_COUNT, data_point.count as f64, &mut row)?;
table.add_row(row);
}
Ok(())
}
#[cfg(test)]
mod tests {
use opentelemetry_proto::tonic::common::v1::any_value::Value as Val;
use opentelemetry_proto::tonic::common::v1::{AnyValue, KeyValue};
use opentelemetry_proto::tonic::metrics::v1::number_data_point::Value;
use opentelemetry_proto::tonic::metrics::v1::summary_data_point::ValueAtQuantile;
use opentelemetry_proto::tonic::metrics::v1::{HistogramDataPoint, NumberDataPoint};
use super::*;
#[test]
fn test_normalize_otlp_name() {
assert_eq!(normalize_otlp_name("jvm.memory.free"), "jvm_memory_free");
assert_eq!(normalize_otlp_name("jvm-memory-free"), "jvm_memory_free");
assert_eq!(normalize_otlp_name("jvm_memory_free"), "jvm_memory_free");
assert_eq!(normalize_otlp_name("JVM_MEMORY_FREE"), "jvm_memory_free");
assert_eq!(normalize_otlp_name("JVM_memory_FREE"), "jvm_memory_free");
}
fn keyvalue(key: &str, value: &str) -> KeyValue {
KeyValue {
key: key.into(),
value: Some(AnyValue {
value: Some(Val::StringValue(value.into())),
}),
}
}
#[test]
fn test_encode_gauge() {
let mut tables = MultiTableData::default();
let data_points = vec![
NumberDataPoint {
attributes: vec![keyvalue("host", "testsevrer")],
time_unix_nano: 100,
value: Some(Value::AsInt(100)),
..Default::default()
},
NumberDataPoint {
attributes: vec![keyvalue("host", "testserver")],
time_unix_nano: 105,
value: Some(Value::AsInt(105)),
..Default::default()
},
];
let gauge = Gauge { data_points };
encode_gauge(
&mut tables,
"datamon",
&gauge,
Some(&vec![keyvalue("resource", "app")]),
Some(&vec![keyvalue("scope", "otel")]),
)
.unwrap();
let table = tables.get_or_default_table_data("datamon", 0, 0);
assert_eq!(table.num_rows(), 2);
assert_eq!(table.num_columns(), 5);
assert_eq!(
table
.columns()
.iter()
.map(|c| &c.column_name)
.collect::<Vec<&String>>(),
vec![
"resource",
"scope",
"host",
"greptime_timestamp",
"greptime_value"
]
);
}
#[test]
fn test_encode_sum() {
let mut tables = MultiTableData::default();
let data_points = vec![
NumberDataPoint {
attributes: vec![keyvalue("host", "testserver")],
time_unix_nano: 100,
value: Some(Value::AsInt(100)),
..Default::default()
},
NumberDataPoint {
attributes: vec![keyvalue("host", "testserver")],
time_unix_nano: 105,
value: Some(Value::AsInt(0)),
..Default::default()
},
];
let sum = Sum {
data_points,
..Default::default()
};
encode_sum(
&mut tables,
"datamon",
&sum,
Some(&vec![keyvalue("resource", "app")]),
Some(&vec![keyvalue("scope", "otel")]),
)
.unwrap();
let table = tables.get_or_default_table_data("datamon", 0, 0);
assert_eq!(table.num_rows(), 2);
assert_eq!(table.num_columns(), 5);
assert_eq!(
table
.columns()
.iter()
.map(|c| &c.column_name)
.collect::<Vec<&String>>(),
vec![
"resource",
"scope",
"host",
"greptime_timestamp",
"greptime_value"
]
);
}
#[test]
fn test_encode_summary() {
let mut tables = MultiTableData::default();
let data_points = vec![SummaryDataPoint {
attributes: vec![keyvalue("host", "testserver")],
time_unix_nano: 100,
count: 25,
sum: 5400.0,
quantile_values: vec![
ValueAtQuantile {
quantile: 0.90,
value: 1000.0,
},
ValueAtQuantile {
quantile: 0.95,
value: 3030.0,
},
],
..Default::default()
}];
let summary = Summary { data_points };
encode_summary(
&mut tables,
"datamon",
&summary,
Some(&vec![keyvalue("resource", "app")]),
Some(&vec![keyvalue("scope", "otel")]),
)
.unwrap();
let table = tables.get_or_default_table_data("datamon", 0, 0);
assert_eq!(table.num_rows(), 1);
assert_eq!(table.num_columns(), 7);
assert_eq!(
table
.columns()
.iter()
.map(|c| &c.column_name)
.collect::<Vec<&String>>(),
vec![
"resource",
"scope",
"host",
"greptime_timestamp",
"greptime_p90",
"greptime_p95",
"greptime_count"
]
);
}
#[test]
fn test_encode_histogram() {
let mut tables = MultiTableData::default();
let data_points = vec![HistogramDataPoint {
attributes: vec![keyvalue("host", "testserver")],
time_unix_nano: 100,
start_time_unix_nano: 23,
count: 25,
sum: Some(100.),
max: Some(200.),
min: Some(0.03),
bucket_counts: vec![2, 4, 6, 9, 4],
explicit_bounds: vec![0.1, 1., 10., 100.],
..Default::default()
}];
let histogram = Histogram {
data_points,
aggregation_temporality: AggregationTemporality::Delta.into(),
};
encode_histogram(
&mut tables,
"histo",
&histogram,
Some(&vec![keyvalue("resource", "app")]),
Some(&vec![keyvalue("scope", "otel")]),
)
.unwrap();
assert_eq!(3, tables.num_tables());
// bucket table
let bucket_table = tables.get_or_default_table_data("histo_bucket", 0, 0);
assert_eq!(bucket_table.num_rows(), 5);
assert_eq!(bucket_table.num_columns(), 6);
assert_eq!(
bucket_table
.columns()
.iter()
.map(|c| &c.column_name)
.collect::<Vec<&String>>(),
vec![
"resource",
"scope",
"host",
"greptime_timestamp",
"le",
"greptime_value",
]
);
let sum_table = tables.get_or_default_table_data("histo_sum", 0, 0);
assert_eq!(sum_table.num_rows(), 1);
assert_eq!(sum_table.num_columns(), 5);
assert_eq!(
sum_table
.columns()
.iter()
.map(|c| &c.column_name)
.collect::<Vec<&String>>(),
vec![
"resource",
"scope",
"host",
"greptime_timestamp",
"greptime_value",
]
);
let count_table = tables.get_or_default_table_data("histo_count", 0, 0);
assert_eq!(count_table.num_rows(), 1);
assert_eq!(count_table.num_columns(), 5);
assert_eq!(
count_table
.columns()
.iter()
.map(|c| &c.column_name)
.collect::<Vec<&String>>(),
vec![
"resource",
"scope",
"host",
"greptime_timestamp",
"greptime_value",
]
);
}
}

View File

@@ -0,0 +1,28 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
use super::trace::TraceSpans;
/// Transformer helps to transform ExportTraceServiceRequest based on logic, like:
/// - uplift some fields from Attributes (Map type) to column
pub trait TraceParser: Send + Sync {
fn parse(&self, request: ExportTraceServiceRequest) -> TraceSpans;
fn table_name(&self) -> String;
}
pub type TraceParserRef = Arc<dyn TraceParser>;

View File

@@ -0,0 +1,411 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use api::v1::value::ValueData;
use api::v1::{ColumnDataType, RowInsertRequests};
use common_grpc::writer::Precision;
use common_time::time::Time;
use itertools::Itertools;
use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
use opentelemetry_proto::tonic::common::v1::any_value::Value as OtlpValue;
use opentelemetry_proto::tonic::common::v1::{
AnyValue, ArrayValue, InstrumentationScope, KeyValue, KeyValueList,
};
use opentelemetry_proto::tonic::trace::v1::span::{Event, Link};
use opentelemetry_proto::tonic::trace::v1::{Span, Status};
use serde_json::json;
use super::{GREPTIME_TIMESTAMP, GREPTIME_VALUE};
use crate::error::Result;
use crate::row_writer::{self, MultiTableData, TableData};
const APPROXIMATE_COLUMN_COUNT: usize = 24;
pub const TRACE_TABLE_NAME: &str = "traces_preview_v01";
#[derive(Debug, Clone)]
#[allow(dead_code)]
pub struct TraceSpan {
// the following are tags
pub trace_id: String,
pub span_id: String,
pub parent_span_id: String,
// the following are fields
pub resource_attributes: String, // TODO(yuanbohan): Map in the future
pub scope_name: String,
pub scope_version: String,
pub scope_attributes: String, // TODO(yuanbohan): Map in the future
pub trace_state: String,
pub span_name: String,
pub span_kind: String,
pub span_status_code: String,
pub span_status_message: String,
pub span_attributes: String, // TODO(yuanbohan): Map in the future
pub span_events: String, // TODO(yuanbohan): List in the future
pub span_links: String, // TODO(yuanbohan): List in the future
pub start_in_nanosecond: u64, // this is also the Timestamp Index
pub end_in_nanosecond: u64,
pub uplifted_fields: Vec<(String, ColumnDataType, ValueData)>,
}
pub type TraceSpans = Vec<TraceSpan>;
/// Convert SpanTraces to GreptimeDB row insert requests.
/// Returns `InsertRequests` and total number of rows to ingest
pub fn to_grpc_insert_requests(
table_name: String,
spans: TraceSpans,
) -> Result<(RowInsertRequests, usize)> {
let mut multi_table_writer = MultiTableData::default();
let one_table_writer = multi_table_writer.get_or_default_table_data(
table_name,
APPROXIMATE_COLUMN_COUNT,
spans.len(),
);
for span in spans {
write_span_to_row(one_table_writer, span)?;
}
Ok(multi_table_writer.into_row_insert_requests())
}
pub fn write_span_to_row(writer: &mut TableData, span: TraceSpan) -> Result<()> {
let mut row = writer.alloc_one_row();
{
// tags
let iter = vec![
("trace_id", span.trace_id),
("span_id", span.span_id),
("parent_span_id", span.parent_span_id),
]
.into_iter()
.map(|(col, val)| (col.to_string(), val));
row_writer::write_tags(writer, iter, &mut row)?;
}
{
// fields
let str_fields_iter = vec![
("resource_attributes", span.resource_attributes),
("scope_name", span.scope_name),
("scope_version", span.scope_version),
("scope_attributes", span.scope_attributes),
("trace_state", span.trace_state),
("span_name", span.span_name),
("span_kind", span.span_kind),
("span_status_code", span.span_status_code),
("span_status_message", span.span_status_message),
("span_attributes", span.span_attributes),
("span_events", span.span_events),
("span_links", span.span_links),
]
.into_iter()
.map(|(col, val)| {
(
col.into(),
ColumnDataType::String,
ValueData::StringValue(val),
)
});
let time_fields_iter = vec![
("start", span.start_in_nanosecond),
("end", span.end_in_nanosecond),
]
.into_iter()
.map(|(col, val)| {
(
col.into(),
ColumnDataType::TimestampNanosecond,
ValueData::TimestampNanosecondValue(val as i64),
)
});
row_writer::write_fields(writer, str_fields_iter, &mut row)?;
row_writer::write_fields(writer, time_fields_iter, &mut row)?;
row_writer::write_fields(writer, span.uplifted_fields.into_iter(), &mut row)?;
}
row_writer::write_f64(
writer,
GREPTIME_VALUE,
(span.end_in_nanosecond - span.start_in_nanosecond) as f64 / 1_000_000.0, // duration in millisecond
&mut row,
)?;
row_writer::write_ts_precision(
writer,
GREPTIME_TIMESTAMP,
Some(span.start_in_nanosecond as i64),
Precision::Nanosecond,
&mut row,
)?;
writer.add_row(row);
Ok(())
}
pub fn parse_span(
resource_attrs: &[KeyValue],
scope: &InstrumentationScope,
span: Span,
) -> TraceSpan {
let (span_status_code, span_status_message) = status_to_string(&span.status);
let span_kind = span.kind().as_str_name().into();
TraceSpan {
trace_id: bytes_to_hex_string(&span.trace_id),
span_id: bytes_to_hex_string(&span.span_id),
parent_span_id: bytes_to_hex_string(&span.parent_span_id),
resource_attributes: vec_kv_to_string(resource_attrs),
trace_state: span.trace_state,
scope_name: scope.name.clone(),
scope_version: scope.version.clone(),
scope_attributes: vec_kv_to_string(&scope.attributes),
span_name: span.name,
span_kind,
span_status_code,
span_status_message,
span_attributes: vec_kv_to_string(&span.attributes),
span_events: events_to_string(&span.events),
span_links: links_to_string(&span.links),
start_in_nanosecond: span.start_time_unix_nano,
end_in_nanosecond: span.end_time_unix_nano,
uplifted_fields: vec![],
}
}
/// Convert OpenTelemetry traces to SpanTraces
///
/// See
/// <https://github.com/open-telemetry/opentelemetry-proto/blob/main/opentelemetry/proto/trace/v1/trace.proto>
/// for data structure of OTLP traces.
pub fn parse(request: ExportTraceServiceRequest) -> TraceSpans {
let mut spans = vec![];
for resource_spans in request.resource_spans {
let resource_attrs = resource_spans
.resource
.map(|r| r.attributes)
.unwrap_or_default();
for scope_spans in resource_spans.scope_spans {
let scope = scope_spans.scope.unwrap_or_default();
for span in scope_spans.spans {
spans.push(parse_span(&resource_attrs, &scope, span));
}
}
}
spans
}
pub fn bytes_to_hex_string(bs: &[u8]) -> String {
bs.iter().map(|b| format!("{:02x}", b)).join("")
}
pub fn arr_vals_to_string(arr: &ArrayValue) -> String {
let vs: Vec<String> = arr
.values
.iter()
.filter_map(|val| any_value_to_string(val.clone()))
.collect();
serde_json::to_string(&vs).unwrap_or_else(|_| "[]".into())
}
pub fn vec_kv_to_string(vec: &[KeyValue]) -> String {
let vs: HashMap<String, String> = vec
.iter()
.map(|kv| {
let val = kv
.value
.clone()
.and_then(any_value_to_string)
.unwrap_or_default();
(kv.key.clone(), val)
})
.collect();
serde_json::to_string(&vs).unwrap_or_else(|_| "{}".into())
}
pub fn kvlist_to_string(kvlist: &KeyValueList) -> String {
vec_kv_to_string(&kvlist.values)
}
pub fn any_value_to_string(val: AnyValue) -> Option<String> {
val.value.map(|value| match value {
OtlpValue::StringValue(s) => s,
OtlpValue::BoolValue(b) => b.to_string(),
OtlpValue::IntValue(i) => i.to_string(),
OtlpValue::DoubleValue(d) => d.to_string(),
OtlpValue::ArrayValue(arr) => arr_vals_to_string(&arr),
OtlpValue::KvlistValue(kv) => kvlist_to_string(&kv),
OtlpValue::BytesValue(bs) => bytes_to_hex_string(&bs),
})
}
pub fn event_to_string(event: &Event) -> String {
json!({
"name": event.name,
"time": Time::new_nanosecond(event.time_unix_nano as i64).to_iso8601_string(),
"attrs": vec_kv_to_string(&event.attributes),
})
.to_string()
}
pub fn events_to_string(events: &[Event]) -> String {
let v: Vec<String> = events.iter().map(event_to_string).collect();
serde_json::to_string(&v).unwrap_or_else(|_| "[]".into())
}
pub fn link_to_string(link: &Link) -> String {
json!({
"trace_id": link.trace_id,
"span_id": link.span_id,
"trace_state": link.trace_state,
"attributes": vec_kv_to_string(&link.attributes),
})
.to_string()
}
pub fn links_to_string(links: &[Link]) -> String {
let v: Vec<String> = links.iter().map(link_to_string).collect();
serde_json::to_string(&v).unwrap_or_else(|_| "[]".into())
}
pub fn status_to_string(status: &Option<Status>) -> (String, String) {
match status {
Some(status) => (status.code().as_str_name().into(), status.message.clone()),
None => ("".into(), "".into()),
}
}
#[cfg(test)]
mod tests {
use common_time::time::Time;
use opentelemetry_proto::tonic::common::v1::{
any_value, AnyValue, ArrayValue, KeyValue, KeyValueList,
};
use opentelemetry_proto::tonic::trace::v1::span::Event;
use opentelemetry_proto::tonic::trace::v1::Status;
use serde_json::json;
use crate::otlp::trace::{
arr_vals_to_string, bytes_to_hex_string, event_to_string, kvlist_to_string,
status_to_string, vec_kv_to_string,
};
#[test]
fn test_bytes_to_hex_string() {
assert_eq!(
"24fe79948641b110a29bc27859307e8d",
bytes_to_hex_string(&[
36, 254, 121, 148, 134, 65, 177, 16, 162, 155, 194, 120, 89, 48, 126, 141,
])
);
assert_eq!(
"baffeedd7b8debc0",
bytes_to_hex_string(&[186, 255, 238, 221, 123, 141, 235, 192,])
);
}
#[test]
fn test_arr_vals_to_string() {
assert_eq!("[]", arr_vals_to_string(&ArrayValue { values: vec![] }));
let arr = ArrayValue {
values: vec![
AnyValue {
value: Some(any_value::Value::StringValue("string_value".into())),
},
AnyValue {
value: Some(any_value::Value::BoolValue(true)),
},
AnyValue {
value: Some(any_value::Value::IntValue(1)),
},
AnyValue {
value: Some(any_value::Value::DoubleValue(1.2)),
},
],
};
let expect = json!(["string_value", "true", "1", "1.2"]).to_string();
assert_eq!(expect, arr_vals_to_string(&arr));
}
#[test]
fn test_kv_list_to_string() {
let kvlist = KeyValueList {
values: vec![KeyValue {
key: "str_key".into(),
value: Some(AnyValue {
value: Some(any_value::Value::StringValue("val1".into())),
}),
}],
};
let expect = json!({
"str_key": "val1",
})
.to_string();
assert_eq!(expect, kvlist_to_string(&kvlist))
}
#[test]
fn test_event_to_string() {
let attributes = vec![KeyValue {
key: "str_key".into(),
value: Some(AnyValue {
value: Some(any_value::Value::StringValue("val1".into())),
}),
}];
let event = Event {
time_unix_nano: 1697620662450128000_u64,
name: "event_name".into(),
attributes,
dropped_attributes_count: 0,
};
let event_string = event_to_string(&event);
let expect = json!({
"name": event.name,
"time": Time::new_nanosecond(event.time_unix_nano as i64).to_iso8601_string(),
"attrs": vec_kv_to_string(&event.attributes),
});
assert_eq!(
expect,
serde_json::from_str::<serde_json::value::Value>(event_string.as_str()).unwrap()
);
}
#[test]
fn test_status_to_string() {
let message = String::from("status message");
let status = Status {
code: 1,
message: message.clone(),
};
assert_eq!(
("STATUS_CODE_OK".into(), message),
status_to_string(&Some(status)),
);
}
}

View File

@@ -34,6 +34,9 @@ use common_query::Output;
use opentelemetry_proto::tonic::collector::metrics::v1::{
ExportMetricsServiceRequest, ExportMetricsServiceResponse,
};
use opentelemetry_proto::tonic::collector::trace::v1::{
ExportTraceServiceRequest, ExportTraceServiceResponse,
};
use session::context::QueryContextRef;
use crate::error::Result;
@@ -101,4 +104,11 @@ pub trait OpenTelemetryProtocolHandler {
request: ExportMetricsServiceRequest,
ctx: QueryContextRef,
) -> Result<ExportMetricsServiceResponse>;
/// Handling opentelemetry traces request
async fn traces(
&self,
request: ExportTraceServiceRequest,
ctx: QueryContextRef,
) -> Result<ExportTraceServiceResponse>;
}

View File

@@ -164,10 +164,7 @@ async fn test_opentsdb_put() {
.send()
.await;
assert_eq!(result.status(), 500);
assert_eq!(
result.text().await,
"{\"error\":\"Internal error: Internal error: expected\"}"
);
assert_eq!(result.text().await, "{\"error\":\"Internal error: 1003\"}");
let mut metrics = vec![];
while let Ok(s) = rx.try_recv() {
@@ -206,7 +203,7 @@ async fn test_opentsdb_debug_put() {
.send()
.await;
assert_eq!(result.status(), 200);
assert_eq!(result.text().await, "{\"success\":0,\"failed\":1,\"errors\":[{\"datapoint\":{\"metric\":\"should_failed\",\"timestamp\":1000,\"value\":1.0,\"tags\":{\"host\":\"web01\"}},\"error\":\"Internal error: expected\"}]}");
assert_eq!(result.text().await, "{\"success\":0,\"failed\":1,\"errors\":[{\"datapoint\":{\"metric\":\"should_failed\",\"timestamp\":1000,\"value\":1.0,\"tags\":{\"host\":\"web01\"}},\"error\":\"Internal error: 1003\"}]}");
// multiple data point summary debug put
let result = client
@@ -231,7 +228,7 @@ async fn test_opentsdb_debug_put() {
.send()
.await;
assert_eq!(result.status(), 200);
assert_eq!(result.text().await, "{\"success\":1,\"failed\":1,\"errors\":[{\"datapoint\":{\"metric\":\"should_failed\",\"timestamp\":1000,\"value\":1.0,\"tags\":{\"host\":\"web01\"}},\"error\":\"Internal error: expected\"}]}");
assert_eq!(result.text().await, "{\"success\":1,\"failed\":1,\"errors\":[{\"datapoint\":{\"metric\":\"should_failed\",\"timestamp\":1000,\"value\":1.0,\"tags\":{\"host\":\"web01\"}},\"error\":\"Internal error: 1003\"}]}");
let mut metrics = vec![];
while let Ok(s) = rx.try_recv() {

View File

@@ -245,11 +245,7 @@ impl ChunkReaderBuilder {
reader_builder = reader_builder.push_batch_iter(iter);
}
let predicate = Predicate::try_new(
self.filters.clone(),
self.schema.store_schema().schema().clone(),
)
.context(error::BuildPredicateSnafu)?;
let predicate = Predicate::new(self.filters.clone());
let read_opts = ReadOptions {
batch_size: self.iter_ctx.batch_size,

View File

@@ -277,7 +277,10 @@ impl ParquetReader {
let pruned_row_groups = self
.predicate
.prune_row_groups(builder.metadata().row_groups())
.prune_row_groups(
builder.metadata().row_groups(),
store_schema.schema().clone(),
)
.into_iter()
.enumerate()
.filter_map(|(idx, valid)| if valid { Some(idx) } else { None })
@@ -549,12 +552,11 @@ mod tests {
let operator = create_object_store(dir.path().to_str().unwrap());
let projected_schema = Arc::new(ProjectedSchema::new(schema, Some(vec![1])).unwrap());
let user_schema = projected_schema.projected_user_schema().clone();
let reader = ParquetReader::new(
sst_file_handle,
operator,
projected_schema,
Predicate::empty(user_schema),
Predicate::empty(),
TimestampRange::min_to_max(),
);
@@ -636,12 +638,11 @@ mod tests {
let operator = create_object_store(dir.path().to_str().unwrap());
let projected_schema = Arc::new(ProjectedSchema::new(schema, Some(vec![1])).unwrap());
let user_schema = projected_schema.projected_user_schema().clone();
let reader = ParquetReader::new(
file_handle,
operator,
projected_schema,
Predicate::empty(user_schema),
Predicate::empty(),
TimestampRange::min_to_max(),
);
@@ -665,14 +666,8 @@ mod tests {
range: TimestampRange,
expect: Vec<i64>,
) {
let store_schema = schema.schema_to_read().clone();
let reader = ParquetReader::new(
file_handle,
object_store,
schema,
Predicate::empty(store_schema.schema().clone()),
range,
);
let reader =
ParquetReader::new(file_handle, object_store, schema, Predicate::empty(), range);
let mut stream = reader.chunk_stream().await.unwrap();
let result = stream.next_batch().await;

View File

@@ -29,9 +29,11 @@ use datatypes::prelude::ConcreteDataType;
use parquet::arrow::arrow_reader::{ArrowPredicate, RowFilter};
use parquet::arrow::ProjectionMask;
use parquet::schema::types::SchemaDescriptor;
use snafu::ResultExt;
use table::predicate::Predicate;
use crate::error;
use crate::error::BuildPredicateSnafu;
use crate::schema::StoreSchema;
/// Builds row filters according to predicates.
@@ -80,7 +82,11 @@ pub(crate) fn build_row_filter(
Box::new(PlainTimestampRowFilter::new(time_range, ts_col_projection)) as _
};
let mut predicates = vec![time_range_row_filter];
if let Ok(datafusion_filters) = predicate_to_row_filter(predicate, projection_mask) {
if let Ok(datafusion_filters) = predicate_to_row_filter(
predicate,
projection_mask,
store_schema.schema().arrow_schema(),
) {
predicates.extend(datafusion_filters);
}
let filter = RowFilter::new(predicates);
@@ -90,9 +96,13 @@ pub(crate) fn build_row_filter(
fn predicate_to_row_filter(
predicate: &Predicate,
projection_mask: ProjectionMask,
schema: &arrow::datatypes::SchemaRef,
) -> error::Result<Vec<Box<dyn ArrowPredicate>>> {
let mut datafusion_predicates = Vec::with_capacity(predicate.exprs().len());
for expr in predicate.exprs() {
let physical_exprs = predicate
.to_physical_exprs(schema)
.context(BuildPredicateSnafu)?;
let mut datafusion_predicates = Vec::with_capacity(physical_exprs.len());
for expr in &physical_exprs {
datafusion_predicates.push(Box::new(DatafusionArrowPredicate {
projection_mask: projection_mask.clone(),
physical_expr: expr.clone(),

View File

@@ -27,6 +27,7 @@ use datafusion_expr::expr::InList;
use datafusion_expr::{Between, BinaryExpr, ColumnarValue, Operator};
use datafusion_physical_expr::execution_props::ExecutionProps;
use datafusion_physical_expr::{create_physical_expr, PhysicalExpr};
use datatypes::arrow;
use datatypes::arrow::array::BooleanArray;
use datatypes::schema::SchemaRef;
use datatypes::value::scalar_value_to_timestamp;
@@ -39,19 +40,24 @@ mod stats;
#[derive(Clone)]
pub struct Predicate {
/// The schema of the table that the expressions being applied.
schema: SchemaRef,
/// Physical expressions of this predicate.
exprs: Vec<Arc<dyn PhysicalExpr>>,
/// logical exprs
exprs: Vec<Expr>,
}
impl Predicate {
/// Creates a new `Predicate` by converting logical exprs to physical exprs that can be
/// evaluated against record batches.
/// Returns error when failed to convert exprs.
pub fn try_new(exprs: Vec<Expr>, schema: SchemaRef) -> error::Result<Self> {
let arrow_schema = schema.arrow_schema();
let df_schema = arrow_schema
pub fn new(exprs: Vec<Expr>) -> Self {
Self { exprs }
}
/// Builds physical exprs according to provided schema.
pub fn to_physical_exprs(
&self,
schema: &arrow::datatypes::SchemaRef,
) -> error::Result<Vec<Arc<dyn PhysicalExpr>>> {
let df_schema = schema
.clone()
.to_dfschema_ref()
.context(error::DatafusionSnafu)?;
@@ -61,47 +67,38 @@ impl Predicate {
// registering variables.
let execution_props = &ExecutionProps::new();
let physical_exprs = exprs
self.exprs
.iter()
.map(|expr| {
create_physical_expr(
expr.df_expr(),
df_schema.as_ref(),
arrow_schema.as_ref(),
execution_props,
)
create_physical_expr(expr.df_expr(), df_schema.as_ref(), schema, execution_props)
})
.collect::<Result<_, _>>()
.context(error::DatafusionSnafu)?;
Ok(Self {
schema,
exprs: physical_exprs,
})
}
#[inline]
pub fn exprs(&self) -> &[Arc<dyn PhysicalExpr>] {
&self.exprs
.context(error::DatafusionSnafu)
}
/// Builds an empty predicate from given schema.
pub fn empty(schema: SchemaRef) -> Self {
Self {
schema,
exprs: vec![],
}
pub fn empty() -> Self {
Self { exprs: vec![] }
}
/// Evaluates the predicate against row group metadata.
/// Returns a vector of boolean values, among which `false` means the row group can be skipped.
pub fn prune_row_groups(&self, row_groups: &[RowGroupMetaData]) -> Vec<bool> {
pub fn prune_row_groups(
&self,
row_groups: &[RowGroupMetaData],
schema: SchemaRef,
) -> Vec<bool> {
let mut res = vec![true; row_groups.len()];
let arrow_schema = self.schema.arrow_schema();
for expr in &self.exprs {
let Ok(physical_exprs) = self.to_physical_exprs(schema.arrow_schema()) else {
return res;
};
let arrow_schema = schema.arrow_schema();
for expr in &physical_exprs {
match PruningPredicate::try_new(expr.clone(), arrow_schema.clone()) {
Ok(p) => {
let stat = RowGroupPruningStatistics::new(row_groups, &self.schema);
let stat = RowGroupPruningStatistics::new(row_groups, &schema);
match p.prune(&stat) {
Ok(r) => {
for (curr_val, res) in r.into_iter().zip(res.iter_mut()) {
@@ -123,7 +120,9 @@ impl Predicate {
/// Prunes primary keys
pub fn prune_primary_key(&self, primary_key: &RecordBatch) -> error::Result<bool> {
for expr in &self.exprs {
let pk_schema = primary_key.schema();
let physical_exprs = self.to_physical_exprs(&pk_schema)?;
for expr in &physical_exprs {
// evaluate every filter against primary key
let Ok(eva) = expr.evaluate(primary_key) else {
continue;
@@ -156,11 +155,22 @@ impl Predicate {
/// Evaluates the predicate against the `stats`.
/// Returns a vector of boolean values, among which `false` means the row group can be skipped.
pub fn prune_with_stats<S: PruningStatistics>(&self, stats: &S) -> Vec<bool> {
pub fn prune_with_stats<S: PruningStatistics>(
&self,
stats: &S,
schema: &arrow::datatypes::SchemaRef,
) -> Vec<bool> {
let mut res = vec![true; stats.num_containers()];
let arrow_schema = self.schema.arrow_schema();
for expr in &self.exprs {
match PruningPredicate::try_new(expr.clone(), arrow_schema.clone()) {
let physical_exprs = match self.to_physical_exprs(schema) {
Ok(expr) => expr,
Err(e) => {
warn!(e; "Failed to build physical expr from predicates: {:?}", &self.exprs);
return res;
}
};
for expr in &physical_exprs {
match PruningPredicate::try_new(expr.clone(), schema.clone()) {
Ok(p) => match p.prune(stats) {
Ok(r) => {
for (curr_val, res) in r.into_iter().zip(res.iter_mut()) {
@@ -641,7 +651,7 @@ mod tests {
let dir = create_temp_dir("prune_parquet");
let (path, schema) = gen_test_parquet_file(&dir, array_cnt).await;
let schema = Arc::new(datatypes::schema::Schema::try_from(schema).unwrap());
let arrow_predicate = Predicate::try_new(filters, schema.clone()).unwrap();
let arrow_predicate = Predicate::new(filters);
let builder = ParquetRecordBatchStreamBuilder::new(
tokio::fs::OpenOptions::new()
.read(true)
@@ -653,7 +663,7 @@ mod tests {
.unwrap();
let metadata = builder.metadata().clone();
let row_groups = metadata.row_groups();
let res = arrow_predicate.prune_row_groups(row_groups);
let res = arrow_predicate.prune_row_groups(row_groups, schema);
assert_eq!(expect, res);
}

View File

@@ -237,7 +237,7 @@ SELECT i FROM (SELECT * FROM integers i1 UNION SELECT * FROM integers i2) a WHER
-- SELECT * FROM (SELECT i1.i AS a, i2.i AS b, row_number() OVER (ORDER BY i1.i, i2.i) FROM integers i1, integers i2 WHERE i1.i IS NOT NULL AND i2.i IS NOT NULL) a1 WHERE a=b ORDER BY 1;
SELECT * FROM (SELECT 0=1 AS cond FROM integers i1, integers i2) a1 WHERE cond ORDER BY 1;
Error: 1003(Internal), Invalid argument error: must either specify a row count or at least one column
Error: 3001(EngineExecuteQuery), Invalid argument error: must either specify a row count or at least one column
SELECT * FROM (SELECT 0=1 AS cond FROM integers i1, integers i2 GROUP BY 1) a1 WHERE cond ORDER BY 1;

View File

@@ -13,7 +13,7 @@ Error: 3000(PlanQuery), Error during planning: Order by column out of bounds, sp
-- Not work in greptimedb
SELECT a FROM test ORDER BY 'hello', a;
Error: 1003(Internal), Error during planning: Sort operation is not applicable to scalar value hello
Error: 3001(EngineExecuteQuery), Error during planning: Sort operation is not applicable to scalar value hello
-- Ambiguous reference in union alias, give and error in duckdb, but works in greptimedb
SELECT a AS k, b FROM test UNION SELECT a, b AS k FROM test ORDER BY k;
@@ -54,7 +54,7 @@ Error: 3000(PlanQuery), Error during planning: Order by column out of bounds, sp
SELECT a % 2, b FROM test UNION SELECT a % 2 AS k, b FROM test ORDER BY -1;
Error: 1003(Internal), Error during planning: Sort operation is not applicable to scalar value -1
Error: 3001(EngineExecuteQuery), Error during planning: Sort operation is not applicable to scalar value -1
SELECT a % 2, b FROM test UNION SELECT a % 2 AS k FROM test ORDER BY -1;

View File

@@ -37,7 +37,7 @@ Error: 2000(InvalidSyntax), sql parser error: Illegal Range select, no RANGE key
SELECT min(val) RANGE '10s', max(val) FROM host ALIGN '5s';
Error: 1003(Internal), No field named "MAX(host.val)". Valid fields are "MIN(host.val) RANGE 10s FILL NULL", host.ts, host.host.
Error: 3001(EngineExecuteQuery), No field named "MAX(host.val)". Valid fields are "MIN(host.val) RANGE 10s FILL NULL", host.ts, host.host.
SELECT min(val) * 2 RANGE '10s' FROM host ALIGN '5s';
@@ -50,12 +50,12 @@ Error: 2000(InvalidSyntax), sql parser error: Can't use the RANGE keyword in Exp
-- 2.2 no align param
SELECT min(val) RANGE '5s' FROM host;
Error: 1003(Internal), Error during planning: Missing argument in range select query
Error: 3000(PlanQuery), Error during planning: Missing argument in range select query
-- 2.3 type mismatch
SELECT covar(ceil(val), floor(val)) RANGE '20s' FROM host ALIGN '10s';
Error: 1003(Internal), Internal error: Unsupported data type Int64 for function ceil. This was likely caused by a bug in DataFusion's code and we would welcome that you file an bug report in our issue tracker
Error: 3001(EngineExecuteQuery), Internal error: Unsupported data type Int64 for function ceil. This was likely caused by a bug in DataFusion's code and we would welcome that you file an bug report in our issue tracker
-- 2.4 nest query
SELECT min(max(val) RANGE '20s') RANGE '20s' FROM host ALIGN '10s';
@@ -70,11 +70,11 @@ Error: 2000(InvalidSyntax), Range Query: Window functions is not allowed in Rang
-- 2.6 invalid fill
SELECT min(val) RANGE '5s', min(val) RANGE '5s' FILL NULL FROM host ALIGN '5s';
Error: 1003(Internal), Schema contains duplicate unqualified field name "MIN(host.val) RANGE 5s FILL NULL"
Error: 3001(EngineExecuteQuery), Schema contains duplicate unqualified field name "MIN(host.val) RANGE 5s FILL NULL"
SELECT min(val) RANGE '5s' FROM host ALIGN '5s' FILL 3.0;
Error: 1003(Internal), Error during planning: 3.0 is not a valid fill option, fail to convert to a const value. { Arrow error: Cast error: Cannot cast string '3.0' to value of Int64 type }
Error: 3000(PlanQuery), Error during planning: 3.0 is not a valid fill option, fail to convert to a const value. { Arrow error: Cast error: Cannot cast string '3.0' to value of Int64 type }
DROP TABLE host;

View File

@@ -0,0 +1,83 @@
create table demo(ts timestamp time index, `value` double, host string,idc string, collector string, primary key(host, idc, collector));
Affected Rows: 0
insert into demo values(1,2,'test1', 'idc1', 'disk') ,(2,3,'test2', 'idc1', 'disk'), (3,4,'test3', 'idc2','memory');
Affected Rows: 3
select * from demo where host='test1';
+-------------------------+-------+-------+------+-----------+
| ts | value | host | idc | collector |
+-------------------------+-------+-------+------+-----------+
| 1970-01-01T00:00:00.001 | 2.0 | test1 | idc1 | disk |
+-------------------------+-------+-------+------+-----------+
select * from demo where host='test2';
+-------------------------+-------+-------+------+-----------+
| ts | value | host | idc | collector |
+-------------------------+-------+-------+------+-----------+
| 1970-01-01T00:00:00.002 | 3.0 | test2 | idc1 | disk |
+-------------------------+-------+-------+------+-----------+
select * from demo where host='test3';
+-------------------------+-------+-------+------+-----------+
| ts | value | host | idc | collector |
+-------------------------+-------+-------+------+-----------+
| 1970-01-01T00:00:00.003 | 4.0 | test3 | idc2 | memory |
+-------------------------+-------+-------+------+-----------+
select * from demo where host='test2' and idc='idc1';
+-------------------------+-------+-------+------+-----------+
| ts | value | host | idc | collector |
+-------------------------+-------+-------+------+-----------+
| 1970-01-01T00:00:00.002 | 3.0 | test2 | idc1 | disk |
+-------------------------+-------+-------+------+-----------+
select * from demo where host='test2' and idc='idc1' and collector='disk';
+-------------------------+-------+-------+------+-----------+
| ts | value | host | idc | collector |
+-------------------------+-------+-------+------+-----------+
| 1970-01-01T00:00:00.002 | 3.0 | test2 | idc1 | disk |
+-------------------------+-------+-------+------+-----------+
select * from demo where host='test2' and idc='idc2';
++
++
select * from demo where host='test3' and idc>'idc1';
+-------------------------+-------+-------+------+-----------+
| ts | value | host | idc | collector |
+-------------------------+-------+-------+------+-----------+
| 1970-01-01T00:00:00.003 | 4.0 | test3 | idc2 | memory |
+-------------------------+-------+-------+------+-----------+
select * from demo where idc='idc1' order by ts;
+-------------------------+-------+-------+------+-----------+
| ts | value | host | idc | collector |
+-------------------------+-------+-------+------+-----------+
| 1970-01-01T00:00:00.001 | 2.0 | test1 | idc1 | disk |
| 1970-01-01T00:00:00.002 | 3.0 | test2 | idc1 | disk |
+-------------------------+-------+-------+------+-----------+
select * from demo where collector='disk' order by ts;
+-------------------------+-------+-------+------+-----------+
| ts | value | host | idc | collector |
+-------------------------+-------+-------+------+-----------+
| 1970-01-01T00:00:00.001 | 2.0 | test1 | idc1 | disk |
| 1970-01-01T00:00:00.002 | 3.0 | test2 | idc1 | disk |
+-------------------------+-------+-------+------+-----------+
drop table demo;
Affected Rows: 0

View File

@@ -0,0 +1,23 @@
create table demo(ts timestamp time index, `value` double, host string,idc string, collector string, primary key(host, idc, collector));
insert into demo values(1,2,'test1', 'idc1', 'disk') ,(2,3,'test2', 'idc1', 'disk'), (3,4,'test3', 'idc2','memory');
select * from demo where host='test1';
select * from demo where host='test2';
select * from demo where host='test3';
select * from demo where host='test2' and idc='idc1';
select * from demo where host='test2' and idc='idc1' and collector='disk';
select * from demo where host='test2' and idc='idc2';
select * from demo where host='test3' and idc>'idc1';
select * from demo where idc='idc1' order by ts;
select * from demo where collector='disk' order by ts;
drop table demo;

View File

@@ -249,11 +249,11 @@ SELECT TIMESTAMP '1992-09-20 11:30:00.123456' - interval_value as new_value from
-- Interval type does not support aggregation functions.
SELECT MIN(interval_value) from intervals;
Error: 1003(Internal), Internal error: Min/Max accumulator not implemented for type Interval(MonthDayNano). This was likely caused by a bug in DataFusion's code and we would welcome that you file an bug report in our issue tracker
Error: 3001(EngineExecuteQuery), Internal error: Min/Max accumulator not implemented for type Interval(MonthDayNano). This was likely caused by a bug in DataFusion's code and we would welcome that you file an bug report in our issue tracker
SELECT MAX(interval_value) from intervals;
Error: 1003(Internal), Internal error: Min/Max accumulator not implemented for type Interval(MonthDayNano). This was likely caused by a bug in DataFusion's code and we would welcome that you file an bug report in our issue tracker
Error: 3001(EngineExecuteQuery), Internal error: Min/Max accumulator not implemented for type Interval(MonthDayNano). This was likely caused by a bug in DataFusion's code and we would welcome that you file an bug report in our issue tracker
SELECT SUM(interval_value) from intervals;

View File

@@ -83,19 +83,19 @@ Error: 3000(PlanQuery), Error during planning: The function Avg does not support
SELECT t+t FROM timestamp;
Error: 1003(Internal), Cast error: Cannot perform arithmetic operation between array of type Timestamp(Millisecond, None) and array of type Timestamp(Millisecond, None)
Error: 3001(EngineExecuteQuery), Cast error: Cannot perform arithmetic operation between array of type Timestamp(Millisecond, None) and array of type Timestamp(Millisecond, None)
SELECT t*t FROM timestamp;
Error: 1003(Internal), Invalid argument error: column types must match schema types, expected Interval(DayTime) but found Timestamp(Millisecond, None) at column index 0
Error: 3001(EngineExecuteQuery), Invalid argument error: column types must match schema types, expected Interval(DayTime) but found Timestamp(Millisecond, None) at column index 0
SELECT t/t FROM timestamp;
Error: 1003(Internal), Invalid argument error: column types must match schema types, expected Interval(DayTime) but found Timestamp(Millisecond, None) at column index 0
Error: 3001(EngineExecuteQuery), Invalid argument error: column types must match schema types, expected Interval(DayTime) but found Timestamp(Millisecond, None) at column index 0
SELECT t%t FROM timestamp;
Error: 1003(Internal), Invalid argument error: column types must match schema types, expected Interval(DayTime) but found Timestamp(Millisecond, None) at column index 0
Error: 3001(EngineExecuteQuery), Invalid argument error: column types must match schema types, expected Interval(DayTime) but found Timestamp(Millisecond, None) at column index 0
-- TODO(dennis): It can't run on distributed mode, uncomment it when the issue is fixed: https://github.com/GreptimeTeam/greptimedb/issues/2071 --
-- SELECT t-t FROM timestamp; --

View File

@@ -23,7 +23,7 @@ SELECT CAST(sec AS VARCHAR), CAST(msec AS VARCHAR), CAST(micros AS VARCHAR), CAS
SELECT EXTRACT(MICROSECONDS FROM sec), EXTRACT(MICROSECONDS FROM msec), EXTRACT(MICROSECONDS FROM micros), EXTRACT(MICROSECONDS FROM nanos) FROM ts_precision;
Error: 1003(Internal), Execution error: Date part 'MICROSECONDS' not supported
Error: 3001(EngineExecuteQuery), Execution error: Date part 'MICROSECONDS' not supported
-- we only support precisions 0, 3, 6, and 9
-- any other precision is rounded up (e.g. 1/2 -> 3, 4/5 -> 6, 7/8 -> 9)