feat: adds common-telemetry crate and adds logs/metrics to query engine (#25)

* feat: adds common-telemetry crate and logging mod

* refactor: common telemetry

* feat: adds metric mod and exports metrics by http

* feat: adds metrics to query engine and datanodes

* refactor: rename metrics

* refactor: rename ElapsedTimer struct and elapsed_timer macro

* refactor: log error when fail to new data node.

* fix: use backtrace crate instead of unstable feature

* feat: add must_use attr to Timer

* fix: only reserve Debug attribute for Timer
This commit is contained in:
dennis zhuang
2022-05-12 17:42:48 +08:00
committed by GitHub
parent 641f4b39bd
commit 93cbdbee9a
21 changed files with 1037 additions and 35 deletions

6
.gitignore vendored
View File

@@ -14,4 +14,8 @@ debug/
*.pdb
# JetBrains IDE config directory
.idea/
.idea/
# Logs
**/__unittest_logs
logs/

597
Cargo.lock generated
View File

@@ -52,6 +52,21 @@ dependencies = [
"alloc-no-stdlib",
]
[[package]]
name = "ansi_term"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d52a9bb7ec0cf484c551830a7ce27bd20d67eac647e1befb56b0be4ee39a55d2"
dependencies = [
"winapi",
]
[[package]]
name = "anyhow"
version = "1.0.57"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "08f9b8508dccb7687a1d6c4ce66b2b0ecef467c94667de27d8d7fe1f8d2a9cdc"
[[package]]
name = "array-init-cursor"
version = "0.2.0"
@@ -158,6 +173,15 @@ dependencies = [
"syn",
]
[[package]]
name = "atomic-shim"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "67cd4b51d303cf3501c301e8125df442128d3c6d7c69f71b27833d253de47e77"
dependencies = [
"crossbeam-utils",
]
[[package]]
name = "atty"
version = "0.2.14"
@@ -414,7 +438,7 @@ dependencies = [
"num-integer",
"num-traits",
"serde",
"time",
"time 0.1.43",
"winapi",
]
@@ -462,6 +486,7 @@ name = "cmd"
version = "0.1.0"
dependencies = [
"clap",
"common-telemetry",
"datanode",
"tokio",
]
@@ -510,6 +535,63 @@ dependencies = [
"tokio",
]
[[package]]
name = "common-telemetry"
version = "0.1.0"
dependencies = [
"backtrace",
"console-subscriber",
"metrics",
"metrics-exporter-prometheus",
"once_cell",
"opentelemetry",
"opentelemetry-jaeger",
"parking_lot 0.12.0",
"tracing",
"tracing-appender",
"tracing-bunyan-formatter",
"tracing-futures",
"tracing-log",
"tracing-opentelemetry",
"tracing-subscriber",
]
[[package]]
name = "console-api"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "24cb05777feccbb2642d4f2df44d0505601a2cd88ca517d8c913f263a5a8dc8b"
dependencies = [
"prost",
"prost-types",
"tonic",
"tracing-core",
]
[[package]]
name = "console-subscriber"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f21a16ee925aa9d2bad2e296beffd6c5b1bfaad50af509d305b8e7f23af20fb"
dependencies = [
"console-api",
"crossbeam-channel",
"crossbeam-utils",
"futures",
"hdrhistogram",
"humantime",
"prost-types",
"serde",
"serde_json",
"thread_local",
"tokio",
"tokio-stream",
"tonic",
"tracing",
"tracing-core",
"tracing-subscriber",
]
[[package]]
name = "constant_time_eq"
version = "0.1.5"
@@ -560,6 +642,20 @@ dependencies = [
"crossbeam-utils",
]
[[package]]
name = "crossbeam-epoch"
version = "0.9.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1145cf131a2c6ba0615079ab6a638f7e1973ac9c2634fcbeaaad6114246efe8c"
dependencies = [
"autocfg",
"cfg-if",
"crossbeam-utils",
"lazy_static",
"memoffset",
"scopeguard",
]
[[package]]
name = "crossbeam-utils"
version = "0.8.8"
@@ -627,7 +723,7 @@ dependencies = [
"log",
"num_cpus",
"ordered-float 2.10.0",
"parking_lot",
"parking_lot 0.12.0",
"parquet2",
"paste",
"pin-project-lite",
@@ -694,7 +790,9 @@ dependencies = [
"axum-test-helper",
"common-error",
"common-recordbatch",
"common-telemetry",
"hyper",
"metrics",
"query",
"serde",
"serde_json",
@@ -765,6 +863,12 @@ dependencies = [
"instant",
]
[[package]]
name = "fixedbitset"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "279fb028e20b3c4c320317955b77c5e0c9701f05a1d309905d6fc702cdc5053e"
[[package]]
name = "flate2"
version = "1.0.23"
@@ -907,6 +1011,16 @@ dependencies = [
"version_check",
]
[[package]]
name = "gethostname"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c1ebd34e35c46e00bb73e81363248d627782724609fe1b6396f553f68fe3862e"
dependencies = [
"libc",
"winapi",
]
[[package]]
name = "getrandom"
version = "0.2.6"
@@ -954,6 +1068,9 @@ name = "hashbrown"
version = "0.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ab5ef0d4909ef3724cc8cce6ccc8572c5c817592e9285f5464f8e86f8bd3726e"
dependencies = [
"ahash",
]
[[package]]
name = "hashbrown"
@@ -1042,6 +1159,12 @@ version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c4a1e36c821dbe04574f602848a19f742f4fb3c98d40449f11bcad18d6b17421"
[[package]]
name = "humantime"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4"
[[package]]
name = "hyper"
version = "0.14.18"
@@ -1066,6 +1189,18 @@ dependencies = [
"want",
]
[[package]]
name = "hyper-timeout"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1"
dependencies = [
"hyper",
"pin-project-lite",
"tokio",
"tokio-io-timeout",
]
[[package]]
name = "hyper-tls"
version = "0.5.0"
@@ -1296,6 +1431,24 @@ dependencies = [
"libc",
]
[[package]]
name = "mach"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b823e83b2affd8f40a9ee8c29dbc56404c1e34cd2710921f2801e2cf29527afa"
dependencies = [
"libc",
]
[[package]]
name = "matchers"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558"
dependencies = [
"regex-automata",
]
[[package]]
name = "matches"
version = "0.1.9"
@@ -1323,6 +1476,67 @@ version = "2.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d"
[[package]]
name = "memoffset"
version = "0.6.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5aa361d4faea93603064a027415f07bd8e1d5c88c9fbf68bf56a285428fd79ce"
dependencies = [
"autocfg",
]
[[package]]
name = "metrics"
version = "0.18.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2e52eb6380b6d2a10eb3434aec0885374490f5b82c8aaf5cd487a183c98be834"
dependencies = [
"ahash",
"metrics-macros",
]
[[package]]
name = "metrics-exporter-prometheus"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b93b470b04c005178058e18ac8bb2eb3fda562cf87af5ea05ba8d44190d458c"
dependencies = [
"indexmap",
"metrics",
"metrics-util",
"parking_lot 0.11.2",
"quanta",
"thiserror",
]
[[package]]
name = "metrics-macros"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "49e30813093f757be5cf21e50389a24dc7dbb22c49f23b7e8f51d69b508a5ffa"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "metrics-util"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "65a9e83b833e1d2e07010a386b197c13aa199bbd0fca5cf69bfa147972db890a"
dependencies = [
"atomic-shim",
"crossbeam-epoch",
"crossbeam-utils",
"hashbrown 0.11.2",
"metrics",
"num_cpus",
"parking_lot 0.11.2",
"quanta",
"sketches-ddsketch",
]
[[package]]
name = "mime"
version = "0.3.16"
@@ -1443,6 +1657,15 @@ dependencies = [
"libc",
]
[[package]]
name = "num_threads"
version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2819ce041d2ee131036f4fc9d6ae7ae125a3a40e97ba64d04fe799ad9dabbb44"
dependencies = [
"libc",
]
[[package]]
name = "object"
version = "0.28.3"
@@ -1507,6 +1730,51 @@ dependencies = [
"vcpkg",
]
[[package]]
name = "opentelemetry"
version = "0.17.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6105e89802af13fdf48c49d7646d3b533a70e536d818aae7e78ba0433d01acb8"
dependencies = [
"async-trait",
"crossbeam-channel",
"futures-channel",
"futures-executor",
"futures-util",
"js-sys",
"lazy_static",
"percent-encoding",
"pin-project",
"rand",
"thiserror",
"tokio",
"tokio-stream",
]
[[package]]
name = "opentelemetry-jaeger"
version = "0.16.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f8c0b12cd9e3f9b35b52f6e0dac66866c519b26f424f4bbf96e3fe8bfbdc5229"
dependencies = [
"async-trait",
"lazy_static",
"opentelemetry",
"opentelemetry-semantic-conventions",
"thiserror",
"thrift",
"tokio",
]
[[package]]
name = "opentelemetry-semantic-conventions"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "985cc35d832d412224b2cffe2f9194b1b89b6aa5d0bef76d080dce09d90e62bd"
dependencies = [
"opentelemetry",
]
[[package]]
name = "ordered-float"
version = "1.1.1"
@@ -1531,6 +1799,17 @@ version = "6.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e22443d1643a904602595ba1cd8f7d896afe56d26712531c5ff73a15b2fbf64"
[[package]]
name = "parking_lot"
version = "0.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7d17b78036a60663b797adeaee46f5c9dfebb86948d1255007a1d6be0271ff99"
dependencies = [
"instant",
"lock_api",
"parking_lot_core 0.8.5",
]
[[package]]
name = "parking_lot"
version = "0.12.0"
@@ -1538,7 +1817,21 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "87f5ec2493a61ac0506c0f4199f99070cbe83857b0337006a30f3e6719b8ef58"
dependencies = [
"lock_api",
"parking_lot_core",
"parking_lot_core 0.9.3",
]
[[package]]
name = "parking_lot_core"
version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d76e8e1493bcac0d2766c42737f34458f1c8c50c0d23bcb24ea953affb273216"
dependencies = [
"cfg-if",
"instant",
"libc",
"redox_syscall",
"smallvec",
"winapi",
]
[[package]]
@@ -1547,10 +1840,13 @@ version = "0.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09a279cbf25cb0757810394fbc1e359949b59e348145c643a939a525692e6929"
dependencies = [
"backtrace",
"cfg-if",
"libc",
"petgraph",
"redox_syscall",
"smallvec",
"thread-id",
"windows-sys",
]
@@ -1597,6 +1893,16 @@ version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e"
[[package]]
name = "petgraph"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4a13a2fa9d0b63e5f22328828741e523766fff0ee9e779316902290dff3f824f"
dependencies = [
"fixedbitset",
"indexmap",
]
[[package]]
name = "pin-project"
version = "1.0.10"
@@ -1683,6 +1989,55 @@ dependencies = [
"unicode-xid",
]
[[package]]
name = "prost"
version = "0.10.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bc03e116981ff7d8da8e5c220e374587b98d294af7ba7dd7fda761158f00086f"
dependencies = [
"bytes",
"prost-derive",
]
[[package]]
name = "prost-derive"
version = "0.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7b670f45da57fb8542ebdbb6105a925fe571b67f9e7ed9f47a06a84e72b4e7cc"
dependencies = [
"anyhow",
"itertools",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "prost-types"
version = "0.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2d0a014229361011dc8e69c8a1ec6c2e8d0f2af7c91e3ea3f5b2170298461e68"
dependencies = [
"bytes",
"prost",
]
[[package]]
name = "quanta"
version = "0.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "20afe714292d5e879d8b12740aa223c6a88f118af41870e8b6196e39a02238a8"
dependencies = [
"crossbeam-utils",
"libc",
"mach",
"once_cell",
"raw-cpuid",
"wasi 0.10.2+wasi-snapshot-preview1",
"web-sys",
"winapi",
]
[[package]]
name = "query"
version = "0.1.0"
@@ -1691,10 +2046,12 @@ dependencies = [
"async-trait",
"common-error",
"common-recordbatch",
"common-telemetry",
"datafusion",
"datatypes",
"futures",
"futures-util",
"metrics",
"snafu",
"sql",
"table",
@@ -1741,6 +2098,15 @@ dependencies = [
"getrandom",
]
[[package]]
name = "raw-cpuid"
version = "10.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "738bc47119e3eeccc7e94c4a506901aea5e7b4944ecd0829cbebf4af04ceda12"
dependencies = [
"bitflags",
]
[[package]]
name = "redox_syscall"
version = "0.2.13"
@@ -1766,6 +2132,9 @@ name = "regex-automata"
version = "0.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132"
dependencies = [
"regex-syntax",
]
[[package]]
name = "regex-syntax"
@@ -1932,6 +2301,15 @@ dependencies = [
"digest",
]
[[package]]
name = "sharded-slab"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "900fba806f70c630b0a382d0d825e17a0f19fcd059a2ade1ff237bcddf446b31"
dependencies = [
"lazy_static",
]
[[package]]
name = "signal-hook-registry"
version = "1.4.0"
@@ -1947,6 +2325,12 @@ version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f27f6278552951f1f2b8cf9da965d10969b2efdea95a6ec47987ab46edfe263a"
[[package]]
name = "sketches-ddsketch"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "76a77a8fd93886010f05e7ea0720e569d6d16c65329dbe3ec033bbbccccb017b"
[[package]]
name = "slab"
version = "0.4.6"
@@ -2138,6 +2522,68 @@ version = "0.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b1141d4d61095b28419e22cb0bbf02755f5e54e0526f97f1e3d1d160e60885fb"
[[package]]
name = "thiserror"
version = "1.0.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bd829fe32373d27f76265620b5309d0340cb8550f523c1dda251d6298069069a"
dependencies = [
"thiserror-impl",
]
[[package]]
name = "thiserror-impl"
version = "1.0.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0396bc89e626244658bef819e22d0cc459e795a5ebe878e6ec336d1674a8d79a"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "thread-id"
version = "4.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5fdfe0627923f7411a43ec9ec9c39c3a9b4151be313e0922042581fb6c9b717f"
dependencies = [
"libc",
"redox_syscall",
"winapi",
]
[[package]]
name = "thread_local"
version = "1.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5516c27b78311c50bf42c071425c560ac799b11c30b31f87e3081965fe5e0180"
dependencies = [
"once_cell",
]
[[package]]
name = "threadpool"
version = "1.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d050e60b33d41c19108b32cea32164033a9013fe3b46cbd4457559bfbf77afaa"
dependencies = [
"num_cpus",
]
[[package]]
name = "thrift"
version = "0.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b82ca8f46f95b3ce96081fe3dd89160fdea970c254bb72925255d1b62aae692e"
dependencies = [
"byteorder",
"integer-encoding",
"log",
"ordered-float 1.1.1",
"threadpool",
]
[[package]]
name = "time"
version = "0.1.43"
@@ -2148,6 +2594,17 @@ dependencies = [
"winapi",
]
[[package]]
name = "time"
version = "0.3.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c2702e08a7a860f005826c6815dcac101b19b5eb330c27fe4a5928fec1d20ddd"
dependencies = [
"itoa 1.0.1",
"libc",
"num_threads",
]
[[package]]
name = "tinyvec"
version = "1.6.0"
@@ -2175,14 +2632,25 @@ dependencies = [
"mio",
"num_cpus",
"once_cell",
"parking_lot",
"parking_lot 0.12.0",
"pin-project-lite",
"signal-hook-registry",
"socket2",
"tokio-macros",
"tracing",
"winapi",
]
[[package]]
name = "tokio-io-timeout"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "30b74022ada614a1b4834de765f9bb43877f910cc8ce4be40e89042c9223a8bf"
dependencies = [
"pin-project-lite",
"tokio",
]
[[package]]
name = "tokio-macros"
version = "1.7.0"
@@ -2243,6 +2711,38 @@ dependencies = [
"tracing",
]
[[package]]
name = "tonic"
version = "0.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5be9d60db39854b30b835107500cf0aca0b0d14d6e1c3de124217c23a29c2ddb"
dependencies = [
"async-stream",
"async-trait",
"axum",
"base64",
"bytes",
"futures-core",
"futures-util",
"h2",
"http",
"http-body",
"hyper",
"hyper-timeout",
"percent-encoding",
"pin-project",
"prost",
"prost-derive",
"tokio",
"tokio-stream",
"tokio-util 0.7.1",
"tower",
"tower-layer",
"tower-service",
"tracing",
"tracing-futures",
]
[[package]]
name = "tower"
version = "0.4.12"
@@ -2319,6 +2819,17 @@ dependencies = [
"tracing-core",
]
[[package]]
name = "tracing-appender"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09d48f71a791638519505cefafe162606f706c25592e4bde4d97600c0195312e"
dependencies = [
"crossbeam-channel",
"time 0.3.9",
"tracing-subscriber",
]
[[package]]
name = "tracing-attributes"
version = "0.1.21"
@@ -2330,6 +2841,23 @@ dependencies = [
"syn",
]
[[package]]
name = "tracing-bunyan-formatter"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bd99ff040622c69c0fc4bd3ea5fe16630ce46400a79bd41339391b2d416ea24c"
dependencies = [
"gethostname",
"log",
"serde",
"serde_json",
"time 0.3.9",
"tracing",
"tracing-core",
"tracing-log",
"tracing-subscriber",
]
[[package]]
name = "tracing-core"
version = "0.1.26"
@@ -2337,6 +2865,61 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f54c8ca710e81886d498c2fd3331b56c93aa248d49de2222ad2742247c60072f"
dependencies = [
"lazy_static",
"valuable",
]
[[package]]
name = "tracing-futures"
version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "97d095ae15e245a057c8e8451bab9b3ee1e1f68e9ba2b4fbc18d0ac5237835f2"
dependencies = [
"futures",
"futures-task",
"pin-project",
"tracing",
]
[[package]]
name = "tracing-log"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "78ddad33d2d10b1ed7eb9d1f518a5674713876e97e5bb9b7345a7984fbb4f922"
dependencies = [
"lazy_static",
"log",
"tracing-core",
]
[[package]]
name = "tracing-opentelemetry"
version = "0.17.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1f9378e96a9361190ae297e7f3a8ff644aacd2897f244b1ff81f381669196fa6"
dependencies = [
"opentelemetry",
"tracing",
"tracing-core",
"tracing-log",
"tracing-subscriber",
]
[[package]]
name = "tracing-subscriber"
version = "0.3.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4bc28f93baff38037f64e6f43d34cfa1605f27a49c34e8a04c5e78b0babf2596"
dependencies = [
"ansi_term",
"lazy_static",
"matchers",
"regex",
"sharded-slab",
"smallvec",
"thread_local",
"tracing",
"tracing-core",
"tracing-log",
]
[[package]]
@@ -2414,6 +2997,12 @@ dependencies = [
"getrandom",
]
[[package]]
name = "valuable"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d"
[[package]]
name = "vcpkg"
version = "0.2.15"

View File

@@ -2,6 +2,7 @@
members = [
"src/common/base",
"src/common/error",
"src/common/telemetry",
"src/common/query",
"src/common/recordbatch",
"src/cmd",

View File

@@ -9,5 +9,6 @@ path = "src/bin/greptime.rs"
[dependencies]
clap = { version = "3.1", features = ["derive"] }
common-telemetry = { path = "../common/telemetry" , features = ["deadlock_detection"]}
datanode = { path = "../datanode" }
tokio = { version = "1.18.0", features = ["full"] }

View File

@@ -1,20 +1,33 @@
use clap::Parser;
use cmd::opts::{GrepTimeOpts, NodeType};
use common_telemetry::{self, logging::error};
use datanode::DataNode;
async fn datanode_main(_opts: &GrepTimeOpts) {
let data_node = DataNode::new().unwrap();
match DataNode::new() {
Ok(data_node) => {
if let Err(e) = data_node.start().await {
error!("Fail to start data node, error: {:?}", e);
}
}
if let Err(e) = data_node.start().await {
println!("Fail to start data node, error: {:?}", e);
Err(e) => error!("Fail to new data node, error: {:?}", e),
}
}
#[tokio::main]
async fn main() {
let opts = GrepTimeOpts::parse();
let node_type = opts.node_type;
// TODO(dennis): 1. adds ip/port to app
// 2. config log dir
let app = format!("{node_type:?}-node").to_lowercase();
match opts.node_type {
common_telemetry::set_panic_hook();
common_telemetry::init_default_metrics_recorder();
let _guard = common_telemetry::init_global_logging(&app, "logs", "info", false);
match node_type {
NodeType::Data => datanode_main(&opts).await,
}
}

View File

@@ -0,0 +1,27 @@
[package]
name = "common-telemetry"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[features]
console = ["console-subscriber"]
deadlock_detection=["parking_lot"]
[dependencies]
backtrace = "0.3"
console-subscriber = { version = "0.1", optional = true }
metrics = "0.18"
metrics-exporter-prometheus = { version = "0.9", default-features = false }
once_cell = "1.10"
opentelemetry = { version = "0.17", default-features = false, features = ["trace", "rt-tokio"] }
opentelemetry-jaeger = { version = "0.16", features = ["rt-tokio"] }
parking_lot = { version = "0.12", features = ["deadlock_detection"], optional = true }
tracing = "0.1"
tracing-appender = "0.2"
tracing-bunyan-formatter = "0.3"
tracing-futures = { version = "0.2", features = ["futures-03"] }
tracing-log = "0.1"
tracing-opentelemetry = "0.17"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }

View File

@@ -0,0 +1,12 @@
pub mod logging;
pub mod metric;
mod panic_hook;
pub use logging::init_default_ut_logging;
pub use logging::init_global_logging;
pub use metric::init_default_metrics_recorder;
pub use panic_hook::set_panic_hook;
pub use tracing;
pub use tracing_appender;
pub use tracing_futures;
pub use tracing_subscriber;

View File

@@ -0,0 +1,107 @@
//! logging stuffs, inspired by databend
use std::env;
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::Once;
use once_cell::sync::Lazy;
use opentelemetry::global;
use opentelemetry::sdk::propagation::TraceContextPropagator;
pub use tracing::{debug, error, info, span, warn, Level};
use tracing_appender::non_blocking::WorkerGuard;
use tracing_appender::rolling::RollingFileAppender;
use tracing_appender::rolling::Rotation;
use tracing_bunyan_formatter::BunyanFormattingLayer;
use tracing_bunyan_formatter::JsonStorageLayer;
use tracing_log::LogTracer;
use tracing_subscriber::filter;
use tracing_subscriber::fmt::Layer;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::EnvFilter;
use tracing_subscriber::Registry;
/// Init tracing for unittest.
/// Write logs to file `unittest`.
pub fn init_default_ut_logging() {
static START: Once = Once::new();
START.call_once(|| {
let mut g = GLOBAL_UT_LOG_GUARD.as_ref().lock().unwrap();
*g = Some(init_global_logging(
"unittest",
"__unittest_logs",
"DEBUG",
false,
));
});
}
static GLOBAL_UT_LOG_GUARD: Lazy<Arc<Mutex<Option<Vec<WorkerGuard>>>>> =
Lazy::new(|| Arc::new(Mutex::new(None)));
pub fn init_global_logging(
app_name: &str,
dir: &str,
level: &str,
enable_jaeger_tracing: bool,
) -> Vec<WorkerGuard> {
let mut guards = vec![];
// Enable log compatible layer to convert log record to tracing span.
LogTracer::init().expect("log tracer must be valid");
// Stdout layer.
let (stdout_writer, stdout_guard) = tracing_appender::non_blocking(std::io::stdout());
let stdout_logging_layer = Layer::new().with_writer(stdout_writer);
guards.push(stdout_guard);
// JSON log layer.
let rolling_appender = RollingFileAppender::new(Rotation::HOURLY, dir, app_name);
let (rolling_writer, rolling_writer_guard) = tracing_appender::non_blocking(rolling_appender);
let file_logging_layer = BunyanFormattingLayer::new(app_name.to_string(), rolling_writer);
guards.push(rolling_writer_guard);
// Use env RUST_LOG to initialize log if present.
// Otherwise use the specified level.
let directives = env::var(EnvFilter::DEFAULT_ENV).unwrap_or_else(|_x| level.to_string());
let filter = filter::Targets::new()
// Only enable WARN and ERROR for 3rd-party crates
// TODO(dennis): configure them?
.with_target("hyper", Level::WARN)
.with_target("tower", Level::WARN)
.with_target("datafusion", Level::WARN)
.with_target("reqwest", Level::WARN)
.with_default(
directives
.parse::<filter::LevelFilter>()
.expect("error parsing level string"),
);
let subscriber = Registry::default()
.with(filter)
.with(JsonStorageLayer)
.with(stdout_logging_layer)
.with(file_logging_layer);
// Must enable 'tokio_unstable' cfg, https://github.com/tokio-rs/console
#[cfg(feature = "console")]
let subscriber = subscriber.with(console_subscriber::spawn());
if enable_jaeger_tracing {
// Jaeger layer.
global::set_text_map_propagator(TraceContextPropagator::new());
let tracer = opentelemetry_jaeger::new_pipeline()
.with_service_name(app_name)
.install_batch(opentelemetry::runtime::Tokio)
.expect("install");
let jaeger_layer = Some(tracing_opentelemetry::layer().with_tracer(tracer));
let subscriber = subscriber.with(jaeger_layer);
tracing::subscriber::set_global_default(subscriber)
.expect("error setting global tracing subscriber");
} else {
tracing::subscriber::set_global_default(subscriber)
.expect("error setting global tracing subscriber");
}
guards
}

View File

@@ -0,0 +1,91 @@
// metric stuffs, inspired by databend
use std::sync::{Arc, Once, RwLock};
use std::time::{Duration, Instant};
use metrics::histogram;
use metrics_exporter_prometheus::PrometheusBuilder;
pub use metrics_exporter_prometheus::PrometheusHandle;
use once_cell::sync::Lazy;
use crate::logging;
static PROMETHEUS_HANDLE: Lazy<Arc<RwLock<Option<PrometheusHandle>>>> =
Lazy::new(|| Arc::new(RwLock::new(None)));
pub fn init_default_metrics_recorder() {
static START: Once = Once::new();
START.call_once(init_prometheus_recorder)
}
/// Init prometheus recorder.
fn init_prometheus_recorder() {
let recorder = PrometheusBuilder::new().build_recorder();
let mut h = PROMETHEUS_HANDLE.as_ref().write().unwrap();
*h = Some(recorder.handle());
metrics::clear_recorder();
match metrics::set_boxed_recorder(Box::new(recorder)) {
Ok(_) => (),
Err(err) => logging::warn!("Install prometheus recorder failed, cause: {}", err),
};
}
pub fn try_handle() -> Option<PrometheusHandle> {
PROMETHEUS_HANDLE.as_ref().read().unwrap().clone()
}
#[must_use = "Timer should be kept in a variable otherwise it cannot observe duration"]
#[derive(Debug)]
pub struct Timer {
start: Instant,
name: &'static str,
}
impl Timer {
pub fn new(name: &'static str) -> Self {
Self {
start: Instant::now(),
name,
}
}
pub fn elapsed(&self) -> Duration {
self.start.elapsed()
}
}
impl Drop for Timer {
fn drop(&mut self) {
histogram!(self.name, self.start.elapsed());
}
}
#[macro_export]
macro_rules! timer {
($name: expr) => {
$crate::metric::Timer::new($name)
};
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_elapsed_timer() {
init_default_metrics_recorder();
{
let t = Timer::new("test_elapsed_timer_a");
drop(t);
}
let handle = try_handle().unwrap();
let text = handle.render();
assert!(text.contains("test_elapsed_timer_a"));
assert!(!text.contains("test_elapsed_timer_b"));
let _ = timer!("test_elapsed_timer_b");
let text = handle.render();
assert!(text.contains("test_elapsed_timer_a"));
assert!(text.contains("test_elapsed_timer_b"));
}
}

View File

@@ -0,0 +1,49 @@
use std::panic;
#[cfg(feature = "deadlock_detection")]
use std::time::Duration;
use backtrace::Backtrace;
pub fn set_panic_hook() {
// Set a panic hook that records the panic as a `tracing` event at the
// `ERROR` verbosity level.
//
// If we are currently in a span when the panic occurred, the logged event
// will include the current span, allowing the context in which the panic
// occurred to be recorded.
let default_hook = panic::take_hook();
panic::set_hook(Box::new(move |panic| {
let backtrace = Backtrace::new();
let backtrace = format!("{:?}", backtrace);
if let Some(location) = panic.location() {
tracing::error!(
message = %panic,
backtrace = %backtrace,
panic.file = location.file(),
panic.line = location.line(),
panic.column = location.column(),
);
} else {
tracing::error!(message = %panic, backtrace = %backtrace);
}
default_hook(panic);
}));
#[cfg(feature = "deadlock_detection")]
std::thread::spawn(move || loop {
std::thread::sleep(Duration::from_secs(5));
let deadlocks = parking_lot::deadlock::check_deadlock();
if deadlocks.is_empty() {
continue;
}
tracing::info!("{} deadlocks detected", deadlocks.len());
for (i, threads) in deadlocks.iter().enumerate() {
tracing::info!("Deadlock #{}", i);
for t in threads {
tracing::info!("Thread Id {:#?}", t.thread_id());
tracing::info!("{:#?}", t.backtrace());
}
}
});
}

View File

@@ -10,7 +10,9 @@ axum = "0.5"
axum-macros = "0.2"
common-error = { path = "../common/error" }
common-recordbatch = { path = "../common/recordbatch" }
common-telemetry = { path = "../common/telemetry" }
hyper = { version = "0.14", features = ["full"] }
metrics = "0.18"
query = { path = "../query" }
serde = "1.0"
serde_json = "1.0"

View File

@@ -2,6 +2,7 @@ pub mod catalog;
pub mod datanode;
pub mod error;
pub mod instance;
mod metric;
pub mod server;
pub use crate::datanode::DataNode;

View File

@@ -0,0 +1,3 @@
//! datanode metrics
pub const METRIC_HANDLE_SQL_ELAPSED: &str = "datanode.handle_sql_elapsed";

View File

@@ -11,6 +11,7 @@ use axum::{
BoxError, Extension, Router,
};
use common_recordbatch::{util, RecordBatch};
use common_telemetry::logging::info;
use query::Output;
use serde::Serialize;
use snafu::ResultExt;
@@ -31,6 +32,14 @@ pub enum JsonOutput {
Rows(Vec<RecordBatch>),
}
/// Http response
#[derive(Serialize, Debug)]
pub enum HttpResponse {
Json(JsonResponse),
Text(String),
}
/// Json response
#[derive(Serialize, Debug)]
pub struct JsonResponse {
success: bool,
@@ -40,9 +49,12 @@ pub struct JsonResponse {
output: Option<JsonOutput>,
}
impl IntoResponse for JsonResponse {
impl IntoResponse for HttpResponse {
fn into_response(self) -> Response {
Json(self).into_response()
match self {
HttpResponse::Json(json) => Json(json).into_response(),
HttpResponse::Text(text) => text.into_response(),
}
}
}
@@ -91,22 +103,26 @@ impl HttpServer {
}
pub fn make_app(&self) -> Router {
Router::new().route("/sql", get(handler::sql)).layer(
ServiceBuilder::new()
.layer(HandleErrorLayer::new(handle_error))
.layer(TraceLayer::new_for_http())
.layer(Extension(self.instance.clone()))
// TODO configure timeout
.layer(TimeoutLayer::new(Duration::from_secs(30))),
)
Router::new()
// handlers
.route("/sql", get(handler::sql))
.route("/metrics", get(handler::metrics))
// middlewares
.layer(
ServiceBuilder::new()
.layer(HandleErrorLayer::new(handle_error))
.layer(TraceLayer::new_for_http())
.layer(Extension(self.instance.clone()))
// TODO configure timeout
.layer(TimeoutLayer::new(Duration::from_secs(30))),
)
}
pub async fn start(&self) -> Result<()> {
let app = self.make_app();
let addr = SocketAddr::from(([0, 0, 0, 0], 3000));
// TODO(dennis): log
println!("Datanode HTTP server is listening on {}", addr);
info!("Datanode HTTP server is listening on {}", addr);
let server = axum::Server::bind(&addr).serve(app.into_make_service());
let graceful = server.with_graceful_shutdown(shutdown_signal());

View File

@@ -3,20 +3,38 @@
use std::collections::HashMap;
use axum::extract::{Extension, Query};
use common_telemetry::{metric, timer};
use crate::instance::InstanceRef;
use crate::server::http::JsonResponse;
use crate::metric::METRIC_HANDLE_SQL_ELAPSED;
use crate::server::http::{HttpResponse, JsonResponse};
/// Handler to execute sql
#[axum_macros::debug_handler]
pub async fn sql(
Extension(instance): Extension<InstanceRef>,
Query(params): Query<HashMap<String, String>>,
) -> JsonResponse {
) -> HttpResponse {
let _timer = timer!(METRIC_HANDLE_SQL_ELAPSED);
if let Some(sql) = params.get("sql") {
JsonResponse::from_output(instance.execute_sql(sql).await).await
HttpResponse::Json(JsonResponse::from_output(instance.execute_sql(sql).await).await)
} else {
JsonResponse::with_error(Some("sql parameter is required.".to_string()))
HttpResponse::Json(JsonResponse::with_error(Some(
"sql parameter is required.".to_string(),
)))
}
}
/// Handler to export metrics
#[axum_macros::debug_handler]
pub async fn metrics(
Extension(_instance): Extension<InstanceRef>,
Query(_params): Query<HashMap<String, String>>,
) -> HttpResponse {
if let Some(handle) = metric::try_handle() {
HttpResponse::Text(handle.render())
} else {
HttpResponse::Text("Prometheus handle not initialized.".to_string())
}
}
@@ -24,6 +42,7 @@ pub async fn sql(
mod tests {
use std::sync::Arc;
use metrics::counter;
use query::catalog::memory;
use super::*;
@@ -50,9 +69,14 @@ mod tests {
let extension = create_extension();
let json = sql(extension, Query(HashMap::default())).await;
assert!(!json.success);
assert_eq!(Some("sql parameter is required.".to_string()), json.error);
assert!(json.output.is_none());
match json {
HttpResponse::Json(json) => {
assert!(!json.success);
assert_eq!(Some("sql parameter is required.".to_string()), json.error);
assert!(json.output.is_none());
}
_ => unreachable!(),
}
}
#[tokio::test]
@@ -61,15 +85,37 @@ mod tests {
let extension = create_extension();
let json = sql(extension, query).await;
assert!(json.success);
assert!(json.error.is_none());
assert!(json.output.is_some());
match json.output.unwrap() {
JsonOutput::Rows(rows) => {
assert_eq!(1, rows.len());
match json {
HttpResponse::Json(json) => {
assert!(json.success);
assert!(json.error.is_none());
assert!(json.output.is_some());
match json.output.unwrap() {
JsonOutput::Rows(rows) => {
assert_eq!(1, rows.len());
}
_ => unreachable!(),
}
}
_ => unreachable!(),
}
}
#[tokio::test]
async fn test_metrics() {
metric::init_default_metrics_recorder();
counter!("test_metrics", 1);
let query = create_params();
let extension = create_extension();
let text = metrics(extension, query).await;
match text {
HttpResponse::Text(s) => assert!(s.contains("test_metrics counter")),
_ => unreachable!(),
}
}
}

View File

@@ -17,6 +17,7 @@ fn make_test_app() -> Router {
#[tokio::test]
async fn test_sql_api() {
common_telemetry::init_default_ut_logging();
let app = make_test_app();
let client = TestClient::new(app);
let res = client.get("/sql").send().await;
@@ -41,3 +42,24 @@ async fn test_sql_api() {
r#"{"success":true,"output":{"Rows":[{"schema":{"fields":[{"name":"number","data_type":"UInt32","is_nullable":false,"metadata":{}}],"metadata":{}},"columns":[{"UInt32":[0,1,2,3,4,5,6,7,8,9]}]}]}}"#
);
}
#[tokio::test]
async fn test_metrics_api() {
common_telemetry::init_default_ut_logging();
common_telemetry::init_default_metrics_recorder();
let app = make_test_app();
let client = TestClient::new(app);
// Send a sql
let res = client
.get("/sql?sql=select * from numbers limit 10")
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
// Call metrics api
let res = client.get("/metrics").send().await;
assert_eq!(res.status(), StatusCode::OK);
let body = res.text().await;
assert!(body.contains("datanode_handle_sql_elapsed"));
}

View File

@@ -12,10 +12,12 @@ features = ["io_csv", "io_json", "io_parquet", "io_parquet_compression", "io_ipc
async-trait = "0.1"
common-error = { path = "../common/error" }
common-recordbatch = {path = "../common/recordbatch" }
common-telemetry = { path = "../common/telemetry" }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git" , branch = "arrow2", features = ["simd"]}
datatypes = {path = "../datatypes" }
futures = "0.3"
futures-util = "0.3.21"
futures-util = "0.3"
metrics = "0.18"
snafu = { version = "0.7", features = ["backtraces"] }
table = { path = "../table" }
tokio = "1.0"

View File

@@ -8,10 +8,12 @@ mod planner;
use std::sync::Arc;
use common_recordbatch::{EmptyRecordBatchStream, SendableRecordBatchStream};
use common_telemetry::timer;
use snafu::{OptionExt, ResultExt};
use sql::{dialect::GenericDialect, parser::ParserContext};
pub use crate::datafusion::catalog_adapter::DfCatalogListAdapter;
use crate::metric;
use crate::query_engine::{QueryContext, QueryEngineState};
use crate::{
catalog::CatalogListRef,
@@ -46,6 +48,7 @@ impl QueryEngine for DatafusionQueryEngine {
}
fn sql_to_plan(&self, sql: &str) -> Result<LogicalPlan> {
let _timer = timer!(metric::METRIC_PARSE_SQL_ELAPSED);
let context_provider = DfContextProviderAdapter::new(self.state.catalog_list());
let planner = DfPlanner::new(&context_provider);
let mut statement = ParserContext::create_with_dialect(sql, &GenericDialect {})
@@ -73,6 +76,7 @@ impl LogicalOptimizer for DatafusionQueryEngine {
_ctx: &mut QueryContext,
plan: &LogicalPlan,
) -> Result<LogicalPlan> {
let _timer = timer!(metric::METRIC_OPTIMIZE_LOGICAL_ELAPSED);
match plan {
LogicalPlan::DfPlan(df_plan) => {
let optimized_plan =
@@ -96,6 +100,7 @@ impl PhysicalPlanner for DatafusionQueryEngine {
_ctx: &mut QueryContext,
logical_plan: &LogicalPlan,
) -> Result<Arc<dyn PhysicalPlan>> {
let _timer = timer!(metric::METRIC_CREATE_PHYSICAL_ELAPSED);
match logical_plan {
LogicalPlan::DfPlan(df_plan) => {
let physical_plan = self
@@ -122,6 +127,7 @@ impl PhysicalOptimizer for DatafusionQueryEngine {
_ctx: &mut QueryContext,
plan: Arc<dyn PhysicalPlan>,
) -> Result<Arc<dyn PhysicalPlan>> {
let _timer = timer!(metric::METRIC_OPTIMIZE_PHYSICAL_ELAPSED);
let config = &self.state.df_context().state.lock().config;
let optimizers = &config.physical_optimizers;
@@ -150,6 +156,7 @@ impl QueryExecutor for DatafusionQueryEngine {
ctx: &QueryContext,
plan: &Arc<dyn PhysicalPlan>,
) -> Result<SendableRecordBatchStream> {
let _timer = timer!(metric::METRIC_EXEC_PLAN_ELAPSED);
match plan.output_partitioning().partition_count() {
0 => Ok(Box::pin(EmptyRecordBatchStream::new(plan.schema()))),
1 => Ok(plan.execute(&ctx.state().runtime(), 0).await?),

View File

@@ -4,6 +4,7 @@ mod datafusion;
pub mod error;
pub mod executor;
pub mod logical_optimizer;
mod metric;
pub mod physical_optimizer;
pub mod physical_planner;
pub mod plan;

7
src/query/src/metric.rs Normal file
View File

@@ -0,0 +1,7 @@
//! query engine metrics
pub static METRIC_PARSE_SQL_ELAPSED: &str = "query.parse_sql_elapsed";
pub static METRIC_OPTIMIZE_LOGICAL_ELAPSED: &str = "query.optimize_logicalplan_elapsed";
pub static METRIC_OPTIMIZE_PHYSICAL_ELAPSED: &str = "query.optimize_physicalplan_elapsed";
pub static METRIC_CREATE_PHYSICAL_ELAPSED: &str = "query.create_physicalplan_elapsed";
pub static METRIC_EXEC_PLAN_ELAPSED: &str = "query.execute_plan_elapsed";

View File

@@ -14,6 +14,7 @@ use table::table::numbers::NumbersTable;
#[tokio::test]
async fn test_datafusion_query_engine() -> Result<()> {
common_telemetry::init_default_ut_logging();
let catalog_list = memory::new_memory_catalog_list()?;
let factory = QueryEngineFactory::new(catalog_list);
let engine = factory.query_engine();