build: on windows (#2054)

* build on windows

* rebase develop

* fix: resolve PR comments
This commit is contained in:
LFC
2023-08-10 16:08:37 +08:00
committed by GitHub
parent 587a24e7fb
commit 5c19913a91
52 changed files with 815 additions and 512 deletions

View File

@@ -1,2 +1,3 @@
[profile.default]
slow-timeout = { period = "60s", terminate-after = 3, grace-period = "30s" }
retries = { backoff = "exponential", count = 3, delay = "10s", jitter = true }

View File

@@ -70,52 +70,13 @@ jobs:
- name: Run taplo
run: taplo format --check
# Use coverage to run test.
# test:
# name: Test Suite
# if: github.event.pull_request.draft == false
# runs-on: ubuntu-latest
# timeout-minutes: 60
# steps:
# - uses: actions/checkout@v3
# - name: Cache LLVM and Clang
# id: cache-llvm
# uses: actions/cache@v3
# with:
# path: ./llvm
# key: llvm
# - uses: arduino/setup-protoc@v1
# with:
# repo-token: ${{ secrets.GITHUB_TOKEN }}
# - uses: KyleMayes/install-llvm-action@v1
# with:
# version: "14.0"
# cached: ${{ steps.cache-llvm.outputs.cache-hit }}
# - uses: dtolnay/rust-toolchain@master
# with:
# toolchain: ${{ env.RUST_TOOLCHAIN }}
# - name: Rust Cache
# uses: Swatinem/rust-cache@v2
# - name: Cleanup disk
# uses: curoky/cleanup-disk-action@v2.0
# with:
# retain: 'rust,llvm'
# - name: Install latest nextest release
# uses: taiki-e/install-action@nextest
# - name: Run tests
# run: cargo nextest run
# env:
# CARGO_BUILD_RUSTFLAGS: "-C link-arg=-fuse-ld=lld"
# RUST_BACKTRACE: 1
# GT_S3_BUCKET: ${{ secrets.S3_BUCKET }}
# GT_S3_ACCESS_KEY_ID: ${{ secrets.S3_ACCESS_KEY_ID }}
# GT_S3_ACCESS_KEY: ${{ secrets.S3_ACCESS_KEY }}
# UNITTEST_LOG_DIR: "__unittest_logs"
sqlness:
name: Sqlness Test
if: github.event.pull_request.draft == false
runs-on: ubuntu-latest-8-cores
runs-on: ${{ matrix.os }}
strategy:
matrix:
os: [ ubuntu-latest-8-cores, windows-latest ]
timeout-minutes: 60
steps:
- uses: actions/checkout@v3
@@ -127,25 +88,14 @@ jobs:
toolchain: ${{ env.RUST_TOOLCHAIN }}
- name: Rust Cache
uses: Swatinem/rust-cache@v2
- name: Run etcd
run: |
ETCD_VER=v3.5.7
DOWNLOAD_URL=https://github.com/etcd-io/etcd/releases/download
curl -L ${DOWNLOAD_URL}/${ETCD_VER}/etcd-${ETCD_VER}-linux-amd64.tar.gz -o /tmp/etcd-${ETCD_VER}-linux-amd64.tar.gz
mkdir -p /tmp/etcd-download
tar xzvf /tmp/etcd-${ETCD_VER}-linux-amd64.tar.gz -C /tmp/etcd-download --strip-components=1
rm -f /tmp/etcd-${ETCD_VER}-linux-amd64.tar.gz
sudo cp -a /tmp/etcd-download/etcd* /usr/local/bin/
nohup etcd >/tmp/etcd.log 2>&1 &
- name: Run sqlness
run: cargo sqlness && ls /tmp
run: cargo sqlness
- name: Upload sqlness logs
if: always()
uses: actions/upload-artifact@v3
with:
name: sqlness-logs
path: /tmp/greptime-*.log
path: ${{ runner.temp }}/greptime-*.log
retention-days: 3
fmt:
@@ -234,3 +184,43 @@ jobs:
flags: rust
fail_ci_if_error: false
verbose: true
test-on-windows:
if: github.event.pull_request.draft == false
runs-on: windows-latest-8-cores
timeout-minutes: 60
steps:
- run: git config --global core.autocrlf false
- uses: actions/checkout@v3
- uses: arduino/setup-protoc@v1
with:
repo-token: ${{ secrets.GITHUB_TOKEN }}
- name: Install Rust toolchain
uses: dtolnay/rust-toolchain@master
with:
toolchain: ${{ env.RUST_TOOLCHAIN }}
components: llvm-tools-preview
- name: Rust Cache
uses: Swatinem/rust-cache@v2
- name: Install Cargo Nextest
uses: taiki-e/install-action@nextest
- name: Install Python
uses: actions/setup-python@v4
with:
python-version: '3.10'
- name: Install PyArrow Package
run: pip install pyarrow
- name: Install WSL distribution
uses: Vampire/setup-wsl@v2
with:
distribution: Ubuntu-22.04
- name: Running tests
run: cargo nextest run -F pyo3_backend,dashboard
env:
RUST_BACKTRACE: 1
CARGO_INCREMENTAL: 0
GT_S3_BUCKET: ${{ secrets.S3_BUCKET }}
GT_S3_ACCESS_KEY_ID: ${{ secrets.S3_ACCESS_KEY_ID }}
GT_S3_ACCESS_KEY: ${{ secrets.S3_ACCESS_KEY }}
GT_S3_REGION: ${{ secrets.S3_REGION }}
UNITTEST_LOG_DIR: "__unittest_logs"

84
Cargo.lock generated
View File

@@ -1816,17 +1816,6 @@ dependencies = [
"tokio",
]
[[package]]
name = "common-pprof"
version = "0.3.2"
dependencies = [
"common-error",
"pprof",
"prost",
"snafu",
"tokio",
]
[[package]]
name = "common-procedure"
version = "0.3.2"
@@ -4943,9 +4932,9 @@ checksum = "ef53942eb7bf7ff43a617b3e2c1c4a5ecf5944a7c1bc12d7ee39bbb15e5c1519"
[[package]]
name = "linux-raw-sys"
version = "0.4.3"
version = "0.4.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09fc20d2ca12cb9f044c93e3bd6d32d523e6e2ec3db4f7b2939cd99026ecd3f0"
checksum = "57bcfdad1b858c2db7c38303a6d2ad4dfaf5eb53dfeb0910128b2c26d6158503"
[[package]]
name = "lock_api"
@@ -5845,13 +5834,13 @@ dependencies = [
[[package]]
name = "num-derive"
version = "0.3.3"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "876a53fff98e03a936a674b29568b0e605f06b29372c2489ff4de23f1949743d"
checksum = "9e6a0fd4f737c707bd9086cc16c925f294943eb62eb71499e9fd4cf71f8b9f4e"
dependencies = [
"proc-macro2",
"quote",
"syn 1.0.109",
"syn 2.0.28",
]
[[package]]
@@ -7354,22 +7343,21 @@ dependencies = [
[[package]]
name = "raft-engine"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "67b66e735395b7ff12f3ebbb4794006aecb365c4c9a82141279b58b227ac3a8b"
source = "git+https://github.com/tikv/raft-engine.git?rev=2dcaf5beeea3d5de9ec9c7133a2451d00f508f52#2dcaf5beeea3d5de9ec9c7133a2451d00f508f52"
dependencies = [
"byteorder",
"crc32fast",
"crossbeam",
"fail",
"fs2",
"hashbrown 0.12.3",
"hashbrown 0.14.0",
"hex",
"if_chain",
"lazy_static",
"libc",
"log",
"lz4-sys",
"nix 0.25.1",
"nix 0.26.2",
"num-derive",
"num-traits",
"parking_lot 0.12.1",
@@ -7377,10 +7365,11 @@ dependencies = [
"prometheus-static-metric",
"protobuf",
"rayon",
"rhai",
"scopeguard",
"serde",
"serde_repr",
"strum 0.24.1",
"strum 0.25.0",
"thiserror",
]
@@ -7683,6 +7672,32 @@ dependencies = [
"bytemuck",
]
[[package]]
name = "rhai"
version = "1.15.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4c2a11a646ef5d4e4a9d5cf80c7e4ecb20f9b1954292d5c5e6d6cbc8d33728ec"
dependencies = [
"ahash 0.8.3",
"bitflags 1.3.2",
"instant",
"num-traits",
"rhai_codegen",
"smallvec",
"smartstring",
]
[[package]]
name = "rhai_codegen"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "db74e3fdd29d969a0ec1f8e79171a6f0f71d0429293656901db382d248c4c021"
dependencies = [
"proc-macro2",
"quote",
"syn 1.0.109",
]
[[package]]
name = "ring"
version = "0.16.20"
@@ -7970,7 +7985,7 @@ dependencies = [
"bitflags 2.3.3",
"errno 0.3.2",
"libc",
"linux-raw-sys 0.4.3",
"linux-raw-sys 0.4.5",
"windows-sys 0.48.0",
]
@@ -8643,9 +8658,9 @@ checksum = "a3f0bf26fd526d2a95683cd0f87bf103b8539e2ca1ef48ce002d67aad59aa0b4"
[[package]]
name = "serde"
version = "1.0.179"
version = "1.0.180"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0a5bf42b8d227d4abf38a1ddb08602e229108a517cd4e5bb28f9c7eaafdce5c0"
checksum = "0ea67f183f058fe88a4e3ec6e2788e003840893b91bac4559cabedd00863b3ed"
dependencies = [
"serde_derive",
]
@@ -8662,9 +8677,9 @@ dependencies = [
[[package]]
name = "serde_derive"
version = "1.0.179"
version = "1.0.180"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "741e124f5485c7e60c03b043f79f320bff3527f4bbf12cf3831750dc46a0ec2c"
checksum = "24e744d7782b686ab3b73267ef05697159cc0e5abbed3f47f9933165e5219036"
dependencies = [
"proc-macro2",
"quote",
@@ -8794,7 +8809,6 @@ dependencies = [
"common-grpc",
"common-grpc-expr",
"common-mem-prof",
"common-pprof",
"common-query",
"common-recordbatch",
"common-runtime",
@@ -8829,6 +8843,7 @@ dependencies = [
"pgwire",
"pin-project",
"postgres-types",
"pprof",
"promql-parser",
"prost",
"query",
@@ -9040,6 +9055,17 @@ version = "1.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "62bb4feee49fdd9f707ef802e22365a35de4b7b299de4763d44bfea899442ff9"
[[package]]
name = "smartstring"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3fb72c633efbaa2dd666986505016c32c3044395ceaf881518399d2f4127ee29"
dependencies = [
"autocfg",
"static_assertions",
"version_check",
]
[[package]]
name = "snafu"
version = "0.7.5"
@@ -9795,9 +9821,9 @@ checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369"
[[package]]
name = "target-lexicon"
version = "0.12.10"
version = "0.12.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1d2faeef5759ab89935255b1a4cd98e0baf99d1085e37d36599c625dac49ae8e"
checksum = "9d0e916b1148c8e263850e1ebcbd046f333e0683c724876bb0da63ea4373dc8a"
[[package]]
name = "temp-env"

View File

@@ -18,7 +18,6 @@ members = [
"src/common/meta",
"src/common/procedure",
"src/common/procedure-test",
"src/common/pprof",
"src/common/query",
"src/common/recordbatch",
"src/common/runtime",

View File

@@ -2,14 +2,14 @@
# This script is used to download built dashboard assets from the "GreptimeTeam/dashboard" repository.
set -e
set -e -x
declare -r SCRIPT_DIR=$(cd $(dirname ${0}) >/dev/null 2>&1 && pwd)
declare -r ROOT_DIR=$(dirname ${SCRIPT_DIR})
declare -r STATIC_DIR="$ROOT_DIR/src/servers/dashboard"
OUT_DIR="${1:-$SCRIPT_DIR}"
RELEASE_VERSION="$(cat $STATIC_DIR/VERSION)"
RELEASE_VERSION="$(cat $STATIC_DIR/VERSION | tr -d '\t\r\n ')"
echo "Downloading assets to dir: $OUT_DIR"
cd $OUT_DIR

View File

@@ -54,15 +54,19 @@ session = { workspace = true }
snafu.workspace = true
substrait = { workspace = true }
table = { workspace = true }
tikv-jemallocator = "0.5"
tokio.workspace = true
[target.'cfg(not(windows))'.dependencies]
tikv-jemallocator = "0.5"
[dev-dependencies]
common-test-util = { workspace = true }
rexpect = "0.5"
serde.workspace = true
temp-env = "0.3"
toml.workspace = true
[target.'cfg(not(windows))'.dev-dependencies]
rexpect = "0.5"
[build-dependencies]
common-version = { workspace = true }

View File

@@ -187,6 +187,7 @@ fn log_env_flags() {
}
}
#[cfg(not(windows))]
#[global_allocator]
static ALLOC: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;

View File

@@ -209,15 +209,19 @@ impl DfRecordBatchEncoder for csv::Writer<SharedBuffer> {
#[cfg(test)]
mod tests {
use common_test_util::find_workspace_path;
use super::*;
use crate::file_format::{
FileFormat, FORMAT_COMPRESSION_TYPE, FORMAT_DELIMITER, FORMAT_HAS_HEADER,
FORMAT_SCHEMA_INFER_MAX_RECORD,
};
use crate::test_util::{self, format_schema, test_store};
use crate::test_util::{format_schema, test_store};
fn test_data_root() -> String {
test_util::get_data_dir("tests/csv").display().to_string()
find_workspace_path("/src/common/datasource/tests/csv")
.display()
.to_string()
}
#[tokio::test]

View File

@@ -167,12 +167,16 @@ impl DfRecordBatchEncoder for json::Writer<SharedBuffer, LineDelimited> {
#[cfg(test)]
mod tests {
use common_test_util::find_workspace_path;
use super::*;
use crate::file_format::{FileFormat, FORMAT_COMPRESSION_TYPE, FORMAT_SCHEMA_INFER_MAX_RECORD};
use crate::test_util::{self, format_schema, test_store};
use crate::test_util::{format_schema, test_store};
fn test_data_root() -> String {
test_util::get_data_dir("tests/json").display().to_string()
find_workspace_path("/src/common/datasource/tests/json")
.display()
.to_string()
}
#[tokio::test]

View File

@@ -188,12 +188,16 @@ impl FileOpener for OrcOpener {
#[cfg(test)]
mod tests {
use common_test_util::find_workspace_path;
use super::*;
use crate::file_format::FileFormat;
use crate::test_util::{self, format_schema, test_store};
use crate::test_util::{format_schema, test_store};
fn test_data_root() -> String {
test_util::get_data_dir("tests/orc").display().to_string()
find_workspace_path("/src/common/datasource/tests/orc")
.display()
.to_string()
}
#[tokio::test]

View File

@@ -158,11 +158,13 @@ impl ArrowWriterCloser for ArrowWriter<SharedBuffer> {
#[cfg(test)]
mod tests {
use common_test_util::find_workspace_path;
use super::*;
use crate::test_util::{self, format_schema, test_store};
use crate::test_util::{format_schema, test_store};
fn test_data_root() -> String {
test_util::get_data_dir("tests/parquet")
find_workspace_path("/src/common/datasource/tests/parquet")
.display()
.to_string()
}

View File

@@ -17,6 +17,7 @@ use std::collections::HashMap;
use std::sync::Arc;
use std::vec;
use common_test_util::find_workspace_path;
use datafusion::assert_batches_eq;
use datafusion::datasource::physical_plan::{FileOpener, FileScanConfig, FileStream, ParquetExec};
use datafusion::execution::context::TaskContext;
@@ -71,7 +72,7 @@ async fn test_json_opener() {
CompressionType::Uncompressed,
);
let path = &test_util::get_data_dir("tests/json/basic.json")
let path = &find_workspace_path("/src/common/datasource/tests/json/basic.json")
.display()
.to_string();
let tests = [
@@ -111,7 +112,7 @@ async fn test_csv_opener() {
let store = test_store("/");
let schema = test_basic_schema();
let path = &test_util::get_data_dir("tests/csv/basic.csv")
let path = &find_workspace_path("/src/common/datasource/tests/csv/basic.csv")
.display()
.to_string();
let csv_conf = CsvConfigBuilder::default()
@@ -160,7 +161,7 @@ async fn test_parquet_exec() {
let schema = test_basic_schema();
let path = &test_util::get_data_dir("tests/parquet/basic.parquet")
let path = &find_workspace_path("/src/common/datasource/tests/parquet/basic.parquet")
.display()
.to_string();
let base_config = scan_config(schema.clone(), None, path);
@@ -196,13 +197,15 @@ async fn test_parquet_exec() {
#[tokio::test]
async fn test_orc_opener() {
let root = test_util::get_data_dir("tests/orc").display().to_string();
let root = find_workspace_path("/src/common/datasource/tests/orc")
.display()
.to_string();
let store = test_store(&root);
let schema = OrcFormat.infer_schema(&store, "test.orc").await.unwrap();
let schema = Arc::new(schema);
let orc_opener = OrcOpener::new(store.clone(), schema.clone(), None);
let path = &test_util::get_data_dir("/test.orc").display().to_string();
let path = "test.orc";
let tests = [
Test {

View File

@@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::path::PathBuf;
use std::sync::Arc;
use arrow_schema::{DataType, Field, Schema, SchemaRef};
@@ -31,13 +30,6 @@ use crate::test_util;
pub const TEST_BATCH_SIZE: usize = 100;
pub fn get_data_dir(path: &str) -> PathBuf {
// https://doc.rust-lang.org/cargo/reference/environment-variables.html
let dir = env!("CARGO_MANIFEST_DIR");
PathBuf::from(dir).join(path)
}
pub fn format_schema(schema: Schema) -> Vec<String> {
schema
.fields()
@@ -78,6 +70,9 @@ pub fn test_basic_schema() -> SchemaRef {
}
pub fn scan_config(file_schema: SchemaRef, limit: Option<usize>, filename: &str) -> FileScanConfig {
// object_store only recognize the Unix style path, so make it happy.
let filename = &filename.replace('\\', "/");
FileScanConfig {
object_store_url: ObjectStoreUrl::parse("empty://").unwrap(), // won't be used
file_schema,
@@ -124,12 +119,7 @@ pub async fn setup_stream_to_json_test(origin_path: &str, threshold: impl Fn(usi
let written = tmp_store.read(&output_path).await.unwrap();
let origin = store.read(origin_path).await.unwrap();
// ignores `\n`
assert_eq!(
String::from_utf8_lossy(&written).trim_end_matches('\n'),
String::from_utf8_lossy(&origin).trim_end_matches('\n'),
)
assert_eq_lines(written, origin);
}
pub async fn setup_stream_to_csv_test(origin_path: &str, threshold: impl Fn(usize) -> usize) {
@@ -166,10 +156,19 @@ pub async fn setup_stream_to_csv_test(origin_path: &str, threshold: impl Fn(usiz
let written = tmp_store.read(&output_path).await.unwrap();
let origin = store.read(origin_path).await.unwrap();
assert_eq_lines(written, origin);
}
// ignores `\n`
// Ignore the CRLF difference across operating systems.
fn assert_eq_lines(written: Vec<u8>, origin: Vec<u8>) {
assert_eq!(
String::from_utf8_lossy(&written).trim_end_matches('\n'),
String::from_utf8_lossy(&origin).trim_end_matches('\n'),
String::from_utf8(written)
.unwrap()
.lines()
.collect::<Vec<_>>(),
String::from_utf8(origin)
.unwrap()
.lines()
.collect::<Vec<_>>(),
)
}

View File

@@ -12,50 +12,36 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use common_test_util::find_workspace_path;
use crate::test_util;
#[tokio::test]
async fn test_stream_to_json() {
let origin_path = &find_workspace_path("/src/common/datasource/tests/json/basic.json")
.display()
.to_string();
// A small threshold
// Triggers the flush each writes
test_util::setup_stream_to_json_test(
&test_util::get_data_dir("tests/json/basic.json")
.display()
.to_string(),
|size| size / 2,
)
.await;
test_util::setup_stream_to_json_test(origin_path, |size| size / 2).await;
// A large threshold
// Only triggers the flush at last
test_util::setup_stream_to_json_test(
&test_util::get_data_dir("tests/json/basic.json")
.display()
.to_string(),
|size| size * 2,
)
.await;
test_util::setup_stream_to_json_test(origin_path, |size| size * 2).await;
}
#[tokio::test]
async fn test_stream_to_csv() {
let origin_path = &find_workspace_path("/src/common/datasource/tests/csv/basic.csv")
.display()
.to_string();
// A small threshold
// Triggers the flush each writes
test_util::setup_stream_to_csv_test(
&test_util::get_data_dir("tests/csv/basic.csv")
.display()
.to_string(),
|size| size / 2,
)
.await;
test_util::setup_stream_to_csv_test(origin_path, |size| size / 2).await;
// A large threshold
// Only triggers the flush at last
test_util::setup_stream_to_csv_test(
&test_util::get_data_dir("tests/csv/basic.csv")
.display()
.to_string(),
|size| size * 2,
)
.await;
test_util::setup_stream_to_csv_test(origin_path, |size| size * 2).await;
}

View File

@@ -66,6 +66,7 @@ mod tests {
}
}
#[cfg(not(windows))]
#[test]
fn test_parse_path_and_dir() {
let parsed = Url::from_file_path("/to/path/file").unwrap();
@@ -75,6 +76,16 @@ mod tests {
assert_eq!(parsed.path(), "/to/path/");
}
#[cfg(windows)]
#[test]
fn test_parse_path_and_dir() {
let parsed = Url::from_file_path("C:\\to\\path\\file").unwrap();
assert_eq!(parsed.path(), "/C:/to/path/file");
let parsed = Url::from_directory_path("C:\\to\\path\\").unwrap();
assert_eq!(parsed.path(), "/C:/to/path/");
}
#[test]
fn test_find_dir_and_filename() {
struct Test<'a> {

View File

@@ -8,9 +8,11 @@ license.workspace = true
common-error = { workspace = true }
snafu.workspace = true
tempfile = "3.4"
tikv-jemalloc-ctl = { version = "0.5", features = ["use_std"] }
tokio.workspace = true
[dependencies.tikv-jemalloc-sys]
[target.'cfg(not(windows))'.dependencies]
tikv-jemalloc-ctl = { version = "0.5", features = ["use_std"] }
[target.'cfg(not(windows))'.dependencies.tikv-jemalloc-sys]
features = ["stats", "profiling", "unprefixed_malloc_on_supported_platforms"]
version = "0.5"

View File

@@ -13,51 +13,28 @@
// limitations under the License.
use std::any::Any;
use std::path::PathBuf;
use common_error::ext::ErrorExt;
use common_error::ext::{BoxedError, ErrorExt};
use common_error::status_code::StatusCode;
use snafu::{Location, Snafu};
use snafu::Snafu;
pub type Result<T> = std::result::Result<T, Error>;
#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
pub enum Error {
#[snafu(display("Failed to read OPT_PROF, source: {}", source))]
ReadOptProf { source: tikv_jemalloc_ctl::Error },
#[snafu(display("{source}"))]
Internal { source: BoxedError },
#[snafu(display("Memory profiling is not enabled"))]
ProfilingNotEnabled,
#[snafu(display("Failed to build temp file from given path: {:?}", path))]
BuildTempPath { path: PathBuf, location: Location },
#[snafu(display("Failed to open temp file: {}, source: {}", path, source))]
OpenTempFile {
path: String,
source: std::io::Error,
},
#[snafu(display(
"Failed to dump profiling data to temp file: {:?}, source: {}",
path,
source
))]
DumpProfileData {
path: PathBuf,
source: tikv_jemalloc_ctl::Error,
},
#[snafu(display("Memory profiling is not supported"))]
ProfilingNotSupported,
}
impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
match self {
Error::ReadOptProf { .. } => StatusCode::Internal,
Error::ProfilingNotEnabled => StatusCode::InvalidArguments,
Error::BuildTempPath { .. } => StatusCode::Internal,
Error::OpenTempFile { .. } => StatusCode::StorageUnavailable,
Error::DumpProfileData { .. } => StatusCode::StorageUnavailable,
Error::Internal { source } => source.status_code(),
Error::ProfilingNotSupported => StatusCode::Unsupported,
}
}

View File

@@ -0,0 +1,76 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
mod error;
use std::ffi::{c_char, CString};
use std::path::PathBuf;
use error::{
BuildTempPathSnafu, DumpProfileDataSnafu, OpenTempFileSnafu, ProfilingNotEnabledSnafu,
ReadOptProfSnafu,
};
use snafu::{ensure, ResultExt};
use tokio::io::AsyncReadExt;
use crate::error::Result;
const PROF_DUMP: &[u8] = b"prof.dump\0";
const OPT_PROF: &[u8] = b"opt.prof\0";
pub async fn dump_profile() -> Result<Vec<u8>> {
ensure!(is_prof_enabled()?, ProfilingNotEnabledSnafu);
let tmp_path = tempfile::tempdir().map_err(|_| {
BuildTempPathSnafu {
path: std::env::temp_dir(),
}
.build()
})?;
let mut path_buf = PathBuf::from(tmp_path.path());
path_buf.push("greptimedb.hprof");
let path = path_buf
.to_str()
.ok_or_else(|| BuildTempPathSnafu { path: &path_buf }.build())?
.to_string();
let mut bytes = CString::new(path.as_str())
.map_err(|_| BuildTempPathSnafu { path: &path_buf }.build())?
.into_bytes_with_nul();
{
// #safety: we always expect a valid temp file path to write profiling data to.
let ptr = bytes.as_mut_ptr() as *mut c_char;
unsafe {
tikv_jemalloc_ctl::raw::write(PROF_DUMP, ptr)
.context(DumpProfileDataSnafu { path: path_buf })?
}
}
let mut f = tokio::fs::File::open(path.as_str())
.await
.context(OpenTempFileSnafu { path: &path })?;
let mut buf = vec![];
let _ = f
.read_to_end(&mut buf)
.await
.context(OpenTempFileSnafu { path })?;
Ok(buf)
}
fn is_prof_enabled() -> Result<bool> {
// safety: OPT_PROF variable, if present, is always a boolean value.
Ok(unsafe { tikv_jemalloc_ctl::raw::read::<bool>(OPT_PROF).context(ReadOptProfSnafu)? })
}

View File

@@ -0,0 +1,73 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use std::path::PathBuf;
use common_error::ext::{BoxedError, ErrorExt};
use common_error::status_code::StatusCode;
use snafu::{Location, Snafu};
#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
pub enum Error {
#[snafu(display("Failed to read OPT_PROF, source: {}", source))]
ReadOptProf { source: tikv_jemalloc_ctl::Error },
#[snafu(display("Memory profiling is not enabled"))]
ProfilingNotEnabled,
#[snafu(display("Failed to build temp file from given path: {:?}", path))]
BuildTempPath { path: PathBuf, location: Location },
#[snafu(display("Failed to open temp file: {}, source: {}", path, source))]
OpenTempFile {
path: String,
source: std::io::Error,
},
#[snafu(display(
"Failed to dump profiling data to temp file: {:?}, source: {}",
path,
source
))]
DumpProfileData {
path: PathBuf,
source: tikv_jemalloc_ctl::Error,
},
}
impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
match self {
Error::ReadOptProf { .. } => StatusCode::Internal,
Error::ProfilingNotEnabled => StatusCode::InvalidArguments,
Error::BuildTempPath { .. } => StatusCode::Internal,
Error::OpenTempFile { .. } => StatusCode::StorageUnavailable,
Error::DumpProfileData { .. } => StatusCode::StorageUnavailable,
}
}
fn as_any(&self) -> &dyn Any {
self
}
}
impl From<Error> for crate::error::Error {
fn from(e: Error) -> Self {
Self::Internal {
source: BoxedError::new(e),
}
}
}

View File

@@ -14,62 +14,12 @@
pub mod error;
use std::ffi::{c_char, CString};
use std::path::PathBuf;
use snafu::{ensure, ResultExt};
use tokio::io::AsyncReadExt;
use crate::error::{
BuildTempPathSnafu, DumpProfileDataSnafu, OpenTempFileSnafu, ProfilingNotEnabledSnafu,
ReadOptProfSnafu,
};
const PROF_DUMP: &[u8] = b"prof.dump\0";
const OPT_PROF: &[u8] = b"opt.prof\0";
#[cfg(not(windows))]
mod jemalloc;
#[cfg(not(windows))]
pub use jemalloc::dump_profile;
#[cfg(windows)]
pub async fn dump_profile() -> error::Result<Vec<u8>> {
ensure!(is_prof_enabled()?, ProfilingNotEnabledSnafu);
let tmp_path = tempfile::tempdir().map_err(|_| {
BuildTempPathSnafu {
path: std::env::temp_dir(),
}
.build()
})?;
let mut path_buf = PathBuf::from(tmp_path.path());
path_buf.push("greptimedb.hprof");
let path = path_buf
.to_str()
.ok_or_else(|| BuildTempPathSnafu { path: &path_buf }.build())?
.to_string();
let mut bytes = CString::new(path.as_str())
.map_err(|_| BuildTempPathSnafu { path: &path_buf }.build())?
.into_bytes_with_nul();
{
// #safety: we always expect a valid temp file path to write profiling data to.
let ptr = bytes.as_mut_ptr() as *mut c_char;
unsafe {
tikv_jemalloc_ctl::raw::write(PROF_DUMP, ptr)
.context(DumpProfileDataSnafu { path: path_buf })?
}
}
let mut f = tokio::fs::File::open(path.as_str())
.await
.context(OpenTempFileSnafu { path: &path })?;
let mut buf = vec![];
let _ = f
.read_to_end(&mut buf)
.await
.context(OpenTempFileSnafu { path })?;
Ok(buf)
}
fn is_prof_enabled() -> error::Result<bool> {
// safety: OPT_PROF variable, if present, is always a boolean value.
Ok(unsafe { tikv_jemalloc_ctl::raw::read::<bool>(OPT_PROF).context(ReadOptProfSnafu)? })
error::ProfilingNotSupportedSnafu.fail()
}

View File

@@ -1,16 +0,0 @@
[package]
name = "common-pprof"
version.workspace = true
edition.workspace = true
license.workspace = true
[dependencies]
common-error = { workspace = true }
pprof = { version = "0.11", features = [
"flamegraph",
"prost-codec",
"protobuf",
] }
prost.workspace = true
snafu.workspace = true
tokio.workspace = true

View File

@@ -12,5 +12,48 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#![feature(lazy_cell)]
use std::path::{Path, PathBuf};
use std::process::Command;
use std::sync::LazyLock;
pub mod ports;
pub mod temp_dir;
// Rust is working on an env possibly named `CARGO_WORKSPACE_DIR` to find the root path to the
// workspace, see https://github.com/rust-lang/cargo/issues/3946.
// Until then, use this verbose way.
static WORKSPACE_ROOT: LazyLock<PathBuf> = LazyLock::new(|| {
let output = Command::new(env!("CARGO"))
.args(["locate-project", "--workspace", "--message-format=plain"])
.output()
.unwrap()
.stdout;
let cargo_path = Path::new(std::str::from_utf8(&output).unwrap().trim());
cargo_path.parent().unwrap().to_path_buf()
});
/// Find the absolute path to a file or a directory in the workspace.
/// The input `path` should be the relative path of the file or directory from workspace root.
///
/// For example, if the greptimedb project is placed under directory "/foo/bar/greptimedb/",
/// and this function is invoked with path = "/src/common/test-util/src/lib.rs", you will get the
/// absolute path to this file.
///
/// The return value is [PathBuf]. This is to adapt the Windows file system's style.
/// However, the input argument is Unix style, this is to give user the most convenience.
pub fn find_workspace_path(path: &str) -> PathBuf {
let mut buf = WORKSPACE_ROOT.clone();
// Manually "canonicalize" to avoid annoy Windows specific "\\?" path prefix.
path.split('/').for_each(|x| {
if x == ".." {
buf.pop();
} else if x != "." {
buf = buf.join(x);
}
});
buf
}

View File

@@ -15,10 +15,11 @@
use std::fmt::{Display, Formatter};
use std::str::FromStr;
use chrono::{Local, LocalResult, NaiveDateTime, TimeZone};
use chrono::{LocalResult, NaiveDateTime};
use serde::{Deserialize, Serialize};
use crate::error::{Error, InvalidDateStrSnafu, Result};
use crate::util::{format_utc_datetime, local_datetime_to_utc};
const DATETIME_FORMAT: &str = "%F %T";
const DATETIME_FORMAT_WITH_TZ: &str = "%F %T%z";
@@ -35,9 +36,7 @@ impl Display for DateTime {
write!(
f,
"{}",
Local {}
.from_utc_datetime(&abs_time)
.format(DATETIME_FORMAT_WITH_TZ)
format_utc_datetime(&abs_time, DATETIME_FORMAT_WITH_TZ)
)
} else {
write!(f, "DateTime({})", self.0)
@@ -56,13 +55,12 @@ impl FromStr for DateTime {
fn from_str(s: &str) -> Result<Self> {
let s = s.trim();
let local = Local {};
let timestamp = if let Ok(d) = NaiveDateTime::parse_from_str(s, DATETIME_FORMAT) {
match local.from_local_datetime(&d) {
match local_datetime_to_utc(&d) {
LocalResult::None => {
return InvalidDateStrSnafu { raw: s }.fail();
}
LocalResult::Single(d) | LocalResult::Ambiguous(d, _) => d.naive_utc().timestamp(),
LocalResult::Single(d) | LocalResult::Ambiguous(d, _) => d.timestamp(),
}
} else if let Ok(v) = chrono::DateTime::parse_from_str(s, DATETIME_FORMAT_WITH_TZ) {
v.timestamp()

View File

@@ -15,12 +15,12 @@
use std::cmp::Ordering;
use std::hash::{Hash, Hasher};
use chrono::offset::Local;
use chrono::{NaiveDateTime, NaiveTime, TimeZone as ChronoTimeZone, Utc};
use serde::{Deserialize, Serialize};
use crate::timestamp::TimeUnit;
use crate::timezone::TimeZone;
use crate::util::format_utc_datetime;
/// Time value, represents the elapsed time since midnight in the unit of `TimeUnit`.
#[derive(Debug, Clone, Default, Copy, Serialize, Deserialize)]
@@ -119,10 +119,7 @@ impl Time {
Some(TimeZone::Named(tz)) => {
format!("{}", tz.from_utc_datetime(&datetime).format(pattern))
}
None => {
let local = Local {};
format!("{}", local.from_utc_datetime(&datetime).format(pattern))
}
None => format_utc_datetime(&datetime, pattern),
}
} else {
format!("[Time{}: {}]", self.unit, self.value)

View File

@@ -20,7 +20,6 @@ use std::str::FromStr;
use std::time::Duration;
use arrow::datatypes::TimeUnit as ArrowTimeUnit;
use chrono::offset::Local;
use chrono::{DateTime, LocalResult, NaiveDateTime, TimeZone as ChronoTimeZone, Utc};
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
@@ -28,7 +27,7 @@ use snafu::{OptionExt, ResultExt};
use crate::error;
use crate::error::{ArithmeticOverflowSnafu, Error, ParseTimestampSnafu, TimestampOverflowSnafu};
use crate::timezone::TimeZone;
use crate::util::div_ceil;
use crate::util::{div_ceil, format_utc_datetime, local_datetime_to_utc};
#[derive(Debug, Clone, Default, Copy, Serialize, Deserialize)]
pub struct Timestamp {
@@ -195,10 +194,7 @@ impl Timestamp {
Some(TimeZone::Named(tz)) => {
format!("{}", tz.from_utc_datetime(&v).format(pattern))
}
None => {
let local = Local {};
format!("{}", local.from_utc_datetime(&v).format(pattern))
}
None => format_utc_datetime(&v, pattern),
}
} else {
format!("[Timestamp{}: {}]", self.unit, self.value)
@@ -265,18 +261,11 @@ fn naive_datetime_to_timestamp(
s: &str,
datetime: NaiveDateTime,
) -> crate::error::Result<Timestamp> {
let l = Local {};
match l.from_local_datetime(&datetime) {
match local_datetime_to_utc(&datetime) {
LocalResult::None => ParseTimestampSnafu { raw: s }.fail(),
LocalResult::Single(local_datetime) => Ok(Timestamp::new(
local_datetime.with_timezone(&Utc).timestamp_nanos(),
TimeUnit::Nanosecond,
)),
LocalResult::Ambiguous(local_datetime, _) => Ok(Timestamp::new(
local_datetime.with_timezone(&Utc).timestamp_nanos(),
TimeUnit::Nanosecond,
)),
LocalResult::Single(utc) | LocalResult::Ambiguous(utc, _) => {
Ok(Timestamp::new(utc.timestamp_nanos(), TimeUnit::Nanosecond))
}
}
}
@@ -421,7 +410,7 @@ impl Hash for Timestamp {
mod tests {
use std::collections::hash_map::DefaultHasher;
use chrono::Offset;
use chrono::{Local, Offset};
use rand::Rng;
use serde_json::Value;

View File

@@ -15,13 +15,14 @@
use std::fmt::Display;
use std::str::FromStr;
use chrono::{FixedOffset, Local};
use chrono::{FixedOffset, Local, Offset};
use chrono_tz::Tz;
use snafu::{OptionExt, ResultExt};
use crate::error::{
InvalidTimeZoneOffsetSnafu, ParseOffsetStrSnafu, ParseTimeZoneNameSnafu, Result,
};
use crate::util::find_tz_from_env;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum TimeZone {
@@ -87,7 +88,11 @@ impl Display for TimeZone {
#[inline]
pub fn system_time_zone_name() -> String {
Local::now().offset().to_string()
if let Some(tz) = find_tz_from_env() {
Local::now().with_timezone(&tz).offset().fix().to_string()
} else {
Local::now().offset().to_string()
}
}
#[cfg(test)]

View File

@@ -12,6 +12,38 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::str::FromStr;
use chrono::offset::Local;
use chrono::{LocalResult, NaiveDateTime, TimeZone};
use chrono_tz::Tz;
pub fn format_utc_datetime(utc: &NaiveDateTime, pattern: &str) -> String {
if let Some(tz) = find_tz_from_env() {
format!("{}", tz.from_utc_datetime(utc).format(pattern))
} else {
format!("{}", Local.from_utc_datetime(utc).format(pattern))
}
}
pub fn local_datetime_to_utc(local: &NaiveDateTime) -> LocalResult<NaiveDateTime> {
if let Some(tz) = find_tz_from_env() {
tz.from_local_datetime(local).map(|x| x.naive_utc())
} else {
Local.from_local_datetime(local).map(|x| x.naive_utc())
}
}
pub fn find_tz_from_env() -> Option<Tz> {
// Windows does not support "TZ" env variable, which is used in the `Local` timezone under Unix.
// However, we are used to set "TZ" env as the default timezone without actually providing a
// timezone argument (especially in tests), and it's very convenient to do so, we decide to make
// it work under Windows as well.
std::env::var("TZ")
.ok()
.and_then(|tz| Tz::from_str(&tz).ok())
}
/// Returns the time duration since UNIX_EPOCH in milliseconds.
pub fn current_time_millis() -> i64 {
chrono::Utc::now().timestamp_millis()

View File

@@ -22,7 +22,7 @@ common-telemetry = { workspace = true }
futures-util.workspace = true
futures.workspace = true
protobuf = { version = "2", features = ["bytes"] }
raft-engine = "0.3"
raft-engine = { git = "https://github.com/tikv/raft-engine.git", rev = "2dcaf5beeea3d5de9ec9c7133a2451d00f508f52" }
snafu = { version = "0.7", features = ["backtraces"] }
store-api = { workspace = true }
tokio-util.workspace = true

View File

@@ -12,14 +12,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::path::Path;
use crate::raft_engine::log_store::RaftEngineLogStore;
use crate::LogConfig;
/// Create a write log for the provided path, used for test.
pub async fn create_tmp_local_file_log_store(path: &str) -> RaftEngineLogStore {
pub async fn create_tmp_local_file_log_store<P: AsRef<Path>>(path: P) -> RaftEngineLogStore {
let cfg = LogConfig {
file_size: 128 * 1024,
log_file_dir: path.to_string(),
log_file_dir: path.as_ref().display().to_string(),
..Default::default()
};
RaftEngineLogStore::try_new(cfg).await.unwrap()

View File

@@ -88,7 +88,7 @@ impl MetaSrvOptions {
// Options for datanode.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
pub struct DatanodeOptions {
client_options: DatanodeClientOptions,
pub client_options: DatanodeClientOptions,
}
// Options for datanode client.

View File

@@ -23,7 +23,6 @@ use datatypes::schema::ColumnSchema;
use log_store::raft_engine::log_store::RaftEngineLogStore;
use log_store::test_util::log_store_util;
use object_store::services::Fs;
use object_store::util::join_dir;
use object_store::ObjectStore;
use store_api::storage::RegionId;
@@ -79,9 +78,9 @@ impl TestEnv {
}
async fn create_log_and_object_store(&self) -> (RaftEngineLogStore, ObjectStore) {
let data_home = self.data_home.path().to_str().unwrap();
let wal_path = join_dir(data_home, "wal");
let data_path = join_dir(data_home, "data");
let data_home = self.data_home.path();
let wal_path = data_home.join("wal");
let data_path = data_home.join("data").as_path().display().to_string();
let log_store = log_store_util::create_tmp_local_file_log_store(&wal_path).await;
let mut builder = Fs::default();
@@ -99,13 +98,20 @@ impl TestEnv {
checkpoint_interval: u64,
initial_metadata: Option<RegionMetadataRef>,
) -> Result<Option<RegionManifestManager>> {
let data_home = self.data_home.path().to_str().unwrap();
let manifest_dir = join_dir(data_home, "manifest");
let data_home = self.data_home.path();
let manifest_dir = data_home.join("manifest").as_path().display().to_string();
let mut builder = Fs::default();
builder.root(&manifest_dir);
let object_store = ObjectStore::new(builder).unwrap().finish();
// The "manifest_dir" here should be the relative path from the `object_store`'s root.
// Otherwise the OpenDal's list operation would fail with "StripPrefixError". This is
// because the `object_store`'s root path is "canonicalize"d; and under the Windows,
// canonicalize a path will prepend "\\?\" to it. This behavior will cause the original
// happen-to-be-working list on an absolute path failed on Windows.
let manifest_dir = "/".to_string();
let manifest_opts = RegionManifestOptions {
manifest_dir,
object_store,

View File

@@ -13,6 +13,7 @@
// limitations under the License.
use std::any::Any;
use std::time::Duration;
use common_error::ext::{BoxedError, ErrorExt};
use common_error::status_code::StatusCode;
@@ -234,6 +235,12 @@ pub enum Error {
location
))]
TimeIndexNotFound { table: String, location: Location },
#[snafu(display("Failed to add duration '{:?}' to SystemTime, overflowed", duration))]
AddSystemTimeOverflow {
duration: Duration,
location: Location,
},
}
impl ErrorExt for Error {
@@ -253,7 +260,8 @@ impl ErrorExt for Error {
| ParseFloat { .. }
| MissingRequiredField { .. }
| BuildRegex { .. }
| ConvertSchema { .. } => StatusCode::InvalidArguments,
| ConvertSchema { .. }
| AddSystemTimeOverflow { .. } => StatusCode::InvalidArguments,
BuildBackend { .. } | ListObjects { .. } => StatusCode::StorageUnavailable,
EncodeSubstraitLogicalPlan { source, .. } => source.status_code(),

View File

@@ -24,14 +24,14 @@ use common_telemetry::timer;
use promql_parser::parser::ast::{Extension as NodeExtension, ExtensionExpr};
use promql_parser::parser::Expr::Extension;
use promql_parser::parser::{EvalStmt, Expr, ValueType};
use snafu::ResultExt;
use snafu::{OptionExt, ResultExt};
use sql::dialect::GreptimeDbDialect;
use sql::parser::ParserContext;
use sql::statements::statement::Statement;
use crate::error::{
MultipleStatementsSnafu, ParseFloatSnafu, ParseTimestampSnafu, QueryParseSnafu, Result,
UnimplementedSnafu,
AddSystemTimeOverflowSnafu, MultipleStatementsSnafu, ParseFloatSnafu, ParseTimestampSnafu,
QueryParseSnafu, Result, UnimplementedSnafu,
};
use crate::metrics::{METRIC_PARSE_PROMQL_ELAPSED, METRIC_PARSE_SQL_ELAPSED};
@@ -177,24 +177,17 @@ impl QueryLanguageParser {
}
// try float format
timestamp
let secs = timestamp
.parse::<f64>()
.context(ParseFloatSnafu { raw: timestamp })
.map(|float| {
let duration = Duration::from_secs_f64(float);
SystemTime::UNIX_EPOCH
.checked_add(duration)
.unwrap_or(max_system_timestamp())
})
// also report rfc3339 error if float parsing fails
.map_err(|_| rfc3339_result.unwrap_err())
}
}
.map_err(|_| rfc3339_result.unwrap_err())?;
fn max_system_timestamp() -> SystemTime {
SystemTime::UNIX_EPOCH
.checked_add(Duration::from_secs(std::i64::MAX as u64))
.unwrap()
let duration = Duration::from_secs_f64(secs);
SystemTime::UNIX_EPOCH
.checked_add(duration)
.context(AddSystemTimeOverflowSnafu { duration })
}
}
macro_rules! define_node_ast_extension {
@@ -291,11 +284,6 @@ mod test {
),
("0.000", SystemTime::UNIX_EPOCH),
("00", SystemTime::UNIX_EPOCH),
(
// i64::MAX + 1
"9223372036854775808.000",
max_system_timestamp(),
),
(
"2015-07-01T20:10:51.781Z",
SystemTime::UNIX_EPOCH
@@ -320,6 +308,14 @@ mod test {
// assert difference < 0.1 second
assert!(result.abs_diff(expected) < 100);
}
// i64::MAX + 1
let timestamp = "9223372036854775808.000";
let result = QueryLanguageParser::parse_promql_timestamp(timestamp);
assert_eq!(
result.unwrap_err().to_string(),
"Failed to add duration '9223372036854775808s' to SystemTime, overflowed"
);
}
#[test]
@@ -331,6 +327,7 @@ mod test {
step: "1d".to_string(),
};
#[cfg(not(windows))]
let expected = String::from(
"\
Promql(EvalStmt { \
@@ -346,6 +343,22 @@ mod test {
})",
);
// Windows has different debug output for SystemTime.
#[cfg(windows)]
let expected = String::from(
"\
Promql(EvalStmt { \
expr: VectorSelector(VectorSelector { \
name: Some(\"http_request\"), \
matchers: Matchers { matchers: [] }, \
offset: None, at: None }), \
start: SystemTime { intervals: 132892460400000000 }, \
end: SystemTime { intervals: 133207820400000000 }, \
interval: 86400s, \
lookback_delta: 300s \
})",
);
let result = QueryLanguageParser::parse_promql(&promql).unwrap();
assert_eq!(format!("{result:?}"), expected);
}

View File

@@ -334,7 +334,7 @@ impl PyVector {
let ret = Self::from(ret).into_py(py);
Ok(ret)
} else if let Ok(slice) = needle.downcast::<PySlice>(py) {
let indices = slice.indices(self.len() as i64)?;
let indices = slice.indices(self.len() as _)?;
let (start, stop, step, _slicelength) = (
indices.start,
indices.stop,

View File

@@ -7,7 +7,7 @@ license.workspace = true
[features]
dashboard = []
mem-prof = ["dep:common-mem-prof"]
pprof = ["dep:common-pprof"]
pprof = ["dep:pprof"]
[dependencies]
aide = { version = "0.9", features = ["axum"] }
@@ -26,7 +26,6 @@ common-error = { workspace = true }
common-grpc = { workspace = true }
common-grpc-expr = { workspace = true }
common-mem-prof = { workspace = true, optional = true }
common-pprof = { workspace = true, optional = true }
common-query = { workspace = true }
common-recordbatch = { workspace = true }
common-runtime = { workspace = true }
@@ -61,6 +60,11 @@ parking_lot = "0.12"
pgwire = "0.16"
pin-project = "1.0"
postgres-types = { version = "0.2", features = ["with-chrono-0_4"] }
pprof = { version = "0.11", features = [
"flamegraph",
"prost-codec",
"protobuf",
], optional = true }
promql-parser = "0.1.1"
prost.workspace = true
query = { workspace = true }
@@ -80,7 +84,6 @@ snap = "1"
sql = { workspace = true }
strum = { version = "0.24", features = ["derive"] }
table = { workspace = true }
tikv-jemalloc-ctl = { version = "0.5", features = ["use_std"] }
tokio-rustls = "0.24"
tokio-stream = { version = "0.1", features = ["net"] }
tokio.workspace = true
@@ -89,6 +92,9 @@ tonic.workspace = true
tower = { version = "0.4", features = ["full"] }
tower-http = { version = "0.3", features = ["full"] }
[target.'cfg(not(windows))'.dependencies]
tikv-jemalloc-ctl = { version = "0.5", features = ["use_std"] }
[dev-dependencies]
axum-test-helper = { git = "https://github.com/sunng87/axum-test-helper.git", branch = "patch-1" }
catalog = { workspace = true, features = ["testing"] }

View File

@@ -28,12 +28,41 @@ fn fetch_dashboard_assets() {
let message = "Failed to fetch dashboard assets";
let help = r#"
You can manually execute './scripts/fetch-dashboard-assets.sh' to see why,
You can manually execute "fetch-dashboard-assets.sh" to see why,
or it's a network error, just try again or enable/disable some proxy."#;
// It's very unlikely to be failed to get the current dir here, see `current_dir`'s docs.
let mut dir = std::env::current_dir().unwrap();
dir.pop();
dir.pop();
dir.push("scripts");
let out_dir = std::env::var("OUT_DIR").unwrap();
let output = Command::new("./fetch-dashboard-assets.sh")
.arg(&out_dir)
.current_dir("../../scripts")
#[cfg(windows)]
let (program, args) = (
"bash",
[
format!(
"/mnt/{}/fetch-dashboard-assets.sh",
dir.display()
.to_string()
.to_lowercase()
.replace(':', "")
.replace('\\', "/")
),
format!(
"/mnt/{}",
out_dir.to_lowercase().replace(':', "").replace('\\', "/")
),
],
);
#[cfg(not(windows))]
let (program, args) = ("./fetch-dashboard-assets.sh", [out_dir]);
let output = Command::new(program)
.args(args)
.current_dir(dir)
.stdout(Stdio::piped())
.spawn()
.and_then(|p| p.wait_with_output());

View File

@@ -297,11 +297,8 @@ pub enum Error {
#[snafu(display("Failed to dump pprof data, source: {}", source))]
DumpPprof { source: common_pprof::Error },
#[snafu(display("Failed to update jemalloc metrics, source: {source}, location: {location}"))]
UpdateJemallocMetrics {
source: tikv_jemalloc_ctl::Error,
location: Location,
},
#[snafu(display("{source}"))]
Metrics { source: BoxedError },
#[snafu(display("DataFrame operation error, source: {source}, location: {location}"))]
DataFrame {
@@ -418,7 +415,7 @@ impl ErrorExt for Error {
#[cfg(feature = "pprof")]
DumpPprof { source, .. } => source.status_code(),
UpdateJemallocMetrics { .. } => StatusCode::Internal,
Metrics { source } => source.status_code(),
ConvertScalarValue { source, .. } => source.status_code(),
}

View File

@@ -21,14 +21,13 @@ use axum::extract::{Json, Query, State};
use axum::response::{IntoResponse, Response};
use axum::{Extension, Form};
use common_error::status_code::StatusCode;
use common_telemetry::{error, timer};
use common_telemetry::timer;
use query::parser::PromQuery;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use session::context::UserInfo;
use crate::http::{ApiState, GreptimeOptionsConfigState, JsonResponse};
use crate::metrics::JEMALLOC_COLLECTOR;
use crate::metrics_handler::MetricsHandler;
#[derive(Debug, Default, Serialize, Deserialize, JsonSchema)]
@@ -141,9 +140,10 @@ pub async fn metrics(
#[cfg(feature = "metrics-process")]
crate::metrics::PROCESS_COLLECTOR.collect();
if let Some(c) = JEMALLOC_COLLECTOR.as_ref() {
#[cfg(not(windows))]
if let Some(c) = crate::metrics::jemalloc::JEMALLOC_COLLECTOR.as_ref() {
if let Err(e) = c.update() {
error!(e; "Failed to update jemalloc metrics");
common_telemetry::error!(e; "Failed to update jemalloc metrics");
}
}
state.render()

View File

@@ -12,6 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#[cfg(feature = "pprof")]
mod nix;
#[cfg(feature = "pprof")]
pub mod handler {
use std::num::NonZeroI32;
@@ -20,13 +23,13 @@ pub mod handler {
use axum::extract::Query;
use axum::http::StatusCode;
use axum::response::IntoResponse;
use common_pprof::Profiling;
use common_telemetry::logging;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use snafu::ResultExt;
use crate::error::{DumpPprofSnafu, Result};
use crate::http::pprof::nix::Profiling;
/// Output format.
#[derive(Debug, Serialize, Deserialize, JsonSchema)]

View File

@@ -12,22 +12,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#[cfg(not(windows))]
pub(crate) mod jemalloc;
use std::task::{Context, Poll};
use std::time::Instant;
use common_telemetry::error;
use hyper::Body;
use metrics::gauge;
use once_cell::sync::Lazy;
use snafu::ResultExt;
use tikv_jemalloc_ctl::stats::{allocated_mib, resident_mib};
use tikv_jemalloc_ctl::{epoch, epoch_mib, stats};
use tonic::body::BoxBody;
use tower::{Layer, Service};
use crate::error;
use crate::error::UpdateJemallocMetricsSnafu;
pub(crate) const METRIC_DB_LABEL: &str = "db";
pub(crate) const METRIC_CODE_LABEL: &str = "code";
pub(crate) const METRIC_TYPE_LABEL: &str = "type";
@@ -80,60 +74,15 @@ pub(crate) const METRIC_GRPC_REQUESTS_TOTAL: &str = "servers.grpc_requests_total
pub(crate) const METRIC_GRPC_REQUESTS_ELAPSED: &str = "servers.grpc_requests_elapsed";
pub(crate) const METRIC_METHOD_LABEL: &str = "method";
pub(crate) const METRIC_PATH_LABEL: &str = "path";
pub(crate) const METRIC_JEMALLOC_RESIDENT: &str = "sys.jemalloc.resident";
pub(crate) const METRIC_JEMALLOC_ALLOCATED: &str = "sys.jemalloc.allocated";
/// Prometheus style process metrics collector.
#[cfg(feature = "metrics-process")]
pub(crate) static PROCESS_COLLECTOR: Lazy<metrics_process::Collector> = Lazy::new(|| {
let collector = metrics_process::Collector::default();
// Describe collector.
collector.describe();
collector
});
pub(crate) static JEMALLOC_COLLECTOR: Lazy<Option<JemallocCollector>> = Lazy::new(|| {
let collector = JemallocCollector::try_new()
.map_err(|e| {
error!(e; "Failed to retrieve jemalloc metrics");
e
})
.ok();
collector.map(|c| {
if let Err(e) = c.update() {
error!(e; "Failed to update jemalloc metrics");
};
c
})
});
pub(crate) struct JemallocCollector {
epoch: epoch_mib,
allocated: allocated_mib,
resident: resident_mib,
}
impl JemallocCollector {
pub(crate) fn try_new() -> error::Result<Self> {
let e = epoch::mib().context(UpdateJemallocMetricsSnafu)?;
let allocated = stats::allocated::mib().context(UpdateJemallocMetricsSnafu)?;
let resident = stats::resident::mib().context(UpdateJemallocMetricsSnafu)?;
Ok(Self {
epoch: e,
allocated,
resident,
})
}
pub(crate) fn update(&self) -> error::Result<()> {
let _ = self.epoch.advance().context(UpdateJemallocMetricsSnafu)?;
let allocated = self.allocated.read().context(UpdateJemallocMetricsSnafu)?;
let resident = self.resident.read().context(UpdateJemallocMetricsSnafu)?;
gauge!(METRIC_JEMALLOC_ALLOCATED, allocated as f64);
gauge!(METRIC_JEMALLOC_RESIDENT, resident as f64);
Ok(())
}
}
pub(crate) static PROCESS_COLLECTOR: once_cell::sync::Lazy<metrics_process::Collector> =
once_cell::sync::Lazy::new(|| {
let collector = metrics_process::Collector::default();
collector.describe();
collector
});
// Based on https://github.com/hyperium/tonic/blob/master/examples/src/tower/server.rs
// See https://github.com/hyperium/tonic/issues/242

View File

@@ -0,0 +1,69 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
mod error;
use common_telemetry::error;
use error::UpdateJemallocMetricsSnafu;
use metrics::gauge;
use once_cell::sync::Lazy;
use snafu::ResultExt;
use tikv_jemalloc_ctl::stats::{allocated_mib, resident_mib};
use tikv_jemalloc_ctl::{epoch, epoch_mib, stats};
pub(crate) const METRIC_JEMALLOC_RESIDENT: &str = "sys.jemalloc.resident";
pub(crate) const METRIC_JEMALLOC_ALLOCATED: &str = "sys.jemalloc.allocated";
pub(crate) static JEMALLOC_COLLECTOR: Lazy<Option<JemallocCollector>> = Lazy::new(|| {
let collector = JemallocCollector::try_new()
.map_err(|e| {
error!(e; "Failed to retrieve jemalloc metrics");
e
})
.ok();
collector.map(|c| {
if let Err(e) = c.update() {
error!(e; "Failed to update jemalloc metrics");
};
c
})
});
pub(crate) struct JemallocCollector {
epoch: epoch_mib,
allocated: allocated_mib,
resident: resident_mib,
}
impl JemallocCollector {
pub(crate) fn try_new() -> crate::error::Result<Self> {
let e = epoch::mib().context(UpdateJemallocMetricsSnafu)?;
let allocated = stats::allocated::mib().context(UpdateJemallocMetricsSnafu)?;
let resident = stats::resident::mib().context(UpdateJemallocMetricsSnafu)?;
Ok(Self {
epoch: e,
allocated,
resident,
})
}
pub(crate) fn update(&self) -> crate::error::Result<()> {
let _ = self.epoch.advance().context(UpdateJemallocMetricsSnafu)?;
let allocated = self.allocated.read().context(UpdateJemallocMetricsSnafu)?;
let resident = self.resident.read().context(UpdateJemallocMetricsSnafu)?;
gauge!(METRIC_JEMALLOC_ALLOCATED, allocated as f64);
gauge!(METRIC_JEMALLOC_RESIDENT, resident as f64);
Ok(())
}
}

View File

@@ -0,0 +1,49 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use common_error::ext::{BoxedError, ErrorExt};
use common_error::status_code::StatusCode;
use snafu::{Location, Snafu};
#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
pub enum Error {
#[snafu(display("Failed to update jemalloc metrics, source: {source}, location: {location}"))]
UpdateJemallocMetrics {
source: tikv_jemalloc_ctl::Error,
location: Location,
},
}
impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
match self {
Error::UpdateJemallocMetrics { .. } => StatusCode::Internal,
}
}
fn as_any(&self) -> &dyn Any {
self
}
}
impl From<Error> for crate::error::Error {
fn from(e: Error) -> Self {
Self::Metrics {
source: BoxedError::new(e),
}
}
}

View File

@@ -199,8 +199,7 @@ async fn test_shutdown_mysql_server() -> Result<()> {
for handle in join_handles.iter_mut() {
let result = handle.await.unwrap();
assert!(result.is_err());
let error = result.unwrap_err().to_string();
assert!(error.contains("Connection refused") || error.contains("Connection reset by peer"));
assert!(result.unwrap_err().is_fatal());
}
Ok(())
}

View File

@@ -378,7 +378,7 @@ mod tests {
assert_eq!(expected, actions);
}
#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn test_fs_region_manifest_checkpoint_compress() {
let duration = Duration::from_millis(50);
let manifest = new_fs_manifest(true, Some(duration)).await;

View File

@@ -20,7 +20,7 @@ use api::v1::meta::Role;
use client::client_manager::DatanodeClients;
use client::Client;
use common_base::Plugins;
use common_grpc::channel_manager::ChannelManager;
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
use common_meta::peer::Peer;
use common_meta::DatanodeId;
use common_runtime::Builder as RuntimeBuilder;
@@ -88,7 +88,8 @@ impl GreptimeDbClusterBuilder {
pub async fn build(self) -> GreptimeDbCluster {
let datanodes = self.datanodes.unwrap_or(4);
let datanode_clients = Arc::new(DatanodeClients::default());
let channel_config = ChannelConfig::new().timeout(Duration::from_secs(20));
let datanode_clients = Arc::new(DatanodeClients::new(channel_config));
let meta_srv = self.build_metasrv(datanode_clients.clone()).await;

View File

@@ -29,8 +29,8 @@ use session::context::{QueryContext, QueryContextRef};
use crate::test_util::check_output_stream;
use crate::tests::test_util::{
both_instances_cases, check_unordered_output_stream, distributed, get_data_dir, standalone,
standalone_instance_case, MockInstance,
both_instances_cases, check_unordered_output_stream, distributed, find_testing_resource,
standalone, standalone_instance_case, MockInstance,
};
#[apply(both_instances_cases)]
@@ -518,12 +518,7 @@ async fn test_execute_external_create_without_ts_type(instance: Arc<dyn MockInst
async fn test_execute_query_external_table_parquet(instance: Arc<dyn MockInstance>) {
let instance = instance.frontend();
let format = "parquet";
let location = get_data_dir("../tests/data/parquet/various_type.parquet")
.canonicalize()
.unwrap()
.display()
.to_string();
let location = find_testing_resource("/tests/data/parquet/various_type.parquet");
let table_name = "various_type_parquet";
let output = execute_sql(
@@ -588,12 +583,7 @@ async fn test_execute_query_external_table_parquet(instance: Arc<dyn MockInstanc
async fn test_execute_query_external_table_orc(instance: Arc<dyn MockInstance>) {
let instance = instance.frontend();
let format = "orc";
let location = get_data_dir("../src/common/datasource/tests/orc/test.orc")
.canonicalize()
.unwrap()
.display()
.to_string();
let location = find_testing_resource("/src/common/datasource/tests/orc/test.orc");
let table_name = "various_type_orc";
let output = execute_sql(
@@ -668,12 +658,7 @@ async fn test_execute_query_external_table_orc(instance: Arc<dyn MockInstance>)
async fn test_execute_query_external_table_csv(instance: Arc<dyn MockInstance>) {
let instance = instance.frontend();
let format = "csv";
let location = get_data_dir("../tests/data/csv/various_type.csv")
.canonicalize()
.unwrap()
.display()
.to_string();
let location = find_testing_resource("/tests/data/csv/various_type.csv");
let table_name = "various_type_csv";
let output = execute_sql(
@@ -719,12 +704,7 @@ async fn test_execute_query_external_table_csv(instance: Arc<dyn MockInstance>)
async fn test_execute_query_external_table_json(instance: Arc<dyn MockInstance>) {
let instance = instance.frontend();
let format = "json";
let location = get_data_dir("../tests/data/json/various_type.json")
.canonicalize()
.unwrap()
.display()
.to_string();
let location = find_testing_resource("/tests/data/json/various_type.json");
let table_name = "various_type_json";
let output = execute_sql(
@@ -776,12 +756,7 @@ async fn test_execute_query_external_table_json(instance: Arc<dyn MockInstance>)
async fn test_execute_query_external_table_json_with_schame(instance: Arc<dyn MockInstance>) {
let instance = instance.frontend();
let format = "json";
let location = get_data_dir("../tests/data/json/various_type.json")
.canonicalize()
.unwrap()
.display()
.to_string();
let location = find_testing_resource("/tests/data/json/various_type.json");
let table_name = "various_type_json_with_schema";
let output = execute_sql(
@@ -1339,11 +1314,7 @@ async fn test_execute_copy_from_orc(instance: Arc<dyn MockInstance>) {
)
.await, Output::AffectedRows(0)));
let filepath = get_data_dir("../src/common/datasource/tests/orc/test.orc")
.canonicalize()
.unwrap()
.display()
.to_string();
let filepath = find_testing_resource("/src/common/datasource/tests/orc/test.orc");
let output = execute_sql(
&instance,
@@ -1376,11 +1347,8 @@ async fn test_cast_type_issue_1594(instance: Arc<dyn MockInstance>) {
"create table tsbs_cpu(hostname STRING, environment STRING, usage_user DOUBLE, usage_system DOUBLE, usage_idle DOUBLE, usage_nice DOUBLE, usage_iowait DOUBLE, usage_irq DOUBLE, usage_softirq DOUBLE, usage_steal DOUBLE, usage_guest DOUBLE, usage_guest_nice DOUBLE, ts TIMESTAMP TIME INDEX, PRIMARY KEY(hostname));",
)
.await, Output::AffectedRows(0)));
let filepath = get_data_dir("../src/common/datasource/tests/csv/type_cast.csv")
.canonicalize()
.unwrap()
.display()
.to_string();
let filepath = find_testing_resource("/src/common/datasource/tests/csv/type_cast.csv");
let output = execute_sql(
&instance,

View File

@@ -12,11 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::path::PathBuf;
use std::sync::Arc;
use common_query::Output;
use common_recordbatch::util;
use common_test_util::find_workspace_path;
use frontend::instance::Instance;
use rstest_reuse::{self, template};
@@ -110,8 +110,20 @@ pub(crate) async fn check_unordered_output_stream(output: Output, expected: &str
assert_eq!(pretty_print, expected);
}
pub fn get_data_dir(path: &str) -> PathBuf {
let dir = env!("CARGO_MANIFEST_DIR");
/// Find the testing file resource under workspace root to be used in object store.
pub fn find_testing_resource(path: &str) -> String {
let p = find_workspace_path(path).display().to_string();
PathBuf::from(dir).join(path)
#[cfg(windows)]
let p = {
// We need unix style path even in the Windows, because the path is used in object-store, must
// be delimited with '/'. Inside the object-store, it will be converted to file system needed
// path in the end.
let p = p.replace('\\', "/");
// Prepend a '/' to indicate it's a file system path when parsed as object-store url in Windows.
format!("/{p}")
};
p
}

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::BTreeMap;
use std::sync::Arc;
use axum::http::StatusCode;
@@ -411,13 +412,22 @@ pub async fn test_prom_http_api(store_type: StorageType) {
assert_eq!(res.status(), StatusCode::OK);
let body = serde_json::from_str::<PrometheusJsonResponse>(&res.text().await).unwrap();
assert_eq!(body.status, "success");
assert_eq!(
body.data,
serde_json::from_value::<PrometheusResponse>(json!(
[{"__name__" : "demo","ts":"1970-01-01 00:00:00+0000","cpu":"1.1","host":"host1","memory":"2.2"}]
))
.unwrap()
);
let PrometheusResponse::Series(mut series) = body.data else {
unreachable!()
};
let actual = series
.remove(0)
.into_iter()
.collect::<BTreeMap<String, String>>();
let expected = BTreeMap::from([
("__name__".to_string(), "demo".to_string()),
("ts".to_string(), "1970-01-01 00:00:00+0000".to_string()),
("cpu".to_string(), "1.1".to_string()),
("host".to_string(), "host1".to_string()),
("memory".to_string(), "2.2".to_string()),
]);
assert_eq!(actual, expected);
let res = client
.post("/api/v1/series?match[]=up&match[]=down")

View File

@@ -36,13 +36,11 @@ use crate::util;
const METASRV_ADDR: &str = "127.0.0.1:3002";
const SERVER_ADDR: &str = "127.0.0.1:4001";
const STANDALONE_LOG_FILE: &str = "/tmp/greptime-sqlness-standalone.log";
const METASRV_LOG_FILE: &str = "/tmp/greptime-sqlness-metasrv.log";
const FRONTEND_LOG_FILE: &str = "/tmp/greptime-sqlness-frontend.log";
const DEFAULT_LOG_LEVEL: &str = "--log-level=debug,hyper=warn,tower=warn,datafusion=warn,reqwest=warn,sqlparser=warn,h2=info,opendal=info";
pub struct Env {}
pub struct Env {
data_home: PathBuf,
}
#[allow(clippy::print_stdout)]
#[async_trait]
@@ -51,8 +49,8 @@ impl EnvController for Env {
async fn start(&self, mode: &str, _config: Option<&Path>) -> Self::DB {
match mode {
"standalone" => Self::start_standalone().await,
"distributed" => Self::start_distributed().await,
"standalone" => self.start_standalone().await,
"distributed" => self.start_distributed().await,
_ => panic!("Unexpected mode: {mode}"),
}
}
@@ -65,14 +63,16 @@ impl EnvController for Env {
#[allow(clippy::print_stdout)]
impl Env {
pub async fn start_standalone() -> GreptimeDB {
pub fn new(data_home: PathBuf) -> Self {
Self { data_home }
}
async fn start_standalone(&self) -> GreptimeDB {
Self::build_db().await;
let db_ctx = GreptimeDBContext::new();
let server_process = Self::start_server("standalone", &db_ctx, true).await;
println!("Started, going to test. Log will be write to {STANDALONE_LOG_FILE}");
let server_process = self.start_server("standalone", &db_ctx, true).await;
let client = Client::with_urls(vec![SERVER_ADDR]);
let db = DB::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client);
@@ -84,22 +84,23 @@ impl Env {
client: TokioMutex::new(db),
ctx: db_ctx,
is_standalone: true,
env: Env::new(self.data_home.clone()),
}
}
pub async fn start_distributed() -> GreptimeDB {
async fn start_distributed(&self) -> GreptimeDB {
Self::build_db().await;
let db_ctx = GreptimeDBContext::new();
// start a distributed GreptimeDB
let meta_server = Env::start_server("metasrv", &db_ctx, true).await;
let meta_server = self.start_server("metasrv", &db_ctx, true).await;
let datanode_1 = Env::start_server("datanode", &db_ctx, true).await;
let datanode_2 = Env::start_server("datanode", &db_ctx, true).await;
let datanode_3 = Env::start_server("datanode", &db_ctx, true).await;
let datanode_1 = self.start_server("datanode", &db_ctx, true).await;
let datanode_2 = self.start_server("datanode", &db_ctx, true).await;
let datanode_3 = self.start_server("datanode", &db_ctx, true).await;
let frontend = Env::start_server("frontend", &db_ctx, true).await;
let frontend = self.start_server("frontend", &db_ctx, true).await;
let client = Client::with_urls(vec![SERVER_ADDR]);
let db = DB::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client);
@@ -111,6 +112,7 @@ impl Env {
client: TokioMutex::new(db),
ctx: db_ctx,
is_standalone: false,
env: Env::new(self.data_home.clone()),
}
}
@@ -120,6 +122,7 @@ impl Env {
}
async fn start_server(
&self,
subcommand: &str,
db_ctx: &GreptimeDBContext,
truncate_log: bool,
@@ -127,17 +130,15 @@ impl Env {
let log_file_name = match subcommand {
"datanode" => {
db_ctx.incr_datanode_id();
format!(
"/tmp/greptime-sqlness-datanode-{}.log",
db_ctx.datanode_id()
)
format!("greptime-sqlness-datanode-{}.log", db_ctx.datanode_id())
}
"frontend" => FRONTEND_LOG_FILE.to_string(),
"metasrv" => METASRV_LOG_FILE.to_string(),
"standalone" => STANDALONE_LOG_FILE.to_string(),
"frontend" => "greptime-sqlness-frontend.log".to_string(),
"metasrv" => "greptime-sqlness-metasrv.log".to_string(),
"standalone" => "greptime-sqlness-standalone.log".to_string(),
_ => panic!("Unexpected subcommand: {subcommand}"),
};
let log_file_name = self.data_home.join(log_file_name).display().to_string();
let log_file = OpenOptions::new()
.create(true)
.write(true)
@@ -146,15 +147,15 @@ impl Env {
.unwrap();
let (args, check_ip_addr) = match subcommand {
"datanode" => Self::datanode_start_args(db_ctx),
"datanode" => self.datanode_start_args(db_ctx),
"standalone" => {
let args = vec![
DEFAULT_LOG_LEVEL.to_string(),
subcommand.to_string(),
"start".to_string(),
"-c".to_string(),
Self::generate_config_file(subcommand, db_ctx),
"--http-addr=0.0.0.0:5001".to_string(),
self.generate_config_file(subcommand, db_ctx),
"--http-addr=127.0.0.1:5001".to_string(),
];
(args, SERVER_ADDR.to_string())
}
@@ -163,8 +164,8 @@ impl Env {
DEFAULT_LOG_LEVEL.to_string(),
subcommand.to_string(),
"start".to_string(),
"--metasrv-addr=0.0.0.0:3002".to_string(),
"--http-addr=0.0.0.0:5003".to_string(),
"--metasrv-addr=127.0.0.1:3002".to_string(),
"--http-addr=127.0.0.1:5003".to_string(),
];
(args, SERVER_ADDR.to_string())
}
@@ -174,7 +175,7 @@ impl Env {
subcommand.to_string(),
"start".to_string(),
"--use-memory-store".to_string(),
"--http-addr=0.0.0.0:5001".to_string(),
"--http-addr=127.0.0.1:5001".to_string(),
"--disable-region-failover".to_string(),
];
(args, METASRV_ADDR.to_string())
@@ -189,7 +190,12 @@ impl Env {
);
}
let mut process = Command::new("./greptime")
#[cfg(not(windows))]
let program = "./greptime";
#[cfg(windows)]
let program = "greptime.exe";
let mut process = Command::new(program)
.current_dir(util::get_binary_dir("debug"))
.args(args)
.stdout(log_file)
@@ -206,32 +212,31 @@ impl Env {
process
}
fn datanode_start_args(db_ctx: &GreptimeDBContext) -> (Vec<String>, String) {
fn datanode_start_args(&self, db_ctx: &GreptimeDBContext) -> (Vec<String>, String) {
let id = db_ctx.datanode_id();
let data_home = self
.data_home
.join(format!("greptimedb_datanode_{}_{id}", db_ctx.time));
let wal_dir = data_home.join("wal").display().to_string();
let subcommand = "datanode";
let mut args = vec![
DEFAULT_LOG_LEVEL.to_string(),
subcommand.to_string(),
"start".to_string(),
];
args.push(format!("--rpc-addr=0.0.0.0:410{id}"));
args.push(format!("--http-addr=0.0.0.0:430{id}"));
args.push(format!(
"--data-home=/tmp/greptimedb_datanode_{}",
db_ctx.time
));
args.push(format!(
"--wal-dir=/tmp/greptimedb_datanode_{}_{id}/wal",
db_ctx.time
));
args.push(format!("--rpc-addr=127.0.0.1:410{id}"));
args.push(format!("--http-addr=127.0.0.1:430{id}"));
args.push(format!("--data-home={}", data_home.display()));
args.push(format!("--wal-dir={wal_dir}"));
args.push(format!("--node-id={id}"));
args.push("--metasrv-addr=0.0.0.0:3002".to_string());
(args, format!("0.0.0.0:410{id}"))
args.push("--metasrv-addr=127.0.0.1:3002".to_string());
(args, format!("127.0.0.1:410{id}"))
}
/// stop and restart the server process
async fn restart_server(db: &GreptimeDB) {
async fn restart_server(&self, db: &GreptimeDB) {
{
let mut server_processes = db.server_processes.lock().unwrap();
for server_process in server_processes.iter_mut() {
@@ -241,14 +246,14 @@ impl Env {
// check if the server is distributed or standalone
let new_server_processes = if db.is_standalone {
let new_server_process = Env::start_server("standalone", &db.ctx, false).await;
let new_server_process = self.start_server("standalone", &db.ctx, false).await;
vec![new_server_process]
} else {
db.ctx.reset_datanode_id();
let mut processes = vec![];
for _ in 0..3 {
let new_server_process = Env::start_server("datanode", &db.ctx, false).await;
let new_server_process = self.start_server("datanode", &db.ctx, false).await;
processes.push(new_server_process);
}
processes
@@ -259,11 +264,13 @@ impl Env {
}
/// Generate config file to `/tmp/{subcommand}-{current_time}.toml`
fn generate_config_file(subcommand: &str, db_ctx: &GreptimeDBContext) -> String {
fn generate_config_file(&self, subcommand: &str, db_ctx: &GreptimeDBContext) -> String {
let mut tt = TinyTemplate::new();
let mut path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
path.push(format!("../conf/{subcommand}-test.toml.template"));
path.pop();
path.push("conf");
path.push(format!("{subcommand}-test.toml.template"));
let template = std::fs::read_to_string(path).unwrap();
tt.add_template(subcommand, &template).unwrap();
@@ -274,15 +281,24 @@ impl Env {
procedure_dir: String,
}
let greptimedb_dir = format!("/tmp/greptimedb-{subcommand}-{}", db_ctx.time);
let data_home = self
.data_home
.join(format!("greptimedb-{subcommand}-{}", db_ctx.time));
std::fs::create_dir_all(data_home.as_path()).unwrap();
let wal_dir = data_home.join("wal").display().to_string();
let procedure_dir = data_home.join("procedure").display().to_string();
let ctx = Context {
wal_dir: format!("{greptimedb_dir}/wal/"),
data_home: format!("{greptimedb_dir}/"),
procedure_dir: format!("{greptimedb_dir}/procedure/"),
wal_dir,
data_home: data_home.display().to_string(),
procedure_dir,
};
let rendered = tt.render(subcommand, &ctx).unwrap();
let conf_file = format!("/tmp/{subcommand}-{}.toml", db_ctx.time);
let conf_file = data_home
.join(format!("{subcommand}-{}.toml", db_ctx.time))
.display()
.to_string();
println!("Generating {subcommand} config file in {conf_file}, full content:\n{rendered}");
std::fs::write(&conf_file, rendered).unwrap();
@@ -313,13 +329,14 @@ pub struct GreptimeDB {
client: TokioMutex<DB>,
ctx: GreptimeDBContext,
is_standalone: bool,
env: Env,
}
#[async_trait]
impl Database for GreptimeDB {
async fn query(&self, ctx: QueryContext, query: String) -> Box<dyn Display> {
if ctx.context.contains_key("restart") {
Env::restart_server(self).await;
self.env.restart_server(self).await;
}
let mut client = self.client.lock().await;

View File

@@ -27,6 +27,11 @@ async fn main() {
"".to_string()
};
#[cfg(windows)]
let data_home = std::env::temp_dir();
#[cfg(not(windows))]
let data_home = std::path::PathBuf::from("/tmp");
let config = ConfigBuilder::default()
.case_dir(util::get_case_dir())
.fail_fast(true)
@@ -34,6 +39,6 @@ async fn main() {
.follow_links(true)
.build()
.unwrap();
let runner = Runner::new(config, Env {});
let runner = Runner::new(config, Env::new(data_home));
runner.run().await.unwrap();
}