Merge branch 'main' into transform-count-min-max

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2024-09-05 14:30:42 +08:00
51 changed files with 1354 additions and 181 deletions

65
Cargo.lock generated
View File

@@ -1950,6 +1950,8 @@ dependencies = [
"common-version",
"datafusion",
"datatypes",
"geohash",
"h3o",
"num",
"num-traits",
"once_cell",
@@ -3813,6 +3815,12 @@ dependencies = [
"num-traits",
]
[[package]]
name = "float_eq"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "28a80e3145d8ad11ba0995949bbcf48b9df2be62772b3d351ef017dff6ecb853"
[[package]]
name = "flow"
version = "0.9.2"
@@ -4211,6 +4219,27 @@ dependencies = [
"version_check",
]
[[package]]
name = "geo-types"
version = "0.7.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9ff16065e5720f376fbced200a5ae0f47ace85fd70b7e54269790281353b6d61"
dependencies = [
"approx",
"num-traits",
"serde",
]
[[package]]
name = "geohash"
version = "0.13.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0fb94b1a65401d6cbf22958a9040aa364812c26674f841bee538b12c135db1e6"
dependencies = [
"geo-types",
"libm",
]
[[package]]
name = "gethostname"
version = "0.2.3"
@@ -4301,6 +4330,25 @@ dependencies = [
"tracing",
]
[[package]]
name = "h3o"
version = "0.6.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0de3592e1f699692aa0525c42ff7879ec3ee7e36329af20967bc910a1cdc39c7"
dependencies = [
"ahash 0.8.11",
"either",
"float_eq",
"h3o-bit",
"libm",
]
[[package]]
name = "h3o-bit"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6fb45e8060378c0353781abf67e1917b545a6b710d0342d85b70c125af7ef320"
[[package]]
name = "half"
version = "1.8.3"
@@ -4717,7 +4765,7 @@ dependencies = [
"httpdate",
"itoa",
"pin-project-lite",
"socket2 0.4.10",
"socket2 0.5.7",
"tokio",
"tower-service",
"tracing",
@@ -8512,7 +8560,7 @@ dependencies = [
"indoc",
"libc",
"memoffset 0.9.1",
"parking_lot 0.11.2",
"parking_lot 0.12.3",
"portable-atomic",
"pyo3-build-config",
"pyo3-ffi",
@@ -12441,6 +12489,16 @@ dependencies = [
"web-time 0.2.4",
]
[[package]]
name = "tracing-serde"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bc6b213177105856957181934e4920de57730fc69bf42c37ee5bb664d406d9e1"
dependencies = [
"serde",
"tracing-core",
]
[[package]]
name = "tracing-subscriber"
version = "0.3.18"
@@ -12451,12 +12509,15 @@ dependencies = [
"nu-ansi-term",
"once_cell",
"regex",
"serde",
"serde_json",
"sharded-slab",
"smallvec",
"thread_local",
"tracing",
"tracing-core",
"tracing-log 0.2.0",
"tracing-serde",
]
[[package]]

View File

@@ -129,6 +129,7 @@
| `region_engine.mito.scan_parallelism` | Integer | `0` | Parallelism to scan a region (default: 1/4 of cpu cores).<br/>- `0`: using the default value (1/4 of cpu cores).<br/>- `1`: scan in current thread.<br/>- `n`: scan in parallelism n. |
| `region_engine.mito.parallel_scan_channel_size` | Integer | `32` | Capacity of the channel to send data from parallel scan tasks to the main task. |
| `region_engine.mito.allow_stale_entries` | Bool | `false` | Whether to allow stale WAL entries read during replay. |
| `region_engine.mito.min_compaction_interval` | String | `0m` | Minimum time interval between two compactions.<br/>To align with the old behavior, the default value is 0 (no restrictions). |
| `region_engine.mito.index` | -- | -- | The options for index in Mito engine. |
| `region_engine.mito.index.aux_path` | String | `""` | Auxiliary directory path for the index in filesystem, used to store intermediate files for<br/>creating the index and staging files for searching the index, defaults to `{data_home}/index_intermediate`.<br/>The default name for this directory is `index_intermediate` for backward compatibility.<br/><br/>This path contains two subdirectories:<br/>- `__intm`: for storing intermediate files used during creating index.<br/>- `staging`: for storing staging files used during searching index. |
| `region_engine.mito.index.staging_size` | String | `2GB` | The max capacity of the staging directory. |
@@ -152,11 +153,12 @@
| `region_engine.mito.memtable.fork_dictionary_bytes` | String | `1GiB` | Max dictionary bytes.<br/>Only available for `partition_tree` memtable. |
| `region_engine.file` | -- | -- | Enable the file engine. |
| `logging` | -- | -- | The logging options. |
| `logging.dir` | String | `/tmp/greptimedb/logs` | The directory to store the log files. |
| `logging.dir` | String | `/tmp/greptimedb/logs` | The directory to store the log files. If set to empty, logs will not be written to files. |
| `logging.level` | String | `None` | The log level. Can be `info`/`debug`/`warn`/`error`. |
| `logging.enable_otlp_tracing` | Bool | `false` | Enable OTLP tracing. |
| `logging.otlp_endpoint` | String | `http://localhost:4317` | The OTLP tracing endpoint. |
| `logging.append_stdout` | Bool | `true` | Whether to append logs to stdout. |
| `logging.log_format` | String | `text` | The log format. Can be `text`/`json`. |
| `logging.tracing_sample_ratio` | -- | -- | The percentage of tracing will be sampled and exported.<br/>Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.<br/>ratio > 1 are treated as 1. Fractions < 0 are treated as 0 |
| `logging.tracing_sample_ratio.default_ratio` | Float | `1.0` | -- |
| `export_metrics` | -- | -- | The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API.<br/>This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. |
@@ -237,11 +239,12 @@
| `datanode.client.connect_timeout` | String | `10s` | -- |
| `datanode.client.tcp_nodelay` | Bool | `true` | -- |
| `logging` | -- | -- | The logging options. |
| `logging.dir` | String | `/tmp/greptimedb/logs` | The directory to store the log files. |
| `logging.dir` | String | `/tmp/greptimedb/logs` | The directory to store the log files. If set to empty, logs will not be written to files. |
| `logging.level` | String | `None` | The log level. Can be `info`/`debug`/`warn`/`error`. |
| `logging.enable_otlp_tracing` | Bool | `false` | Enable OTLP tracing. |
| `logging.otlp_endpoint` | String | `http://localhost:4317` | The OTLP tracing endpoint. |
| `logging.append_stdout` | Bool | `true` | Whether to append logs to stdout. |
| `logging.log_format` | String | `text` | The log format. Can be `text`/`json`. |
| `logging.tracing_sample_ratio` | -- | -- | The percentage of tracing will be sampled and exported.<br/>Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.<br/>ratio > 1 are treated as 1. Fractions < 0 are treated as 0 |
| `logging.tracing_sample_ratio.default_ratio` | Float | `1.0` | -- |
| `export_metrics` | -- | -- | The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API.<br/>This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. |
@@ -301,11 +304,12 @@
| `wal.backoff_base` | Integer | `2` | Exponential backoff rate, i.e. next backoff = base * current backoff. |
| `wal.backoff_deadline` | String | `5mins` | Stop reconnecting if the total wait time reaches the deadline. If this config is missing, the reconnecting won't terminate. |
| `logging` | -- | -- | The logging options. |
| `logging.dir` | String | `/tmp/greptimedb/logs` | The directory to store the log files. |
| `logging.dir` | String | `/tmp/greptimedb/logs` | The directory to store the log files. If set to empty, logs will not be written to files. |
| `logging.level` | String | `None` | The log level. Can be `info`/`debug`/`warn`/`error`. |
| `logging.enable_otlp_tracing` | Bool | `false` | Enable OTLP tracing. |
| `logging.otlp_endpoint` | String | `http://localhost:4317` | The OTLP tracing endpoint. |
| `logging.append_stdout` | Bool | `true` | Whether to append logs to stdout. |
| `logging.log_format` | String | `text` | The log format. Can be `text`/`json`. |
| `logging.tracing_sample_ratio` | -- | -- | The percentage of tracing will be sampled and exported.<br/>Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.<br/>ratio > 1 are treated as 1. Fractions < 0 are treated as 0 |
| `logging.tracing_sample_ratio.default_ratio` | Float | `1.0` | -- |
| `export_metrics` | -- | -- | The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API.<br/>This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. |
@@ -428,6 +432,7 @@
| `region_engine.mito.scan_parallelism` | Integer | `0` | Parallelism to scan a region (default: 1/4 of cpu cores).<br/>- `0`: using the default value (1/4 of cpu cores).<br/>- `1`: scan in current thread.<br/>- `n`: scan in parallelism n. |
| `region_engine.mito.parallel_scan_channel_size` | Integer | `32` | Capacity of the channel to send data from parallel scan tasks to the main task. |
| `region_engine.mito.allow_stale_entries` | Bool | `false` | Whether to allow stale WAL entries read during replay. |
| `region_engine.mito.min_compaction_interval` | String | `0m` | Minimum time interval between two compactions.<br/>To align with the old behavior, the default value is 0 (no restrictions). |
| `region_engine.mito.index` | -- | -- | The options for index in Mito engine. |
| `region_engine.mito.index.aux_path` | String | `""` | Auxiliary directory path for the index in filesystem, used to store intermediate files for<br/>creating the index and staging files for searching the index, defaults to `{data_home}/index_intermediate`.<br/>The default name for this directory is `index_intermediate` for backward compatibility.<br/><br/>This path contains two subdirectories:<br/>- `__intm`: for storing intermediate files used during creating index.<br/>- `staging`: for storing staging files used during searching index. |
| `region_engine.mito.index.staging_size` | String | `2GB` | The max capacity of the staging directory. |
@@ -449,11 +454,12 @@
| `region_engine.mito.memtable.fork_dictionary_bytes` | String | `1GiB` | Max dictionary bytes.<br/>Only available for `partition_tree` memtable. |
| `region_engine.file` | -- | -- | Enable the file engine. |
| `logging` | -- | -- | The logging options. |
| `logging.dir` | String | `/tmp/greptimedb/logs` | The directory to store the log files. |
| `logging.dir` | String | `/tmp/greptimedb/logs` | The directory to store the log files. If set to empty, logs will not be written to files. |
| `logging.level` | String | `None` | The log level. Can be `info`/`debug`/`warn`/`error`. |
| `logging.enable_otlp_tracing` | Bool | `false` | Enable OTLP tracing. |
| `logging.otlp_endpoint` | String | `http://localhost:4317` | The OTLP tracing endpoint. |
| `logging.append_stdout` | Bool | `true` | Whether to append logs to stdout. |
| `logging.log_format` | String | `text` | The log format. Can be `text`/`json`. |
| `logging.tracing_sample_ratio` | -- | -- | The percentage of tracing will be sampled and exported.<br/>Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.<br/>ratio > 1 are treated as 1. Fractions < 0 are treated as 0 |
| `logging.tracing_sample_ratio.default_ratio` | Float | `1.0` | -- |
| `export_metrics` | -- | -- | The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API.<br/>This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. |
@@ -494,11 +500,12 @@
| `heartbeat.interval` | String | `3s` | Interval for sending heartbeat messages to the metasrv. |
| `heartbeat.retry_interval` | String | `3s` | Interval for retrying to send heartbeat messages to the metasrv. |
| `logging` | -- | -- | The logging options. |
| `logging.dir` | String | `/tmp/greptimedb/logs` | The directory to store the log files. |
| `logging.dir` | String | `/tmp/greptimedb/logs` | The directory to store the log files. If set to empty, logs will not be written to files. |
| `logging.level` | String | `None` | The log level. Can be `info`/`debug`/`warn`/`error`. |
| `logging.enable_otlp_tracing` | Bool | `false` | Enable OTLP tracing. |
| `logging.otlp_endpoint` | String | `http://localhost:4317` | The OTLP tracing endpoint. |
| `logging.append_stdout` | Bool | `true` | Whether to append logs to stdout. |
| `logging.log_format` | String | `text` | The log format. Can be `text`/`json`. |
| `logging.tracing_sample_ratio` | -- | -- | The percentage of tracing will be sampled and exported.<br/>Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.<br/>ratio > 1 are treated as 1. Fractions < 0 are treated as 0 |
| `logging.tracing_sample_ratio.default_ratio` | Float | `1.0` | -- |
| `tracing` | -- | -- | The tracing options. Only effect when compiled with `tokio-console` feature. |

View File

@@ -336,7 +336,7 @@ credential_path = "test"
## The credential of the google cloud storage.
## **It's only used when the storage type is `Gcs`**.
## +toml2docs:none-default
credential= "base64-credential"
credential = "base64-credential"
## The container of the azure account.
## **It's only used when the storage type is `Azblob`**.
@@ -455,6 +455,10 @@ parallel_scan_channel_size = 32
## Whether to allow stale WAL entries read during replay.
allow_stale_entries = false
## Minimum time interval between two compactions.
## To align with the old behavior, the default value is 0 (no restrictions).
min_compaction_interval = "0m"
## The options for index in Mito engine.
[region_engine.mito.index]
@@ -545,7 +549,7 @@ fork_dictionary_bytes = "1GiB"
## The logging options.
[logging]
## The directory to store the log files.
## The directory to store the log files. If set to empty, logs will not be written to files.
dir = "/tmp/greptimedb/logs"
## The log level. Can be `info`/`debug`/`warn`/`error`.
@@ -561,6 +565,9 @@ otlp_endpoint = "http://localhost:4317"
## Whether to append logs to stdout.
append_stdout = true
## The log format. Can be `text`/`json`.
log_format = "text"
## The percentage of tracing will be sampled and exported.
## Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.
## ratio > 1 are treated as 1. Fractions < 0 are treated as 0

View File

@@ -59,7 +59,7 @@ retry_interval = "3s"
## The logging options.
[logging]
## The directory to store the log files.
## The directory to store the log files. If set to empty, logs will not be written to files.
dir = "/tmp/greptimedb/logs"
## The log level. Can be `info`/`debug`/`warn`/`error`.
@@ -75,6 +75,9 @@ otlp_endpoint = "http://localhost:4317"
## Whether to append logs to stdout.
append_stdout = true
## The log format. Can be `text`/`json`.
log_format = "text"
## The percentage of tracing will be sampled and exported.
## Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.
## ratio > 1 are treated as 1. Fractions < 0 are treated as 0

View File

@@ -166,7 +166,7 @@ tcp_nodelay = true
## The logging options.
[logging]
## The directory to store the log files.
## The directory to store the log files. If set to empty, logs will not be written to files.
dir = "/tmp/greptimedb/logs"
## The log level. Can be `info`/`debug`/`warn`/`error`.
@@ -182,6 +182,9 @@ otlp_endpoint = "http://localhost:4317"
## Whether to append logs to stdout.
append_stdout = true
## The log format. Can be `text`/`json`.
log_format = "text"
## The percentage of tracing will be sampled and exported.
## Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.
## ratio > 1 are treated as 1. Fractions < 0 are treated as 0

View File

@@ -153,7 +153,7 @@ backoff_deadline = "5mins"
## The logging options.
[logging]
## The directory to store the log files.
## The directory to store the log files. If set to empty, logs will not be written to files.
dir = "/tmp/greptimedb/logs"
## The log level. Can be `info`/`debug`/`warn`/`error`.
@@ -169,6 +169,9 @@ otlp_endpoint = "http://localhost:4317"
## Whether to append logs to stdout.
append_stdout = true
## The log format. Can be `text`/`json`.
log_format = "text"
## The percentage of tracing will be sampled and exported.
## Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.
## ratio > 1 are treated as 1. Fractions < 0 are treated as 0

View File

@@ -493,6 +493,10 @@ parallel_scan_channel_size = 32
## Whether to allow stale WAL entries read during replay.
allow_stale_entries = false
## Minimum time interval between two compactions.
## To align with the old behavior, the default value is 0 (no restrictions).
min_compaction_interval = "0m"
## The options for index in Mito engine.
[region_engine.mito.index]
@@ -589,7 +593,7 @@ fork_dictionary_bytes = "1GiB"
## The logging options.
[logging]
## The directory to store the log files.
## The directory to store the log files. If set to empty, logs will not be written to files.
dir = "/tmp/greptimedb/logs"
## The log level. Can be `info`/`debug`/`warn`/`error`.
@@ -605,6 +609,9 @@ otlp_endpoint = "http://localhost:4317"
## Whether to append logs to stdout.
append_stdout = true
## The log format. Can be `text`/`json`.
log_format = "text"
## The percentage of tracing will be sampled and exported.
## Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.
## ratio > 1 are treated as 1. Fractions < 0 are treated as 0

View File

@@ -11,7 +11,9 @@ RUN apt-get update && DEBIAN_FRONTEND=noninteractive apt-get install -y \
python3.10 \
python3.10-dev \
python3-pip \
curl
curl \
mysql-client \
postgresql-client
COPY $DOCKER_BUILD_ROOT/docker/python/requirements.txt /etc/greptime/requirements.txt

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use common_error::ext::ErrorExt;
use common_error::ext::{BoxedError, ErrorExt};
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use snafu::{Location, Snafu};
@@ -38,6 +38,14 @@ pub enum Error {
location: Location,
},
#[snafu(display("Authentication source failure"))]
AuthBackend {
#[snafu(implicit)]
location: Location,
#[snafu(source)]
source: BoxedError,
},
#[snafu(display("User not found, username: {}", username))]
UserNotFound { username: String },
@@ -81,6 +89,7 @@ impl ErrorExt for Error {
Error::FileWatch { .. } => StatusCode::InvalidArguments,
Error::InternalState { .. } => StatusCode::Unexpected,
Error::Io { .. } => StatusCode::StorageUnavailable,
Error::AuthBackend { .. } => StatusCode::Internal,
Error::UserNotFound { .. } => StatusCode::UserNotFound,
Error::UnsupportedPasswordType { .. } => StatusCode::UnsupportedPasswordType,

View File

@@ -13,9 +13,11 @@
// limitations under the License.
use common_base::secrets::ExposeSecret;
use common_error::ext::BoxedError;
use snafu::{OptionExt, ResultExt};
use crate::error::{
AccessDeniedSnafu, Result, UnsupportedPasswordTypeSnafu, UserNotFoundSnafu,
AccessDeniedSnafu, AuthBackendSnafu, Result, UnsupportedPasswordTypeSnafu, UserNotFoundSnafu,
UserPasswordMismatchSnafu,
};
use crate::user_info::DefaultUserInfo;
@@ -49,6 +51,19 @@ impl MockUserProvider {
info.schema.clone_into(&mut self.schema);
info.username.clone_into(&mut self.username);
}
// this is a deliberate function to ref AuthBackendSnafu
// so that it won't get deleted in the future
pub fn ref_auth_backend_snafu(&self) -> Result<()> {
let none_option = None;
none_option
.context(UserNotFoundSnafu {
username: "no_user".to_string(),
})
.map_err(BoxedError::new)
.context(AuthBackendSnafu)
}
}
#[async_trait::async_trait]

View File

@@ -7,6 +7,10 @@ license.workspace = true
[lints]
workspace = true
[features]
default = ["geo"]
geo = ["geohash", "h3o"]
[dependencies]
api.workspace = true
arc-swap = "1.0"
@@ -23,6 +27,8 @@ common-time.workspace = true
common-version.workspace = true
datafusion.workspace = true
datatypes.workspace = true
geohash = { version = "0.13", optional = true }
h3o = { version = "0.6", optional = true }
num = "0.4"
num-traits = "0.2"
once_cell.workspace = true

View File

@@ -116,6 +116,10 @@ pub static FUNCTION_REGISTRY: Lazy<Arc<FunctionRegistry>> = Lazy::new(|| {
SystemFunction::register(&function_registry);
TableFunction::register(&function_registry);
// Geo functions
#[cfg(feature = "geo")]
crate::scalars::geo::GeoFunctions::register(&function_registry);
Arc::new(function_registry)
});

View File

@@ -15,6 +15,8 @@
pub mod aggregate;
pub(crate) mod date;
pub mod expression;
#[cfg(feature = "geo")]
pub mod geo;
pub mod matches;
pub mod math;
pub mod numpy;

View File

@@ -0,0 +1,31 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
mod geohash;
mod h3;
use geohash::GeohashFunction;
use h3::H3Function;
use crate::function_registry::FunctionRegistry;
pub(crate) struct GeoFunctions;
impl GeoFunctions {
pub fn register(registry: &FunctionRegistry) {
registry.register(Arc::new(GeohashFunction));
registry.register(Arc::new(H3Function));
}
}

View File

@@ -0,0 +1,135 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt;
use common_error::ext::{BoxedError, PlainError};
use common_error::status_code::StatusCode;
use common_query::error::{self, InvalidFuncArgsSnafu, Result};
use common_query::prelude::{Signature, TypeSignature};
use datafusion::logical_expr::Volatility;
use datatypes::prelude::ConcreteDataType;
use datatypes::scalars::ScalarVectorBuilder;
use datatypes::value::Value;
use datatypes::vectors::{MutableVector, StringVectorBuilder, VectorRef};
use geohash::Coord;
use snafu::{ensure, ResultExt};
use crate::function::{Function, FunctionContext};
/// Function that return geohash string for a given geospatial coordinate.
#[derive(Clone, Debug, Default)]
pub struct GeohashFunction;
const NAME: &str = "geohash";
impl Function for GeohashFunction {
fn name(&self) -> &str {
NAME
}
fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
Ok(ConcreteDataType::string_datatype())
}
fn signature(&self) -> Signature {
let mut signatures = Vec::new();
for coord_type in &[
ConcreteDataType::float32_datatype(),
ConcreteDataType::float64_datatype(),
] {
for resolution_type in &[
ConcreteDataType::int8_datatype(),
ConcreteDataType::int16_datatype(),
ConcreteDataType::int32_datatype(),
ConcreteDataType::int64_datatype(),
ConcreteDataType::uint8_datatype(),
ConcreteDataType::uint16_datatype(),
ConcreteDataType::uint32_datatype(),
ConcreteDataType::uint64_datatype(),
] {
signatures.push(TypeSignature::Exact(vec![
// latitude
coord_type.clone(),
// longitude
coord_type.clone(),
// resolution
resolution_type.clone(),
]));
}
}
Signature::one_of(signatures, Volatility::Stable)
}
fn eval(&self, _func_ctx: FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
ensure!(
columns.len() == 3,
InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the args is not correct, expect 3, provided : {}",
columns.len()
),
}
);
let lat_vec = &columns[0];
let lon_vec = &columns[1];
let resolution_vec = &columns[2];
let size = lat_vec.len();
let mut results = StringVectorBuilder::with_capacity(size);
for i in 0..size {
let lat = lat_vec.get(i).as_f64_lossy();
let lon = lon_vec.get(i).as_f64_lossy();
let r = match resolution_vec.get(i) {
Value::Int8(v) => v as usize,
Value::Int16(v) => v as usize,
Value::Int32(v) => v as usize,
Value::Int64(v) => v as usize,
Value::UInt8(v) => v as usize,
Value::UInt16(v) => v as usize,
Value::UInt32(v) => v as usize,
Value::UInt64(v) => v as usize,
_ => unreachable!(),
};
let result = match (lat, lon) {
(Some(lat), Some(lon)) => {
let coord = Coord { x: lon, y: lat };
let encoded = geohash::encode(coord, r)
.map_err(|e| {
BoxedError::new(PlainError::new(
format!("Geohash error: {}", e),
StatusCode::EngineExecuteQuery,
))
})
.context(error::ExecuteSnafu)?;
Some(encoded)
}
_ => None,
};
results.push(result.as_deref());
}
Ok(results.to_vector())
}
}
impl fmt::Display for GeohashFunction {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}", NAME)
}
}

