From 5c19913a910b38d55c66d43609d77961b662f268 Mon Sep 17 00:00:00 2001 From: LFC Date: Thu, 10 Aug 2023 16:08:37 +0800 Subject: [PATCH] build: on windows (#2054) * build on windows * rebase develop * fix: resolve PR comments --- .config/nextest.toml | 1 + .github/workflows/develop.yml | 102 +++++++------- Cargo.lock | 84 ++++++++---- Cargo.toml | 1 - scripts/fetch-dashboard-assets.sh | 4 +- src/cmd/Cargo.toml | 8 +- src/cmd/src/bin/greptime.rs | 1 + src/common/datasource/src/file_format/csv.rs | 8 +- src/common/datasource/src/file_format/json.rs | 8 +- src/common/datasource/src/file_format/orc.rs | 8 +- .../datasource/src/file_format/parquet.rs | 6 +- .../datasource/src/file_format/tests.rs | 13 +- src/common/datasource/src/test_util.rs | 33 +++-- src/common/datasource/src/tests.rs | 42 ++---- src/common/datasource/src/util.rs | 11 ++ src/common/mem-prof/Cargo.toml | 6 +- src/common/mem-prof/src/error.rs | 39 ++---- src/common/mem-prof/src/jemalloc.rs | 76 +++++++++++ src/common/mem-prof/src/jemalloc/error.rs | 73 ++++++++++ src/common/mem-prof/src/lib.rs | 62 +-------- src/common/pprof/Cargo.toml | 16 --- src/common/test-util/src/lib.rs | 43 ++++++ src/common/time/src/datetime.rs | 12 +- src/common/time/src/time.rs | 7 +- src/common/time/src/timestamp.rs | 25 +--- src/common/time/src/timezone.rs | 9 +- src/common/time/src/util.rs | 32 +++++ src/log-store/Cargo.toml | 2 +- src/log-store/src/test_util/log_store_util.rs | 6 +- src/meta-srv/src/metasrv.rs | 2 +- src/mito2/src/test_util.rs | 18 ++- src/query/src/error.rs | 10 +- src/query/src/parser.rs | 57 +++++--- src/script/src/python/pyo3/vector_impl.rs | 2 +- src/servers/Cargo.toml | 12 +- src/servers/build.rs | 37 ++++- src/servers/src/error.rs | 9 +- src/servers/src/http/handler.rs | 8 +- src/servers/src/http/pprof.rs | 5 +- .../src/http}/pprof/README.md | 0 .../lib.rs => servers/src/http/pprof/nix.rs} | 0 src/servers/src/metrics.rs | 69 ++-------- src/servers/src/metrics/jemalloc.rs | 69 ++++++++++ src/servers/src/metrics/jemalloc/error.rs | 49 +++++++ src/servers/tests/mysql/mysql_server_test.rs | 3 +- src/storage/src/manifest/region.rs | 2 +- tests-integration/src/cluster.rs | 5 +- tests-integration/src/tests/instance_test.rs | 52 ++----- tests-integration/src/tests/test_util.rs | 20 ++- tests-integration/tests/http.rs | 24 +++- tests/runner/src/env.rs | 129 ++++++++++-------- tests/runner/src/main.rs | 7 +- 52 files changed, 815 insertions(+), 512 deletions(-) create mode 100644 src/common/mem-prof/src/jemalloc.rs create mode 100644 src/common/mem-prof/src/jemalloc/error.rs delete mode 100644 src/common/pprof/Cargo.toml rename src/{common => servers/src/http}/pprof/README.md (100%) rename src/{common/pprof/src/lib.rs => servers/src/http/pprof/nix.rs} (100%) create mode 100644 src/servers/src/metrics/jemalloc.rs create mode 100644 src/servers/src/metrics/jemalloc/error.rs diff --git a/.config/nextest.toml b/.config/nextest.toml index d5ac6dc652..1543a82394 100644 --- a/.config/nextest.toml +++ b/.config/nextest.toml @@ -1,2 +1,3 @@ [profile.default] slow-timeout = { period = "60s", terminate-after = 3, grace-period = "30s" } +retries = { backoff = "exponential", count = 3, delay = "10s", jitter = true } diff --git a/.github/workflows/develop.yml b/.github/workflows/develop.yml index f249087f41..ad624837cd 100644 --- a/.github/workflows/develop.yml +++ b/.github/workflows/develop.yml @@ -70,52 +70,13 @@ jobs: - name: Run taplo run: taplo format --check - # Use coverage to run test. - # test: - # name: Test Suite - # if: github.event.pull_request.draft == false - # runs-on: ubuntu-latest - # timeout-minutes: 60 - # steps: - # - uses: actions/checkout@v3 - # - name: Cache LLVM and Clang - # id: cache-llvm - # uses: actions/cache@v3 - # with: - # path: ./llvm - # key: llvm - # - uses: arduino/setup-protoc@v1 - # with: - # repo-token: ${{ secrets.GITHUB_TOKEN }} - # - uses: KyleMayes/install-llvm-action@v1 - # with: - # version: "14.0" - # cached: ${{ steps.cache-llvm.outputs.cache-hit }} - # - uses: dtolnay/rust-toolchain@master - # with: - # toolchain: ${{ env.RUST_TOOLCHAIN }} - # - name: Rust Cache - # uses: Swatinem/rust-cache@v2 - # - name: Cleanup disk - # uses: curoky/cleanup-disk-action@v2.0 - # with: - # retain: 'rust,llvm' - # - name: Install latest nextest release - # uses: taiki-e/install-action@nextest - # - name: Run tests - # run: cargo nextest run - # env: - # CARGO_BUILD_RUSTFLAGS: "-C link-arg=-fuse-ld=lld" - # RUST_BACKTRACE: 1 - # GT_S3_BUCKET: ${{ secrets.S3_BUCKET }} - # GT_S3_ACCESS_KEY_ID: ${{ secrets.S3_ACCESS_KEY_ID }} - # GT_S3_ACCESS_KEY: ${{ secrets.S3_ACCESS_KEY }} - # UNITTEST_LOG_DIR: "__unittest_logs" - sqlness: name: Sqlness Test if: github.event.pull_request.draft == false - runs-on: ubuntu-latest-8-cores + runs-on: ${{ matrix.os }} + strategy: + matrix: + os: [ ubuntu-latest-8-cores, windows-latest ] timeout-minutes: 60 steps: - uses: actions/checkout@v3 @@ -127,25 +88,14 @@ jobs: toolchain: ${{ env.RUST_TOOLCHAIN }} - name: Rust Cache uses: Swatinem/rust-cache@v2 - - name: Run etcd - run: | - ETCD_VER=v3.5.7 - DOWNLOAD_URL=https://github.com/etcd-io/etcd/releases/download - curl -L ${DOWNLOAD_URL}/${ETCD_VER}/etcd-${ETCD_VER}-linux-amd64.tar.gz -o /tmp/etcd-${ETCD_VER}-linux-amd64.tar.gz - mkdir -p /tmp/etcd-download - tar xzvf /tmp/etcd-${ETCD_VER}-linux-amd64.tar.gz -C /tmp/etcd-download --strip-components=1 - rm -f /tmp/etcd-${ETCD_VER}-linux-amd64.tar.gz - - sudo cp -a /tmp/etcd-download/etcd* /usr/local/bin/ - nohup etcd >/tmp/etcd.log 2>&1 & - name: Run sqlness - run: cargo sqlness && ls /tmp + run: cargo sqlness - name: Upload sqlness logs if: always() uses: actions/upload-artifact@v3 with: name: sqlness-logs - path: /tmp/greptime-*.log + path: ${{ runner.temp }}/greptime-*.log retention-days: 3 fmt: @@ -234,3 +184,43 @@ jobs: flags: rust fail_ci_if_error: false verbose: true + + test-on-windows: + if: github.event.pull_request.draft == false + runs-on: windows-latest-8-cores + timeout-minutes: 60 + steps: + - run: git config --global core.autocrlf false + - uses: actions/checkout@v3 + - uses: arduino/setup-protoc@v1 + with: + repo-token: ${{ secrets.GITHUB_TOKEN }} + - name: Install Rust toolchain + uses: dtolnay/rust-toolchain@master + with: + toolchain: ${{ env.RUST_TOOLCHAIN }} + components: llvm-tools-preview + - name: Rust Cache + uses: Swatinem/rust-cache@v2 + - name: Install Cargo Nextest + uses: taiki-e/install-action@nextest + - name: Install Python + uses: actions/setup-python@v4 + with: + python-version: '3.10' + - name: Install PyArrow Package + run: pip install pyarrow + - name: Install WSL distribution + uses: Vampire/setup-wsl@v2 + with: + distribution: Ubuntu-22.04 + - name: Running tests + run: cargo nextest run -F pyo3_backend,dashboard + env: + RUST_BACKTRACE: 1 + CARGO_INCREMENTAL: 0 + GT_S3_BUCKET: ${{ secrets.S3_BUCKET }} + GT_S3_ACCESS_KEY_ID: ${{ secrets.S3_ACCESS_KEY_ID }} + GT_S3_ACCESS_KEY: ${{ secrets.S3_ACCESS_KEY }} + GT_S3_REGION: ${{ secrets.S3_REGION }} + UNITTEST_LOG_DIR: "__unittest_logs" diff --git a/Cargo.lock b/Cargo.lock index 23a94b6f8d..f35dedf864 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1816,17 +1816,6 @@ dependencies = [ "tokio", ] -[[package]] -name = "common-pprof" -version = "0.3.2" -dependencies = [ - "common-error", - "pprof", - "prost", - "snafu", - "tokio", -] - [[package]] name = "common-procedure" version = "0.3.2" @@ -4943,9 +4932,9 @@ checksum = "ef53942eb7bf7ff43a617b3e2c1c4a5ecf5944a7c1bc12d7ee39bbb15e5c1519" [[package]] name = "linux-raw-sys" -version = "0.4.3" +version = "0.4.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "09fc20d2ca12cb9f044c93e3bd6d32d523e6e2ec3db4f7b2939cd99026ecd3f0" +checksum = "57bcfdad1b858c2db7c38303a6d2ad4dfaf5eb53dfeb0910128b2c26d6158503" [[package]] name = "lock_api" @@ -5845,13 +5834,13 @@ dependencies = [ [[package]] name = "num-derive" -version = "0.3.3" +version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "876a53fff98e03a936a674b29568b0e605f06b29372c2489ff4de23f1949743d" +checksum = "9e6a0fd4f737c707bd9086cc16c925f294943eb62eb71499e9fd4cf71f8b9f4e" dependencies = [ "proc-macro2", "quote", - "syn 1.0.109", + "syn 2.0.28", ] [[package]] @@ -7354,22 +7343,21 @@ dependencies = [ [[package]] name = "raft-engine" version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "67b66e735395b7ff12f3ebbb4794006aecb365c4c9a82141279b58b227ac3a8b" +source = "git+https://github.com/tikv/raft-engine.git?rev=2dcaf5beeea3d5de9ec9c7133a2451d00f508f52#2dcaf5beeea3d5de9ec9c7133a2451d00f508f52" dependencies = [ "byteorder", "crc32fast", "crossbeam", "fail", "fs2", - "hashbrown 0.12.3", + "hashbrown 0.14.0", "hex", "if_chain", "lazy_static", "libc", "log", "lz4-sys", - "nix 0.25.1", + "nix 0.26.2", "num-derive", "num-traits", "parking_lot 0.12.1", @@ -7377,10 +7365,11 @@ dependencies = [ "prometheus-static-metric", "protobuf", "rayon", + "rhai", "scopeguard", "serde", "serde_repr", - "strum 0.24.1", + "strum 0.25.0", "thiserror", ] @@ -7683,6 +7672,32 @@ dependencies = [ "bytemuck", ] +[[package]] +name = "rhai" +version = "1.15.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c2a11a646ef5d4e4a9d5cf80c7e4ecb20f9b1954292d5c5e6d6cbc8d33728ec" +dependencies = [ + "ahash 0.8.3", + "bitflags 1.3.2", + "instant", + "num-traits", + "rhai_codegen", + "smallvec", + "smartstring", +] + +[[package]] +name = "rhai_codegen" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "db74e3fdd29d969a0ec1f8e79171a6f0f71d0429293656901db382d248c4c021" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "ring" version = "0.16.20" @@ -7970,7 +7985,7 @@ dependencies = [ "bitflags 2.3.3", "errno 0.3.2", "libc", - "linux-raw-sys 0.4.3", + "linux-raw-sys 0.4.5", "windows-sys 0.48.0", ] @@ -8643,9 +8658,9 @@ checksum = "a3f0bf26fd526d2a95683cd0f87bf103b8539e2ca1ef48ce002d67aad59aa0b4" [[package]] name = "serde" -version = "1.0.179" +version = "1.0.180" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0a5bf42b8d227d4abf38a1ddb08602e229108a517cd4e5bb28f9c7eaafdce5c0" +checksum = "0ea67f183f058fe88a4e3ec6e2788e003840893b91bac4559cabedd00863b3ed" dependencies = [ "serde_derive", ] @@ -8662,9 +8677,9 @@ dependencies = [ [[package]] name = "serde_derive" -version = "1.0.179" +version = "1.0.180" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "741e124f5485c7e60c03b043f79f320bff3527f4bbf12cf3831750dc46a0ec2c" +checksum = "24e744d7782b686ab3b73267ef05697159cc0e5abbed3f47f9933165e5219036" dependencies = [ "proc-macro2", "quote", @@ -8794,7 +8809,6 @@ dependencies = [ "common-grpc", "common-grpc-expr", "common-mem-prof", - "common-pprof", "common-query", "common-recordbatch", "common-runtime", @@ -8829,6 +8843,7 @@ dependencies = [ "pgwire", "pin-project", "postgres-types", + "pprof", "promql-parser", "prost", "query", @@ -9040,6 +9055,17 @@ version = "1.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "62bb4feee49fdd9f707ef802e22365a35de4b7b299de4763d44bfea899442ff9" +[[package]] +name = "smartstring" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3fb72c633efbaa2dd666986505016c32c3044395ceaf881518399d2f4127ee29" +dependencies = [ + "autocfg", + "static_assertions", + "version_check", +] + [[package]] name = "snafu" version = "0.7.5" @@ -9795,9 +9821,9 @@ checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369" [[package]] name = "target-lexicon" -version = "0.12.10" +version = "0.12.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1d2faeef5759ab89935255b1a4cd98e0baf99d1085e37d36599c625dac49ae8e" +checksum = "9d0e916b1148c8e263850e1ebcbd046f333e0683c724876bb0da63ea4373dc8a" [[package]] name = "temp-env" diff --git a/Cargo.toml b/Cargo.toml index 3ff5ee0cdb..e322f66182 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,7 +18,6 @@ members = [ "src/common/meta", "src/common/procedure", "src/common/procedure-test", - "src/common/pprof", "src/common/query", "src/common/recordbatch", "src/common/runtime", diff --git a/scripts/fetch-dashboard-assets.sh b/scripts/fetch-dashboard-assets.sh index ac940bab93..0a405b93ba 100755 --- a/scripts/fetch-dashboard-assets.sh +++ b/scripts/fetch-dashboard-assets.sh @@ -2,14 +2,14 @@ # This script is used to download built dashboard assets from the "GreptimeTeam/dashboard" repository. -set -e +set -e -x declare -r SCRIPT_DIR=$(cd $(dirname ${0}) >/dev/null 2>&1 && pwd) declare -r ROOT_DIR=$(dirname ${SCRIPT_DIR}) declare -r STATIC_DIR="$ROOT_DIR/src/servers/dashboard" OUT_DIR="${1:-$SCRIPT_DIR}" -RELEASE_VERSION="$(cat $STATIC_DIR/VERSION)" +RELEASE_VERSION="$(cat $STATIC_DIR/VERSION | tr -d '\t\r\n ')" echo "Downloading assets to dir: $OUT_DIR" cd $OUT_DIR diff --git a/src/cmd/Cargo.toml b/src/cmd/Cargo.toml index 720048bd0d..037e151820 100644 --- a/src/cmd/Cargo.toml +++ b/src/cmd/Cargo.toml @@ -54,15 +54,19 @@ session = { workspace = true } snafu.workspace = true substrait = { workspace = true } table = { workspace = true } -tikv-jemallocator = "0.5" tokio.workspace = true +[target.'cfg(not(windows))'.dependencies] +tikv-jemallocator = "0.5" + [dev-dependencies] common-test-util = { workspace = true } -rexpect = "0.5" serde.workspace = true temp-env = "0.3" toml.workspace = true +[target.'cfg(not(windows))'.dev-dependencies] +rexpect = "0.5" + [build-dependencies] common-version = { workspace = true } diff --git a/src/cmd/src/bin/greptime.rs b/src/cmd/src/bin/greptime.rs index 1478c4570e..1be0982eec 100644 --- a/src/cmd/src/bin/greptime.rs +++ b/src/cmd/src/bin/greptime.rs @@ -187,6 +187,7 @@ fn log_env_flags() { } } +#[cfg(not(windows))] #[global_allocator] static ALLOC: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc; diff --git a/src/common/datasource/src/file_format/csv.rs b/src/common/datasource/src/file_format/csv.rs index 78058c251b..eebe86ac18 100644 --- a/src/common/datasource/src/file_format/csv.rs +++ b/src/common/datasource/src/file_format/csv.rs @@ -209,15 +209,19 @@ impl DfRecordBatchEncoder for csv::Writer { #[cfg(test)] mod tests { + use common_test_util::find_workspace_path; + use super::*; use crate::file_format::{ FileFormat, FORMAT_COMPRESSION_TYPE, FORMAT_DELIMITER, FORMAT_HAS_HEADER, FORMAT_SCHEMA_INFER_MAX_RECORD, }; - use crate::test_util::{self, format_schema, test_store}; + use crate::test_util::{format_schema, test_store}; fn test_data_root() -> String { - test_util::get_data_dir("tests/csv").display().to_string() + find_workspace_path("/src/common/datasource/tests/csv") + .display() + .to_string() } #[tokio::test] diff --git a/src/common/datasource/src/file_format/json.rs b/src/common/datasource/src/file_format/json.rs index 7e9777b02b..bcf162971e 100644 --- a/src/common/datasource/src/file_format/json.rs +++ b/src/common/datasource/src/file_format/json.rs @@ -167,12 +167,16 @@ impl DfRecordBatchEncoder for json::Writer { #[cfg(test)] mod tests { + use common_test_util::find_workspace_path; + use super::*; use crate::file_format::{FileFormat, FORMAT_COMPRESSION_TYPE, FORMAT_SCHEMA_INFER_MAX_RECORD}; - use crate::test_util::{self, format_schema, test_store}; + use crate::test_util::{format_schema, test_store}; fn test_data_root() -> String { - test_util::get_data_dir("tests/json").display().to_string() + find_workspace_path("/src/common/datasource/tests/json") + .display() + .to_string() } #[tokio::test] diff --git a/src/common/datasource/src/file_format/orc.rs b/src/common/datasource/src/file_format/orc.rs index 245e24e643..effbed98e9 100644 --- a/src/common/datasource/src/file_format/orc.rs +++ b/src/common/datasource/src/file_format/orc.rs @@ -188,12 +188,16 @@ impl FileOpener for OrcOpener { #[cfg(test)] mod tests { + use common_test_util::find_workspace_path; + use super::*; use crate::file_format::FileFormat; - use crate::test_util::{self, format_schema, test_store}; + use crate::test_util::{format_schema, test_store}; fn test_data_root() -> String { - test_util::get_data_dir("tests/orc").display().to_string() + find_workspace_path("/src/common/datasource/tests/orc") + .display() + .to_string() } #[tokio::test] diff --git a/src/common/datasource/src/file_format/parquet.rs b/src/common/datasource/src/file_format/parquet.rs index d24101f203..5e76255caf 100644 --- a/src/common/datasource/src/file_format/parquet.rs +++ b/src/common/datasource/src/file_format/parquet.rs @@ -158,11 +158,13 @@ impl ArrowWriterCloser for ArrowWriter { #[cfg(test)] mod tests { + use common_test_util::find_workspace_path; + use super::*; - use crate::test_util::{self, format_schema, test_store}; + use crate::test_util::{format_schema, test_store}; fn test_data_root() -> String { - test_util::get_data_dir("tests/parquet") + find_workspace_path("/src/common/datasource/tests/parquet") .display() .to_string() } diff --git a/src/common/datasource/src/file_format/tests.rs b/src/common/datasource/src/file_format/tests.rs index e95b74bb65..be8650c9d5 100644 --- a/src/common/datasource/src/file_format/tests.rs +++ b/src/common/datasource/src/file_format/tests.rs @@ -17,6 +17,7 @@ use std::collections::HashMap; use std::sync::Arc; use std::vec; +use common_test_util::find_workspace_path; use datafusion::assert_batches_eq; use datafusion::datasource::physical_plan::{FileOpener, FileScanConfig, FileStream, ParquetExec}; use datafusion::execution::context::TaskContext; @@ -71,7 +72,7 @@ async fn test_json_opener() { CompressionType::Uncompressed, ); - let path = &test_util::get_data_dir("tests/json/basic.json") + let path = &find_workspace_path("/src/common/datasource/tests/json/basic.json") .display() .to_string(); let tests = [ @@ -111,7 +112,7 @@ async fn test_csv_opener() { let store = test_store("/"); let schema = test_basic_schema(); - let path = &test_util::get_data_dir("tests/csv/basic.csv") + let path = &find_workspace_path("/src/common/datasource/tests/csv/basic.csv") .display() .to_string(); let csv_conf = CsvConfigBuilder::default() @@ -160,7 +161,7 @@ async fn test_parquet_exec() { let schema = test_basic_schema(); - let path = &test_util::get_data_dir("tests/parquet/basic.parquet") + let path = &find_workspace_path("/src/common/datasource/tests/parquet/basic.parquet") .display() .to_string(); let base_config = scan_config(schema.clone(), None, path); @@ -196,13 +197,15 @@ async fn test_parquet_exec() { #[tokio::test] async fn test_orc_opener() { - let root = test_util::get_data_dir("tests/orc").display().to_string(); + let root = find_workspace_path("/src/common/datasource/tests/orc") + .display() + .to_string(); let store = test_store(&root); let schema = OrcFormat.infer_schema(&store, "test.orc").await.unwrap(); let schema = Arc::new(schema); let orc_opener = OrcOpener::new(store.clone(), schema.clone(), None); - let path = &test_util::get_data_dir("/test.orc").display().to_string(); + let path = "test.orc"; let tests = [ Test { diff --git a/src/common/datasource/src/test_util.rs b/src/common/datasource/src/test_util.rs index 32be04deee..7e6abd2b9d 100644 --- a/src/common/datasource/src/test_util.rs +++ b/src/common/datasource/src/test_util.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::path::PathBuf; use std::sync::Arc; use arrow_schema::{DataType, Field, Schema, SchemaRef}; @@ -31,13 +30,6 @@ use crate::test_util; pub const TEST_BATCH_SIZE: usize = 100; -pub fn get_data_dir(path: &str) -> PathBuf { - // https://doc.rust-lang.org/cargo/reference/environment-variables.html - let dir = env!("CARGO_MANIFEST_DIR"); - - PathBuf::from(dir).join(path) -} - pub fn format_schema(schema: Schema) -> Vec { schema .fields() @@ -78,6 +70,9 @@ pub fn test_basic_schema() -> SchemaRef { } pub fn scan_config(file_schema: SchemaRef, limit: Option, filename: &str) -> FileScanConfig { + // object_store only recognize the Unix style path, so make it happy. + let filename = &filename.replace('\\', "/"); + FileScanConfig { object_store_url: ObjectStoreUrl::parse("empty://").unwrap(), // won't be used file_schema, @@ -124,12 +119,7 @@ pub async fn setup_stream_to_json_test(origin_path: &str, threshold: impl Fn(usi let written = tmp_store.read(&output_path).await.unwrap(); let origin = store.read(origin_path).await.unwrap(); - - // ignores `\n` - assert_eq!( - String::from_utf8_lossy(&written).trim_end_matches('\n'), - String::from_utf8_lossy(&origin).trim_end_matches('\n'), - ) + assert_eq_lines(written, origin); } pub async fn setup_stream_to_csv_test(origin_path: &str, threshold: impl Fn(usize) -> usize) { @@ -166,10 +156,19 @@ pub async fn setup_stream_to_csv_test(origin_path: &str, threshold: impl Fn(usiz let written = tmp_store.read(&output_path).await.unwrap(); let origin = store.read(origin_path).await.unwrap(); + assert_eq_lines(written, origin); +} - // ignores `\n` +// Ignore the CRLF difference across operating systems. +fn assert_eq_lines(written: Vec, origin: Vec) { assert_eq!( - String::from_utf8_lossy(&written).trim_end_matches('\n'), - String::from_utf8_lossy(&origin).trim_end_matches('\n'), + String::from_utf8(written) + .unwrap() + .lines() + .collect::>(), + String::from_utf8(origin) + .unwrap() + .lines() + .collect::>(), ) } diff --git a/src/common/datasource/src/tests.rs b/src/common/datasource/src/tests.rs index a1e70eb1ca..f6391028bf 100644 --- a/src/common/datasource/src/tests.rs +++ b/src/common/datasource/src/tests.rs @@ -12,50 +12,36 @@ // See the License for the specific language governing permissions and // limitations under the License. +use common_test_util::find_workspace_path; + use crate::test_util; #[tokio::test] async fn test_stream_to_json() { + let origin_path = &find_workspace_path("/src/common/datasource/tests/json/basic.json") + .display() + .to_string(); + // A small threshold // Triggers the flush each writes - test_util::setup_stream_to_json_test( - &test_util::get_data_dir("tests/json/basic.json") - .display() - .to_string(), - |size| size / 2, - ) - .await; + test_util::setup_stream_to_json_test(origin_path, |size| size / 2).await; // A large threshold // Only triggers the flush at last - test_util::setup_stream_to_json_test( - &test_util::get_data_dir("tests/json/basic.json") - .display() - .to_string(), - |size| size * 2, - ) - .await; + test_util::setup_stream_to_json_test(origin_path, |size| size * 2).await; } #[tokio::test] async fn test_stream_to_csv() { + let origin_path = &find_workspace_path("/src/common/datasource/tests/csv/basic.csv") + .display() + .to_string(); + // A small threshold // Triggers the flush each writes - test_util::setup_stream_to_csv_test( - &test_util::get_data_dir("tests/csv/basic.csv") - .display() - .to_string(), - |size| size / 2, - ) - .await; + test_util::setup_stream_to_csv_test(origin_path, |size| size / 2).await; // A large threshold // Only triggers the flush at last - test_util::setup_stream_to_csv_test( - &test_util::get_data_dir("tests/csv/basic.csv") - .display() - .to_string(), - |size| size * 2, - ) - .await; + test_util::setup_stream_to_csv_test(origin_path, |size| size * 2).await; } diff --git a/src/common/datasource/src/util.rs b/src/common/datasource/src/util.rs index 870fe376ce..fc0340b148 100644 --- a/src/common/datasource/src/util.rs +++ b/src/common/datasource/src/util.rs @@ -66,6 +66,7 @@ mod tests { } } + #[cfg(not(windows))] #[test] fn test_parse_path_and_dir() { let parsed = Url::from_file_path("/to/path/file").unwrap(); @@ -75,6 +76,16 @@ mod tests { assert_eq!(parsed.path(), "/to/path/"); } + #[cfg(windows)] + #[test] + fn test_parse_path_and_dir() { + let parsed = Url::from_file_path("C:\\to\\path\\file").unwrap(); + assert_eq!(parsed.path(), "/C:/to/path/file"); + + let parsed = Url::from_directory_path("C:\\to\\path\\").unwrap(); + assert_eq!(parsed.path(), "/C:/to/path/"); + } + #[test] fn test_find_dir_and_filename() { struct Test<'a> { diff --git a/src/common/mem-prof/Cargo.toml b/src/common/mem-prof/Cargo.toml index 413a3f3bac..211f941cab 100644 --- a/src/common/mem-prof/Cargo.toml +++ b/src/common/mem-prof/Cargo.toml @@ -8,9 +8,11 @@ license.workspace = true common-error = { workspace = true } snafu.workspace = true tempfile = "3.4" -tikv-jemalloc-ctl = { version = "0.5", features = ["use_std"] } tokio.workspace = true -[dependencies.tikv-jemalloc-sys] +[target.'cfg(not(windows))'.dependencies] +tikv-jemalloc-ctl = { version = "0.5", features = ["use_std"] } + +[target.'cfg(not(windows))'.dependencies.tikv-jemalloc-sys] features = ["stats", "profiling", "unprefixed_malloc_on_supported_platforms"] version = "0.5" diff --git a/src/common/mem-prof/src/error.rs b/src/common/mem-prof/src/error.rs index df158e8dcb..944ae25875 100644 --- a/src/common/mem-prof/src/error.rs +++ b/src/common/mem-prof/src/error.rs @@ -13,51 +13,28 @@ // limitations under the License. use std::any::Any; -use std::path::PathBuf; -use common_error::ext::ErrorExt; +use common_error::ext::{BoxedError, ErrorExt}; use common_error::status_code::StatusCode; -use snafu::{Location, Snafu}; +use snafu::Snafu; pub type Result = std::result::Result; #[derive(Debug, Snafu)] #[snafu(visibility(pub))] pub enum Error { - #[snafu(display("Failed to read OPT_PROF, source: {}", source))] - ReadOptProf { source: tikv_jemalloc_ctl::Error }, + #[snafu(display("{source}"))] + Internal { source: BoxedError }, - #[snafu(display("Memory profiling is not enabled"))] - ProfilingNotEnabled, - - #[snafu(display("Failed to build temp file from given path: {:?}", path))] - BuildTempPath { path: PathBuf, location: Location }, - - #[snafu(display("Failed to open temp file: {}, source: {}", path, source))] - OpenTempFile { - path: String, - source: std::io::Error, - }, - - #[snafu(display( - "Failed to dump profiling data to temp file: {:?}, source: {}", - path, - source - ))] - DumpProfileData { - path: PathBuf, - source: tikv_jemalloc_ctl::Error, - }, + #[snafu(display("Memory profiling is not supported"))] + ProfilingNotSupported, } impl ErrorExt for Error { fn status_code(&self) -> StatusCode { match self { - Error::ReadOptProf { .. } => StatusCode::Internal, - Error::ProfilingNotEnabled => StatusCode::InvalidArguments, - Error::BuildTempPath { .. } => StatusCode::Internal, - Error::OpenTempFile { .. } => StatusCode::StorageUnavailable, - Error::DumpProfileData { .. } => StatusCode::StorageUnavailable, + Error::Internal { source } => source.status_code(), + Error::ProfilingNotSupported => StatusCode::Unsupported, } } diff --git a/src/common/mem-prof/src/jemalloc.rs b/src/common/mem-prof/src/jemalloc.rs new file mode 100644 index 0000000000..a369f7b784 --- /dev/null +++ b/src/common/mem-prof/src/jemalloc.rs @@ -0,0 +1,76 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod error; + +use std::ffi::{c_char, CString}; +use std::path::PathBuf; + +use error::{ + BuildTempPathSnafu, DumpProfileDataSnafu, OpenTempFileSnafu, ProfilingNotEnabledSnafu, + ReadOptProfSnafu, +}; +use snafu::{ensure, ResultExt}; +use tokio::io::AsyncReadExt; + +use crate::error::Result; + +const PROF_DUMP: &[u8] = b"prof.dump\0"; +const OPT_PROF: &[u8] = b"opt.prof\0"; + +pub async fn dump_profile() -> Result> { + ensure!(is_prof_enabled()?, ProfilingNotEnabledSnafu); + let tmp_path = tempfile::tempdir().map_err(|_| { + BuildTempPathSnafu { + path: std::env::temp_dir(), + } + .build() + })?; + + let mut path_buf = PathBuf::from(tmp_path.path()); + path_buf.push("greptimedb.hprof"); + + let path = path_buf + .to_str() + .ok_or_else(|| BuildTempPathSnafu { path: &path_buf }.build())? + .to_string(); + + let mut bytes = CString::new(path.as_str()) + .map_err(|_| BuildTempPathSnafu { path: &path_buf }.build())? + .into_bytes_with_nul(); + + { + // #safety: we always expect a valid temp file path to write profiling data to. + let ptr = bytes.as_mut_ptr() as *mut c_char; + unsafe { + tikv_jemalloc_ctl::raw::write(PROF_DUMP, ptr) + .context(DumpProfileDataSnafu { path: path_buf })? + } + } + + let mut f = tokio::fs::File::open(path.as_str()) + .await + .context(OpenTempFileSnafu { path: &path })?; + let mut buf = vec![]; + let _ = f + .read_to_end(&mut buf) + .await + .context(OpenTempFileSnafu { path })?; + Ok(buf) +} + +fn is_prof_enabled() -> Result { + // safety: OPT_PROF variable, if present, is always a boolean value. + Ok(unsafe { tikv_jemalloc_ctl::raw::read::(OPT_PROF).context(ReadOptProfSnafu)? }) +} diff --git a/src/common/mem-prof/src/jemalloc/error.rs b/src/common/mem-prof/src/jemalloc/error.rs new file mode 100644 index 0000000000..e85f0a4d6a --- /dev/null +++ b/src/common/mem-prof/src/jemalloc/error.rs @@ -0,0 +1,73 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; +use std::path::PathBuf; + +use common_error::ext::{BoxedError, ErrorExt}; +use common_error::status_code::StatusCode; +use snafu::{Location, Snafu}; + +#[derive(Debug, Snafu)] +#[snafu(visibility(pub))] +pub enum Error { + #[snafu(display("Failed to read OPT_PROF, source: {}", source))] + ReadOptProf { source: tikv_jemalloc_ctl::Error }, + + #[snafu(display("Memory profiling is not enabled"))] + ProfilingNotEnabled, + + #[snafu(display("Failed to build temp file from given path: {:?}", path))] + BuildTempPath { path: PathBuf, location: Location }, + + #[snafu(display("Failed to open temp file: {}, source: {}", path, source))] + OpenTempFile { + path: String, + source: std::io::Error, + }, + + #[snafu(display( + "Failed to dump profiling data to temp file: {:?}, source: {}", + path, + source + ))] + DumpProfileData { + path: PathBuf, + source: tikv_jemalloc_ctl::Error, + }, +} + +impl ErrorExt for Error { + fn status_code(&self) -> StatusCode { + match self { + Error::ReadOptProf { .. } => StatusCode::Internal, + Error::ProfilingNotEnabled => StatusCode::InvalidArguments, + Error::BuildTempPath { .. } => StatusCode::Internal, + Error::OpenTempFile { .. } => StatusCode::StorageUnavailable, + Error::DumpProfileData { .. } => StatusCode::StorageUnavailable, + } + } + + fn as_any(&self) -> &dyn Any { + self + } +} + +impl From for crate::error::Error { + fn from(e: Error) -> Self { + Self::Internal { + source: BoxedError::new(e), + } + } +} diff --git a/src/common/mem-prof/src/lib.rs b/src/common/mem-prof/src/lib.rs index de982ec876..6f08d71ee3 100644 --- a/src/common/mem-prof/src/lib.rs +++ b/src/common/mem-prof/src/lib.rs @@ -14,62 +14,12 @@ pub mod error; -use std::ffi::{c_char, CString}; -use std::path::PathBuf; - -use snafu::{ensure, ResultExt}; -use tokio::io::AsyncReadExt; - -use crate::error::{ - BuildTempPathSnafu, DumpProfileDataSnafu, OpenTempFileSnafu, ProfilingNotEnabledSnafu, - ReadOptProfSnafu, -}; - -const PROF_DUMP: &[u8] = b"prof.dump\0"; -const OPT_PROF: &[u8] = b"opt.prof\0"; +#[cfg(not(windows))] +mod jemalloc; +#[cfg(not(windows))] +pub use jemalloc::dump_profile; +#[cfg(windows)] pub async fn dump_profile() -> error::Result> { - ensure!(is_prof_enabled()?, ProfilingNotEnabledSnafu); - let tmp_path = tempfile::tempdir().map_err(|_| { - BuildTempPathSnafu { - path: std::env::temp_dir(), - } - .build() - })?; - - let mut path_buf = PathBuf::from(tmp_path.path()); - path_buf.push("greptimedb.hprof"); - - let path = path_buf - .to_str() - .ok_or_else(|| BuildTempPathSnafu { path: &path_buf }.build())? - .to_string(); - - let mut bytes = CString::new(path.as_str()) - .map_err(|_| BuildTempPathSnafu { path: &path_buf }.build())? - .into_bytes_with_nul(); - - { - // #safety: we always expect a valid temp file path to write profiling data to. - let ptr = bytes.as_mut_ptr() as *mut c_char; - unsafe { - tikv_jemalloc_ctl::raw::write(PROF_DUMP, ptr) - .context(DumpProfileDataSnafu { path: path_buf })? - } - } - - let mut f = tokio::fs::File::open(path.as_str()) - .await - .context(OpenTempFileSnafu { path: &path })?; - let mut buf = vec![]; - let _ = f - .read_to_end(&mut buf) - .await - .context(OpenTempFileSnafu { path })?; - Ok(buf) -} - -fn is_prof_enabled() -> error::Result { - // safety: OPT_PROF variable, if present, is always a boolean value. - Ok(unsafe { tikv_jemalloc_ctl::raw::read::(OPT_PROF).context(ReadOptProfSnafu)? }) + error::ProfilingNotSupportedSnafu.fail() } diff --git a/src/common/pprof/Cargo.toml b/src/common/pprof/Cargo.toml deleted file mode 100644 index 74a2b848fd..0000000000 --- a/src/common/pprof/Cargo.toml +++ /dev/null @@ -1,16 +0,0 @@ -[package] -name = "common-pprof" -version.workspace = true -edition.workspace = true -license.workspace = true - -[dependencies] -common-error = { workspace = true } -pprof = { version = "0.11", features = [ - "flamegraph", - "prost-codec", - "protobuf", -] } -prost.workspace = true -snafu.workspace = true -tokio.workspace = true diff --git a/src/common/test-util/src/lib.rs b/src/common/test-util/src/lib.rs index 90d7529d12..ef6ff46968 100644 --- a/src/common/test-util/src/lib.rs +++ b/src/common/test-util/src/lib.rs @@ -12,5 +12,48 @@ // See the License for the specific language governing permissions and // limitations under the License. +#![feature(lazy_cell)] + +use std::path::{Path, PathBuf}; +use std::process::Command; +use std::sync::LazyLock; + pub mod ports; pub mod temp_dir; + +// Rust is working on an env possibly named `CARGO_WORKSPACE_DIR` to find the root path to the +// workspace, see https://github.com/rust-lang/cargo/issues/3946. +// Until then, use this verbose way. +static WORKSPACE_ROOT: LazyLock = LazyLock::new(|| { + let output = Command::new(env!("CARGO")) + .args(["locate-project", "--workspace", "--message-format=plain"]) + .output() + .unwrap() + .stdout; + let cargo_path = Path::new(std::str::from_utf8(&output).unwrap().trim()); + cargo_path.parent().unwrap().to_path_buf() +}); + +/// Find the absolute path to a file or a directory in the workspace. +/// The input `path` should be the relative path of the file or directory from workspace root. +/// +/// For example, if the greptimedb project is placed under directory "/foo/bar/greptimedb/", +/// and this function is invoked with path = "/src/common/test-util/src/lib.rs", you will get the +/// absolute path to this file. +/// +/// The return value is [PathBuf]. This is to adapt the Windows file system's style. +/// However, the input argument is Unix style, this is to give user the most convenience. +pub fn find_workspace_path(path: &str) -> PathBuf { + let mut buf = WORKSPACE_ROOT.clone(); + + // Manually "canonicalize" to avoid annoy Windows specific "\\?" path prefix. + path.split('/').for_each(|x| { + if x == ".." { + buf.pop(); + } else if x != "." { + buf = buf.join(x); + } + }); + + buf +} diff --git a/src/common/time/src/datetime.rs b/src/common/time/src/datetime.rs index 0e51aa5c8e..426d3f1069 100644 --- a/src/common/time/src/datetime.rs +++ b/src/common/time/src/datetime.rs @@ -15,10 +15,11 @@ use std::fmt::{Display, Formatter}; use std::str::FromStr; -use chrono::{Local, LocalResult, NaiveDateTime, TimeZone}; +use chrono::{LocalResult, NaiveDateTime}; use serde::{Deserialize, Serialize}; use crate::error::{Error, InvalidDateStrSnafu, Result}; +use crate::util::{format_utc_datetime, local_datetime_to_utc}; const DATETIME_FORMAT: &str = "%F %T"; const DATETIME_FORMAT_WITH_TZ: &str = "%F %T%z"; @@ -35,9 +36,7 @@ impl Display for DateTime { write!( f, "{}", - Local {} - .from_utc_datetime(&abs_time) - .format(DATETIME_FORMAT_WITH_TZ) + format_utc_datetime(&abs_time, DATETIME_FORMAT_WITH_TZ) ) } else { write!(f, "DateTime({})", self.0) @@ -56,13 +55,12 @@ impl FromStr for DateTime { fn from_str(s: &str) -> Result { let s = s.trim(); - let local = Local {}; let timestamp = if let Ok(d) = NaiveDateTime::parse_from_str(s, DATETIME_FORMAT) { - match local.from_local_datetime(&d) { + match local_datetime_to_utc(&d) { LocalResult::None => { return InvalidDateStrSnafu { raw: s }.fail(); } - LocalResult::Single(d) | LocalResult::Ambiguous(d, _) => d.naive_utc().timestamp(), + LocalResult::Single(d) | LocalResult::Ambiguous(d, _) => d.timestamp(), } } else if let Ok(v) = chrono::DateTime::parse_from_str(s, DATETIME_FORMAT_WITH_TZ) { v.timestamp() diff --git a/src/common/time/src/time.rs b/src/common/time/src/time.rs index 4ee3278c16..7400e22c0f 100644 --- a/src/common/time/src/time.rs +++ b/src/common/time/src/time.rs @@ -15,12 +15,12 @@ use std::cmp::Ordering; use std::hash::{Hash, Hasher}; -use chrono::offset::Local; use chrono::{NaiveDateTime, NaiveTime, TimeZone as ChronoTimeZone, Utc}; use serde::{Deserialize, Serialize}; use crate::timestamp::TimeUnit; use crate::timezone::TimeZone; +use crate::util::format_utc_datetime; /// Time value, represents the elapsed time since midnight in the unit of `TimeUnit`. #[derive(Debug, Clone, Default, Copy, Serialize, Deserialize)] @@ -119,10 +119,7 @@ impl Time { Some(TimeZone::Named(tz)) => { format!("{}", tz.from_utc_datetime(&datetime).format(pattern)) } - None => { - let local = Local {}; - format!("{}", local.from_utc_datetime(&datetime).format(pattern)) - } + None => format_utc_datetime(&datetime, pattern), } } else { format!("[Time{}: {}]", self.unit, self.value) diff --git a/src/common/time/src/timestamp.rs b/src/common/time/src/timestamp.rs index c7c64ead15..a7fd00a2da 100644 --- a/src/common/time/src/timestamp.rs +++ b/src/common/time/src/timestamp.rs @@ -20,7 +20,6 @@ use std::str::FromStr; use std::time::Duration; use arrow::datatypes::TimeUnit as ArrowTimeUnit; -use chrono::offset::Local; use chrono::{DateTime, LocalResult, NaiveDateTime, TimeZone as ChronoTimeZone, Utc}; use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt}; @@ -28,7 +27,7 @@ use snafu::{OptionExt, ResultExt}; use crate::error; use crate::error::{ArithmeticOverflowSnafu, Error, ParseTimestampSnafu, TimestampOverflowSnafu}; use crate::timezone::TimeZone; -use crate::util::div_ceil; +use crate::util::{div_ceil, format_utc_datetime, local_datetime_to_utc}; #[derive(Debug, Clone, Default, Copy, Serialize, Deserialize)] pub struct Timestamp { @@ -195,10 +194,7 @@ impl Timestamp { Some(TimeZone::Named(tz)) => { format!("{}", tz.from_utc_datetime(&v).format(pattern)) } - None => { - let local = Local {}; - format!("{}", local.from_utc_datetime(&v).format(pattern)) - } + None => format_utc_datetime(&v, pattern), } } else { format!("[Timestamp{}: {}]", self.unit, self.value) @@ -265,18 +261,11 @@ fn naive_datetime_to_timestamp( s: &str, datetime: NaiveDateTime, ) -> crate::error::Result { - let l = Local {}; - - match l.from_local_datetime(&datetime) { + match local_datetime_to_utc(&datetime) { LocalResult::None => ParseTimestampSnafu { raw: s }.fail(), - LocalResult::Single(local_datetime) => Ok(Timestamp::new( - local_datetime.with_timezone(&Utc).timestamp_nanos(), - TimeUnit::Nanosecond, - )), - LocalResult::Ambiguous(local_datetime, _) => Ok(Timestamp::new( - local_datetime.with_timezone(&Utc).timestamp_nanos(), - TimeUnit::Nanosecond, - )), + LocalResult::Single(utc) | LocalResult::Ambiguous(utc, _) => { + Ok(Timestamp::new(utc.timestamp_nanos(), TimeUnit::Nanosecond)) + } } } @@ -421,7 +410,7 @@ impl Hash for Timestamp { mod tests { use std::collections::hash_map::DefaultHasher; - use chrono::Offset; + use chrono::{Local, Offset}; use rand::Rng; use serde_json::Value; diff --git a/src/common/time/src/timezone.rs b/src/common/time/src/timezone.rs index 6feba8b251..87415d57d6 100644 --- a/src/common/time/src/timezone.rs +++ b/src/common/time/src/timezone.rs @@ -15,13 +15,14 @@ use std::fmt::Display; use std::str::FromStr; -use chrono::{FixedOffset, Local}; +use chrono::{FixedOffset, Local, Offset}; use chrono_tz::Tz; use snafu::{OptionExt, ResultExt}; use crate::error::{ InvalidTimeZoneOffsetSnafu, ParseOffsetStrSnafu, ParseTimeZoneNameSnafu, Result, }; +use crate::util::find_tz_from_env; #[derive(Debug, Clone, PartialEq, Eq)] pub enum TimeZone { @@ -87,7 +88,11 @@ impl Display for TimeZone { #[inline] pub fn system_time_zone_name() -> String { - Local::now().offset().to_string() + if let Some(tz) = find_tz_from_env() { + Local::now().with_timezone(&tz).offset().fix().to_string() + } else { + Local::now().offset().to_string() + } } #[cfg(test)] diff --git a/src/common/time/src/util.rs b/src/common/time/src/util.rs index f2d1feeee1..8d1f870cb4 100644 --- a/src/common/time/src/util.rs +++ b/src/common/time/src/util.rs @@ -12,6 +12,38 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::str::FromStr; + +use chrono::offset::Local; +use chrono::{LocalResult, NaiveDateTime, TimeZone}; +use chrono_tz::Tz; + +pub fn format_utc_datetime(utc: &NaiveDateTime, pattern: &str) -> String { + if let Some(tz) = find_tz_from_env() { + format!("{}", tz.from_utc_datetime(utc).format(pattern)) + } else { + format!("{}", Local.from_utc_datetime(utc).format(pattern)) + } +} + +pub fn local_datetime_to_utc(local: &NaiveDateTime) -> LocalResult { + if let Some(tz) = find_tz_from_env() { + tz.from_local_datetime(local).map(|x| x.naive_utc()) + } else { + Local.from_local_datetime(local).map(|x| x.naive_utc()) + } +} + +pub fn find_tz_from_env() -> Option { + // Windows does not support "TZ" env variable, which is used in the `Local` timezone under Unix. + // However, we are used to set "TZ" env as the default timezone without actually providing a + // timezone argument (especially in tests), and it's very convenient to do so, we decide to make + // it work under Windows as well. + std::env::var("TZ") + .ok() + .and_then(|tz| Tz::from_str(&tz).ok()) +} + /// Returns the time duration since UNIX_EPOCH in milliseconds. pub fn current_time_millis() -> i64 { chrono::Utc::now().timestamp_millis() diff --git a/src/log-store/Cargo.toml b/src/log-store/Cargo.toml index 574faffea7..340d201eab 100644 --- a/src/log-store/Cargo.toml +++ b/src/log-store/Cargo.toml @@ -22,7 +22,7 @@ common-telemetry = { workspace = true } futures-util.workspace = true futures.workspace = true protobuf = { version = "2", features = ["bytes"] } -raft-engine = "0.3" +raft-engine = { git = "https://github.com/tikv/raft-engine.git", rev = "2dcaf5beeea3d5de9ec9c7133a2451d00f508f52" } snafu = { version = "0.7", features = ["backtraces"] } store-api = { workspace = true } tokio-util.workspace = true diff --git a/src/log-store/src/test_util/log_store_util.rs b/src/log-store/src/test_util/log_store_util.rs index c1ec201f1d..e97b9cb1ce 100644 --- a/src/log-store/src/test_util/log_store_util.rs +++ b/src/log-store/src/test_util/log_store_util.rs @@ -12,14 +12,16 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::path::Path; + use crate::raft_engine::log_store::RaftEngineLogStore; use crate::LogConfig; /// Create a write log for the provided path, used for test. -pub async fn create_tmp_local_file_log_store(path: &str) -> RaftEngineLogStore { +pub async fn create_tmp_local_file_log_store>(path: P) -> RaftEngineLogStore { let cfg = LogConfig { file_size: 128 * 1024, - log_file_dir: path.to_string(), + log_file_dir: path.as_ref().display().to_string(), ..Default::default() }; RaftEngineLogStore::try_new(cfg).await.unwrap() diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index 4097887f00..6e6a6961b3 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -88,7 +88,7 @@ impl MetaSrvOptions { // Options for datanode. #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)] pub struct DatanodeOptions { - client_options: DatanodeClientOptions, + pub client_options: DatanodeClientOptions, } // Options for datanode client. diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index 3c56c50a9f..68b411c7ae 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -23,7 +23,6 @@ use datatypes::schema::ColumnSchema; use log_store::raft_engine::log_store::RaftEngineLogStore; use log_store::test_util::log_store_util; use object_store::services::Fs; -use object_store::util::join_dir; use object_store::ObjectStore; use store_api::storage::RegionId; @@ -79,9 +78,9 @@ impl TestEnv { } async fn create_log_and_object_store(&self) -> (RaftEngineLogStore, ObjectStore) { - let data_home = self.data_home.path().to_str().unwrap(); - let wal_path = join_dir(data_home, "wal"); - let data_path = join_dir(data_home, "data"); + let data_home = self.data_home.path(); + let wal_path = data_home.join("wal"); + let data_path = data_home.join("data").as_path().display().to_string(); let log_store = log_store_util::create_tmp_local_file_log_store(&wal_path).await; let mut builder = Fs::default(); @@ -99,13 +98,20 @@ impl TestEnv { checkpoint_interval: u64, initial_metadata: Option, ) -> Result> { - let data_home = self.data_home.path().to_str().unwrap(); - let manifest_dir = join_dir(data_home, "manifest"); + let data_home = self.data_home.path(); + let manifest_dir = data_home.join("manifest").as_path().display().to_string(); let mut builder = Fs::default(); builder.root(&manifest_dir); let object_store = ObjectStore::new(builder).unwrap().finish(); + // The "manifest_dir" here should be the relative path from the `object_store`'s root. + // Otherwise the OpenDal's list operation would fail with "StripPrefixError". This is + // because the `object_store`'s root path is "canonicalize"d; and under the Windows, + // canonicalize a path will prepend "\\?\" to it. This behavior will cause the original + // happen-to-be-working list on an absolute path failed on Windows. + let manifest_dir = "/".to_string(); + let manifest_opts = RegionManifestOptions { manifest_dir, object_store, diff --git a/src/query/src/error.rs b/src/query/src/error.rs index 2d38ba15e4..e3402c8b5b 100644 --- a/src/query/src/error.rs +++ b/src/query/src/error.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::any::Any; +use std::time::Duration; use common_error::ext::{BoxedError, ErrorExt}; use common_error::status_code::StatusCode; @@ -234,6 +235,12 @@ pub enum Error { location ))] TimeIndexNotFound { table: String, location: Location }, + + #[snafu(display("Failed to add duration '{:?}' to SystemTime, overflowed", duration))] + AddSystemTimeOverflow { + duration: Duration, + location: Location, + }, } impl ErrorExt for Error { @@ -253,7 +260,8 @@ impl ErrorExt for Error { | ParseFloat { .. } | MissingRequiredField { .. } | BuildRegex { .. } - | ConvertSchema { .. } => StatusCode::InvalidArguments, + | ConvertSchema { .. } + | AddSystemTimeOverflow { .. } => StatusCode::InvalidArguments, BuildBackend { .. } | ListObjects { .. } => StatusCode::StorageUnavailable, EncodeSubstraitLogicalPlan { source, .. } => source.status_code(), diff --git a/src/query/src/parser.rs b/src/query/src/parser.rs index 15d7ad6ff0..a30f68392d 100644 --- a/src/query/src/parser.rs +++ b/src/query/src/parser.rs @@ -24,14 +24,14 @@ use common_telemetry::timer; use promql_parser::parser::ast::{Extension as NodeExtension, ExtensionExpr}; use promql_parser::parser::Expr::Extension; use promql_parser::parser::{EvalStmt, Expr, ValueType}; -use snafu::ResultExt; +use snafu::{OptionExt, ResultExt}; use sql::dialect::GreptimeDbDialect; use sql::parser::ParserContext; use sql::statements::statement::Statement; use crate::error::{ - MultipleStatementsSnafu, ParseFloatSnafu, ParseTimestampSnafu, QueryParseSnafu, Result, - UnimplementedSnafu, + AddSystemTimeOverflowSnafu, MultipleStatementsSnafu, ParseFloatSnafu, ParseTimestampSnafu, + QueryParseSnafu, Result, UnimplementedSnafu, }; use crate::metrics::{METRIC_PARSE_PROMQL_ELAPSED, METRIC_PARSE_SQL_ELAPSED}; @@ -177,24 +177,17 @@ impl QueryLanguageParser { } // try float format - timestamp + let secs = timestamp .parse::() .context(ParseFloatSnafu { raw: timestamp }) - .map(|float| { - let duration = Duration::from_secs_f64(float); - SystemTime::UNIX_EPOCH - .checked_add(duration) - .unwrap_or(max_system_timestamp()) - }) // also report rfc3339 error if float parsing fails - .map_err(|_| rfc3339_result.unwrap_err()) - } -} + .map_err(|_| rfc3339_result.unwrap_err())?; -fn max_system_timestamp() -> SystemTime { - SystemTime::UNIX_EPOCH - .checked_add(Duration::from_secs(std::i64::MAX as u64)) - .unwrap() + let duration = Duration::from_secs_f64(secs); + SystemTime::UNIX_EPOCH + .checked_add(duration) + .context(AddSystemTimeOverflowSnafu { duration }) + } } macro_rules! define_node_ast_extension { @@ -291,11 +284,6 @@ mod test { ), ("0.000", SystemTime::UNIX_EPOCH), ("00", SystemTime::UNIX_EPOCH), - ( - // i64::MAX + 1 - "9223372036854775808.000", - max_system_timestamp(), - ), ( "2015-07-01T20:10:51.781Z", SystemTime::UNIX_EPOCH @@ -320,6 +308,14 @@ mod test { // assert difference < 0.1 second assert!(result.abs_diff(expected) < 100); } + + // i64::MAX + 1 + let timestamp = "9223372036854775808.000"; + let result = QueryLanguageParser::parse_promql_timestamp(timestamp); + assert_eq!( + result.unwrap_err().to_string(), + "Failed to add duration '9223372036854775808s' to SystemTime, overflowed" + ); } #[test] @@ -331,6 +327,7 @@ mod test { step: "1d".to_string(), }; + #[cfg(not(windows))] let expected = String::from( "\ Promql(EvalStmt { \ @@ -346,6 +343,22 @@ mod test { })", ); + // Windows has different debug output for SystemTime. + #[cfg(windows)] + let expected = String::from( + "\ + Promql(EvalStmt { \ + expr: VectorSelector(VectorSelector { \ + name: Some(\"http_request\"), \ + matchers: Matchers { matchers: [] }, \ + offset: None, at: None }), \ + start: SystemTime { intervals: 132892460400000000 }, \ + end: SystemTime { intervals: 133207820400000000 }, \ + interval: 86400s, \ + lookback_delta: 300s \ + })", + ); + let result = QueryLanguageParser::parse_promql(&promql).unwrap(); assert_eq!(format!("{result:?}"), expected); } diff --git a/src/script/src/python/pyo3/vector_impl.rs b/src/script/src/python/pyo3/vector_impl.rs index 86feefc111..d50694a88b 100644 --- a/src/script/src/python/pyo3/vector_impl.rs +++ b/src/script/src/python/pyo3/vector_impl.rs @@ -334,7 +334,7 @@ impl PyVector { let ret = Self::from(ret).into_py(py); Ok(ret) } else if let Ok(slice) = needle.downcast::(py) { - let indices = slice.indices(self.len() as i64)?; + let indices = slice.indices(self.len() as _)?; let (start, stop, step, _slicelength) = ( indices.start, indices.stop, diff --git a/src/servers/Cargo.toml b/src/servers/Cargo.toml index 331e526a95..47f48811bf 100644 --- a/src/servers/Cargo.toml +++ b/src/servers/Cargo.toml @@ -7,7 +7,7 @@ license.workspace = true [features] dashboard = [] mem-prof = ["dep:common-mem-prof"] -pprof = ["dep:common-pprof"] +pprof = ["dep:pprof"] [dependencies] aide = { version = "0.9", features = ["axum"] } @@ -26,7 +26,6 @@ common-error = { workspace = true } common-grpc = { workspace = true } common-grpc-expr = { workspace = true } common-mem-prof = { workspace = true, optional = true } -common-pprof = { workspace = true, optional = true } common-query = { workspace = true } common-recordbatch = { workspace = true } common-runtime = { workspace = true } @@ -61,6 +60,11 @@ parking_lot = "0.12" pgwire = "0.16" pin-project = "1.0" postgres-types = { version = "0.2", features = ["with-chrono-0_4"] } +pprof = { version = "0.11", features = [ + "flamegraph", + "prost-codec", + "protobuf", +], optional = true } promql-parser = "0.1.1" prost.workspace = true query = { workspace = true } @@ -80,7 +84,6 @@ snap = "1" sql = { workspace = true } strum = { version = "0.24", features = ["derive"] } table = { workspace = true } -tikv-jemalloc-ctl = { version = "0.5", features = ["use_std"] } tokio-rustls = "0.24" tokio-stream = { version = "0.1", features = ["net"] } tokio.workspace = true @@ -89,6 +92,9 @@ tonic.workspace = true tower = { version = "0.4", features = ["full"] } tower-http = { version = "0.3", features = ["full"] } +[target.'cfg(not(windows))'.dependencies] +tikv-jemalloc-ctl = { version = "0.5", features = ["use_std"] } + [dev-dependencies] axum-test-helper = { git = "https://github.com/sunng87/axum-test-helper.git", branch = "patch-1" } catalog = { workspace = true, features = ["testing"] } diff --git a/src/servers/build.rs b/src/servers/build.rs index f06b772d36..7d6b8aa2de 100644 --- a/src/servers/build.rs +++ b/src/servers/build.rs @@ -28,12 +28,41 @@ fn fetch_dashboard_assets() { let message = "Failed to fetch dashboard assets"; let help = r#" -You can manually execute './scripts/fetch-dashboard-assets.sh' to see why, +You can manually execute "fetch-dashboard-assets.sh" to see why, or it's a network error, just try again or enable/disable some proxy."#; + + // It's very unlikely to be failed to get the current dir here, see `current_dir`'s docs. + let mut dir = std::env::current_dir().unwrap(); + dir.pop(); + dir.pop(); + dir.push("scripts"); + let out_dir = std::env::var("OUT_DIR").unwrap(); - let output = Command::new("./fetch-dashboard-assets.sh") - .arg(&out_dir) - .current_dir("../../scripts") + + #[cfg(windows)] + let (program, args) = ( + "bash", + [ + format!( + "/mnt/{}/fetch-dashboard-assets.sh", + dir.display() + .to_string() + .to_lowercase() + .replace(':', "") + .replace('\\', "/") + ), + format!( + "/mnt/{}", + out_dir.to_lowercase().replace(':', "").replace('\\', "/") + ), + ], + ); + #[cfg(not(windows))] + let (program, args) = ("./fetch-dashboard-assets.sh", [out_dir]); + + let output = Command::new(program) + .args(args) + .current_dir(dir) .stdout(Stdio::piped()) .spawn() .and_then(|p| p.wait_with_output()); diff --git a/src/servers/src/error.rs b/src/servers/src/error.rs index ff85e70f96..130359fc1e 100644 --- a/src/servers/src/error.rs +++ b/src/servers/src/error.rs @@ -297,11 +297,8 @@ pub enum Error { #[snafu(display("Failed to dump pprof data, source: {}", source))] DumpPprof { source: common_pprof::Error }, - #[snafu(display("Failed to update jemalloc metrics, source: {source}, location: {location}"))] - UpdateJemallocMetrics { - source: tikv_jemalloc_ctl::Error, - location: Location, - }, + #[snafu(display("{source}"))] + Metrics { source: BoxedError }, #[snafu(display("DataFrame operation error, source: {source}, location: {location}"))] DataFrame { @@ -418,7 +415,7 @@ impl ErrorExt for Error { #[cfg(feature = "pprof")] DumpPprof { source, .. } => source.status_code(), - UpdateJemallocMetrics { .. } => StatusCode::Internal, + Metrics { source } => source.status_code(), ConvertScalarValue { source, .. } => source.status_code(), } diff --git a/src/servers/src/http/handler.rs b/src/servers/src/http/handler.rs index 7fdfbb1af3..96f98833b4 100644 --- a/src/servers/src/http/handler.rs +++ b/src/servers/src/http/handler.rs @@ -21,14 +21,13 @@ use axum::extract::{Json, Query, State}; use axum::response::{IntoResponse, Response}; use axum::{Extension, Form}; use common_error::status_code::StatusCode; -use common_telemetry::{error, timer}; +use common_telemetry::timer; use query::parser::PromQuery; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use session::context::UserInfo; use crate::http::{ApiState, GreptimeOptionsConfigState, JsonResponse}; -use crate::metrics::JEMALLOC_COLLECTOR; use crate::metrics_handler::MetricsHandler; #[derive(Debug, Default, Serialize, Deserialize, JsonSchema)] @@ -141,9 +140,10 @@ pub async fn metrics( #[cfg(feature = "metrics-process")] crate::metrics::PROCESS_COLLECTOR.collect(); - if let Some(c) = JEMALLOC_COLLECTOR.as_ref() { + #[cfg(not(windows))] + if let Some(c) = crate::metrics::jemalloc::JEMALLOC_COLLECTOR.as_ref() { if let Err(e) = c.update() { - error!(e; "Failed to update jemalloc metrics"); + common_telemetry::error!(e; "Failed to update jemalloc metrics"); } } state.render() diff --git a/src/servers/src/http/pprof.rs b/src/servers/src/http/pprof.rs index 1a5f651078..3e8ed55ac0 100644 --- a/src/servers/src/http/pprof.rs +++ b/src/servers/src/http/pprof.rs @@ -12,6 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +#[cfg(feature = "pprof")] +mod nix; + #[cfg(feature = "pprof")] pub mod handler { use std::num::NonZeroI32; @@ -20,13 +23,13 @@ pub mod handler { use axum::extract::Query; use axum::http::StatusCode; use axum::response::IntoResponse; - use common_pprof::Profiling; use common_telemetry::logging; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use snafu::ResultExt; use crate::error::{DumpPprofSnafu, Result}; + use crate::http::pprof::nix::Profiling; /// Output format. #[derive(Debug, Serialize, Deserialize, JsonSchema)] diff --git a/src/common/pprof/README.md b/src/servers/src/http/pprof/README.md similarity index 100% rename from src/common/pprof/README.md rename to src/servers/src/http/pprof/README.md diff --git a/src/common/pprof/src/lib.rs b/src/servers/src/http/pprof/nix.rs similarity index 100% rename from src/common/pprof/src/lib.rs rename to src/servers/src/http/pprof/nix.rs diff --git a/src/servers/src/metrics.rs b/src/servers/src/metrics.rs index 1c857f9a0a..d9e708cfcf 100644 --- a/src/servers/src/metrics.rs +++ b/src/servers/src/metrics.rs @@ -12,22 +12,16 @@ // See the License for the specific language governing permissions and // limitations under the License. +#[cfg(not(windows))] +pub(crate) mod jemalloc; + use std::task::{Context, Poll}; use std::time::Instant; -use common_telemetry::error; use hyper::Body; -use metrics::gauge; -use once_cell::sync::Lazy; -use snafu::ResultExt; -use tikv_jemalloc_ctl::stats::{allocated_mib, resident_mib}; -use tikv_jemalloc_ctl::{epoch, epoch_mib, stats}; use tonic::body::BoxBody; use tower::{Layer, Service}; -use crate::error; -use crate::error::UpdateJemallocMetricsSnafu; - pub(crate) const METRIC_DB_LABEL: &str = "db"; pub(crate) const METRIC_CODE_LABEL: &str = "code"; pub(crate) const METRIC_TYPE_LABEL: &str = "type"; @@ -80,60 +74,15 @@ pub(crate) const METRIC_GRPC_REQUESTS_TOTAL: &str = "servers.grpc_requests_total pub(crate) const METRIC_GRPC_REQUESTS_ELAPSED: &str = "servers.grpc_requests_elapsed"; pub(crate) const METRIC_METHOD_LABEL: &str = "method"; pub(crate) const METRIC_PATH_LABEL: &str = "path"; -pub(crate) const METRIC_JEMALLOC_RESIDENT: &str = "sys.jemalloc.resident"; -pub(crate) const METRIC_JEMALLOC_ALLOCATED: &str = "sys.jemalloc.allocated"; /// Prometheus style process metrics collector. #[cfg(feature = "metrics-process")] -pub(crate) static PROCESS_COLLECTOR: Lazy = Lazy::new(|| { - let collector = metrics_process::Collector::default(); - // Describe collector. - collector.describe(); - collector -}); - -pub(crate) static JEMALLOC_COLLECTOR: Lazy> = Lazy::new(|| { - let collector = JemallocCollector::try_new() - .map_err(|e| { - error!(e; "Failed to retrieve jemalloc metrics"); - e - }) - .ok(); - collector.map(|c| { - if let Err(e) = c.update() { - error!(e; "Failed to update jemalloc metrics"); - }; - c - }) -}); - -pub(crate) struct JemallocCollector { - epoch: epoch_mib, - allocated: allocated_mib, - resident: resident_mib, -} - -impl JemallocCollector { - pub(crate) fn try_new() -> error::Result { - let e = epoch::mib().context(UpdateJemallocMetricsSnafu)?; - let allocated = stats::allocated::mib().context(UpdateJemallocMetricsSnafu)?; - let resident = stats::resident::mib().context(UpdateJemallocMetricsSnafu)?; - Ok(Self { - epoch: e, - allocated, - resident, - }) - } - - pub(crate) fn update(&self) -> error::Result<()> { - let _ = self.epoch.advance().context(UpdateJemallocMetricsSnafu)?; - let allocated = self.allocated.read().context(UpdateJemallocMetricsSnafu)?; - let resident = self.resident.read().context(UpdateJemallocMetricsSnafu)?; - gauge!(METRIC_JEMALLOC_ALLOCATED, allocated as f64); - gauge!(METRIC_JEMALLOC_RESIDENT, resident as f64); - Ok(()) - } -} +pub(crate) static PROCESS_COLLECTOR: once_cell::sync::Lazy = + once_cell::sync::Lazy::new(|| { + let collector = metrics_process::Collector::default(); + collector.describe(); + collector + }); // Based on https://github.com/hyperium/tonic/blob/master/examples/src/tower/server.rs // See https://github.com/hyperium/tonic/issues/242 diff --git a/src/servers/src/metrics/jemalloc.rs b/src/servers/src/metrics/jemalloc.rs new file mode 100644 index 0000000000..d54eddafde --- /dev/null +++ b/src/servers/src/metrics/jemalloc.rs @@ -0,0 +1,69 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod error; + +use common_telemetry::error; +use error::UpdateJemallocMetricsSnafu; +use metrics::gauge; +use once_cell::sync::Lazy; +use snafu::ResultExt; +use tikv_jemalloc_ctl::stats::{allocated_mib, resident_mib}; +use tikv_jemalloc_ctl::{epoch, epoch_mib, stats}; + +pub(crate) const METRIC_JEMALLOC_RESIDENT: &str = "sys.jemalloc.resident"; +pub(crate) const METRIC_JEMALLOC_ALLOCATED: &str = "sys.jemalloc.allocated"; + +pub(crate) static JEMALLOC_COLLECTOR: Lazy> = Lazy::new(|| { + let collector = JemallocCollector::try_new() + .map_err(|e| { + error!(e; "Failed to retrieve jemalloc metrics"); + e + }) + .ok(); + collector.map(|c| { + if let Err(e) = c.update() { + error!(e; "Failed to update jemalloc metrics"); + }; + c + }) +}); + +pub(crate) struct JemallocCollector { + epoch: epoch_mib, + allocated: allocated_mib, + resident: resident_mib, +} + +impl JemallocCollector { + pub(crate) fn try_new() -> crate::error::Result { + let e = epoch::mib().context(UpdateJemallocMetricsSnafu)?; + let allocated = stats::allocated::mib().context(UpdateJemallocMetricsSnafu)?; + let resident = stats::resident::mib().context(UpdateJemallocMetricsSnafu)?; + Ok(Self { + epoch: e, + allocated, + resident, + }) + } + + pub(crate) fn update(&self) -> crate::error::Result<()> { + let _ = self.epoch.advance().context(UpdateJemallocMetricsSnafu)?; + let allocated = self.allocated.read().context(UpdateJemallocMetricsSnafu)?; + let resident = self.resident.read().context(UpdateJemallocMetricsSnafu)?; + gauge!(METRIC_JEMALLOC_ALLOCATED, allocated as f64); + gauge!(METRIC_JEMALLOC_RESIDENT, resident as f64); + Ok(()) + } +} diff --git a/src/servers/src/metrics/jemalloc/error.rs b/src/servers/src/metrics/jemalloc/error.rs new file mode 100644 index 0000000000..4daa93184e --- /dev/null +++ b/src/servers/src/metrics/jemalloc/error.rs @@ -0,0 +1,49 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; + +use common_error::ext::{BoxedError, ErrorExt}; +use common_error::status_code::StatusCode; +use snafu::{Location, Snafu}; + +#[derive(Debug, Snafu)] +#[snafu(visibility(pub))] +pub enum Error { + #[snafu(display("Failed to update jemalloc metrics, source: {source}, location: {location}"))] + UpdateJemallocMetrics { + source: tikv_jemalloc_ctl::Error, + location: Location, + }, +} + +impl ErrorExt for Error { + fn status_code(&self) -> StatusCode { + match self { + Error::UpdateJemallocMetrics { .. } => StatusCode::Internal, + } + } + + fn as_any(&self) -> &dyn Any { + self + } +} + +impl From for crate::error::Error { + fn from(e: Error) -> Self { + Self::Metrics { + source: BoxedError::new(e), + } + } +} diff --git a/src/servers/tests/mysql/mysql_server_test.rs b/src/servers/tests/mysql/mysql_server_test.rs index 6bdef94c52..fd3c203fa2 100644 --- a/src/servers/tests/mysql/mysql_server_test.rs +++ b/src/servers/tests/mysql/mysql_server_test.rs @@ -199,8 +199,7 @@ async fn test_shutdown_mysql_server() -> Result<()> { for handle in join_handles.iter_mut() { let result = handle.await.unwrap(); assert!(result.is_err()); - let error = result.unwrap_err().to_string(); - assert!(error.contains("Connection refused") || error.contains("Connection reset by peer")); + assert!(result.unwrap_err().is_fatal()); } Ok(()) } diff --git a/src/storage/src/manifest/region.rs b/src/storage/src/manifest/region.rs index cacee02201..ca5017ba61 100644 --- a/src/storage/src/manifest/region.rs +++ b/src/storage/src/manifest/region.rs @@ -378,7 +378,7 @@ mod tests { assert_eq!(expected, actions); } - #[tokio::test] + #[tokio::test(flavor = "multi_thread")] async fn test_fs_region_manifest_checkpoint_compress() { let duration = Duration::from_millis(50); let manifest = new_fs_manifest(true, Some(duration)).await; diff --git a/tests-integration/src/cluster.rs b/tests-integration/src/cluster.rs index d25ca64340..683ed91055 100644 --- a/tests-integration/src/cluster.rs +++ b/tests-integration/src/cluster.rs @@ -20,7 +20,7 @@ use api::v1::meta::Role; use client::client_manager::DatanodeClients; use client::Client; use common_base::Plugins; -use common_grpc::channel_manager::ChannelManager; +use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; use common_meta::peer::Peer; use common_meta::DatanodeId; use common_runtime::Builder as RuntimeBuilder; @@ -88,7 +88,8 @@ impl GreptimeDbClusterBuilder { pub async fn build(self) -> GreptimeDbCluster { let datanodes = self.datanodes.unwrap_or(4); - let datanode_clients = Arc::new(DatanodeClients::default()); + let channel_config = ChannelConfig::new().timeout(Duration::from_secs(20)); + let datanode_clients = Arc::new(DatanodeClients::new(channel_config)); let meta_srv = self.build_metasrv(datanode_clients.clone()).await; diff --git a/tests-integration/src/tests/instance_test.rs b/tests-integration/src/tests/instance_test.rs index c0fef772ee..e451cdb1c8 100644 --- a/tests-integration/src/tests/instance_test.rs +++ b/tests-integration/src/tests/instance_test.rs @@ -29,8 +29,8 @@ use session::context::{QueryContext, QueryContextRef}; use crate::test_util::check_output_stream; use crate::tests::test_util::{ - both_instances_cases, check_unordered_output_stream, distributed, get_data_dir, standalone, - standalone_instance_case, MockInstance, + both_instances_cases, check_unordered_output_stream, distributed, find_testing_resource, + standalone, standalone_instance_case, MockInstance, }; #[apply(both_instances_cases)] @@ -518,12 +518,7 @@ async fn test_execute_external_create_without_ts_type(instance: Arc) { let instance = instance.frontend(); let format = "parquet"; - let location = get_data_dir("../tests/data/parquet/various_type.parquet") - .canonicalize() - .unwrap() - .display() - .to_string(); - + let location = find_testing_resource("/tests/data/parquet/various_type.parquet"); let table_name = "various_type_parquet"; let output = execute_sql( @@ -588,12 +583,7 @@ async fn test_execute_query_external_table_parquet(instance: Arc) { let instance = instance.frontend(); let format = "orc"; - let location = get_data_dir("../src/common/datasource/tests/orc/test.orc") - .canonicalize() - .unwrap() - .display() - .to_string(); - + let location = find_testing_resource("/src/common/datasource/tests/orc/test.orc"); let table_name = "various_type_orc"; let output = execute_sql( @@ -668,12 +658,7 @@ async fn test_execute_query_external_table_orc(instance: Arc) async fn test_execute_query_external_table_csv(instance: Arc) { let instance = instance.frontend(); let format = "csv"; - let location = get_data_dir("../tests/data/csv/various_type.csv") - .canonicalize() - .unwrap() - .display() - .to_string(); - + let location = find_testing_resource("/tests/data/csv/various_type.csv"); let table_name = "various_type_csv"; let output = execute_sql( @@ -719,12 +704,7 @@ async fn test_execute_query_external_table_csv(instance: Arc) async fn test_execute_query_external_table_json(instance: Arc) { let instance = instance.frontend(); let format = "json"; - let location = get_data_dir("../tests/data/json/various_type.json") - .canonicalize() - .unwrap() - .display() - .to_string(); - + let location = find_testing_resource("/tests/data/json/various_type.json"); let table_name = "various_type_json"; let output = execute_sql( @@ -776,12 +756,7 @@ async fn test_execute_query_external_table_json(instance: Arc) async fn test_execute_query_external_table_json_with_schame(instance: Arc) { let instance = instance.frontend(); let format = "json"; - let location = get_data_dir("../tests/data/json/various_type.json") - .canonicalize() - .unwrap() - .display() - .to_string(); - + let location = find_testing_resource("/tests/data/json/various_type.json"); let table_name = "various_type_json_with_schema"; let output = execute_sql( @@ -1339,11 +1314,7 @@ async fn test_execute_copy_from_orc(instance: Arc) { ) .await, Output::AffectedRows(0))); - let filepath = get_data_dir("../src/common/datasource/tests/orc/test.orc") - .canonicalize() - .unwrap() - .display() - .to_string(); + let filepath = find_testing_resource("/src/common/datasource/tests/orc/test.orc"); let output = execute_sql( &instance, @@ -1376,11 +1347,8 @@ async fn test_cast_type_issue_1594(instance: Arc) { "create table tsbs_cpu(hostname STRING, environment STRING, usage_user DOUBLE, usage_system DOUBLE, usage_idle DOUBLE, usage_nice DOUBLE, usage_iowait DOUBLE, usage_irq DOUBLE, usage_softirq DOUBLE, usage_steal DOUBLE, usage_guest DOUBLE, usage_guest_nice DOUBLE, ts TIMESTAMP TIME INDEX, PRIMARY KEY(hostname));", ) .await, Output::AffectedRows(0))); - let filepath = get_data_dir("../src/common/datasource/tests/csv/type_cast.csv") - .canonicalize() - .unwrap() - .display() - .to_string(); + + let filepath = find_testing_resource("/src/common/datasource/tests/csv/type_cast.csv"); let output = execute_sql( &instance, diff --git a/tests-integration/src/tests/test_util.rs b/tests-integration/src/tests/test_util.rs index 51121f11dc..2d02f4a47a 100644 --- a/tests-integration/src/tests/test_util.rs +++ b/tests-integration/src/tests/test_util.rs @@ -12,11 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::path::PathBuf; use std::sync::Arc; use common_query::Output; use common_recordbatch::util; +use common_test_util::find_workspace_path; use frontend::instance::Instance; use rstest_reuse::{self, template}; @@ -110,8 +110,20 @@ pub(crate) async fn check_unordered_output_stream(output: Output, expected: &str assert_eq!(pretty_print, expected); } -pub fn get_data_dir(path: &str) -> PathBuf { - let dir = env!("CARGO_MANIFEST_DIR"); +/// Find the testing file resource under workspace root to be used in object store. +pub fn find_testing_resource(path: &str) -> String { + let p = find_workspace_path(path).display().to_string(); - PathBuf::from(dir).join(path) + #[cfg(windows)] + let p = { + // We need unix style path even in the Windows, because the path is used in object-store, must + // be delimited with '/'. Inside the object-store, it will be converted to file system needed + // path in the end. + let p = p.replace('\\', "/"); + + // Prepend a '/' to indicate it's a file system path when parsed as object-store url in Windows. + format!("/{p}") + }; + + p } diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index c40e4d793f..d408137b75 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::BTreeMap; use std::sync::Arc; use axum::http::StatusCode; @@ -411,13 +412,22 @@ pub async fn test_prom_http_api(store_type: StorageType) { assert_eq!(res.status(), StatusCode::OK); let body = serde_json::from_str::(&res.text().await).unwrap(); assert_eq!(body.status, "success"); - assert_eq!( - body.data, - serde_json::from_value::(json!( - [{"__name__" : "demo","ts":"1970-01-01 00:00:00+0000","cpu":"1.1","host":"host1","memory":"2.2"}] - )) - .unwrap() - ); + + let PrometheusResponse::Series(mut series) = body.data else { + unreachable!() + }; + let actual = series + .remove(0) + .into_iter() + .collect::>(); + let expected = BTreeMap::from([ + ("__name__".to_string(), "demo".to_string()), + ("ts".to_string(), "1970-01-01 00:00:00+0000".to_string()), + ("cpu".to_string(), "1.1".to_string()), + ("host".to_string(), "host1".to_string()), + ("memory".to_string(), "2.2".to_string()), + ]); + assert_eq!(actual, expected); let res = client .post("/api/v1/series?match[]=up&match[]=down") diff --git a/tests/runner/src/env.rs b/tests/runner/src/env.rs index de81777829..084b8c4128 100644 --- a/tests/runner/src/env.rs +++ b/tests/runner/src/env.rs @@ -36,13 +36,11 @@ use crate::util; const METASRV_ADDR: &str = "127.0.0.1:3002"; const SERVER_ADDR: &str = "127.0.0.1:4001"; -const STANDALONE_LOG_FILE: &str = "/tmp/greptime-sqlness-standalone.log"; -const METASRV_LOG_FILE: &str = "/tmp/greptime-sqlness-metasrv.log"; -const FRONTEND_LOG_FILE: &str = "/tmp/greptime-sqlness-frontend.log"; - const DEFAULT_LOG_LEVEL: &str = "--log-level=debug,hyper=warn,tower=warn,datafusion=warn,reqwest=warn,sqlparser=warn,h2=info,opendal=info"; -pub struct Env {} +pub struct Env { + data_home: PathBuf, +} #[allow(clippy::print_stdout)] #[async_trait] @@ -51,8 +49,8 @@ impl EnvController for Env { async fn start(&self, mode: &str, _config: Option<&Path>) -> Self::DB { match mode { - "standalone" => Self::start_standalone().await, - "distributed" => Self::start_distributed().await, + "standalone" => self.start_standalone().await, + "distributed" => self.start_distributed().await, _ => panic!("Unexpected mode: {mode}"), } } @@ -65,14 +63,16 @@ impl EnvController for Env { #[allow(clippy::print_stdout)] impl Env { - pub async fn start_standalone() -> GreptimeDB { + pub fn new(data_home: PathBuf) -> Self { + Self { data_home } + } + + async fn start_standalone(&self) -> GreptimeDB { Self::build_db().await; let db_ctx = GreptimeDBContext::new(); - let server_process = Self::start_server("standalone", &db_ctx, true).await; - - println!("Started, going to test. Log will be write to {STANDALONE_LOG_FILE}"); + let server_process = self.start_server("standalone", &db_ctx, true).await; let client = Client::with_urls(vec![SERVER_ADDR]); let db = DB::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client); @@ -84,22 +84,23 @@ impl Env { client: TokioMutex::new(db), ctx: db_ctx, is_standalone: true, + env: Env::new(self.data_home.clone()), } } - pub async fn start_distributed() -> GreptimeDB { + async fn start_distributed(&self) -> GreptimeDB { Self::build_db().await; let db_ctx = GreptimeDBContext::new(); // start a distributed GreptimeDB - let meta_server = Env::start_server("metasrv", &db_ctx, true).await; + let meta_server = self.start_server("metasrv", &db_ctx, true).await; - let datanode_1 = Env::start_server("datanode", &db_ctx, true).await; - let datanode_2 = Env::start_server("datanode", &db_ctx, true).await; - let datanode_3 = Env::start_server("datanode", &db_ctx, true).await; + let datanode_1 = self.start_server("datanode", &db_ctx, true).await; + let datanode_2 = self.start_server("datanode", &db_ctx, true).await; + let datanode_3 = self.start_server("datanode", &db_ctx, true).await; - let frontend = Env::start_server("frontend", &db_ctx, true).await; + let frontend = self.start_server("frontend", &db_ctx, true).await; let client = Client::with_urls(vec![SERVER_ADDR]); let db = DB::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client); @@ -111,6 +112,7 @@ impl Env { client: TokioMutex::new(db), ctx: db_ctx, is_standalone: false, + env: Env::new(self.data_home.clone()), } } @@ -120,6 +122,7 @@ impl Env { } async fn start_server( + &self, subcommand: &str, db_ctx: &GreptimeDBContext, truncate_log: bool, @@ -127,17 +130,15 @@ impl Env { let log_file_name = match subcommand { "datanode" => { db_ctx.incr_datanode_id(); - - format!( - "/tmp/greptime-sqlness-datanode-{}.log", - db_ctx.datanode_id() - ) + format!("greptime-sqlness-datanode-{}.log", db_ctx.datanode_id()) } - "frontend" => FRONTEND_LOG_FILE.to_string(), - "metasrv" => METASRV_LOG_FILE.to_string(), - "standalone" => STANDALONE_LOG_FILE.to_string(), + "frontend" => "greptime-sqlness-frontend.log".to_string(), + "metasrv" => "greptime-sqlness-metasrv.log".to_string(), + "standalone" => "greptime-sqlness-standalone.log".to_string(), _ => panic!("Unexpected subcommand: {subcommand}"), }; + let log_file_name = self.data_home.join(log_file_name).display().to_string(); + let log_file = OpenOptions::new() .create(true) .write(true) @@ -146,15 +147,15 @@ impl Env { .unwrap(); let (args, check_ip_addr) = match subcommand { - "datanode" => Self::datanode_start_args(db_ctx), + "datanode" => self.datanode_start_args(db_ctx), "standalone" => { let args = vec![ DEFAULT_LOG_LEVEL.to_string(), subcommand.to_string(), "start".to_string(), "-c".to_string(), - Self::generate_config_file(subcommand, db_ctx), - "--http-addr=0.0.0.0:5001".to_string(), + self.generate_config_file(subcommand, db_ctx), + "--http-addr=127.0.0.1:5001".to_string(), ]; (args, SERVER_ADDR.to_string()) } @@ -163,8 +164,8 @@ impl Env { DEFAULT_LOG_LEVEL.to_string(), subcommand.to_string(), "start".to_string(), - "--metasrv-addr=0.0.0.0:3002".to_string(), - "--http-addr=0.0.0.0:5003".to_string(), + "--metasrv-addr=127.0.0.1:3002".to_string(), + "--http-addr=127.0.0.1:5003".to_string(), ]; (args, SERVER_ADDR.to_string()) } @@ -174,7 +175,7 @@ impl Env { subcommand.to_string(), "start".to_string(), "--use-memory-store".to_string(), - "--http-addr=0.0.0.0:5001".to_string(), + "--http-addr=127.0.0.1:5001".to_string(), "--disable-region-failover".to_string(), ]; (args, METASRV_ADDR.to_string()) @@ -189,7 +190,12 @@ impl Env { ); } - let mut process = Command::new("./greptime") + #[cfg(not(windows))] + let program = "./greptime"; + #[cfg(windows)] + let program = "greptime.exe"; + + let mut process = Command::new(program) .current_dir(util::get_binary_dir("debug")) .args(args) .stdout(log_file) @@ -206,32 +212,31 @@ impl Env { process } - fn datanode_start_args(db_ctx: &GreptimeDBContext) -> (Vec, String) { + fn datanode_start_args(&self, db_ctx: &GreptimeDBContext) -> (Vec, String) { let id = db_ctx.datanode_id(); + let data_home = self + .data_home + .join(format!("greptimedb_datanode_{}_{id}", db_ctx.time)); + let wal_dir = data_home.join("wal").display().to_string(); + let subcommand = "datanode"; let mut args = vec![ DEFAULT_LOG_LEVEL.to_string(), subcommand.to_string(), "start".to_string(), ]; - args.push(format!("--rpc-addr=0.0.0.0:410{id}")); - args.push(format!("--http-addr=0.0.0.0:430{id}")); - args.push(format!( - "--data-home=/tmp/greptimedb_datanode_{}", - db_ctx.time - )); - args.push(format!( - "--wal-dir=/tmp/greptimedb_datanode_{}_{id}/wal", - db_ctx.time - )); + args.push(format!("--rpc-addr=127.0.0.1:410{id}")); + args.push(format!("--http-addr=127.0.0.1:430{id}")); + args.push(format!("--data-home={}", data_home.display())); + args.push(format!("--wal-dir={wal_dir}")); args.push(format!("--node-id={id}")); - args.push("--metasrv-addr=0.0.0.0:3002".to_string()); - (args, format!("0.0.0.0:410{id}")) + args.push("--metasrv-addr=127.0.0.1:3002".to_string()); + (args, format!("127.0.0.1:410{id}")) } /// stop and restart the server process - async fn restart_server(db: &GreptimeDB) { + async fn restart_server(&self, db: &GreptimeDB) { { let mut server_processes = db.server_processes.lock().unwrap(); for server_process in server_processes.iter_mut() { @@ -241,14 +246,14 @@ impl Env { // check if the server is distributed or standalone let new_server_processes = if db.is_standalone { - let new_server_process = Env::start_server("standalone", &db.ctx, false).await; + let new_server_process = self.start_server("standalone", &db.ctx, false).await; vec![new_server_process] } else { db.ctx.reset_datanode_id(); let mut processes = vec![]; for _ in 0..3 { - let new_server_process = Env::start_server("datanode", &db.ctx, false).await; + let new_server_process = self.start_server("datanode", &db.ctx, false).await; processes.push(new_server_process); } processes @@ -259,11 +264,13 @@ impl Env { } /// Generate config file to `/tmp/{subcommand}-{current_time}.toml` - fn generate_config_file(subcommand: &str, db_ctx: &GreptimeDBContext) -> String { + fn generate_config_file(&self, subcommand: &str, db_ctx: &GreptimeDBContext) -> String { let mut tt = TinyTemplate::new(); let mut path = PathBuf::from(env!("CARGO_MANIFEST_DIR")); - path.push(format!("../conf/{subcommand}-test.toml.template")); + path.pop(); + path.push("conf"); + path.push(format!("{subcommand}-test.toml.template")); let template = std::fs::read_to_string(path).unwrap(); tt.add_template(subcommand, &template).unwrap(); @@ -274,15 +281,24 @@ impl Env { procedure_dir: String, } - let greptimedb_dir = format!("/tmp/greptimedb-{subcommand}-{}", db_ctx.time); + let data_home = self + .data_home + .join(format!("greptimedb-{subcommand}-{}", db_ctx.time)); + std::fs::create_dir_all(data_home.as_path()).unwrap(); + + let wal_dir = data_home.join("wal").display().to_string(); + let procedure_dir = data_home.join("procedure").display().to_string(); let ctx = Context { - wal_dir: format!("{greptimedb_dir}/wal/"), - data_home: format!("{greptimedb_dir}/"), - procedure_dir: format!("{greptimedb_dir}/procedure/"), + wal_dir, + data_home: data_home.display().to_string(), + procedure_dir, }; let rendered = tt.render(subcommand, &ctx).unwrap(); - let conf_file = format!("/tmp/{subcommand}-{}.toml", db_ctx.time); + let conf_file = data_home + .join(format!("{subcommand}-{}.toml", db_ctx.time)) + .display() + .to_string(); println!("Generating {subcommand} config file in {conf_file}, full content:\n{rendered}"); std::fs::write(&conf_file, rendered).unwrap(); @@ -313,13 +329,14 @@ pub struct GreptimeDB { client: TokioMutex, ctx: GreptimeDBContext, is_standalone: bool, + env: Env, } #[async_trait] impl Database for GreptimeDB { async fn query(&self, ctx: QueryContext, query: String) -> Box { if ctx.context.contains_key("restart") { - Env::restart_server(self).await; + self.env.restart_server(self).await; } let mut client = self.client.lock().await; diff --git a/tests/runner/src/main.rs b/tests/runner/src/main.rs index 835ecfe4dc..1d7e15d925 100644 --- a/tests/runner/src/main.rs +++ b/tests/runner/src/main.rs @@ -27,6 +27,11 @@ async fn main() { "".to_string() }; + #[cfg(windows)] + let data_home = std::env::temp_dir(); + #[cfg(not(windows))] + let data_home = std::path::PathBuf::from("/tmp"); + let config = ConfigBuilder::default() .case_dir(util::get_case_dir()) .fail_fast(true) @@ -34,6 +39,6 @@ async fn main() { .follow_links(true) .build() .unwrap(); - let runner = Runner::new(config, Env {}); + let runner = Runner::new(config, Env::new(data_home)); runner.run().await.unwrap(); }