View File

@@ -0,0 +1,145 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt;
use common_error::ext::{BoxedError, PlainError};
use common_error::status_code::StatusCode;
use common_query::error::{self, InvalidFuncArgsSnafu, Result};
use common_query::prelude::{Signature, TypeSignature};
use datafusion::logical_expr::Volatility;
use datatypes::prelude::ConcreteDataType;
use datatypes::scalars::ScalarVectorBuilder;
use datatypes::value::Value;
use datatypes::vectors::{MutableVector, StringVectorBuilder, VectorRef};
use h3o::{LatLng, Resolution};
use snafu::{ensure, ResultExt};
use crate::function::{Function, FunctionContext};
/// Function that returns [h3] encoding string for a given geospatial coordinate.
///
/// [h3]: https://h3geo.org/
#[derive(Clone, Debug, Default)]
pub struct H3Function;
const NAME: &str = "h3";
impl Function for H3Function {
fn name(&self) -> &str {
NAME
}
fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
Ok(ConcreteDataType::string_datatype())
}
fn signature(&self) -> Signature {
let mut signatures = Vec::new();
for coord_type in &[
ConcreteDataType::float32_datatype(),
ConcreteDataType::float64_datatype(),
] {
for resolution_type in &[
ConcreteDataType::int8_datatype(),
ConcreteDataType::int16_datatype(),
ConcreteDataType::int32_datatype(),
ConcreteDataType::int64_datatype(),
ConcreteDataType::uint8_datatype(),
ConcreteDataType::uint16_datatype(),
ConcreteDataType::uint32_datatype(),
ConcreteDataType::uint64_datatype(),
] {
signatures.push(TypeSignature::Exact(vec![
// latitude
coord_type.clone(),
// longitude
coord_type.clone(),
// resolution
resolution_type.clone(),
]));
}
}
Signature::one_of(signatures, Volatility::Stable)
}
fn eval(&self, _func_ctx: FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
ensure!(
columns.len() == 3,
InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the args is not correct, expect 3, provided : {}",
columns.len()
),
}
);
let lat_vec = &columns[0];
let lon_vec = &columns[1];
let resolution_vec = &columns[2];
let size = lat_vec.len();
let mut results = StringVectorBuilder::with_capacity(size);
for i in 0..size {
let lat = lat_vec.get(i).as_f64_lossy();
let lon = lon_vec.get(i).as_f64_lossy();
let r = match resolution_vec.get(i) {
Value::Int8(v) => v as u8,
Value::Int16(v) => v as u8,
Value::Int32(v) => v as u8,
Value::Int64(v) => v as u8,
Value::UInt8(v) => v,
Value::UInt16(v) => v as u8,
Value::UInt32(v) => v as u8,
Value::UInt64(v) => v as u8,
_ => unreachable!(),
};
let result = match (lat, lon) {
(Some(lat), Some(lon)) => {
let coord = LatLng::new(lat, lon)
.map_err(|e| {
BoxedError::new(PlainError::new(
format!("H3 error: {}", e),
StatusCode::EngineExecuteQuery,
))
})
.context(error::ExecuteSnafu)?;
let r = Resolution::try_from(r)
.map_err(|e| {
BoxedError::new(PlainError::new(
format!("H3 error: {}", e),
StatusCode::EngineExecuteQuery,
))
})
.context(error::ExecuteSnafu)?;
let encoded = coord.to_cell(r).to_string();
Some(encoded)
}
_ => None,
};
results.push(result.as_deref());
}
Ok(results.to_vector())
}
}
impl fmt::Display for H3Function {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}", NAME)
}
}

View File

@@ -172,12 +172,13 @@ impl ErrorExt for Error {
Error::DataTypes { .. }
| Error::CreateRecordBatches { .. }
| Error::PollStream { .. }
| Error::Format { .. }
| Error::ToArrowScalar { .. }
| Error::ProjectArrowRecordBatch { .. }
| Error::PhysicalExpr { .. } => StatusCode::Internal,
Error::PollStream { .. } => StatusCode::EngineExecuteQuery,
Error::ArrowCompute { .. } => StatusCode::IllegalState,
Error::ColumnNotExists { .. } => StatusCode::TableColumnNotFound,

View File

@@ -34,4 +34,4 @@ tracing = "0.1"
tracing-appender = "0.2"
tracing-log = "0.1"
tracing-opentelemetry = "0.22.0"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
tracing-subscriber = { version = "0.3", features = ["env-filter", "json", "fmt"] }

View File

@@ -21,7 +21,7 @@ mod panic_hook;
pub mod tracing_context;
mod tracing_sampler;
pub use logging::{init_default_ut_logging, init_global_logging};
pub use logging::{init_default_ut_logging, init_global_logging, RELOAD_HANDLE};
pub use metric::dump_metrics;
pub use panic_hook::set_panic_hook;
pub use {common_error, tracing};
pub use {common_error, tracing, tracing_subscriber};

View File

@@ -16,7 +16,7 @@
use std::env;
use std::sync::{Arc, Mutex, Once};
use once_cell::sync::Lazy;
use once_cell::sync::{Lazy, OnceCell};
use opentelemetry::{global, KeyValue};
use opentelemetry_otlp::WithExportConfig;
use opentelemetry_sdk::propagation::TraceContextPropagator;
@@ -26,6 +26,7 @@ use serde::{Deserialize, Serialize};
use tracing_appender::non_blocking::WorkerGuard;
use tracing_appender::rolling::{RollingFileAppender, Rotation};
use tracing_log::LogTracer;
use tracing_subscriber::filter::Targets;
use tracing_subscriber::fmt::Layer;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::prelude::*;
@@ -35,15 +36,41 @@ use crate::tracing_sampler::{create_sampler, TracingSampleOptions};
pub const DEFAULT_OTLP_ENDPOINT: &str = "http://localhost:4317";
// Handle for reloading log level
pub static RELOAD_HANDLE: OnceCell<tracing_subscriber::reload::Handle<Targets, Registry>> =
OnceCell::new();
/// The logging options that used to initialize the logger.
#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(default)]
pub struct LoggingOptions {
/// The directory to store log files. If not set, logs will be written to stdout.
pub dir: String,
/// The log level that can be one of "trace", "debug", "info", "warn", "error". Default is "info".
pub level: Option<String>,
pub enable_otlp_tracing: bool,
pub otlp_endpoint: Option<String>,
pub tracing_sample_ratio: Option<TracingSampleOptions>,
/// The log format that can be one of "json" or "text". Default is "text".
pub log_format: LogFormat,
/// Whether to append logs to stdout. Default is true.
pub append_stdout: bool,
/// Whether to enable tracing with OTLP. Default is false.
pub enable_otlp_tracing: bool,
/// The endpoint of OTLP. Default is "http://localhost:4317".
pub otlp_endpoint: Option<String>,
/// The tracing sample ratio.
pub tracing_sample_ratio: Option<TracingSampleOptions>,
}
#[derive(Clone, Debug, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum LogFormat {
Json,
Text,
}
impl PartialEq for LoggingOptions {
@@ -64,6 +91,7 @@ impl Default for LoggingOptions {
Self {
dir: "/tmp/greptimedb/logs".to_string(),
level: None,
log_format: LogFormat::Text,
enable_otlp_tracing: false,
otlp_endpoint: None,
tracing_sample_ratio: None,
@@ -128,62 +156,103 @@ pub fn init_global_logging(
let mut guards = vec![];
START.call_once(|| {
let dir = &opts.dir;
let level = &opts.level;
let enable_otlp_tracing = opts.enable_otlp_tracing;
// Enable log compatible layer to convert log record to tracing span.
LogTracer::init().expect("log tracer must be valid");
// stdout log layer.
// Configure the stdout logging layer.
let stdout_logging_layer = if opts.append_stdout {
let (stdout_writer, stdout_guard) = tracing_appender::non_blocking(std::io::stdout());
guards.push(stdout_guard);
let (writer, guard) = tracing_appender::non_blocking(std::io::stdout());
guards.push(guard);
Some(
Layer::new()
.with_writer(stdout_writer)
.with_ansi(atty::is(atty::Stream::Stdout)),
)
if opts.log_format == LogFormat::Json {
Some(
Layer::new()
.json()
.with_writer(writer)
.with_ansi(atty::is(atty::Stream::Stdout))
.boxed(),
)
} else {
Some(
Layer::new()
.with_writer(writer)
.with_ansi(atty::is(atty::Stream::Stdout))
.boxed(),
)
}
} else {
None
};
// file log layer.
let rolling_appender = RollingFileAppender::new(Rotation::HOURLY, dir, app_name);
let (rolling_writer, rolling_writer_guard) =
tracing_appender::non_blocking(rolling_appender);
let file_logging_layer = Layer::new().with_writer(rolling_writer).with_ansi(false);
guards.push(rolling_writer_guard);
// Configure the file logging layer with rolling policy.
let file_logging_layer = if !opts.dir.is_empty() {
let rolling_appender =
RollingFileAppender::new(Rotation::HOURLY, &opts.dir, "greptimedb");
let (writer, guard) = tracing_appender::non_blocking(rolling_appender);
guards.push(guard);
// error file log layer.
let err_rolling_appender =
RollingFileAppender::new(Rotation::HOURLY, dir, format!("{}-{}", app_name, "err"));
let (err_rolling_writer, err_rolling_writer_guard) =
tracing_appender::non_blocking(err_rolling_appender);
let err_file_logging_layer = Layer::new()
.with_writer(err_rolling_writer)
.with_ansi(false);
guards.push(err_rolling_writer_guard);
if opts.log_format == LogFormat::Json {
Some(
Layer::new()
.json()
.with_writer(writer)
.with_ansi(false)
.boxed(),
)
} else {
Some(Layer::new().with_writer(writer).with_ansi(false).boxed())
}
} else {
None
};
// Configure the error file logging layer with rolling policy.
let err_file_logging_layer = if !opts.dir.is_empty() {
let rolling_appender =
RollingFileAppender::new(Rotation::HOURLY, &opts.dir, "greptimedb-err");
let (writer, guard) = tracing_appender::non_blocking(rolling_appender);
guards.push(guard);
if opts.log_format == LogFormat::Json {
Some(
Layer::new()
.json()
.with_writer(writer)
.with_ansi(false)
.with_filter(filter::LevelFilter::ERROR)
.boxed(),
)
} else {
Some(
Layer::new()
.with_writer(writer)
.with_ansi(false)
.with_filter(filter::LevelFilter::ERROR)
.boxed(),
)
}
} else {
None
};
// resolve log level settings from:
// - options from command line or config files
// - environment variable: RUST_LOG
// - default settings
let rust_log_env = std::env::var(EnvFilter::DEFAULT_ENV).ok();
let targets_string = level
let filter = opts
.level
.as_deref()
.or(rust_log_env.as_deref())
.unwrap_or(DEFAULT_LOG_TARGETS);
let filter = targets_string
.or(env::var(EnvFilter::DEFAULT_ENV).ok().as_deref())
.unwrap_or(DEFAULT_LOG_TARGETS)
.parse::<filter::Targets>()
.expect("error parsing log level string");
let sampler = opts
.tracing_sample_ratio
.as_ref()
.map(create_sampler)
.map(Sampler::ParentBased)
.unwrap_or(Sampler::ParentBased(Box::new(Sampler::AlwaysOn)));
let (dyn_filter, reload_handle) = tracing_subscriber::reload::Layer::new(filter.clone());
RELOAD_HANDLE
.set(reload_handle)
.expect("reload handle already set, maybe init_global_logging get called twice?");
// Must enable 'tokio_unstable' cfg to use this feature.
// For example: `RUSTFLAGS="--cfg tokio_unstable" cargo run -F common-telemetry/console -- standalone start`
#[cfg(feature = "tokio-console")]
@@ -204,65 +273,70 @@ pub fn init_global_logging(
None
};
let stdout_logging_layer = stdout_logging_layer.map(|x| x.with_filter(filter.clone()));
let file_logging_layer = file_logging_layer.with_filter(filter);
Registry::default()
.with(dyn_filter)
.with(tokio_console_layer)
.with(stdout_logging_layer)
.with(file_logging_layer)
.with(err_file_logging_layer.with_filter(filter::LevelFilter::ERROR))
.with(err_file_logging_layer)
};
// consume the `tracing_opts`, to avoid "unused" warnings
// consume the `tracing_opts` to avoid "unused" warnings.
let _ = tracing_opts;
#[cfg(not(feature = "tokio-console"))]
let subscriber = Registry::default()
.with(filter)
.with(dyn_filter)
.with(stdout_logging_layer)
.with(file_logging_layer)
.with(err_file_logging_layer.with_filter(filter::LevelFilter::ERROR));
.with(err_file_logging_layer);
if enable_otlp_tracing {
if opts.enable_otlp_tracing {
global::set_text_map_propagator(TraceContextPropagator::new());
// otlp exporter
let sampler = opts
.tracing_sample_ratio
.as_ref()
.map(create_sampler)
.map(Sampler::ParentBased)
.unwrap_or(Sampler::ParentBased(Box::new(Sampler::AlwaysOn)));
let trace_config = opentelemetry_sdk::trace::config()
.with_sampler(sampler)
.with_resource(opentelemetry_sdk::Resource::new(vec![
KeyValue::new(resource::SERVICE_NAME, app_name.to_string()),
KeyValue::new(
resource::SERVICE_INSTANCE_ID,
node_id.unwrap_or("none".to_string()),
),
KeyValue::new(resource::SERVICE_VERSION, env!("CARGO_PKG_VERSION")),
KeyValue::new(resource::PROCESS_PID, std::process::id().to_string()),
]));
let exporter = opentelemetry_otlp::new_exporter().tonic().with_endpoint(
opts.otlp_endpoint
.as_ref()
.map(|e| {
if e.starts_with("http") {
e.to_string()
} else {
format!("http://{}", e)
}
})
.unwrap_or(DEFAULT_OTLP_ENDPOINT.to_string()),
);
let tracer = opentelemetry_otlp::new_pipeline()
.tracing()
.with_exporter(
opentelemetry_otlp::new_exporter().tonic().with_endpoint(
opts.otlp_endpoint
.as_ref()
.map(|e| {
if e.starts_with("http") {
e.to_string()
} else {
format!("http://{}", e)
}
})
.unwrap_or(DEFAULT_OTLP_ENDPOINT.to_string()),
),
)
.with_trace_config(
opentelemetry_sdk::trace::config()
.with_sampler(sampler)
.with_resource(opentelemetry_sdk::Resource::new(vec![
KeyValue::new(resource::SERVICE_NAME, app_name.to_string()),
KeyValue::new(
resource::SERVICE_INSTANCE_ID,
node_id.unwrap_or("none".to_string()),
),
KeyValue::new(resource::SERVICE_VERSION, env!("CARGO_PKG_VERSION")),
KeyValue::new(resource::PROCESS_PID, std::process::id().to_string()),
])),
)
.with_exporter(exporter)
.with_trace_config(trace_config)
.install_batch(opentelemetry_sdk::runtime::Tokio)
.expect("otlp tracer install failed");
let tracing_layer = Some(tracing_opentelemetry::layer().with_tracer(tracer));
let subscriber = subscriber.with(tracing_layer);
tracing::subscriber::set_global_default(subscriber)
.expect("error setting global tracing subscriber");
tracing::subscriber::set_global_default(
subscriber.with(tracing_opentelemetry::layer().with_tracer(tracer)),
)
.expect("error setting global tracing subscriber");
} else {
tracing::subscriber::set_global_default(subscriber)
.expect("error setting global tracing subscriber");

View File

@@ -268,6 +268,23 @@ impl Value {
}
}
/// Cast Value to f64. Return None if it's not castable;
pub fn as_f64_lossy(&self) -> Option<f64> {
match self {
Value::Float32(v) => Some(v.0 as _),
Value::Float64(v) => Some(v.0),
Value::Int8(v) => Some(*v as _),
Value::Int16(v) => Some(*v as _),
Value::Int32(v) => Some(*v as _),
Value::Int64(v) => Some(*v as _),
Value::UInt8(v) => Some(*v as _),
Value::UInt16(v) => Some(*v as _),
Value::UInt32(v) => Some(*v as _),
Value::UInt64(v) => Some(*v as _),
_ => None,
}
}
/// Returns the logical type of the value.
pub fn logical_type_id(&self) -> LogicalTypeId {
match self {

View File

@@ -148,6 +148,7 @@ impl HeartbeatHandler for CollectStatsHandler {
mod tests {
use std::sync::Arc;
use common_meta::cache_invalidator::DummyCacheInvalidator;
use common_meta::key::TableMetadataManager;
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::sequence::SequenceBuilder;
@@ -184,6 +185,7 @@ mod tests {
election: None,
is_infancy: false,
table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend.clone())),
cache_invalidator: Arc::new(DummyCacheInvalidator),
};
let handler = CollectStatsHandler::default();

View File

@@ -49,6 +49,7 @@ mod tests {
use std::sync::Arc;
use api::v1::meta::{HeartbeatResponse, RequestHeader};
use common_meta::cache_invalidator::DummyCacheInvalidator;
use common_meta::key::TableMetadataManager;
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::sequence::SequenceBuilder;
@@ -85,6 +86,7 @@ mod tests {
election: None,
is_infancy: false,
table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend.clone())),
cache_invalidator: Arc::new(DummyCacheInvalidator),
};
let req = HeartbeatRequest {

View File

@@ -25,6 +25,7 @@ use common_base::Plugins;
use common_config::Configurable;
use common_greptimedb_telemetry::GreptimeDBTelemetryTask;
use common_grpc::channel_manager;
use common_meta::cache_invalidator::CacheInvalidatorRef;
use common_meta::ddl::ProcedureExecutorRef;
use common_meta::key::TableMetadataManagerRef;
use common_meta::kv_backend::{KvBackendRef, ResettableKvBackend, ResettableKvBackendRef};
@@ -212,6 +213,7 @@ pub struct Context {
pub election: Option<ElectionRef>,
pub is_infancy: bool,
pub table_metadata_manager: TableMetadataManagerRef,
pub cache_invalidator: CacheInvalidatorRef,
}
impl Context {
@@ -376,6 +378,7 @@ pub struct Metasrv {
greptimedb_telemetry_task: Arc<GreptimeDBTelemetryTask>,
region_migration_manager: RegionMigrationManagerRef,
region_supervisor_ticker: Option<RegionSupervisorTickerRef>,
cache_invalidator: CacheInvalidatorRef,
plugins: Plugins,
}
@@ -617,6 +620,7 @@ impl Metasrv {
let mailbox = self.mailbox.clone();
let election = self.election.clone();
let table_metadata_manager = self.table_metadata_manager.clone();
let cache_invalidator = self.cache_invalidator.clone();
Context {
server_addr,
@@ -628,6 +632,7 @@ impl Metasrv {
election,
is_infancy: false,
table_metadata_manager,
cache_invalidator,
}
}
}

View File

@@ -319,6 +319,7 @@ impl MetasrvBuilder {
region_failure_detector_controller.clone(),
mailbox.clone(),
options.server_addr.clone(),
cache_invalidator.clone(),
),
));
region_migration_manager.try_start()?;
@@ -346,7 +347,7 @@ impl MetasrvBuilder {
DdlManager::try_new(
DdlContext {
node_manager,
cache_invalidator,
cache_invalidator: cache_invalidator.clone(),
memory_region_keeper: memory_region_keeper.clone(),
table_metadata_manager: table_metadata_manager.clone(),
table_metadata_allocator: table_metadata_allocator.clone(),
@@ -434,6 +435,7 @@ impl MetasrvBuilder {
memory_region_keeper,
region_migration_manager,
region_supervisor_ticker,
cache_invalidator,
})
}
}

View File

@@ -27,10 +27,10 @@ use std::any::Any;
use std::fmt::Debug;
use std::time::Duration;
use api::v1::meta::MailboxMessage;
use common_error::ext::BoxedError;
use common_meta::cache_invalidator::CacheInvalidatorRef;
use common_meta::ddl::RegionFailureDetectorControllerRef;
use common_meta::instruction::{CacheIdent, Instruction};
use common_meta::instruction::CacheIdent;
use common_meta::key::datanode_table::{DatanodeTableKey, DatanodeTableValue};
use common_meta::key::table_info::TableInfoValue;
use common_meta::key::table_route::TableRouteValue;
@@ -52,7 +52,7 @@ use tokio::time::Instant;
use self::migration_start::RegionMigrationStart;
use crate::error::{self, Result};
use crate::service::mailbox::{BroadcastChannel, MailboxRef};
use crate::service::mailbox::MailboxRef;
/// It's shared in each step and available even after recovering.
///
@@ -158,6 +158,7 @@ pub struct DefaultContextFactory {
region_failure_detector_controller: RegionFailureDetectorControllerRef,
mailbox: MailboxRef,
server_addr: String,
cache_invalidator: CacheInvalidatorRef,
}
impl DefaultContextFactory {
@@ -168,6 +169,7 @@ impl DefaultContextFactory {
region_failure_detector_controller: RegionFailureDetectorControllerRef,
mailbox: MailboxRef,
server_addr: String,
cache_invalidator: CacheInvalidatorRef,
) -> Self {
Self {
volatile_ctx: VolatileContext::default(),
@@ -176,6 +178,7 @@ impl DefaultContextFactory {
region_failure_detector_controller,
mailbox,
server_addr,
cache_invalidator,
}
}
}
@@ -190,6 +193,7 @@ impl ContextFactory for DefaultContextFactory {
region_failure_detector_controller: self.region_failure_detector_controller,
mailbox: self.mailbox,
server_addr: self.server_addr,
cache_invalidator: self.cache_invalidator,
}
}
}
@@ -203,6 +207,7 @@ pub struct Context {
region_failure_detector_controller: RegionFailureDetectorControllerRef,
mailbox: MailboxRef,
server_addr: String,
cache_invalidator: CacheInvalidatorRef,
}
impl Context {
@@ -356,22 +361,13 @@ impl Context {
/// Broadcasts the invalidate table cache message.
pub async fn invalidate_table_cache(&self) -> Result<()> {
let table_id = self.region_id().table_id();
let instruction = Instruction::InvalidateCaches(vec![CacheIdent::TableId(table_id)]);
let msg = &MailboxMessage::json_message(
"Invalidate Table Cache",
&format!("Metasrv@{}", self.server_addr()),
"Frontend broadcast",
common_time::util::current_time_millis(),
&instruction,
)
.with_context(|_| error::SerializeToJsonSnafu {
input: instruction.to_string(),
})?;
self.mailbox
.broadcast(&BroadcastChannel::Frontend, msg)
.await
// ignore the result
let ctx = common_meta::cache_invalidator::Context::default();
let _ = self
.cache_invalidator
.invalidate(&ctx, &[CacheIdent::TableId(table_id)])
.await;
Ok(())
}
}
@@ -497,6 +493,7 @@ mod tests {
use std::sync::Arc;
use common_meta::distributed_time_constants::REGION_LEASE_SECS;
use common_meta::instruction::Instruction;
use common_meta::key::test_utils::new_test_table_info;
use common_meta::rpc::router::{Region, RegionRoute};

View File

@@ -48,8 +48,10 @@ use super::manager::RegionMigrationProcedureTracker;
use super::migration_abort::RegionMigrationAbort;
use super::upgrade_candidate_region::UpgradeCandidateRegion;
use super::{Context, ContextFactory, DefaultContextFactory, State, VolatileContext};
use crate::cache_invalidator::MetasrvCacheInvalidator;
use crate::error::{self, Error, Result};
use crate::handler::{HeartbeatMailbox, Pusher, Pushers};
use crate::metasrv::MetasrvInfo;
use crate::procedure::region_migration::downgrade_leader_region::DowngradeLeaderRegion;
use crate::procedure::region_migration::migration_end::RegionMigrationEnd;
use crate::procedure::region_migration::open_candidate_region::OpenCandidateRegion;
@@ -152,6 +154,12 @@ impl TestingEnv {
mailbox: self.mailbox_ctx.mailbox().clone(),
server_addr: self.server_addr.to_string(),
region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl),
cache_invalidator: Arc::new(MetasrvCacheInvalidator::new(
self.mailbox_ctx.mailbox.clone(),
MetasrvInfo {
server_addr: self.server_addr.to_string(),
},
)),
}
}

View File

@@ -160,8 +160,12 @@ impl CompactionScheduler {
self.listener.clone(),
);
self.region_status.insert(region_id, status);
self.schedule_compaction_request(request, compact_options)
.await
let result = self
.schedule_compaction_request(request, compact_options)
.await;
self.listener.on_compaction_scheduled(region_id);
result
}
/// Notifies the scheduler that the compaction job is finished successfully.

View File

@@ -122,6 +122,11 @@ pub struct MitoConfig {
/// Memtable config
pub memtable: MemtableConfig,
/// Minimum time interval between two compactions.
/// To align with the old behavior, the default value is 0 (no restrictions).
#[serde(with = "humantime_serde")]
pub min_compaction_interval: Duration,
}
impl Default for MitoConfig {
@@ -152,6 +157,7 @@ impl Default for MitoConfig {
inverted_index: InvertedIndexConfig::default(),
fulltext_index: FulltextIndexConfig::default(),
memtable: MemtableConfig::default(),
min_compaction_interval: Duration::from_secs(0),
};
// Adjust buffer and cache size according to system memory if we can.

View File

@@ -15,6 +15,7 @@
use std::sync::{Arc, Mutex};
use std::time::Duration;
use common_time::util::current_time_millis;
use object_store::ObjectStore;
use store_api::region_engine::RegionEngine;
use store_api::region_request::RegionRequest;
@@ -22,6 +23,7 @@ use store_api::storage::RegionId;
use tokio::sync::{oneshot, Barrier};
use crate::config::MitoConfig;
use crate::engine::flush_test::MockTimeProvider;
use crate::engine::listener::EventListener;
use crate::engine::MitoEngine;
use crate::manifest::action::RegionEdit;
@@ -29,6 +31,84 @@ use crate::region::MitoRegionRef;
use crate::sst::file::{FileId, FileMeta};
use crate::test_util::{CreateRequestBuilder, TestEnv};
#[tokio::test]
async fn test_edit_region_schedule_compaction() {
let mut env = TestEnv::new();
struct EditRegionListener {
tx: Mutex<Option<oneshot::Sender<RegionId>>>,
}
impl EventListener for EditRegionListener {
fn on_compaction_scheduled(&self, region_id: RegionId) {
let mut tx = self.tx.lock().unwrap();
tx.take().unwrap().send(region_id).unwrap();
}
}
let (tx, mut rx) = oneshot::channel();
let config = MitoConfig {
min_compaction_interval: Duration::from_secs(60 * 60),
..Default::default()
};
let time_provider = Arc::new(MockTimeProvider::new(current_time_millis()));
let engine = env
.create_engine_with_time(
config.clone(),
None,
Some(Arc::new(EditRegionListener {
tx: Mutex::new(Some(tx)),
})),
time_provider.clone(),
)
.await;
let region_id = RegionId::new(1, 1);
engine
.handle_request(
region_id,
RegionRequest::Create(CreateRequestBuilder::new().build()),
)
.await
.unwrap();
let region = engine.get_region(region_id).unwrap();
let new_edit = || RegionEdit {
files_to_add: vec![FileMeta {
region_id: region.region_id,
file_id: FileId::random(),
level: 0,
..Default::default()
}],
files_to_remove: vec![],
compaction_time_window: None,
flushed_entry_id: None,
flushed_sequence: None,
};
engine
.edit_region(region.region_id, new_edit())
.await
.unwrap();
// Asserts that the compaction of the region is not scheduled,
// because the minimum time interval between two compactions is not passed.
assert_eq!(rx.try_recv(), Err(oneshot::error::TryRecvError::Empty));
// Simulates the time has passed the min compaction interval,
time_provider
.set_now(current_time_millis() + config.min_compaction_interval.as_millis() as i64);
// ... then edits the region again,
engine
.edit_region(region.region_id, new_edit())
.await
.unwrap();
// ... finally asserts that the compaction of the region is scheduled.
let actual = tokio::time::timeout(Duration::from_secs(9), rx)
.await
.unwrap()
.unwrap();
assert_eq!(region_id, actual);
}
#[tokio::test]
async fn test_edit_region_fill_cache() {
let mut env = TestEnv::new();
@@ -151,7 +231,13 @@ async fn test_edit_region_concurrently() {
}
let mut env = TestEnv::new();
let engine = env.create_engine(MitoConfig::default()).await;
let engine = env
.create_engine(MitoConfig {
// Suppress the compaction to not impede the speed of this kinda stress testing.
min_compaction_interval: Duration::from_secs(60 * 60),
..Default::default()
})
.await;
let region_id = RegionId::new(1, 1);
engine

View File

@@ -306,7 +306,7 @@ async fn test_flush_reopen_region(factory: Option<LogStoreFactory>) {
}
#[derive(Debug)]
struct MockTimeProvider {
pub(crate) struct MockTimeProvider {
now: AtomicI64,
elapsed: AtomicI64,
}
@@ -326,14 +326,14 @@ impl TimeProvider for MockTimeProvider {
}
impl MockTimeProvider {
fn new(now: i64) -> Self {
pub(crate) fn new(now: i64) -> Self {
Self {
now: AtomicI64::new(now),
elapsed: AtomicI64::new(0),
}
}
fn set_now(&self, now: i64) {
pub(crate) fn set_now(&self, now: i64) {
self.now.store(now, Ordering::Relaxed);
}

View File

@@ -66,6 +66,9 @@ pub trait EventListener: Send + Sync {
/// Notifies the listener that the file cache is filled when, for example, editing region.
fn on_file_cache_filled(&self, _file_id: FileId) {}
/// Notifies the listener that the compaction is scheduled.
fn on_compaction_scheduled(&self, _region_id: RegionId) {}
}
pub type EventListenerRef = Arc<dyn EventListener>;

View File

@@ -20,7 +20,7 @@ use std::time::{Duration, Instant};
use common_error::ext::BoxedError;
use common_recordbatch::SendableRecordBatchStream;
use common_telemetry::{debug, error, warn};
use common_telemetry::{debug, error, tracing, warn};
use common_time::range::TimestampRange;
use common_time::Timestamp;
use datafusion::physical_plan::DisplayFormatType;
@@ -62,6 +62,7 @@ pub(crate) enum Scanner {
impl Scanner {
/// Returns a [SendableRecordBatchStream] to retrieve scan results from all partitions.
#[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
pub(crate) async fn scan(&self) -> Result<SendableRecordBatchStream, BoxedError> {
match self {
Scanner::Seq(seq_scan) => seq_scan.build_stream(),
@@ -70,6 +71,7 @@ impl Scanner {
}
/// Returns a [RegionScanner] to scan the region.
#[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
pub(crate) async fn region_scanner(self) -> Result<RegionScannerRef> {
match self {
Scanner::Seq(seq_scan) => Ok(Box::new(seq_scan)),
@@ -284,9 +286,10 @@ impl ScanRegion {
.collect();
debug!(
"Scan region {}, request: {:?}, memtables: {}, ssts_to_read: {}, append_mode: {}",
"Scan region {}, request: {:?}, time range: {:?}, memtables: {}, ssts_to_read: {}, append_mode: {}",
self.version.metadata.region_id,
self.request,
time_range,
memtables.len(),
files.len(),
self.version.options.append_mode,

View File

@@ -23,7 +23,7 @@ use common_error::ext::BoxedError;
use common_recordbatch::error::ExternalSnafu;
use common_recordbatch::util::ChainedRecordBatchStream;
use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
use common_telemetry::debug;
use common_telemetry::{debug, tracing};
use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
use datatypes::schema::SchemaRef;
use smallvec::smallvec;
@@ -112,6 +112,7 @@ impl SeqScan {
self.semaphore.clone(),
&mut metrics,
self.compaction,
self.properties.num_partitions(),
)
.await?;
// Safety: `build_merge_reader()` always returns a reader if partition is None.
@@ -184,10 +185,11 @@ impl SeqScan {
semaphore: Arc<Semaphore>,
metrics: &mut ScannerMetrics,
compaction: bool,
parallelism: usize,
) -> Result<Option<BoxedBatchReader>> {
// initialize parts list
let mut parts = stream_ctx.parts.lock().await;
Self::maybe_init_parts(&stream_ctx.input, &mut parts, metrics).await?;
Self::maybe_init_parts(&stream_ctx.input, &mut parts, metrics, parallelism).await?;
let parts_len = parts.0.len();
let mut sources = Vec::with_capacity(parts_len);
@@ -211,11 +213,12 @@ impl SeqScan {
semaphore: Arc<Semaphore>,
metrics: &mut ScannerMetrics,
compaction: bool,
parallelism: usize,
) -> Result<Option<BoxedBatchReader>> {
let mut sources = Vec::new();
let build_start = {
let mut parts = stream_ctx.parts.lock().await;
Self::maybe_init_parts(&stream_ctx.input, &mut parts, metrics).await?;
Self::maybe_init_parts(&stream_ctx.input, &mut parts, metrics, parallelism).await?;
let Some(part) = parts.0.get_part(range_id) else {
return Ok(None);
@@ -245,6 +248,7 @@ impl SeqScan {
maybe_reader
}
#[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
async fn build_reader_from_sources(
stream_ctx: &StreamContext,
mut sources: Vec<Source>,
@@ -311,12 +315,13 @@ impl SeqScan {
let semaphore = self.semaphore.clone();
let partition_ranges = self.properties.partitions[partition].clone();
let compaction = self.compaction;
let parallelism = self.properties.num_partitions();
let stream = try_stream! {
let first_poll = stream_ctx.query_start.elapsed();
for partition_range in partition_ranges {
let maybe_reader =
Self::build_merge_reader(&stream_ctx, partition_range.identifier, semaphore.clone(), &mut metrics, compaction)
Self::build_merge_reader(&stream_ctx, partition_range.identifier, semaphore.clone(), &mut metrics, compaction, parallelism)
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
@@ -390,6 +395,7 @@ impl SeqScan {
let stream_ctx = self.stream_ctx.clone();
let semaphore = self.semaphore.clone();
let compaction = self.compaction;
let parallelism = self.properties.num_partitions();
// build stream
let stream = try_stream! {
@@ -398,7 +404,7 @@ impl SeqScan {
// init parts
let parts_len = {
let mut parts = stream_ctx.parts.lock().await;
Self::maybe_init_parts(&stream_ctx.input, &mut parts, &mut metrics).await
Self::maybe_init_parts(&stream_ctx.input, &mut parts, &mut metrics, parallelism).await
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
parts.0.len()
@@ -411,6 +417,7 @@ impl SeqScan {
semaphore.clone(),
&mut metrics,
compaction,
parallelism
)
.await
.map_err(BoxedError::new)
@@ -467,6 +474,7 @@ impl SeqScan {
input: &ScanInput,
part_list: &mut (ScanPartList, Duration),
metrics: &mut ScannerMetrics,
parallelism: usize,
) -> Result<()> {
if part_list.0.is_none() {
let now = Instant::now();
@@ -477,9 +485,7 @@ impl SeqScan {
Some(input.mapper.column_ids()),
input.predicate.clone(),
);
part_list
.0
.set_parts(distributor.build_parts(input.parallelism.parallelism));
part_list.0.set_parts(distributor.build_parts(parallelism));
let build_part_cost = now.elapsed();
part_list.1 = build_part_cost;

View File

@@ -59,10 +59,11 @@ impl UnorderedScan {
/// Creates a new [UnorderedScan].
pub(crate) fn new(input: ScanInput) -> Self {
let parallelism = input.parallelism.parallelism.max(1);
let properties = ScannerProperties::default()
let mut properties = ScannerProperties::default()
.with_parallelism(parallelism)
.with_append_mode(input.append_mode)
.with_total_rows(input.total_rows());
properties.partitions = vec![input.partition_ranges()];
let stream_ctx = Arc::new(StreamContext::new(input));
Self {
@@ -148,12 +149,13 @@ impl RegionScanner for UnorderedScan {
..Default::default()
};
let stream_ctx = self.stream_ctx.clone();
let parallelism = self.properties.num_partitions();
let stream = try_stream! {
let first_poll = stream_ctx.query_start.elapsed();
let part = {
let mut parts = stream_ctx.parts.lock().await;
maybe_init_parts(&stream_ctx.input, &mut parts, &mut metrics)
maybe_init_parts(&stream_ctx.input, &mut parts, &mut metrics, parallelism)
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
@@ -260,6 +262,7 @@ async fn maybe_init_parts(
input: &ScanInput,
part_list: &mut (ScanPartList, Duration),
metrics: &mut ScannerMetrics,
parallelism: usize,
) -> Result<()> {
if part_list.0.is_none() {
let now = Instant::now();
@@ -270,9 +273,7 @@ async fn maybe_init_parts(
Some(input.mapper.column_ids()),
input.predicate.clone(),
);
part_list
.0
.set_parts(distributor.build_parts(input.parallelism.parallelism));
part_list.0.set_parts(distributor.build_parts(parallelism));
let build_part_cost = now.elapsed();
part_list.1 = build_part_cost;

View File

@@ -102,6 +102,8 @@ pub(crate) struct MitoRegion {
pub(crate) provider: Provider,
/// Last flush time in millis.
last_flush_millis: AtomicI64,
/// Last compaction time in millis.
last_compaction_millis: AtomicI64,
/// Provider to get current time.
time_provider: TimeProviderRef,
/// Memtable builder for the region.
@@ -151,6 +153,17 @@ impl MitoRegion {
self.last_flush_millis.store(now, Ordering::Relaxed);
}
/// Return last compaction time in millis.
pub(crate) fn last_compaction_millis(&self) -> i64 {
self.last_compaction_millis.load(Ordering::Relaxed)
}
/// Update compaction time to now millis.
pub(crate) fn update_compaction_millis(&self) {
let now = self.time_provider.current_time_millis();
self.last_compaction_millis.store(now, Ordering::Relaxed);
}
/// Returns the region dir.
pub(crate) fn region_dir(&self) -> &str {
self.access_layer.region_dir()

View File

@@ -49,7 +49,7 @@ use crate::schedule::scheduler::SchedulerRef;
use crate::sst::file_purger::LocalFilePurger;
use crate::sst::index::intermediate::IntermediateManager;
use crate::sst::index::puffin_manager::PuffinManagerFactory;
use crate::time_provider::{StdTimeProvider, TimeProviderRef};
use crate::time_provider::TimeProviderRef;
use crate::wal::entry_reader::WalEntryReader;
use crate::wal::{EntryId, Wal};
@@ -66,13 +66,15 @@ pub(crate) struct RegionOpener {
skip_wal_replay: bool,
puffin_manager_factory: PuffinManagerFactory,
intermediate_manager: IntermediateManager,
time_provider: Option<TimeProviderRef>,
time_provider: TimeProviderRef,
stats: ManifestStats,
wal_entry_reader: Option<Box<dyn WalEntryReader>>,
}
impl RegionOpener {
/// Returns a new opener.
// TODO(LFC): Reduce the number of arguments.
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
region_id: RegionId,
region_dir: &str,
@@ -81,6 +83,7 @@ impl RegionOpener {
purge_scheduler: SchedulerRef,
puffin_manager_factory: PuffinManagerFactory,
intermediate_manager: IntermediateManager,
time_provider: TimeProviderRef,
) -> RegionOpener {
RegionOpener {
region_id,
@@ -94,7 +97,7 @@ impl RegionOpener {
skip_wal_replay: false,
puffin_manager_factory,
intermediate_manager,
time_provider: None,
time_provider,
stats: Default::default(),
wal_entry_reader: None,
}
@@ -223,9 +226,7 @@ impl RegionOpener {
self.puffin_manager_factory,
self.intermediate_manager,
));
let time_provider = self
.time_provider
.unwrap_or_else(|| Arc::new(StdTimeProvider));
let now = self.time_provider.current_time_millis();
Ok(MitoRegion {
region_id,
@@ -242,8 +243,9 @@ impl RegionOpener {
self.cache_manager,
)),
provider,
last_flush_millis: AtomicI64::new(time_provider.current_time_millis()),
time_provider,
last_flush_millis: AtomicI64::new(now),
last_compaction_millis: AtomicI64::new(now),
time_provider: self.time_provider.clone(),
memtable_builder,
stats: self.stats,
})
@@ -377,10 +379,7 @@ impl RegionOpener {
} else {
info!("Skip the WAL replay for region: {}", region_id);
}
let time_provider = self
.time_provider
.clone()
.unwrap_or_else(|| Arc::new(StdTimeProvider));
let now = self.time_provider.current_time_millis();
let region = MitoRegion {
region_id: self.region_id,
@@ -393,8 +392,9 @@ impl RegionOpener {
)),
file_purger,
provider: provider.clone(),
last_flush_millis: AtomicI64::new(time_provider.current_time_millis()),
time_provider,
last_flush_millis: AtomicI64::new(now),
last_compaction_millis: AtomicI64::new(now),
time_provider: self.time_provider.clone(),
memtable_builder,
stats: self.stats.clone(),
};

View File

@@ -958,6 +958,13 @@ impl WorkerListener {
listener.on_file_cache_filled(_file_id);
}
}
pub(crate) fn on_compaction_scheduled(&self, _region_id: RegionId) {
#[cfg(any(test, feature = "test"))]
if let Some(listener) = &self.listener {
listener.on_compaction_scheduled(_region_id);
}
}
}
#[cfg(test)]

View File

@@ -56,6 +56,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
self.purge_scheduler.clone(),
self.puffin_manager_factory.clone(),
self.intermediate_manager.clone(),
self.time_provider.clone(),
)
.cache(Some(self.cache_manager.clone()))
.options(region.version().options.clone())?

View File

@@ -12,17 +12,18 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use common_telemetry::{error, info};
use store_api::logstore::LogStore;
use api::v1::region::compact_request;
use common_telemetry::{error, info, warn};
use store_api::region_request::RegionCompactRequest;
use store_api::storage::RegionId;
use crate::error::RegionNotFoundSnafu;
use crate::metrics::COMPACTION_REQUEST_COUNT;
use crate::region::MitoRegionRef;
use crate::request::{CompactionFailed, CompactionFinished, OnFailure, OptionOutputTx};
use crate::worker::RegionWorkerLoop;
impl<S: LogStore> RegionWorkerLoop<S> {
impl<S> RegionWorkerLoop<S> {
/// Handles compaction request submitted to region worker.
pub(crate) async fn handle_compaction_request(
&mut self,
@@ -68,6 +69,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
return;
}
};
region.update_compaction_millis();
region
.version_control
@@ -89,4 +91,30 @@ impl<S: LogStore> RegionWorkerLoop<S> {
self.compaction_scheduler
.on_compaction_failed(req.region_id, req.err);
}
/// Schedule compaction for the region if necessary.
pub(crate) async fn schedule_compaction(&mut self, region: &MitoRegionRef) {
let now = self.time_provider.current_time_millis();
if now - region.last_compaction_millis()
>= self.config.min_compaction_interval.as_millis() as i64
{
if let Err(e) = self
.compaction_scheduler
.schedule_compaction(
region.region_id,
compact_request::Options::Regular(Default::default()),
&region.version_control,
&region.access_layer,
OptionOutputTx::none(),
&region.manifest_ctx,
)
.await
{
warn!(
"Failed to schedule compaction for region: {}, err: {}",
region.region_id, e
);
}
}
}
}

View File

@@ -62,6 +62,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
self.purge_scheduler.clone(),
self.puffin_manager_factory.clone(),
self.intermediate_manager.clone(),
self.time_provider.clone(),
)
.metadata(metadata)
.parse_options(request.options)?

View File

@@ -16,8 +16,7 @@
use std::sync::Arc;
use api::v1::region::compact_request;
use common_telemetry::{error, info, warn};
use common_telemetry::{error, info};
use store_api::logstore::LogStore;
use store_api::region_request::RegionFlushRequest;
use store_api::storage::RegionId;
@@ -242,23 +241,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
self.handle_stalled_requests().await;
// Schedules compaction.
if let Err(e) = self
.compaction_scheduler
.schedule_compaction(
region.region_id,
compact_request::Options::Regular(Default::default()),
&region.version_control,
&region.access_layer,
OptionOutputTx::none(),
&region.manifest_ctx,
)
.await
{
warn!(
"Failed to schedule compaction after flush, region: {}, err: {}",
region.region_id, e
);
}
self.schedule_compaction(&region).await;
self.listener.on_flush_success(region_id);
}

View File

@@ -148,6 +148,9 @@ impl<S> RegionWorkerLoop<S> {
}
};
let need_compaction =
edit_result.result.is_ok() && !edit_result.edit.files_to_add.is_empty();
if edit_result.result.is_ok() {
// Applies the edit to the region.
region
@@ -165,6 +168,10 @@ impl<S> RegionWorkerLoop<S> {
self.handle_region_edit(request).await;
}
}
if need_compaction {
self.schedule_compaction(&region).await;
}
}
/// Writes truncate action to the manifest and then applies it to the region in background.

View File

@@ -97,6 +97,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
self.purge_scheduler.clone(),
self.puffin_manager_factory.clone(),
self.intermediate_manager.clone(),
self.time_provider.clone(),
)
.skip_wal_replay(request.skip_wal_replay)
.cache(Some(self.cache_manager.clone()))

View File

@@ -74,6 +74,7 @@ use crate::query_handler::{
use crate::server::Server;
pub mod authorize;
pub mod dyn_log;
pub mod event;
pub mod handler;
pub mod header;
@@ -708,6 +709,15 @@ impl HttpServer {
authorize::check_http_auth,
)),
)
.nest(
"/debug",
Router::new()
// handler for changing log level dynamically
.route(
"/log_level",
routing::get(dyn_log::dyn_log_handler).post(dyn_log::dyn_log_handler),
),
)
// Handlers for debug, we don't expect a timeout.
.nest(
&format!("/{HTTP_API_VERSION}/prof"),

View File

@@ -0,0 +1,54 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use axum::http::StatusCode;
use axum::response::IntoResponse;
use common_telemetry::tracing_subscriber::filter;
use common_telemetry::{info, RELOAD_HANDLE};
use snafu::OptionExt;
use crate::error::{InternalSnafu, InvalidParameterSnafu, Result};
#[axum_macros::debug_handler]
pub async fn dyn_log_handler(level: String) -> Result<impl IntoResponse> {
let new_filter = level.parse::<filter::Targets>().map_err(|e| {
InvalidParameterSnafu {
reason: format!("Invalid filter \"{level}\": {e:?}"),
}
.build()
})?;
let mut old_filter = None;
RELOAD_HANDLE
.get()
.context(InternalSnafu {
err_msg: "Reload handle not initialized",
})?
.modify(|filter| {
old_filter = Some(filter.clone());
*filter = new_filter.clone()
})
.map_err(|e| {
InternalSnafu {
err_msg: format!("Fail to modify filter: {e:?}"),
}
.build()
})?;
let change_note = format!(
"Log Level changed from {} to {}",
old_filter.map(|f| f.to_string()).unwrap_or_default(),
new_filter
);
info!("{}", change_note.clone());
Ok((StatusCode::OK, change_note))
}

View File

@@ -823,8 +823,8 @@ max_retry_times = 3
retry_delay = "500ms"
[logging]
enable_otlp_tracing = false
append_stdout = true
enable_otlp_tracing = false
[[region_engine]]
@@ -841,6 +841,7 @@ experimental_write_cache_size = "512MiB"
sst_write_buffer_size = "8MiB"
parallel_scan_channel_size = 32
allow_stale_entries = false
min_compaction_interval = "0s"
[region_engine.mito.index]
aux_path = ""
@@ -883,6 +884,7 @@ write_interval = "30s"
fn drop_lines_with_inconsistent_results(input: String) -> String {
let inconsistent_results = [
"dir =",
"log_format =",
"data_home =",
"bucket =",
"root =",

View File

@@ -0,0 +1,192 @@
SELECT h3(37.76938, -122.3889, 0);
+---------------------------------------------------+
| h3(Float64(37.76938),Float64(-122.3889),Int64(0)) |
+---------------------------------------------------+
| 8029fffffffffff |
+---------------------------------------------------+
SELECT h3(37.76938, -122.3889, 1);
+---------------------------------------------------+
| h3(Float64(37.76938),Float64(-122.3889),Int64(1)) |
+---------------------------------------------------+
| 81283ffffffffff |
+---------------------------------------------------+
SELECT h3(37.76938, -122.3889, 8);
+---------------------------------------------------+
| h3(Float64(37.76938),Float64(-122.3889),Int64(8)) |
+---------------------------------------------------+
| 88283082e7fffff |
+---------------------------------------------------+
SELECT h3(37.76938, -122.3889, 100);
Error: 3001(EngineExecuteQuery), H3 error: invalid resolution (got Some(100)): out of range
SELECT h3(37.76938, -122.3889, -1);
Error: 3001(EngineExecuteQuery), H3 error: invalid resolution (got Some(255)): out of range
SELECT h3(37.76938, -122.3889, 8::Int8);
+---------------------------------------------------+
| h3(Float64(37.76938),Float64(-122.3889),Int64(8)) |
+---------------------------------------------------+
| 88283082e7fffff |
+---------------------------------------------------+
SELECT h3(37.76938, -122.3889, 8::Int16);
+-----------------------------------------------------------------------------+
| h3(Float64(37.76938),Float64(-122.3889),arrow_cast(Int64(8),Utf8("Int16"))) |
+-----------------------------------------------------------------------------+
| 88283082e7fffff |
+-----------------------------------------------------------------------------+
SELECT h3(37.76938, -122.3889, 8::Int32);
+-----------------------------------------------------------------------------+
| h3(Float64(37.76938),Float64(-122.3889),arrow_cast(Int64(8),Utf8("Int32"))) |
+-----------------------------------------------------------------------------+
| 88283082e7fffff |
+-----------------------------------------------------------------------------+
SELECT h3(37.76938, -122.3889, 8::Int64);
+-----------------------------------------------------------------------------+
| h3(Float64(37.76938),Float64(-122.3889),arrow_cast(Int64(8),Utf8("Int64"))) |
+-----------------------------------------------------------------------------+
| 88283082e7fffff |
+-----------------------------------------------------------------------------+
SELECT h3(37.76938, -122.3889, 8::UInt8);
+-----------------------------------------------------------------------------+
| h3(Float64(37.76938),Float64(-122.3889),arrow_cast(Int64(8),Utf8("UInt8"))) |
+-----------------------------------------------------------------------------+
| 88283082e7fffff |
+-----------------------------------------------------------------------------+
SELECT h3(37.76938, -122.3889, 8::UInt16);
+------------------------------------------------------------------------------+
| h3(Float64(37.76938),Float64(-122.3889),arrow_cast(Int64(8),Utf8("UInt16"))) |
+------------------------------------------------------------------------------+
| 88283082e7fffff |
+------------------------------------------------------------------------------+
SELECT h3(37.76938, -122.3889, 8::UInt32);
+------------------------------------------------------------------------------+
| h3(Float64(37.76938),Float64(-122.3889),arrow_cast(Int64(8),Utf8("UInt32"))) |
+------------------------------------------------------------------------------+
| 88283082e7fffff |
+------------------------------------------------------------------------------+
SELECT h3(37.76938, -122.3889, 8::UInt64);
+------------------------------------------------------------------------------+
| h3(Float64(37.76938),Float64(-122.3889),arrow_cast(Int64(8),Utf8("UInt64"))) |
+------------------------------------------------------------------------------+
| 88283082e7fffff |
+------------------------------------------------------------------------------+
SELECT geohash(37.76938, -122.3889, 9);
+--------------------------------------------------------+
| geohash(Float64(37.76938),Float64(-122.3889),Int64(9)) |
+--------------------------------------------------------+
| 9q8yygxne |
+--------------------------------------------------------+
SELECT geohash(37.76938, -122.3889, 10);
+---------------------------------------------------------+
| geohash(Float64(37.76938),Float64(-122.3889),Int64(10)) |
+---------------------------------------------------------+
| 9q8yygxnef |
+---------------------------------------------------------+
SELECT geohash(37.76938, -122.3889, 11);
+---------------------------------------------------------+
| geohash(Float64(37.76938),Float64(-122.3889),Int64(11)) |
+---------------------------------------------------------+
| 9q8yygxneft |
+---------------------------------------------------------+
SELECT geohash(37.76938, -122.3889, 100);
Error: 3001(EngineExecuteQuery), Geohash error: Invalid length specified: 100. Accepted values are between 1 and 12, inclusive
SELECT geohash(37.76938, -122.3889, -1);
Error: 3001(EngineExecuteQuery), Geohash error: Invalid length specified: 18446744073709551615. Accepted values are between 1 and 12, inclusive
SELECT geohash(37.76938, -122.3889, 11::Int8);
+---------------------------------------------------------+
| geohash(Float64(37.76938),Float64(-122.3889),Int64(11)) |
+---------------------------------------------------------+
| 9q8yygxneft |
+---------------------------------------------------------+
SELECT geohash(37.76938, -122.3889, 11::Int16);
+-----------------------------------------------------------------------------------+
| geohash(Float64(37.76938),Float64(-122.3889),arrow_cast(Int64(11),Utf8("Int16"))) |
+-----------------------------------------------------------------------------------+
| 9q8yygxneft |
+-----------------------------------------------------------------------------------+
SELECT geohash(37.76938, -122.3889, 11::Int32);
+-----------------------------------------------------------------------------------+
| geohash(Float64(37.76938),Float64(-122.3889),arrow_cast(Int64(11),Utf8("Int32"))) |
+-----------------------------------------------------------------------------------+
| 9q8yygxneft |
+-----------------------------------------------------------------------------------+
SELECT geohash(37.76938, -122.3889, 11::Int64);
+-----------------------------------------------------------------------------------+
| geohash(Float64(37.76938),Float64(-122.3889),arrow_cast(Int64(11),Utf8("Int64"))) |
+-----------------------------------------------------------------------------------+
| 9q8yygxneft |
+-----------------------------------------------------------------------------------+
SELECT geohash(37.76938, -122.3889, 11::UInt8);
+-----------------------------------------------------------------------------------+
| geohash(Float64(37.76938),Float64(-122.3889),arrow_cast(Int64(11),Utf8("UInt8"))) |
+-----------------------------------------------------------------------------------+
| 9q8yygxneft |
+-----------------------------------------------------------------------------------+
SELECT geohash(37.76938, -122.3889, 11::UInt16);
+------------------------------------------------------------------------------------+
| geohash(Float64(37.76938),Float64(-122.3889),arrow_cast(Int64(11),Utf8("UInt16"))) |
+------------------------------------------------------------------------------------+
| 9q8yygxneft |
+------------------------------------------------------------------------------------+
SELECT geohash(37.76938, -122.3889, 11::UInt32);
+------------------------------------------------------------------------------------+
| geohash(Float64(37.76938),Float64(-122.3889),arrow_cast(Int64(11),Utf8("UInt32"))) |
+------------------------------------------------------------------------------------+
| 9q8yygxneft |
+------------------------------------------------------------------------------------+
SELECT geohash(37.76938, -122.3889, 11::UInt64);
+------------------------------------------------------------------------------------+
| geohash(Float64(37.76938),Float64(-122.3889),arrow_cast(Int64(11),Utf8("UInt64"))) |
+------------------------------------------------------------------------------------+
| 9q8yygxneft |
+------------------------------------------------------------------------------------+

View File

@@ -0,0 +1,51 @@
SELECT h3(37.76938, -122.3889, 0);
SELECT h3(37.76938, -122.3889, 1);
SELECT h3(37.76938, -122.3889, 8);
SELECT h3(37.76938, -122.3889, 100);
SELECT h3(37.76938, -122.3889, -1);
SELECT h3(37.76938, -122.3889, 8::Int8);
SELECT h3(37.76938, -122.3889, 8::Int16);
SELECT h3(37.76938, -122.3889, 8::Int32);
SELECT h3(37.76938, -122.3889, 8::Int64);
SELECT h3(37.76938, -122.3889, 8::UInt8);
SELECT h3(37.76938, -122.3889, 8::UInt16);
SELECT h3(37.76938, -122.3889, 8::UInt32);
SELECT h3(37.76938, -122.3889, 8::UInt64);
SELECT geohash(37.76938, -122.3889, 9);
SELECT geohash(37.76938, -122.3889, 10);
SELECT geohash(37.76938, -122.3889, 11);
SELECT geohash(37.76938, -122.3889, 100);
SELECT geohash(37.76938, -122.3889, -1);
SELECT geohash(37.76938, -122.3889, 11::Int8);
SELECT geohash(37.76938, -122.3889, 11::Int16);
SELECT geohash(37.76938, -122.3889, 11::Int32);
SELECT geohash(37.76938, -122.3889, 11::Int64);
SELECT geohash(37.76938, -122.3889, 11::UInt8);
SELECT geohash(37.76938, -122.3889, 11::UInt16);
SELECT geohash(37.76938, -122.3889, 11::UInt32);
SELECT geohash(37.76938, -122.3889, 11::UInt64);

View File

@@ -0,0 +1,108 @@
create table
t (
ts timestamp time index,
host string primary key,
not_pk string,
val double,
)
with
(
append_mode = 'true',
'compaction.type' = 'twcs',
'compaction.twcs.max_active_window_files' = '8',
'compaction.twcs.max_inactive_window_files' = '8'
);
Affected Rows: 0
insert into
t
values
(0, 'a', '🌕', 1.0),
(1, 'b', '🌖', 2.0),
(1, 'a', '🌗', 3.0),
(1, 'c', '🌘', 4.0),
(2, 'a', '🌑', 5.0),
(2, 'b', '🌒', 6.0),
(2, 'a', '🌓', 7.0),
(3, 'c', '🌔', 8.0),
(3, 'd', '🌕', 9.0);
Affected Rows: 9
admin flush_table ('t');
+------------------------+
| ADMIN flush_table('t') |
+------------------------+
| 0 |
+------------------------+
insert into
t
values
(10, 'a', '🌕', 1.0),
(11, 'b', '🌖', 2.0),
(11, 'a', '🌗', 3.0),
(11, 'c', '🌘', 4.0),
(12, 'a', '🌑', 5.0),
(12, 'b', '🌒', 6.0),
(12, 'a', '🌓', 7.0),
(13, 'c', '🌔', 8.0),
(13, 'd', '🌕', 9.0);
Affected Rows: 9
admin flush_table ('t');
+------------------------+
| ADMIN flush_table('t') |
+------------------------+
| 0 |
+------------------------+
select
count(ts)
from
t;
+-------------+
| COUNT(t.ts) |
+-------------+
| 18 |
+-------------+
select
ts
from
t
order by
ts;
+-------------------------+
| ts |
+-------------------------+
| 1970-01-01T00:00:00 |
| 1970-01-01T00:00:00.001 |
| 1970-01-01T00:00:00.001 |
| 1970-01-01T00:00:00.001 |
| 1970-01-01T00:00:00.002 |
| 1970-01-01T00:00:00.002 |
| 1970-01-01T00:00:00.002 |
| 1970-01-01T00:00:00.003 |
| 1970-01-01T00:00:00.003 |
| 1970-01-01T00:00:00.010 |
| 1970-01-01T00:00:00.011 |
| 1970-01-01T00:00:00.011 |
| 1970-01-01T00:00:00.011 |
| 1970-01-01T00:00:00.012 |
| 1970-01-01T00:00:00.012 |
| 1970-01-01T00:00:00.012 |
| 1970-01-01T00:00:00.013 |
| 1970-01-01T00:00:00.013 |
+-------------------------+
drop table t;
Affected Rows: 0

View File

@@ -0,0 +1,58 @@
create table
t (
ts timestamp time index,
host string primary key,
not_pk string,
val double,
)
with
(
append_mode = 'true',
'compaction.type' = 'twcs',
'compaction.twcs.max_active_window_files' = '8',
'compaction.twcs.max_inactive_window_files' = '8'
);
insert into
t
values
(0, 'a', '🌕', 1.0),
(1, 'b', '🌖', 2.0),
(1, 'a', '🌗', 3.0),
(1, 'c', '🌘', 4.0),
(2, 'a', '🌑', 5.0),
(2, 'b', '🌒', 6.0),
(2, 'a', '🌓', 7.0),
(3, 'c', '🌔', 8.0),
(3, 'd', '🌕', 9.0);
admin flush_table ('t');
insert into
t
values
(10, 'a', '🌕', 1.0),
(11, 'b', '🌖', 2.0),
(11, 'a', '🌗', 3.0),
(11, 'c', '🌘', 4.0),
(12, 'a', '🌑', 5.0),
(12, 'b', '🌒', 6.0),
(12, 'a', '🌓', 7.0),
(13, 'c', '🌔', 8.0),
(13, 'd', '🌕', 9.0);
admin flush_table ('t');
select
count(ts)
from
t;
select
ts
from
t
order by
ts;
drop table t;