diff --git a/Cargo.lock b/Cargo.lock index 18d16b2682..4d1aff23c6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1950,6 +1950,8 @@ dependencies = [ "common-version", "datafusion", "datatypes", + "geohash", + "h3o", "num", "num-traits", "once_cell", @@ -3813,6 +3815,12 @@ dependencies = [ "num-traits", ] +[[package]] +name = "float_eq" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "28a80e3145d8ad11ba0995949bbcf48b9df2be62772b3d351ef017dff6ecb853" + [[package]] name = "flow" version = "0.9.2" @@ -4211,6 +4219,27 @@ dependencies = [ "version_check", ] +[[package]] +name = "geo-types" +version = "0.7.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ff16065e5720f376fbced200a5ae0f47ace85fd70b7e54269790281353b6d61" +dependencies = [ + "approx", + "num-traits", + "serde", +] + +[[package]] +name = "geohash" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fb94b1a65401d6cbf22958a9040aa364812c26674f841bee538b12c135db1e6" +dependencies = [ + "geo-types", + "libm", +] + [[package]] name = "gethostname" version = "0.2.3" @@ -4301,6 +4330,25 @@ dependencies = [ "tracing", ] +[[package]] +name = "h3o" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0de3592e1f699692aa0525c42ff7879ec3ee7e36329af20967bc910a1cdc39c7" +dependencies = [ + "ahash 0.8.11", + "either", + "float_eq", + "h3o-bit", + "libm", +] + +[[package]] +name = "h3o-bit" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6fb45e8060378c0353781abf67e1917b545a6b710d0342d85b70c125af7ef320" + [[package]] name = "half" version = "1.8.3" @@ -4717,7 +4765,7 @@ dependencies = [ "httpdate", "itoa", "pin-project-lite", - "socket2 0.4.10", + "socket2 0.5.7", "tokio", "tower-service", "tracing", @@ -8512,7 +8560,7 @@ dependencies = [ "indoc", "libc", "memoffset 0.9.1", - "parking_lot 0.11.2", + "parking_lot 0.12.3", "portable-atomic", "pyo3-build-config", "pyo3-ffi", @@ -12441,6 +12489,16 @@ dependencies = [ "web-time 0.2.4", ] +[[package]] +name = "tracing-serde" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc6b213177105856957181934e4920de57730fc69bf42c37ee5bb664d406d9e1" +dependencies = [ + "serde", + "tracing-core", +] + [[package]] name = "tracing-subscriber" version = "0.3.18" @@ -12451,12 +12509,15 @@ dependencies = [ "nu-ansi-term", "once_cell", "regex", + "serde", + "serde_json", "sharded-slab", "smallvec", "thread_local", "tracing", "tracing-core", "tracing-log 0.2.0", + "tracing-serde", ] [[package]] diff --git a/config/config.md b/config/config.md index 5e5d8e7f5d..f0ee9e54f8 100644 --- a/config/config.md +++ b/config/config.md @@ -129,6 +129,7 @@ | `region_engine.mito.scan_parallelism` | Integer | `0` | Parallelism to scan a region (default: 1/4 of cpu cores).
- `0`: using the default value (1/4 of cpu cores).
- `1`: scan in current thread.
- `n`: scan in parallelism n. | | `region_engine.mito.parallel_scan_channel_size` | Integer | `32` | Capacity of the channel to send data from parallel scan tasks to the main task. | | `region_engine.mito.allow_stale_entries` | Bool | `false` | Whether to allow stale WAL entries read during replay. | +| `region_engine.mito.min_compaction_interval` | String | `0m` | Minimum time interval between two compactions.
To align with the old behavior, the default value is 0 (no restrictions). | | `region_engine.mito.index` | -- | -- | The options for index in Mito engine. | | `region_engine.mito.index.aux_path` | String | `""` | Auxiliary directory path for the index in filesystem, used to store intermediate files for
creating the index and staging files for searching the index, defaults to `{data_home}/index_intermediate`.
The default name for this directory is `index_intermediate` for backward compatibility.

This path contains two subdirectories:
- `__intm`: for storing intermediate files used during creating index.
- `staging`: for storing staging files used during searching index. | | `region_engine.mito.index.staging_size` | String | `2GB` | The max capacity of the staging directory. | @@ -152,11 +153,12 @@ | `region_engine.mito.memtable.fork_dictionary_bytes` | String | `1GiB` | Max dictionary bytes.
Only available for `partition_tree` memtable. | | `region_engine.file` | -- | -- | Enable the file engine. | | `logging` | -- | -- | The logging options. | -| `logging.dir` | String | `/tmp/greptimedb/logs` | The directory to store the log files. | +| `logging.dir` | String | `/tmp/greptimedb/logs` | The directory to store the log files. If set to empty, logs will not be written to files. | | `logging.level` | String | `None` | The log level. Can be `info`/`debug`/`warn`/`error`. | | `logging.enable_otlp_tracing` | Bool | `false` | Enable OTLP tracing. | | `logging.otlp_endpoint` | String | `http://localhost:4317` | The OTLP tracing endpoint. | | `logging.append_stdout` | Bool | `true` | Whether to append logs to stdout. | +| `logging.log_format` | String | `text` | The log format. Can be `text`/`json`. | | `logging.tracing_sample_ratio` | -- | -- | The percentage of tracing will be sampled and exported.
Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.
ratio > 1 are treated as 1. Fractions < 0 are treated as 0 | | `logging.tracing_sample_ratio.default_ratio` | Float | `1.0` | -- | | `export_metrics` | -- | -- | The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API.
This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. | @@ -237,11 +239,12 @@ | `datanode.client.connect_timeout` | String | `10s` | -- | | `datanode.client.tcp_nodelay` | Bool | `true` | -- | | `logging` | -- | -- | The logging options. | -| `logging.dir` | String | `/tmp/greptimedb/logs` | The directory to store the log files. | +| `logging.dir` | String | `/tmp/greptimedb/logs` | The directory to store the log files. If set to empty, logs will not be written to files. | | `logging.level` | String | `None` | The log level. Can be `info`/`debug`/`warn`/`error`. | | `logging.enable_otlp_tracing` | Bool | `false` | Enable OTLP tracing. | | `logging.otlp_endpoint` | String | `http://localhost:4317` | The OTLP tracing endpoint. | | `logging.append_stdout` | Bool | `true` | Whether to append logs to stdout. | +| `logging.log_format` | String | `text` | The log format. Can be `text`/`json`. | | `logging.tracing_sample_ratio` | -- | -- | The percentage of tracing will be sampled and exported.
Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.
ratio > 1 are treated as 1. Fractions < 0 are treated as 0 | | `logging.tracing_sample_ratio.default_ratio` | Float | `1.0` | -- | | `export_metrics` | -- | -- | The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API.
This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. | @@ -301,11 +304,12 @@ | `wal.backoff_base` | Integer | `2` | Exponential backoff rate, i.e. next backoff = base * current backoff. | | `wal.backoff_deadline` | String | `5mins` | Stop reconnecting if the total wait time reaches the deadline. If this config is missing, the reconnecting won't terminate. | | `logging` | -- | -- | The logging options. | -| `logging.dir` | String | `/tmp/greptimedb/logs` | The directory to store the log files. | +| `logging.dir` | String | `/tmp/greptimedb/logs` | The directory to store the log files. If set to empty, logs will not be written to files. | | `logging.level` | String | `None` | The log level. Can be `info`/`debug`/`warn`/`error`. | | `logging.enable_otlp_tracing` | Bool | `false` | Enable OTLP tracing. | | `logging.otlp_endpoint` | String | `http://localhost:4317` | The OTLP tracing endpoint. | | `logging.append_stdout` | Bool | `true` | Whether to append logs to stdout. | +| `logging.log_format` | String | `text` | The log format. Can be `text`/`json`. | | `logging.tracing_sample_ratio` | -- | -- | The percentage of tracing will be sampled and exported.
Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.
ratio > 1 are treated as 1. Fractions < 0 are treated as 0 | | `logging.tracing_sample_ratio.default_ratio` | Float | `1.0` | -- | | `export_metrics` | -- | -- | The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API.
This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. | @@ -428,6 +432,7 @@ | `region_engine.mito.scan_parallelism` | Integer | `0` | Parallelism to scan a region (default: 1/4 of cpu cores).
- `0`: using the default value (1/4 of cpu cores).
- `1`: scan in current thread.
- `n`: scan in parallelism n. | | `region_engine.mito.parallel_scan_channel_size` | Integer | `32` | Capacity of the channel to send data from parallel scan tasks to the main task. | | `region_engine.mito.allow_stale_entries` | Bool | `false` | Whether to allow stale WAL entries read during replay. | +| `region_engine.mito.min_compaction_interval` | String | `0m` | Minimum time interval between two compactions.
To align with the old behavior, the default value is 0 (no restrictions). | | `region_engine.mito.index` | -- | -- | The options for index in Mito engine. | | `region_engine.mito.index.aux_path` | String | `""` | Auxiliary directory path for the index in filesystem, used to store intermediate files for
creating the index and staging files for searching the index, defaults to `{data_home}/index_intermediate`.
The default name for this directory is `index_intermediate` for backward compatibility.

This path contains two subdirectories:
- `__intm`: for storing intermediate files used during creating index.
- `staging`: for storing staging files used during searching index. | | `region_engine.mito.index.staging_size` | String | `2GB` | The max capacity of the staging directory. | @@ -449,11 +454,12 @@ | `region_engine.mito.memtable.fork_dictionary_bytes` | String | `1GiB` | Max dictionary bytes.
Only available for `partition_tree` memtable. | | `region_engine.file` | -- | -- | Enable the file engine. | | `logging` | -- | -- | The logging options. | -| `logging.dir` | String | `/tmp/greptimedb/logs` | The directory to store the log files. | +| `logging.dir` | String | `/tmp/greptimedb/logs` | The directory to store the log files. If set to empty, logs will not be written to files. | | `logging.level` | String | `None` | The log level. Can be `info`/`debug`/`warn`/`error`. | | `logging.enable_otlp_tracing` | Bool | `false` | Enable OTLP tracing. | | `logging.otlp_endpoint` | String | `http://localhost:4317` | The OTLP tracing endpoint. | | `logging.append_stdout` | Bool | `true` | Whether to append logs to stdout. | +| `logging.log_format` | String | `text` | The log format. Can be `text`/`json`. | | `logging.tracing_sample_ratio` | -- | -- | The percentage of tracing will be sampled and exported.
Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.
ratio > 1 are treated as 1. Fractions < 0 are treated as 0 | | `logging.tracing_sample_ratio.default_ratio` | Float | `1.0` | -- | | `export_metrics` | -- | -- | The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API.
This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. | @@ -494,11 +500,12 @@ | `heartbeat.interval` | String | `3s` | Interval for sending heartbeat messages to the metasrv. | | `heartbeat.retry_interval` | String | `3s` | Interval for retrying to send heartbeat messages to the metasrv. | | `logging` | -- | -- | The logging options. | -| `logging.dir` | String | `/tmp/greptimedb/logs` | The directory to store the log files. | +| `logging.dir` | String | `/tmp/greptimedb/logs` | The directory to store the log files. If set to empty, logs will not be written to files. | | `logging.level` | String | `None` | The log level. Can be `info`/`debug`/`warn`/`error`. | | `logging.enable_otlp_tracing` | Bool | `false` | Enable OTLP tracing. | | `logging.otlp_endpoint` | String | `http://localhost:4317` | The OTLP tracing endpoint. | | `logging.append_stdout` | Bool | `true` | Whether to append logs to stdout. | +| `logging.log_format` | String | `text` | The log format. Can be `text`/`json`. | | `logging.tracing_sample_ratio` | -- | -- | The percentage of tracing will be sampled and exported.
Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.
ratio > 1 are treated as 1. Fractions < 0 are treated as 0 | | `logging.tracing_sample_ratio.default_ratio` | Float | `1.0` | -- | | `tracing` | -- | -- | The tracing options. Only effect when compiled with `tokio-console` feature. | diff --git a/config/datanode.example.toml b/config/datanode.example.toml index 48f37b62ea..07c1df3e2a 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -336,7 +336,7 @@ credential_path = "test" ## The credential of the google cloud storage. ## **It's only used when the storage type is `Gcs`**. ## +toml2docs:none-default -credential= "base64-credential" +credential = "base64-credential" ## The container of the azure account. ## **It's only used when the storage type is `Azblob`**. @@ -455,6 +455,10 @@ parallel_scan_channel_size = 32 ## Whether to allow stale WAL entries read during replay. allow_stale_entries = false +## Minimum time interval between two compactions. +## To align with the old behavior, the default value is 0 (no restrictions). +min_compaction_interval = "0m" + ## The options for index in Mito engine. [region_engine.mito.index] @@ -545,7 +549,7 @@ fork_dictionary_bytes = "1GiB" ## The logging options. [logging] -## The directory to store the log files. +## The directory to store the log files. If set to empty, logs will not be written to files. dir = "/tmp/greptimedb/logs" ## The log level. Can be `info`/`debug`/`warn`/`error`. @@ -561,6 +565,9 @@ otlp_endpoint = "http://localhost:4317" ## Whether to append logs to stdout. append_stdout = true +## The log format. Can be `text`/`json`. +log_format = "text" + ## The percentage of tracing will be sampled and exported. ## Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1. ## ratio > 1 are treated as 1. Fractions < 0 are treated as 0 diff --git a/config/flownode.example.toml b/config/flownode.example.toml index 443edb0303..d5640062f7 100644 --- a/config/flownode.example.toml +++ b/config/flownode.example.toml @@ -59,7 +59,7 @@ retry_interval = "3s" ## The logging options. [logging] -## The directory to store the log files. +## The directory to store the log files. If set to empty, logs will not be written to files. dir = "/tmp/greptimedb/logs" ## The log level. Can be `info`/`debug`/`warn`/`error`. @@ -75,6 +75,9 @@ otlp_endpoint = "http://localhost:4317" ## Whether to append logs to stdout. append_stdout = true +## The log format. Can be `text`/`json`. +log_format = "text" + ## The percentage of tracing will be sampled and exported. ## Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1. ## ratio > 1 are treated as 1. Fractions < 0 are treated as 0 diff --git a/config/frontend.example.toml b/config/frontend.example.toml index a3b8ef366f..e5a7f5af89 100644 --- a/config/frontend.example.toml +++ b/config/frontend.example.toml @@ -166,7 +166,7 @@ tcp_nodelay = true ## The logging options. [logging] -## The directory to store the log files. +## The directory to store the log files. If set to empty, logs will not be written to files. dir = "/tmp/greptimedb/logs" ## The log level. Can be `info`/`debug`/`warn`/`error`. @@ -182,6 +182,9 @@ otlp_endpoint = "http://localhost:4317" ## Whether to append logs to stdout. append_stdout = true +## The log format. Can be `text`/`json`. +log_format = "text" + ## The percentage of tracing will be sampled and exported. ## Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1. ## ratio > 1 are treated as 1. Fractions < 0 are treated as 0 diff --git a/config/metasrv.example.toml b/config/metasrv.example.toml index 03be77c070..dc5f091166 100644 --- a/config/metasrv.example.toml +++ b/config/metasrv.example.toml @@ -153,7 +153,7 @@ backoff_deadline = "5mins" ## The logging options. [logging] -## The directory to store the log files. +## The directory to store the log files. If set to empty, logs will not be written to files. dir = "/tmp/greptimedb/logs" ## The log level. Can be `info`/`debug`/`warn`/`error`. @@ -169,6 +169,9 @@ otlp_endpoint = "http://localhost:4317" ## Whether to append logs to stdout. append_stdout = true +## The log format. Can be `text`/`json`. +log_format = "text" + ## The percentage of tracing will be sampled and exported. ## Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1. ## ratio > 1 are treated as 1. Fractions < 0 are treated as 0 diff --git a/config/standalone.example.toml b/config/standalone.example.toml index 6ca7d917d3..f36c0e2904 100644 --- a/config/standalone.example.toml +++ b/config/standalone.example.toml @@ -493,6 +493,10 @@ parallel_scan_channel_size = 32 ## Whether to allow stale WAL entries read during replay. allow_stale_entries = false +## Minimum time interval between two compactions. +## To align with the old behavior, the default value is 0 (no restrictions). +min_compaction_interval = "0m" + ## The options for index in Mito engine. [region_engine.mito.index] @@ -589,7 +593,7 @@ fork_dictionary_bytes = "1GiB" ## The logging options. [logging] -## The directory to store the log files. +## The directory to store the log files. If set to empty, logs will not be written to files. dir = "/tmp/greptimedb/logs" ## The log level. Can be `info`/`debug`/`warn`/`error`. @@ -605,6 +609,9 @@ otlp_endpoint = "http://localhost:4317" ## Whether to append logs to stdout. append_stdout = true +## The log format. Can be `text`/`json`. +log_format = "text" + ## The percentage of tracing will be sampled and exported. ## Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1. ## ratio > 1 are treated as 1. Fractions < 0 are treated as 0 diff --git a/docker/ci/ubuntu/Dockerfile b/docker/ci/ubuntu/Dockerfile index 580b73e56f..cc3bed6f25 100644 --- a/docker/ci/ubuntu/Dockerfile +++ b/docker/ci/ubuntu/Dockerfile @@ -11,7 +11,9 @@ RUN apt-get update && DEBIAN_FRONTEND=noninteractive apt-get install -y \ python3.10 \ python3.10-dev \ python3-pip \ - curl + curl \ + mysql-client \ + postgresql-client COPY $DOCKER_BUILD_ROOT/docker/python/requirements.txt /etc/greptime/requirements.txt diff --git a/src/auth/src/error.rs b/src/auth/src/error.rs index 23c5f0d66c..281c45234d 100644 --- a/src/auth/src/error.rs +++ b/src/auth/src/error.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use common_error::ext::ErrorExt; +use common_error::ext::{BoxedError, ErrorExt}; use common_error::status_code::StatusCode; use common_macro::stack_trace_debug; use snafu::{Location, Snafu}; @@ -38,6 +38,14 @@ pub enum Error { location: Location, }, + #[snafu(display("Authentication source failure"))] + AuthBackend { + #[snafu(implicit)] + location: Location, + #[snafu(source)] + source: BoxedError, + }, + #[snafu(display("User not found, username: {}", username))] UserNotFound { username: String }, @@ -81,6 +89,7 @@ impl ErrorExt for Error { Error::FileWatch { .. } => StatusCode::InvalidArguments, Error::InternalState { .. } => StatusCode::Unexpected, Error::Io { .. } => StatusCode::StorageUnavailable, + Error::AuthBackend { .. } => StatusCode::Internal, Error::UserNotFound { .. } => StatusCode::UserNotFound, Error::UnsupportedPasswordType { .. } => StatusCode::UnsupportedPasswordType, diff --git a/src/auth/src/tests.rs b/src/auth/src/tests.rs index 8e3cd17e7a..ef5bf9a6b5 100644 --- a/src/auth/src/tests.rs +++ b/src/auth/src/tests.rs @@ -13,9 +13,11 @@ // limitations under the License. use common_base::secrets::ExposeSecret; +use common_error::ext::BoxedError; +use snafu::{OptionExt, ResultExt}; use crate::error::{ - AccessDeniedSnafu, Result, UnsupportedPasswordTypeSnafu, UserNotFoundSnafu, + AccessDeniedSnafu, AuthBackendSnafu, Result, UnsupportedPasswordTypeSnafu, UserNotFoundSnafu, UserPasswordMismatchSnafu, }; use crate::user_info::DefaultUserInfo; @@ -49,6 +51,19 @@ impl MockUserProvider { info.schema.clone_into(&mut self.schema); info.username.clone_into(&mut self.username); } + + // this is a deliberate function to ref AuthBackendSnafu + // so that it won't get deleted in the future + pub fn ref_auth_backend_snafu(&self) -> Result<()> { + let none_option = None; + + none_option + .context(UserNotFoundSnafu { + username: "no_user".to_string(), + }) + .map_err(BoxedError::new) + .context(AuthBackendSnafu) + } } #[async_trait::async_trait] diff --git a/src/common/function/Cargo.toml b/src/common/function/Cargo.toml index e7d6ee870f..2451b2bcbd 100644 --- a/src/common/function/Cargo.toml +++ b/src/common/function/Cargo.toml @@ -7,6 +7,10 @@ license.workspace = true [lints] workspace = true +[features] +default = ["geo"] +geo = ["geohash", "h3o"] + [dependencies] api.workspace = true arc-swap = "1.0" @@ -23,6 +27,8 @@ common-time.workspace = true common-version.workspace = true datafusion.workspace = true datatypes.workspace = true +geohash = { version = "0.13", optional = true } +h3o = { version = "0.6", optional = true } num = "0.4" num-traits = "0.2" once_cell.workspace = true diff --git a/src/common/function/src/function_registry.rs b/src/common/function/src/function_registry.rs index c2a315d51d..ed863c16aa 100644 --- a/src/common/function/src/function_registry.rs +++ b/src/common/function/src/function_registry.rs @@ -116,6 +116,10 @@ pub static FUNCTION_REGISTRY: Lazy> = Lazy::new(|| { SystemFunction::register(&function_registry); TableFunction::register(&function_registry); + // Geo functions + #[cfg(feature = "geo")] + crate::scalars::geo::GeoFunctions::register(&function_registry); + Arc::new(function_registry) }); diff --git a/src/common/function/src/scalars.rs b/src/common/function/src/scalars.rs index 2b3f463e94..f8dc570d12 100644 --- a/src/common/function/src/scalars.rs +++ b/src/common/function/src/scalars.rs @@ -15,6 +15,8 @@ pub mod aggregate; pub(crate) mod date; pub mod expression; +#[cfg(feature = "geo")] +pub mod geo; pub mod matches; pub mod math; pub mod numpy; diff --git a/src/common/function/src/scalars/geo.rs b/src/common/function/src/scalars/geo.rs new file mode 100644 index 0000000000..4b126f20f0 --- /dev/null +++ b/src/common/function/src/scalars/geo.rs @@ -0,0 +1,31 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; +mod geohash; +mod h3; + +use geohash::GeohashFunction; +use h3::H3Function; + +use crate::function_registry::FunctionRegistry; + +pub(crate) struct GeoFunctions; + +impl GeoFunctions { + pub fn register(registry: &FunctionRegistry) { + registry.register(Arc::new(GeohashFunction)); + registry.register(Arc::new(H3Function)); + } +} diff --git a/src/common/function/src/scalars/geo/geohash.rs b/src/common/function/src/scalars/geo/geohash.rs new file mode 100644 index 0000000000..2daa8223cc --- /dev/null +++ b/src/common/function/src/scalars/geo/geohash.rs @@ -0,0 +1,135 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::fmt; + +use common_error::ext::{BoxedError, PlainError}; +use common_error::status_code::StatusCode; +use common_query::error::{self, InvalidFuncArgsSnafu, Result}; +use common_query::prelude::{Signature, TypeSignature}; +use datafusion::logical_expr::Volatility; +use datatypes::prelude::ConcreteDataType; +use datatypes::scalars::ScalarVectorBuilder; +use datatypes::value::Value; +use datatypes::vectors::{MutableVector, StringVectorBuilder, VectorRef}; +use geohash::Coord; +use snafu::{ensure, ResultExt}; + +use crate::function::{Function, FunctionContext}; + +/// Function that return geohash string for a given geospatial coordinate. +#[derive(Clone, Debug, Default)] +pub struct GeohashFunction; + +const NAME: &str = "geohash"; + +impl Function for GeohashFunction { + fn name(&self) -> &str { + NAME + } + + fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result { + Ok(ConcreteDataType::string_datatype()) + } + + fn signature(&self) -> Signature { + let mut signatures = Vec::new(); + for coord_type in &[ + ConcreteDataType::float32_datatype(), + ConcreteDataType::float64_datatype(), + ] { + for resolution_type in &[ + ConcreteDataType::int8_datatype(), + ConcreteDataType::int16_datatype(), + ConcreteDataType::int32_datatype(), + ConcreteDataType::int64_datatype(), + ConcreteDataType::uint8_datatype(), + ConcreteDataType::uint16_datatype(), + ConcreteDataType::uint32_datatype(), + ConcreteDataType::uint64_datatype(), + ] { + signatures.push(TypeSignature::Exact(vec![ + // latitude + coord_type.clone(), + // longitude + coord_type.clone(), + // resolution + resolution_type.clone(), + ])); + } + } + Signature::one_of(signatures, Volatility::Stable) + } + + fn eval(&self, _func_ctx: FunctionContext, columns: &[VectorRef]) -> Result { + ensure!( + columns.len() == 3, + InvalidFuncArgsSnafu { + err_msg: format!( + "The length of the args is not correct, expect 3, provided : {}", + columns.len() + ), + } + ); + + let lat_vec = &columns[0]; + let lon_vec = &columns[1]; + let resolution_vec = &columns[2]; + + let size = lat_vec.len(); + let mut results = StringVectorBuilder::with_capacity(size); + + for i in 0..size { + let lat = lat_vec.get(i).as_f64_lossy(); + let lon = lon_vec.get(i).as_f64_lossy(); + let r = match resolution_vec.get(i) { + Value::Int8(v) => v as usize, + Value::Int16(v) => v as usize, + Value::Int32(v) => v as usize, + Value::Int64(v) => v as usize, + Value::UInt8(v) => v as usize, + Value::UInt16(v) => v as usize, + Value::UInt32(v) => v as usize, + Value::UInt64(v) => v as usize, + _ => unreachable!(), + }; + + let result = match (lat, lon) { + (Some(lat), Some(lon)) => { + let coord = Coord { x: lon, y: lat }; + let encoded = geohash::encode(coord, r) + .map_err(|e| { + BoxedError::new(PlainError::new( + format!("Geohash error: {}", e), + StatusCode::EngineExecuteQuery, + )) + }) + .context(error::ExecuteSnafu)?; + Some(encoded) + } + _ => None, + }; + + results.push(result.as_deref()); + } + + Ok(results.to_vector()) + } +} + +impl fmt::Display for GeohashFunction { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "{}", NAME) + } +} diff --git a/src/common/function/src/scalars/geo/h3.rs b/src/common/function/src/scalars/geo/h3.rs new file mode 100644 index 0000000000..26ec246997 --- /dev/null +++ b/src/common/function/src/scalars/geo/h3.rs @@ -0,0 +1,145 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::fmt; + +use common_error::ext::{BoxedError, PlainError}; +use common_error::status_code::StatusCode; +use common_query::error::{self, InvalidFuncArgsSnafu, Result}; +use common_query::prelude::{Signature, TypeSignature}; +use datafusion::logical_expr::Volatility; +use datatypes::prelude::ConcreteDataType; +use datatypes::scalars::ScalarVectorBuilder; +use datatypes::value::Value; +use datatypes::vectors::{MutableVector, StringVectorBuilder, VectorRef}; +use h3o::{LatLng, Resolution}; +use snafu::{ensure, ResultExt}; + +use crate::function::{Function, FunctionContext}; + +/// Function that returns [h3] encoding string for a given geospatial coordinate. +/// +/// [h3]: https://h3geo.org/ +#[derive(Clone, Debug, Default)] +pub struct H3Function; + +const NAME: &str = "h3"; + +impl Function for H3Function { + fn name(&self) -> &str { + NAME + } + + fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result { + Ok(ConcreteDataType::string_datatype()) + } + + fn signature(&self) -> Signature { + let mut signatures = Vec::new(); + for coord_type in &[ + ConcreteDataType::float32_datatype(), + ConcreteDataType::float64_datatype(), + ] { + for resolution_type in &[ + ConcreteDataType::int8_datatype(), + ConcreteDataType::int16_datatype(), + ConcreteDataType::int32_datatype(), + ConcreteDataType::int64_datatype(), + ConcreteDataType::uint8_datatype(), + ConcreteDataType::uint16_datatype(), + ConcreteDataType::uint32_datatype(), + ConcreteDataType::uint64_datatype(), + ] { + signatures.push(TypeSignature::Exact(vec![ + // latitude + coord_type.clone(), + // longitude + coord_type.clone(), + // resolution + resolution_type.clone(), + ])); + } + } + Signature::one_of(signatures, Volatility::Stable) + } + + fn eval(&self, _func_ctx: FunctionContext, columns: &[VectorRef]) -> Result { + ensure!( + columns.len() == 3, + InvalidFuncArgsSnafu { + err_msg: format!( + "The length of the args is not correct, expect 3, provided : {}", + columns.len() + ), + } + ); + + let lat_vec = &columns[0]; + let lon_vec = &columns[1]; + let resolution_vec = &columns[2]; + + let size = lat_vec.len(); + let mut results = StringVectorBuilder::with_capacity(size); + + for i in 0..size { + let lat = lat_vec.get(i).as_f64_lossy(); + let lon = lon_vec.get(i).as_f64_lossy(); + let r = match resolution_vec.get(i) { + Value::Int8(v) => v as u8, + Value::Int16(v) => v as u8, + Value::Int32(v) => v as u8, + Value::Int64(v) => v as u8, + Value::UInt8(v) => v, + Value::UInt16(v) => v as u8, + Value::UInt32(v) => v as u8, + Value::UInt64(v) => v as u8, + _ => unreachable!(), + }; + + let result = match (lat, lon) { + (Some(lat), Some(lon)) => { + let coord = LatLng::new(lat, lon) + .map_err(|e| { + BoxedError::new(PlainError::new( + format!("H3 error: {}", e), + StatusCode::EngineExecuteQuery, + )) + }) + .context(error::ExecuteSnafu)?; + let r = Resolution::try_from(r) + .map_err(|e| { + BoxedError::new(PlainError::new( + format!("H3 error: {}", e), + StatusCode::EngineExecuteQuery, + )) + }) + .context(error::ExecuteSnafu)?; + let encoded = coord.to_cell(r).to_string(); + Some(encoded) + } + _ => None, + }; + + results.push(result.as_deref()); + } + + Ok(results.to_vector()) + } +} + +impl fmt::Display for H3Function { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "{}", NAME) + } +} diff --git a/src/common/recordbatch/src/error.rs b/src/common/recordbatch/src/error.rs index f2114f645f..3eb90b05e7 100644 --- a/src/common/recordbatch/src/error.rs +++ b/src/common/recordbatch/src/error.rs @@ -172,12 +172,13 @@ impl ErrorExt for Error { Error::DataTypes { .. } | Error::CreateRecordBatches { .. } - | Error::PollStream { .. } | Error::Format { .. } | Error::ToArrowScalar { .. } | Error::ProjectArrowRecordBatch { .. } | Error::PhysicalExpr { .. } => StatusCode::Internal, + Error::PollStream { .. } => StatusCode::EngineExecuteQuery, + Error::ArrowCompute { .. } => StatusCode::IllegalState, Error::ColumnNotExists { .. } => StatusCode::TableColumnNotFound, diff --git a/src/common/telemetry/Cargo.toml b/src/common/telemetry/Cargo.toml index 3cf2a36a40..20fc52a763 100644 --- a/src/common/telemetry/Cargo.toml +++ b/src/common/telemetry/Cargo.toml @@ -34,4 +34,4 @@ tracing = "0.1" tracing-appender = "0.2" tracing-log = "0.1" tracing-opentelemetry = "0.22.0" -tracing-subscriber = { version = "0.3", features = ["env-filter"] } +tracing-subscriber = { version = "0.3", features = ["env-filter", "json", "fmt"] } diff --git a/src/common/telemetry/src/lib.rs b/src/common/telemetry/src/lib.rs index 12f0098a15..e63d6e8af4 100644 --- a/src/common/telemetry/src/lib.rs +++ b/src/common/telemetry/src/lib.rs @@ -21,7 +21,7 @@ mod panic_hook; pub mod tracing_context; mod tracing_sampler; -pub use logging::{init_default_ut_logging, init_global_logging}; +pub use logging::{init_default_ut_logging, init_global_logging, RELOAD_HANDLE}; pub use metric::dump_metrics; pub use panic_hook::set_panic_hook; -pub use {common_error, tracing}; +pub use {common_error, tracing, tracing_subscriber}; diff --git a/src/common/telemetry/src/logging.rs b/src/common/telemetry/src/logging.rs index 04766c20db..de018aa4b6 100644 --- a/src/common/telemetry/src/logging.rs +++ b/src/common/telemetry/src/logging.rs @@ -16,7 +16,7 @@ use std::env; use std::sync::{Arc, Mutex, Once}; -use once_cell::sync::Lazy; +use once_cell::sync::{Lazy, OnceCell}; use opentelemetry::{global, KeyValue}; use opentelemetry_otlp::WithExportConfig; use opentelemetry_sdk::propagation::TraceContextPropagator; @@ -26,6 +26,7 @@ use serde::{Deserialize, Serialize}; use tracing_appender::non_blocking::WorkerGuard; use tracing_appender::rolling::{RollingFileAppender, Rotation}; use tracing_log::LogTracer; +use tracing_subscriber::filter::Targets; use tracing_subscriber::fmt::Layer; use tracing_subscriber::layer::SubscriberExt; use tracing_subscriber::prelude::*; @@ -35,15 +36,41 @@ use crate::tracing_sampler::{create_sampler, TracingSampleOptions}; pub const DEFAULT_OTLP_ENDPOINT: &str = "http://localhost:4317"; +// Handle for reloading log level +pub static RELOAD_HANDLE: OnceCell> = + OnceCell::new(); + +/// The logging options that used to initialize the logger. #[derive(Clone, Debug, Serialize, Deserialize)] #[serde(default)] pub struct LoggingOptions { + /// The directory to store log files. If not set, logs will be written to stdout. pub dir: String, + + /// The log level that can be one of "trace", "debug", "info", "warn", "error". Default is "info". pub level: Option, - pub enable_otlp_tracing: bool, - pub otlp_endpoint: Option, - pub tracing_sample_ratio: Option, + + /// The log format that can be one of "json" or "text". Default is "text". + pub log_format: LogFormat, + + /// Whether to append logs to stdout. Default is true. pub append_stdout: bool, + + /// Whether to enable tracing with OTLP. Default is false. + pub enable_otlp_tracing: bool, + + /// The endpoint of OTLP. Default is "http://localhost:4317". + pub otlp_endpoint: Option, + + /// The tracing sample ratio. + pub tracing_sample_ratio: Option, +} + +#[derive(Clone, Debug, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum LogFormat { + Json, + Text, } impl PartialEq for LoggingOptions { @@ -64,6 +91,7 @@ impl Default for LoggingOptions { Self { dir: "/tmp/greptimedb/logs".to_string(), level: None, + log_format: LogFormat::Text, enable_otlp_tracing: false, otlp_endpoint: None, tracing_sample_ratio: None, @@ -128,62 +156,103 @@ pub fn init_global_logging( let mut guards = vec![]; START.call_once(|| { - let dir = &opts.dir; - let level = &opts.level; - let enable_otlp_tracing = opts.enable_otlp_tracing; - // Enable log compatible layer to convert log record to tracing span. LogTracer::init().expect("log tracer must be valid"); - // stdout log layer. + // Configure the stdout logging layer. let stdout_logging_layer = if opts.append_stdout { - let (stdout_writer, stdout_guard) = tracing_appender::non_blocking(std::io::stdout()); - guards.push(stdout_guard); + let (writer, guard) = tracing_appender::non_blocking(std::io::stdout()); + guards.push(guard); - Some( - Layer::new() - .with_writer(stdout_writer) - .with_ansi(atty::is(atty::Stream::Stdout)), - ) + if opts.log_format == LogFormat::Json { + Some( + Layer::new() + .json() + .with_writer(writer) + .with_ansi(atty::is(atty::Stream::Stdout)) + .boxed(), + ) + } else { + Some( + Layer::new() + .with_writer(writer) + .with_ansi(atty::is(atty::Stream::Stdout)) + .boxed(), + ) + } } else { None }; - // file log layer. - let rolling_appender = RollingFileAppender::new(Rotation::HOURLY, dir, app_name); - let (rolling_writer, rolling_writer_guard) = - tracing_appender::non_blocking(rolling_appender); - let file_logging_layer = Layer::new().with_writer(rolling_writer).with_ansi(false); - guards.push(rolling_writer_guard); + // Configure the file logging layer with rolling policy. + let file_logging_layer = if !opts.dir.is_empty() { + let rolling_appender = + RollingFileAppender::new(Rotation::HOURLY, &opts.dir, "greptimedb"); + let (writer, guard) = tracing_appender::non_blocking(rolling_appender); + guards.push(guard); - // error file log layer. - let err_rolling_appender = - RollingFileAppender::new(Rotation::HOURLY, dir, format!("{}-{}", app_name, "err")); - let (err_rolling_writer, err_rolling_writer_guard) = - tracing_appender::non_blocking(err_rolling_appender); - let err_file_logging_layer = Layer::new() - .with_writer(err_rolling_writer) - .with_ansi(false); - guards.push(err_rolling_writer_guard); + if opts.log_format == LogFormat::Json { + Some( + Layer::new() + .json() + .with_writer(writer) + .with_ansi(false) + .boxed(), + ) + } else { + Some(Layer::new().with_writer(writer).with_ansi(false).boxed()) + } + } else { + None + }; + + // Configure the error file logging layer with rolling policy. + let err_file_logging_layer = if !opts.dir.is_empty() { + let rolling_appender = + RollingFileAppender::new(Rotation::HOURLY, &opts.dir, "greptimedb-err"); + let (writer, guard) = tracing_appender::non_blocking(rolling_appender); + guards.push(guard); + + if opts.log_format == LogFormat::Json { + Some( + Layer::new() + .json() + .with_writer(writer) + .with_ansi(false) + .with_filter(filter::LevelFilter::ERROR) + .boxed(), + ) + } else { + Some( + Layer::new() + .with_writer(writer) + .with_ansi(false) + .with_filter(filter::LevelFilter::ERROR) + .boxed(), + ) + } + } else { + None + }; // resolve log level settings from: // - options from command line or config files // - environment variable: RUST_LOG // - default settings - let rust_log_env = std::env::var(EnvFilter::DEFAULT_ENV).ok(); - let targets_string = level + let filter = opts + .level .as_deref() - .or(rust_log_env.as_deref()) - .unwrap_or(DEFAULT_LOG_TARGETS); - let filter = targets_string + .or(env::var(EnvFilter::DEFAULT_ENV).ok().as_deref()) + .unwrap_or(DEFAULT_LOG_TARGETS) .parse::() .expect("error parsing log level string"); - let sampler = opts - .tracing_sample_ratio - .as_ref() - .map(create_sampler) - .map(Sampler::ParentBased) - .unwrap_or(Sampler::ParentBased(Box::new(Sampler::AlwaysOn))); + + let (dyn_filter, reload_handle) = tracing_subscriber::reload::Layer::new(filter.clone()); + + RELOAD_HANDLE + .set(reload_handle) + .expect("reload handle already set, maybe init_global_logging get called twice?"); + // Must enable 'tokio_unstable' cfg to use this feature. // For example: `RUSTFLAGS="--cfg tokio_unstable" cargo run -F common-telemetry/console -- standalone start` #[cfg(feature = "tokio-console")] @@ -204,65 +273,70 @@ pub fn init_global_logging( None }; - let stdout_logging_layer = stdout_logging_layer.map(|x| x.with_filter(filter.clone())); - - let file_logging_layer = file_logging_layer.with_filter(filter); - Registry::default() + .with(dyn_filter) .with(tokio_console_layer) .with(stdout_logging_layer) .with(file_logging_layer) - .with(err_file_logging_layer.with_filter(filter::LevelFilter::ERROR)) + .with(err_file_logging_layer) }; - // consume the `tracing_opts`, to avoid "unused" warnings + // consume the `tracing_opts` to avoid "unused" warnings. let _ = tracing_opts; #[cfg(not(feature = "tokio-console"))] let subscriber = Registry::default() - .with(filter) + .with(dyn_filter) .with(stdout_logging_layer) .with(file_logging_layer) - .with(err_file_logging_layer.with_filter(filter::LevelFilter::ERROR)); + .with(err_file_logging_layer); - if enable_otlp_tracing { + if opts.enable_otlp_tracing { global::set_text_map_propagator(TraceContextPropagator::new()); - // otlp exporter + + let sampler = opts + .tracing_sample_ratio + .as_ref() + .map(create_sampler) + .map(Sampler::ParentBased) + .unwrap_or(Sampler::ParentBased(Box::new(Sampler::AlwaysOn))); + + let trace_config = opentelemetry_sdk::trace::config() + .with_sampler(sampler) + .with_resource(opentelemetry_sdk::Resource::new(vec![ + KeyValue::new(resource::SERVICE_NAME, app_name.to_string()), + KeyValue::new( + resource::SERVICE_INSTANCE_ID, + node_id.unwrap_or("none".to_string()), + ), + KeyValue::new(resource::SERVICE_VERSION, env!("CARGO_PKG_VERSION")), + KeyValue::new(resource::PROCESS_PID, std::process::id().to_string()), + ])); + + let exporter = opentelemetry_otlp::new_exporter().tonic().with_endpoint( + opts.otlp_endpoint + .as_ref() + .map(|e| { + if e.starts_with("http") { + e.to_string() + } else { + format!("http://{}", e) + } + }) + .unwrap_or(DEFAULT_OTLP_ENDPOINT.to_string()), + ); + let tracer = opentelemetry_otlp::new_pipeline() .tracing() - .with_exporter( - opentelemetry_otlp::new_exporter().tonic().with_endpoint( - opts.otlp_endpoint - .as_ref() - .map(|e| { - if e.starts_with("http") { - e.to_string() - } else { - format!("http://{}", e) - } - }) - .unwrap_or(DEFAULT_OTLP_ENDPOINT.to_string()), - ), - ) - .with_trace_config( - opentelemetry_sdk::trace::config() - .with_sampler(sampler) - .with_resource(opentelemetry_sdk::Resource::new(vec![ - KeyValue::new(resource::SERVICE_NAME, app_name.to_string()), - KeyValue::new( - resource::SERVICE_INSTANCE_ID, - node_id.unwrap_or("none".to_string()), - ), - KeyValue::new(resource::SERVICE_VERSION, env!("CARGO_PKG_VERSION")), - KeyValue::new(resource::PROCESS_PID, std::process::id().to_string()), - ])), - ) + .with_exporter(exporter) + .with_trace_config(trace_config) .install_batch(opentelemetry_sdk::runtime::Tokio) .expect("otlp tracer install failed"); - let tracing_layer = Some(tracing_opentelemetry::layer().with_tracer(tracer)); - let subscriber = subscriber.with(tracing_layer); - tracing::subscriber::set_global_default(subscriber) - .expect("error setting global tracing subscriber"); + + tracing::subscriber::set_global_default( + subscriber.with(tracing_opentelemetry::layer().with_tracer(tracer)), + ) + .expect("error setting global tracing subscriber"); } else { tracing::subscriber::set_global_default(subscriber) .expect("error setting global tracing subscriber"); diff --git a/src/datatypes/src/value.rs b/src/datatypes/src/value.rs index fdb6b38bb6..15aa028f4f 100644 --- a/src/datatypes/src/value.rs +++ b/src/datatypes/src/value.rs @@ -268,6 +268,23 @@ impl Value { } } + /// Cast Value to f64. Return None if it's not castable; + pub fn as_f64_lossy(&self) -> Option { + match self { + Value::Float32(v) => Some(v.0 as _), + Value::Float64(v) => Some(v.0), + Value::Int8(v) => Some(*v as _), + Value::Int16(v) => Some(*v as _), + Value::Int32(v) => Some(*v as _), + Value::Int64(v) => Some(*v as _), + Value::UInt8(v) => Some(*v as _), + Value::UInt16(v) => Some(*v as _), + Value::UInt32(v) => Some(*v as _), + Value::UInt64(v) => Some(*v as _), + _ => None, + } + } + /// Returns the logical type of the value. pub fn logical_type_id(&self) -> LogicalTypeId { match self { diff --git a/src/meta-srv/src/handler/collect_stats_handler.rs b/src/meta-srv/src/handler/collect_stats_handler.rs index 1389e6896f..ca4df868d1 100644 --- a/src/meta-srv/src/handler/collect_stats_handler.rs +++ b/src/meta-srv/src/handler/collect_stats_handler.rs @@ -148,6 +148,7 @@ impl HeartbeatHandler for CollectStatsHandler { mod tests { use std::sync::Arc; + use common_meta::cache_invalidator::DummyCacheInvalidator; use common_meta::key::TableMetadataManager; use common_meta::kv_backend::memory::MemoryKvBackend; use common_meta::sequence::SequenceBuilder; @@ -184,6 +185,7 @@ mod tests { election: None, is_infancy: false, table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend.clone())), + cache_invalidator: Arc::new(DummyCacheInvalidator), }; let handler = CollectStatsHandler::default(); diff --git a/src/meta-srv/src/handler/response_header_handler.rs b/src/meta-srv/src/handler/response_header_handler.rs index 8bcc7e9a13..baa7e7ee29 100644 --- a/src/meta-srv/src/handler/response_header_handler.rs +++ b/src/meta-srv/src/handler/response_header_handler.rs @@ -49,6 +49,7 @@ mod tests { use std::sync::Arc; use api::v1::meta::{HeartbeatResponse, RequestHeader}; + use common_meta::cache_invalidator::DummyCacheInvalidator; use common_meta::key::TableMetadataManager; use common_meta::kv_backend::memory::MemoryKvBackend; use common_meta::sequence::SequenceBuilder; @@ -85,6 +86,7 @@ mod tests { election: None, is_infancy: false, table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend.clone())), + cache_invalidator: Arc::new(DummyCacheInvalidator), }; let req = HeartbeatRequest { diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index d2a8073408..2beb09859b 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -25,6 +25,7 @@ use common_base::Plugins; use common_config::Configurable; use common_greptimedb_telemetry::GreptimeDBTelemetryTask; use common_grpc::channel_manager; +use common_meta::cache_invalidator::CacheInvalidatorRef; use common_meta::ddl::ProcedureExecutorRef; use common_meta::key::TableMetadataManagerRef; use common_meta::kv_backend::{KvBackendRef, ResettableKvBackend, ResettableKvBackendRef}; @@ -212,6 +213,7 @@ pub struct Context { pub election: Option, pub is_infancy: bool, pub table_metadata_manager: TableMetadataManagerRef, + pub cache_invalidator: CacheInvalidatorRef, } impl Context { @@ -376,6 +378,7 @@ pub struct Metasrv { greptimedb_telemetry_task: Arc, region_migration_manager: RegionMigrationManagerRef, region_supervisor_ticker: Option, + cache_invalidator: CacheInvalidatorRef, plugins: Plugins, } @@ -617,6 +620,7 @@ impl Metasrv { let mailbox = self.mailbox.clone(); let election = self.election.clone(); let table_metadata_manager = self.table_metadata_manager.clone(); + let cache_invalidator = self.cache_invalidator.clone(); Context { server_addr, @@ -628,6 +632,7 @@ impl Metasrv { election, is_infancy: false, table_metadata_manager, + cache_invalidator, } } } diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index b6d1251c72..6b06bab867 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -319,6 +319,7 @@ impl MetasrvBuilder { region_failure_detector_controller.clone(), mailbox.clone(), options.server_addr.clone(), + cache_invalidator.clone(), ), )); region_migration_manager.try_start()?; @@ -346,7 +347,7 @@ impl MetasrvBuilder { DdlManager::try_new( DdlContext { node_manager, - cache_invalidator, + cache_invalidator: cache_invalidator.clone(), memory_region_keeper: memory_region_keeper.clone(), table_metadata_manager: table_metadata_manager.clone(), table_metadata_allocator: table_metadata_allocator.clone(), @@ -434,6 +435,7 @@ impl MetasrvBuilder { memory_region_keeper, region_migration_manager, region_supervisor_ticker, + cache_invalidator, }) } } diff --git a/src/meta-srv/src/procedure/region_migration.rs b/src/meta-srv/src/procedure/region_migration.rs index fe4dec7dda..123a489cd5 100644 --- a/src/meta-srv/src/procedure/region_migration.rs +++ b/src/meta-srv/src/procedure/region_migration.rs @@ -27,10 +27,10 @@ use std::any::Any; use std::fmt::Debug; use std::time::Duration; -use api::v1::meta::MailboxMessage; use common_error::ext::BoxedError; +use common_meta::cache_invalidator::CacheInvalidatorRef; use common_meta::ddl::RegionFailureDetectorControllerRef; -use common_meta::instruction::{CacheIdent, Instruction}; +use common_meta::instruction::CacheIdent; use common_meta::key::datanode_table::{DatanodeTableKey, DatanodeTableValue}; use common_meta::key::table_info::TableInfoValue; use common_meta::key::table_route::TableRouteValue; @@ -52,7 +52,7 @@ use tokio::time::Instant; use self::migration_start::RegionMigrationStart; use crate::error::{self, Result}; -use crate::service::mailbox::{BroadcastChannel, MailboxRef}; +use crate::service::mailbox::MailboxRef; /// It's shared in each step and available even after recovering. /// @@ -158,6 +158,7 @@ pub struct DefaultContextFactory { region_failure_detector_controller: RegionFailureDetectorControllerRef, mailbox: MailboxRef, server_addr: String, + cache_invalidator: CacheInvalidatorRef, } impl DefaultContextFactory { @@ -168,6 +169,7 @@ impl DefaultContextFactory { region_failure_detector_controller: RegionFailureDetectorControllerRef, mailbox: MailboxRef, server_addr: String, + cache_invalidator: CacheInvalidatorRef, ) -> Self { Self { volatile_ctx: VolatileContext::default(), @@ -176,6 +178,7 @@ impl DefaultContextFactory { region_failure_detector_controller, mailbox, server_addr, + cache_invalidator, } } } @@ -190,6 +193,7 @@ impl ContextFactory for DefaultContextFactory { region_failure_detector_controller: self.region_failure_detector_controller, mailbox: self.mailbox, server_addr: self.server_addr, + cache_invalidator: self.cache_invalidator, } } } @@ -203,6 +207,7 @@ pub struct Context { region_failure_detector_controller: RegionFailureDetectorControllerRef, mailbox: MailboxRef, server_addr: String, + cache_invalidator: CacheInvalidatorRef, } impl Context { @@ -356,22 +361,13 @@ impl Context { /// Broadcasts the invalidate table cache message. pub async fn invalidate_table_cache(&self) -> Result<()> { let table_id = self.region_id().table_id(); - let instruction = Instruction::InvalidateCaches(vec![CacheIdent::TableId(table_id)]); - - let msg = &MailboxMessage::json_message( - "Invalidate Table Cache", - &format!("Metasrv@{}", self.server_addr()), - "Frontend broadcast", - common_time::util::current_time_millis(), - &instruction, - ) - .with_context(|_| error::SerializeToJsonSnafu { - input: instruction.to_string(), - })?; - - self.mailbox - .broadcast(&BroadcastChannel::Frontend, msg) - .await + // ignore the result + let ctx = common_meta::cache_invalidator::Context::default(); + let _ = self + .cache_invalidator + .invalidate(&ctx, &[CacheIdent::TableId(table_id)]) + .await; + Ok(()) } } @@ -497,6 +493,7 @@ mod tests { use std::sync::Arc; use common_meta::distributed_time_constants::REGION_LEASE_SECS; + use common_meta::instruction::Instruction; use common_meta::key::test_utils::new_test_table_info; use common_meta::rpc::router::{Region, RegionRoute}; diff --git a/src/meta-srv/src/procedure/region_migration/test_util.rs b/src/meta-srv/src/procedure/region_migration/test_util.rs index 1fb63b4a34..edfb89515f 100644 --- a/src/meta-srv/src/procedure/region_migration/test_util.rs +++ b/src/meta-srv/src/procedure/region_migration/test_util.rs @@ -48,8 +48,10 @@ use super::manager::RegionMigrationProcedureTracker; use super::migration_abort::RegionMigrationAbort; use super::upgrade_candidate_region::UpgradeCandidateRegion; use super::{Context, ContextFactory, DefaultContextFactory, State, VolatileContext}; +use crate::cache_invalidator::MetasrvCacheInvalidator; use crate::error::{self, Error, Result}; use crate::handler::{HeartbeatMailbox, Pusher, Pushers}; +use crate::metasrv::MetasrvInfo; use crate::procedure::region_migration::downgrade_leader_region::DowngradeLeaderRegion; use crate::procedure::region_migration::migration_end::RegionMigrationEnd; use crate::procedure::region_migration::open_candidate_region::OpenCandidateRegion; @@ -152,6 +154,12 @@ impl TestingEnv { mailbox: self.mailbox_ctx.mailbox().clone(), server_addr: self.server_addr.to_string(), region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl), + cache_invalidator: Arc::new(MetasrvCacheInvalidator::new( + self.mailbox_ctx.mailbox.clone(), + MetasrvInfo { + server_addr: self.server_addr.to_string(), + }, + )), } } diff --git a/src/mito2/src/compaction.rs b/src/mito2/src/compaction.rs index 9ee2252f2d..0f33471b21 100644 --- a/src/mito2/src/compaction.rs +++ b/src/mito2/src/compaction.rs @@ -160,8 +160,12 @@ impl CompactionScheduler { self.listener.clone(), ); self.region_status.insert(region_id, status); - self.schedule_compaction_request(request, compact_options) - .await + let result = self + .schedule_compaction_request(request, compact_options) + .await; + + self.listener.on_compaction_scheduled(region_id); + result } /// Notifies the scheduler that the compaction job is finished successfully. diff --git a/src/mito2/src/config.rs b/src/mito2/src/config.rs index 7af36ab896..bbdff81ff6 100644 --- a/src/mito2/src/config.rs +++ b/src/mito2/src/config.rs @@ -122,6 +122,11 @@ pub struct MitoConfig { /// Memtable config pub memtable: MemtableConfig, + + /// Minimum time interval between two compactions. + /// To align with the old behavior, the default value is 0 (no restrictions). + #[serde(with = "humantime_serde")] + pub min_compaction_interval: Duration, } impl Default for MitoConfig { @@ -152,6 +157,7 @@ impl Default for MitoConfig { inverted_index: InvertedIndexConfig::default(), fulltext_index: FulltextIndexConfig::default(), memtable: MemtableConfig::default(), + min_compaction_interval: Duration::from_secs(0), }; // Adjust buffer and cache size according to system memory if we can. diff --git a/src/mito2/src/engine/edit_region_test.rs b/src/mito2/src/engine/edit_region_test.rs index b13691fb85..51f2a976b3 100644 --- a/src/mito2/src/engine/edit_region_test.rs +++ b/src/mito2/src/engine/edit_region_test.rs @@ -15,6 +15,7 @@ use std::sync::{Arc, Mutex}; use std::time::Duration; +use common_time::util::current_time_millis; use object_store::ObjectStore; use store_api::region_engine::RegionEngine; use store_api::region_request::RegionRequest; @@ -22,6 +23,7 @@ use store_api::storage::RegionId; use tokio::sync::{oneshot, Barrier}; use crate::config::MitoConfig; +use crate::engine::flush_test::MockTimeProvider; use crate::engine::listener::EventListener; use crate::engine::MitoEngine; use crate::manifest::action::RegionEdit; @@ -29,6 +31,84 @@ use crate::region::MitoRegionRef; use crate::sst::file::{FileId, FileMeta}; use crate::test_util::{CreateRequestBuilder, TestEnv}; +#[tokio::test] +async fn test_edit_region_schedule_compaction() { + let mut env = TestEnv::new(); + + struct EditRegionListener { + tx: Mutex>>, + } + + impl EventListener for EditRegionListener { + fn on_compaction_scheduled(&self, region_id: RegionId) { + let mut tx = self.tx.lock().unwrap(); + tx.take().unwrap().send(region_id).unwrap(); + } + } + + let (tx, mut rx) = oneshot::channel(); + let config = MitoConfig { + min_compaction_interval: Duration::from_secs(60 * 60), + ..Default::default() + }; + let time_provider = Arc::new(MockTimeProvider::new(current_time_millis())); + let engine = env + .create_engine_with_time( + config.clone(), + None, + Some(Arc::new(EditRegionListener { + tx: Mutex::new(Some(tx)), + })), + time_provider.clone(), + ) + .await; + + let region_id = RegionId::new(1, 1); + engine + .handle_request( + region_id, + RegionRequest::Create(CreateRequestBuilder::new().build()), + ) + .await + .unwrap(); + let region = engine.get_region(region_id).unwrap(); + + let new_edit = || RegionEdit { + files_to_add: vec![FileMeta { + region_id: region.region_id, + file_id: FileId::random(), + level: 0, + ..Default::default() + }], + files_to_remove: vec![], + compaction_time_window: None, + flushed_entry_id: None, + flushed_sequence: None, + }; + engine + .edit_region(region.region_id, new_edit()) + .await + .unwrap(); + // Asserts that the compaction of the region is not scheduled, + // because the minimum time interval between two compactions is not passed. + assert_eq!(rx.try_recv(), Err(oneshot::error::TryRecvError::Empty)); + + // Simulates the time has passed the min compaction interval, + time_provider + .set_now(current_time_millis() + config.min_compaction_interval.as_millis() as i64); + // ... then edits the region again, + engine + .edit_region(region.region_id, new_edit()) + .await + .unwrap(); + // ... finally asserts that the compaction of the region is scheduled. + let actual = tokio::time::timeout(Duration::from_secs(9), rx) + .await + .unwrap() + .unwrap(); + assert_eq!(region_id, actual); +} + #[tokio::test] async fn test_edit_region_fill_cache() { let mut env = TestEnv::new(); @@ -151,7 +231,13 @@ async fn test_edit_region_concurrently() { } let mut env = TestEnv::new(); - let engine = env.create_engine(MitoConfig::default()).await; + let engine = env + .create_engine(MitoConfig { + // Suppress the compaction to not impede the speed of this kinda stress testing. + min_compaction_interval: Duration::from_secs(60 * 60), + ..Default::default() + }) + .await; let region_id = RegionId::new(1, 1); engine diff --git a/src/mito2/src/engine/flush_test.rs b/src/mito2/src/engine/flush_test.rs index 52fb46dfab..aac02db91e 100644 --- a/src/mito2/src/engine/flush_test.rs +++ b/src/mito2/src/engine/flush_test.rs @@ -306,7 +306,7 @@ async fn test_flush_reopen_region(factory: Option) { } #[derive(Debug)] -struct MockTimeProvider { +pub(crate) struct MockTimeProvider { now: AtomicI64, elapsed: AtomicI64, } @@ -326,14 +326,14 @@ impl TimeProvider for MockTimeProvider { } impl MockTimeProvider { - fn new(now: i64) -> Self { + pub(crate) fn new(now: i64) -> Self { Self { now: AtomicI64::new(now), elapsed: AtomicI64::new(0), } } - fn set_now(&self, now: i64) { + pub(crate) fn set_now(&self, now: i64) { self.now.store(now, Ordering::Relaxed); } diff --git a/src/mito2/src/engine/listener.rs b/src/mito2/src/engine/listener.rs index beea4add1e..a79cb6eafd 100644 --- a/src/mito2/src/engine/listener.rs +++ b/src/mito2/src/engine/listener.rs @@ -66,6 +66,9 @@ pub trait EventListener: Send + Sync { /// Notifies the listener that the file cache is filled when, for example, editing region. fn on_file_cache_filled(&self, _file_id: FileId) {} + + /// Notifies the listener that the compaction is scheduled. + fn on_compaction_scheduled(&self, _region_id: RegionId) {} } pub type EventListenerRef = Arc; diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index b13068dbec..dcf5b4395c 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -20,7 +20,7 @@ use std::time::{Duration, Instant}; use common_error::ext::BoxedError; use common_recordbatch::SendableRecordBatchStream; -use common_telemetry::{debug, error, warn}; +use common_telemetry::{debug, error, tracing, warn}; use common_time::range::TimestampRange; use common_time::Timestamp; use datafusion::physical_plan::DisplayFormatType; @@ -62,6 +62,7 @@ pub(crate) enum Scanner { impl Scanner { /// Returns a [SendableRecordBatchStream] to retrieve scan results from all partitions. + #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)] pub(crate) async fn scan(&self) -> Result { match self { Scanner::Seq(seq_scan) => seq_scan.build_stream(), @@ -70,6 +71,7 @@ impl Scanner { } /// Returns a [RegionScanner] to scan the region. + #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)] pub(crate) async fn region_scanner(self) -> Result { match self { Scanner::Seq(seq_scan) => Ok(Box::new(seq_scan)), @@ -284,9 +286,10 @@ impl ScanRegion { .collect(); debug!( - "Scan region {}, request: {:?}, memtables: {}, ssts_to_read: {}, append_mode: {}", + "Scan region {}, request: {:?}, time range: {:?}, memtables: {}, ssts_to_read: {}, append_mode: {}", self.version.metadata.region_id, self.request, + time_range, memtables.len(), files.len(), self.version.options.append_mode, diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index 80fbb3189f..ec5fcf53d3 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -23,7 +23,7 @@ use common_error::ext::BoxedError; use common_recordbatch::error::ExternalSnafu; use common_recordbatch::util::ChainedRecordBatchStream; use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream}; -use common_telemetry::debug; +use common_telemetry::{debug, tracing}; use datafusion::physical_plan::{DisplayAs, DisplayFormatType}; use datatypes::schema::SchemaRef; use smallvec::smallvec; @@ -112,6 +112,7 @@ impl SeqScan { self.semaphore.clone(), &mut metrics, self.compaction, + self.properties.num_partitions(), ) .await?; // Safety: `build_merge_reader()` always returns a reader if partition is None. @@ -184,10 +185,11 @@ impl SeqScan { semaphore: Arc, metrics: &mut ScannerMetrics, compaction: bool, + parallelism: usize, ) -> Result> { // initialize parts list let mut parts = stream_ctx.parts.lock().await; - Self::maybe_init_parts(&stream_ctx.input, &mut parts, metrics).await?; + Self::maybe_init_parts(&stream_ctx.input, &mut parts, metrics, parallelism).await?; let parts_len = parts.0.len(); let mut sources = Vec::with_capacity(parts_len); @@ -211,11 +213,12 @@ impl SeqScan { semaphore: Arc, metrics: &mut ScannerMetrics, compaction: bool, + parallelism: usize, ) -> Result> { let mut sources = Vec::new(); let build_start = { let mut parts = stream_ctx.parts.lock().await; - Self::maybe_init_parts(&stream_ctx.input, &mut parts, metrics).await?; + Self::maybe_init_parts(&stream_ctx.input, &mut parts, metrics, parallelism).await?; let Some(part) = parts.0.get_part(range_id) else { return Ok(None); @@ -245,6 +248,7 @@ impl SeqScan { maybe_reader } + #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)] async fn build_reader_from_sources( stream_ctx: &StreamContext, mut sources: Vec, @@ -311,12 +315,13 @@ impl SeqScan { let semaphore = self.semaphore.clone(); let partition_ranges = self.properties.partitions[partition].clone(); let compaction = self.compaction; + let parallelism = self.properties.num_partitions(); let stream = try_stream! { let first_poll = stream_ctx.query_start.elapsed(); for partition_range in partition_ranges { let maybe_reader = - Self::build_merge_reader(&stream_ctx, partition_range.identifier, semaphore.clone(), &mut metrics, compaction) + Self::build_merge_reader(&stream_ctx, partition_range.identifier, semaphore.clone(), &mut metrics, compaction, parallelism) .await .map_err(BoxedError::new) .context(ExternalSnafu)?; @@ -390,6 +395,7 @@ impl SeqScan { let stream_ctx = self.stream_ctx.clone(); let semaphore = self.semaphore.clone(); let compaction = self.compaction; + let parallelism = self.properties.num_partitions(); // build stream let stream = try_stream! { @@ -398,7 +404,7 @@ impl SeqScan { // init parts let parts_len = { let mut parts = stream_ctx.parts.lock().await; - Self::maybe_init_parts(&stream_ctx.input, &mut parts, &mut metrics).await + Self::maybe_init_parts(&stream_ctx.input, &mut parts, &mut metrics, parallelism).await .map_err(BoxedError::new) .context(ExternalSnafu)?; parts.0.len() @@ -411,6 +417,7 @@ impl SeqScan { semaphore.clone(), &mut metrics, compaction, + parallelism ) .await .map_err(BoxedError::new) @@ -467,6 +474,7 @@ impl SeqScan { input: &ScanInput, part_list: &mut (ScanPartList, Duration), metrics: &mut ScannerMetrics, + parallelism: usize, ) -> Result<()> { if part_list.0.is_none() { let now = Instant::now(); @@ -477,9 +485,7 @@ impl SeqScan { Some(input.mapper.column_ids()), input.predicate.clone(), ); - part_list - .0 - .set_parts(distributor.build_parts(input.parallelism.parallelism)); + part_list.0.set_parts(distributor.build_parts(parallelism)); let build_part_cost = now.elapsed(); part_list.1 = build_part_cost; diff --git a/src/mito2/src/read/unordered_scan.rs b/src/mito2/src/read/unordered_scan.rs index 3bca8e0afe..ec43654e09 100644 --- a/src/mito2/src/read/unordered_scan.rs +++ b/src/mito2/src/read/unordered_scan.rs @@ -59,10 +59,11 @@ impl UnorderedScan { /// Creates a new [UnorderedScan]. pub(crate) fn new(input: ScanInput) -> Self { let parallelism = input.parallelism.parallelism.max(1); - let properties = ScannerProperties::default() + let mut properties = ScannerProperties::default() .with_parallelism(parallelism) .with_append_mode(input.append_mode) .with_total_rows(input.total_rows()); + properties.partitions = vec![input.partition_ranges()]; let stream_ctx = Arc::new(StreamContext::new(input)); Self { @@ -148,12 +149,13 @@ impl RegionScanner for UnorderedScan { ..Default::default() }; let stream_ctx = self.stream_ctx.clone(); + let parallelism = self.properties.num_partitions(); let stream = try_stream! { let first_poll = stream_ctx.query_start.elapsed(); let part = { let mut parts = stream_ctx.parts.lock().await; - maybe_init_parts(&stream_ctx.input, &mut parts, &mut metrics) + maybe_init_parts(&stream_ctx.input, &mut parts, &mut metrics, parallelism) .await .map_err(BoxedError::new) .context(ExternalSnafu)?; @@ -260,6 +262,7 @@ async fn maybe_init_parts( input: &ScanInput, part_list: &mut (ScanPartList, Duration), metrics: &mut ScannerMetrics, + parallelism: usize, ) -> Result<()> { if part_list.0.is_none() { let now = Instant::now(); @@ -270,9 +273,7 @@ async fn maybe_init_parts( Some(input.mapper.column_ids()), input.predicate.clone(), ); - part_list - .0 - .set_parts(distributor.build_parts(input.parallelism.parallelism)); + part_list.0.set_parts(distributor.build_parts(parallelism)); let build_part_cost = now.elapsed(); part_list.1 = build_part_cost; diff --git a/src/mito2/src/region.rs b/src/mito2/src/region.rs index 5f945390f9..086fbef7d0 100644 --- a/src/mito2/src/region.rs +++ b/src/mito2/src/region.rs @@ -102,6 +102,8 @@ pub(crate) struct MitoRegion { pub(crate) provider: Provider, /// Last flush time in millis. last_flush_millis: AtomicI64, + /// Last compaction time in millis. + last_compaction_millis: AtomicI64, /// Provider to get current time. time_provider: TimeProviderRef, /// Memtable builder for the region. @@ -151,6 +153,17 @@ impl MitoRegion { self.last_flush_millis.store(now, Ordering::Relaxed); } + /// Return last compaction time in millis. + pub(crate) fn last_compaction_millis(&self) -> i64 { + self.last_compaction_millis.load(Ordering::Relaxed) + } + + /// Update compaction time to now millis. + pub(crate) fn update_compaction_millis(&self) { + let now = self.time_provider.current_time_millis(); + self.last_compaction_millis.store(now, Ordering::Relaxed); + } + /// Returns the region dir. pub(crate) fn region_dir(&self) -> &str { self.access_layer.region_dir() diff --git a/src/mito2/src/region/opener.rs b/src/mito2/src/region/opener.rs index f20da8f3d6..64272a183b 100644 --- a/src/mito2/src/region/opener.rs +++ b/src/mito2/src/region/opener.rs @@ -49,7 +49,7 @@ use crate::schedule::scheduler::SchedulerRef; use crate::sst::file_purger::LocalFilePurger; use crate::sst::index::intermediate::IntermediateManager; use crate::sst::index::puffin_manager::PuffinManagerFactory; -use crate::time_provider::{StdTimeProvider, TimeProviderRef}; +use crate::time_provider::TimeProviderRef; use crate::wal::entry_reader::WalEntryReader; use crate::wal::{EntryId, Wal}; @@ -66,13 +66,15 @@ pub(crate) struct RegionOpener { skip_wal_replay: bool, puffin_manager_factory: PuffinManagerFactory, intermediate_manager: IntermediateManager, - time_provider: Option, + time_provider: TimeProviderRef, stats: ManifestStats, wal_entry_reader: Option>, } impl RegionOpener { /// Returns a new opener. + // TODO(LFC): Reduce the number of arguments. + #[allow(clippy::too_many_arguments)] pub(crate) fn new( region_id: RegionId, region_dir: &str, @@ -81,6 +83,7 @@ impl RegionOpener { purge_scheduler: SchedulerRef, puffin_manager_factory: PuffinManagerFactory, intermediate_manager: IntermediateManager, + time_provider: TimeProviderRef, ) -> RegionOpener { RegionOpener { region_id, @@ -94,7 +97,7 @@ impl RegionOpener { skip_wal_replay: false, puffin_manager_factory, intermediate_manager, - time_provider: None, + time_provider, stats: Default::default(), wal_entry_reader: None, } @@ -223,9 +226,7 @@ impl RegionOpener { self.puffin_manager_factory, self.intermediate_manager, )); - let time_provider = self - .time_provider - .unwrap_or_else(|| Arc::new(StdTimeProvider)); + let now = self.time_provider.current_time_millis(); Ok(MitoRegion { region_id, @@ -242,8 +243,9 @@ impl RegionOpener { self.cache_manager, )), provider, - last_flush_millis: AtomicI64::new(time_provider.current_time_millis()), - time_provider, + last_flush_millis: AtomicI64::new(now), + last_compaction_millis: AtomicI64::new(now), + time_provider: self.time_provider.clone(), memtable_builder, stats: self.stats, }) @@ -377,10 +379,7 @@ impl RegionOpener { } else { info!("Skip the WAL replay for region: {}", region_id); } - let time_provider = self - .time_provider - .clone() - .unwrap_or_else(|| Arc::new(StdTimeProvider)); + let now = self.time_provider.current_time_millis(); let region = MitoRegion { region_id: self.region_id, @@ -393,8 +392,9 @@ impl RegionOpener { )), file_purger, provider: provider.clone(), - last_flush_millis: AtomicI64::new(time_provider.current_time_millis()), - time_provider, + last_flush_millis: AtomicI64::new(now), + last_compaction_millis: AtomicI64::new(now), + time_provider: self.time_provider.clone(), memtable_builder, stats: self.stats.clone(), }; diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index 242d48c45f..0f872e24e4 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -958,6 +958,13 @@ impl WorkerListener { listener.on_file_cache_filled(_file_id); } } + + pub(crate) fn on_compaction_scheduled(&self, _region_id: RegionId) { + #[cfg(any(test, feature = "test"))] + if let Some(listener) = &self.listener { + listener.on_compaction_scheduled(_region_id); + } + } } #[cfg(test)] diff --git a/src/mito2/src/worker/handle_catchup.rs b/src/mito2/src/worker/handle_catchup.rs index 6d16d72c1c..bee00fae5e 100644 --- a/src/mito2/src/worker/handle_catchup.rs +++ b/src/mito2/src/worker/handle_catchup.rs @@ -56,6 +56,7 @@ impl RegionWorkerLoop { self.purge_scheduler.clone(), self.puffin_manager_factory.clone(), self.intermediate_manager.clone(), + self.time_provider.clone(), ) .cache(Some(self.cache_manager.clone())) .options(region.version().options.clone())? diff --git a/src/mito2/src/worker/handle_compaction.rs b/src/mito2/src/worker/handle_compaction.rs index 080c359784..e088932035 100644 --- a/src/mito2/src/worker/handle_compaction.rs +++ b/src/mito2/src/worker/handle_compaction.rs @@ -12,17 +12,18 @@ // See the License for the specific language governing permissions and // limitations under the License. -use common_telemetry::{error, info}; -use store_api::logstore::LogStore; +use api::v1::region::compact_request; +use common_telemetry::{error, info, warn}; use store_api::region_request::RegionCompactRequest; use store_api::storage::RegionId; use crate::error::RegionNotFoundSnafu; use crate::metrics::COMPACTION_REQUEST_COUNT; +use crate::region::MitoRegionRef; use crate::request::{CompactionFailed, CompactionFinished, OnFailure, OptionOutputTx}; use crate::worker::RegionWorkerLoop; -impl RegionWorkerLoop { +impl RegionWorkerLoop { /// Handles compaction request submitted to region worker. pub(crate) async fn handle_compaction_request( &mut self, @@ -68,6 +69,7 @@ impl RegionWorkerLoop { return; } }; + region.update_compaction_millis(); region .version_control @@ -89,4 +91,30 @@ impl RegionWorkerLoop { self.compaction_scheduler .on_compaction_failed(req.region_id, req.err); } + + /// Schedule compaction for the region if necessary. + pub(crate) async fn schedule_compaction(&mut self, region: &MitoRegionRef) { + let now = self.time_provider.current_time_millis(); + if now - region.last_compaction_millis() + >= self.config.min_compaction_interval.as_millis() as i64 + { + if let Err(e) = self + .compaction_scheduler + .schedule_compaction( + region.region_id, + compact_request::Options::Regular(Default::default()), + ®ion.version_control, + ®ion.access_layer, + OptionOutputTx::none(), + ®ion.manifest_ctx, + ) + .await + { + warn!( + "Failed to schedule compaction for region: {}, err: {}", + region.region_id, e + ); + } + } + } } diff --git a/src/mito2/src/worker/handle_create.rs b/src/mito2/src/worker/handle_create.rs index 863435c7ea..200ed1913f 100644 --- a/src/mito2/src/worker/handle_create.rs +++ b/src/mito2/src/worker/handle_create.rs @@ -62,6 +62,7 @@ impl RegionWorkerLoop { self.purge_scheduler.clone(), self.puffin_manager_factory.clone(), self.intermediate_manager.clone(), + self.time_provider.clone(), ) .metadata(metadata) .parse_options(request.options)? diff --git a/src/mito2/src/worker/handle_flush.rs b/src/mito2/src/worker/handle_flush.rs index 8acb289b24..14a70225bb 100644 --- a/src/mito2/src/worker/handle_flush.rs +++ b/src/mito2/src/worker/handle_flush.rs @@ -16,8 +16,7 @@ use std::sync::Arc; -use api::v1::region::compact_request; -use common_telemetry::{error, info, warn}; +use common_telemetry::{error, info}; use store_api::logstore::LogStore; use store_api::region_request::RegionFlushRequest; use store_api::storage::RegionId; @@ -242,23 +241,7 @@ impl RegionWorkerLoop { self.handle_stalled_requests().await; // Schedules compaction. - if let Err(e) = self - .compaction_scheduler - .schedule_compaction( - region.region_id, - compact_request::Options::Regular(Default::default()), - ®ion.version_control, - ®ion.access_layer, - OptionOutputTx::none(), - ®ion.manifest_ctx, - ) - .await - { - warn!( - "Failed to schedule compaction after flush, region: {}, err: {}", - region.region_id, e - ); - } + self.schedule_compaction(®ion).await; self.listener.on_flush_success(region_id); } diff --git a/src/mito2/src/worker/handle_manifest.rs b/src/mito2/src/worker/handle_manifest.rs index a7ebb219ce..de5f4e563d 100644 --- a/src/mito2/src/worker/handle_manifest.rs +++ b/src/mito2/src/worker/handle_manifest.rs @@ -148,6 +148,9 @@ impl RegionWorkerLoop { } }; + let need_compaction = + edit_result.result.is_ok() && !edit_result.edit.files_to_add.is_empty(); + if edit_result.result.is_ok() { // Applies the edit to the region. region @@ -165,6 +168,10 @@ impl RegionWorkerLoop { self.handle_region_edit(request).await; } } + + if need_compaction { + self.schedule_compaction(®ion).await; + } } /// Writes truncate action to the manifest and then applies it to the region in background. diff --git a/src/mito2/src/worker/handle_open.rs b/src/mito2/src/worker/handle_open.rs index fa4f487040..925e2dc8f5 100644 --- a/src/mito2/src/worker/handle_open.rs +++ b/src/mito2/src/worker/handle_open.rs @@ -97,6 +97,7 @@ impl RegionWorkerLoop { self.purge_scheduler.clone(), self.puffin_manager_factory.clone(), self.intermediate_manager.clone(), + self.time_provider.clone(), ) .skip_wal_replay(request.skip_wal_replay) .cache(Some(self.cache_manager.clone())) diff --git a/src/servers/src/http.rs b/src/servers/src/http.rs index 2b959f91f3..ad4ff52225 100644 --- a/src/servers/src/http.rs +++ b/src/servers/src/http.rs @@ -74,6 +74,7 @@ use crate::query_handler::{ use crate::server::Server; pub mod authorize; +pub mod dyn_log; pub mod event; pub mod handler; pub mod header; @@ -708,6 +709,15 @@ impl HttpServer { authorize::check_http_auth, )), ) + .nest( + "/debug", + Router::new() + // handler for changing log level dynamically + .route( + "/log_level", + routing::get(dyn_log::dyn_log_handler).post(dyn_log::dyn_log_handler), + ), + ) // Handlers for debug, we don't expect a timeout. .nest( &format!("/{HTTP_API_VERSION}/prof"), diff --git a/src/servers/src/http/dyn_log.rs b/src/servers/src/http/dyn_log.rs new file mode 100644 index 0000000000..a34601aff9 --- /dev/null +++ b/src/servers/src/http/dyn_log.rs @@ -0,0 +1,54 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use axum::http::StatusCode; +use axum::response::IntoResponse; +use common_telemetry::tracing_subscriber::filter; +use common_telemetry::{info, RELOAD_HANDLE}; +use snafu::OptionExt; + +use crate::error::{InternalSnafu, InvalidParameterSnafu, Result}; + +#[axum_macros::debug_handler] +pub async fn dyn_log_handler(level: String) -> Result { + let new_filter = level.parse::().map_err(|e| { + InvalidParameterSnafu { + reason: format!("Invalid filter \"{level}\": {e:?}"), + } + .build() + })?; + let mut old_filter = None; + RELOAD_HANDLE + .get() + .context(InternalSnafu { + err_msg: "Reload handle not initialized", + })? + .modify(|filter| { + old_filter = Some(filter.clone()); + *filter = new_filter.clone() + }) + .map_err(|e| { + InternalSnafu { + err_msg: format!("Fail to modify filter: {e:?}"), + } + .build() + })?; + let change_note = format!( + "Log Level changed from {} to {}", + old_filter.map(|f| f.to_string()).unwrap_or_default(), + new_filter + ); + info!("{}", change_note.clone()); + Ok((StatusCode::OK, change_note)) +} diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 4d9c6ceb87..497ea4969c 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -823,8 +823,8 @@ max_retry_times = 3 retry_delay = "500ms" [logging] -enable_otlp_tracing = false append_stdout = true +enable_otlp_tracing = false [[region_engine]] @@ -841,6 +841,7 @@ experimental_write_cache_size = "512MiB" sst_write_buffer_size = "8MiB" parallel_scan_channel_size = 32 allow_stale_entries = false +min_compaction_interval = "0s" [region_engine.mito.index] aux_path = "" @@ -883,6 +884,7 @@ write_interval = "30s" fn drop_lines_with_inconsistent_results(input: String) -> String { let inconsistent_results = [ "dir =", + "log_format =", "data_home =", "bucket =", "root =", diff --git a/tests/cases/standalone/common/function/geo.result b/tests/cases/standalone/common/function/geo.result new file mode 100644 index 0000000000..6d44c3ac04 --- /dev/null +++ b/tests/cases/standalone/common/function/geo.result @@ -0,0 +1,192 @@ +SELECT h3(37.76938, -122.3889, 0); + ++---------------------------------------------------+ +| h3(Float64(37.76938),Float64(-122.3889),Int64(0)) | ++---------------------------------------------------+ +| 8029fffffffffff | ++---------------------------------------------------+ + +SELECT h3(37.76938, -122.3889, 1); + ++---------------------------------------------------+ +| h3(Float64(37.76938),Float64(-122.3889),Int64(1)) | ++---------------------------------------------------+ +| 81283ffffffffff | ++---------------------------------------------------+ + +SELECT h3(37.76938, -122.3889, 8); + ++---------------------------------------------------+ +| h3(Float64(37.76938),Float64(-122.3889),Int64(8)) | ++---------------------------------------------------+ +| 88283082e7fffff | ++---------------------------------------------------+ + +SELECT h3(37.76938, -122.3889, 100); + +Error: 3001(EngineExecuteQuery), H3 error: invalid resolution (got Some(100)): out of range + +SELECT h3(37.76938, -122.3889, -1); + +Error: 3001(EngineExecuteQuery), H3 error: invalid resolution (got Some(255)): out of range + +SELECT h3(37.76938, -122.3889, 8::Int8); + ++---------------------------------------------------+ +| h3(Float64(37.76938),Float64(-122.3889),Int64(8)) | ++---------------------------------------------------+ +| 88283082e7fffff | ++---------------------------------------------------+ + +SELECT h3(37.76938, -122.3889, 8::Int16); + ++-----------------------------------------------------------------------------+ +| h3(Float64(37.76938),Float64(-122.3889),arrow_cast(Int64(8),Utf8("Int16"))) | ++-----------------------------------------------------------------------------+ +| 88283082e7fffff | ++-----------------------------------------------------------------------------+ + +SELECT h3(37.76938, -122.3889, 8::Int32); + ++-----------------------------------------------------------------------------+ +| h3(Float64(37.76938),Float64(-122.3889),arrow_cast(Int64(8),Utf8("Int32"))) | ++-----------------------------------------------------------------------------+ +| 88283082e7fffff | ++-----------------------------------------------------------------------------+ + +SELECT h3(37.76938, -122.3889, 8::Int64); + ++-----------------------------------------------------------------------------+ +| h3(Float64(37.76938),Float64(-122.3889),arrow_cast(Int64(8),Utf8("Int64"))) | ++-----------------------------------------------------------------------------+ +| 88283082e7fffff | ++-----------------------------------------------------------------------------+ + +SELECT h3(37.76938, -122.3889, 8::UInt8); + ++-----------------------------------------------------------------------------+ +| h3(Float64(37.76938),Float64(-122.3889),arrow_cast(Int64(8),Utf8("UInt8"))) | ++-----------------------------------------------------------------------------+ +| 88283082e7fffff | ++-----------------------------------------------------------------------------+ + +SELECT h3(37.76938, -122.3889, 8::UInt16); + ++------------------------------------------------------------------------------+ +| h3(Float64(37.76938),Float64(-122.3889),arrow_cast(Int64(8),Utf8("UInt16"))) | ++------------------------------------------------------------------------------+ +| 88283082e7fffff | ++------------------------------------------------------------------------------+ + +SELECT h3(37.76938, -122.3889, 8::UInt32); + ++------------------------------------------------------------------------------+ +| h3(Float64(37.76938),Float64(-122.3889),arrow_cast(Int64(8),Utf8("UInt32"))) | ++------------------------------------------------------------------------------+ +| 88283082e7fffff | ++------------------------------------------------------------------------------+ + +SELECT h3(37.76938, -122.3889, 8::UInt64); + ++------------------------------------------------------------------------------+ +| h3(Float64(37.76938),Float64(-122.3889),arrow_cast(Int64(8),Utf8("UInt64"))) | ++------------------------------------------------------------------------------+ +| 88283082e7fffff | ++------------------------------------------------------------------------------+ + +SELECT geohash(37.76938, -122.3889, 9); + ++--------------------------------------------------------+ +| geohash(Float64(37.76938),Float64(-122.3889),Int64(9)) | ++--------------------------------------------------------+ +| 9q8yygxne | ++--------------------------------------------------------+ + +SELECT geohash(37.76938, -122.3889, 10); + ++---------------------------------------------------------+ +| geohash(Float64(37.76938),Float64(-122.3889),Int64(10)) | ++---------------------------------------------------------+ +| 9q8yygxnef | ++---------------------------------------------------------+ + +SELECT geohash(37.76938, -122.3889, 11); + ++---------------------------------------------------------+ +| geohash(Float64(37.76938),Float64(-122.3889),Int64(11)) | ++---------------------------------------------------------+ +| 9q8yygxneft | ++---------------------------------------------------------+ + +SELECT geohash(37.76938, -122.3889, 100); + +Error: 3001(EngineExecuteQuery), Geohash error: Invalid length specified: 100. Accepted values are between 1 and 12, inclusive + +SELECT geohash(37.76938, -122.3889, -1); + +Error: 3001(EngineExecuteQuery), Geohash error: Invalid length specified: 18446744073709551615. Accepted values are between 1 and 12, inclusive + +SELECT geohash(37.76938, -122.3889, 11::Int8); + ++---------------------------------------------------------+ +| geohash(Float64(37.76938),Float64(-122.3889),Int64(11)) | ++---------------------------------------------------------+ +| 9q8yygxneft | ++---------------------------------------------------------+ + +SELECT geohash(37.76938, -122.3889, 11::Int16); + ++-----------------------------------------------------------------------------------+ +| geohash(Float64(37.76938),Float64(-122.3889),arrow_cast(Int64(11),Utf8("Int16"))) | ++-----------------------------------------------------------------------------------+ +| 9q8yygxneft | ++-----------------------------------------------------------------------------------+ + +SELECT geohash(37.76938, -122.3889, 11::Int32); + ++-----------------------------------------------------------------------------------+ +| geohash(Float64(37.76938),Float64(-122.3889),arrow_cast(Int64(11),Utf8("Int32"))) | ++-----------------------------------------------------------------------------------+ +| 9q8yygxneft | ++-----------------------------------------------------------------------------------+ + +SELECT geohash(37.76938, -122.3889, 11::Int64); + ++-----------------------------------------------------------------------------------+ +| geohash(Float64(37.76938),Float64(-122.3889),arrow_cast(Int64(11),Utf8("Int64"))) | ++-----------------------------------------------------------------------------------+ +| 9q8yygxneft | ++-----------------------------------------------------------------------------------+ + +SELECT geohash(37.76938, -122.3889, 11::UInt8); + ++-----------------------------------------------------------------------------------+ +| geohash(Float64(37.76938),Float64(-122.3889),arrow_cast(Int64(11),Utf8("UInt8"))) | ++-----------------------------------------------------------------------------------+ +| 9q8yygxneft | ++-----------------------------------------------------------------------------------+ + +SELECT geohash(37.76938, -122.3889, 11::UInt16); + ++------------------------------------------------------------------------------------+ +| geohash(Float64(37.76938),Float64(-122.3889),arrow_cast(Int64(11),Utf8("UInt16"))) | ++------------------------------------------------------------------------------------+ +| 9q8yygxneft | ++------------------------------------------------------------------------------------+ + +SELECT geohash(37.76938, -122.3889, 11::UInt32); + ++------------------------------------------------------------------------------------+ +| geohash(Float64(37.76938),Float64(-122.3889),arrow_cast(Int64(11),Utf8("UInt32"))) | ++------------------------------------------------------------------------------------+ +| 9q8yygxneft | ++------------------------------------------------------------------------------------+ + +SELECT geohash(37.76938, -122.3889, 11::UInt64); + ++------------------------------------------------------------------------------------+ +| geohash(Float64(37.76938),Float64(-122.3889),arrow_cast(Int64(11),Utf8("UInt64"))) | ++------------------------------------------------------------------------------------+ +| 9q8yygxneft | ++------------------------------------------------------------------------------------+ + diff --git a/tests/cases/standalone/common/function/geo.sql b/tests/cases/standalone/common/function/geo.sql new file mode 100644 index 0000000000..8f6f70f4a4 --- /dev/null +++ b/tests/cases/standalone/common/function/geo.sql @@ -0,0 +1,51 @@ +SELECT h3(37.76938, -122.3889, 0); + +SELECT h3(37.76938, -122.3889, 1); + +SELECT h3(37.76938, -122.3889, 8); + +SELECT h3(37.76938, -122.3889, 100); + +SELECT h3(37.76938, -122.3889, -1); + +SELECT h3(37.76938, -122.3889, 8::Int8); + +SELECT h3(37.76938, -122.3889, 8::Int16); + +SELECT h3(37.76938, -122.3889, 8::Int32); + +SELECT h3(37.76938, -122.3889, 8::Int64); + +SELECT h3(37.76938, -122.3889, 8::UInt8); + +SELECT h3(37.76938, -122.3889, 8::UInt16); + +SELECT h3(37.76938, -122.3889, 8::UInt32); + +SELECT h3(37.76938, -122.3889, 8::UInt64); + +SELECT geohash(37.76938, -122.3889, 9); + +SELECT geohash(37.76938, -122.3889, 10); + +SELECT geohash(37.76938, -122.3889, 11); + +SELECT geohash(37.76938, -122.3889, 100); + +SELECT geohash(37.76938, -122.3889, -1); + +SELECT geohash(37.76938, -122.3889, 11::Int8); + +SELECT geohash(37.76938, -122.3889, 11::Int16); + +SELECT geohash(37.76938, -122.3889, 11::Int32); + +SELECT geohash(37.76938, -122.3889, 11::Int64); + +SELECT geohash(37.76938, -122.3889, 11::UInt8); + +SELECT geohash(37.76938, -122.3889, 11::UInt16); + +SELECT geohash(37.76938, -122.3889, 11::UInt32); + +SELECT geohash(37.76938, -122.3889, 11::UInt64); diff --git a/tests/cases/standalone/common/select/flush_append_only.result b/tests/cases/standalone/common/select/flush_append_only.result new file mode 100644 index 0000000000..38f221aa93 --- /dev/null +++ b/tests/cases/standalone/common/select/flush_append_only.result @@ -0,0 +1,108 @@ +create table + t ( + ts timestamp time index, + host string primary key, + not_pk string, + val double, + ) +with + ( + append_mode = 'true', + 'compaction.type' = 'twcs', + 'compaction.twcs.max_active_window_files' = '8', + 'compaction.twcs.max_inactive_window_files' = '8' + ); + +Affected Rows: 0 + +insert into + t +values + (0, 'a', '🌕', 1.0), + (1, 'b', '🌖', 2.0), + (1, 'a', '🌗', 3.0), + (1, 'c', '🌘', 4.0), + (2, 'a', '🌑', 5.0), + (2, 'b', '🌒', 6.0), + (2, 'a', '🌓', 7.0), + (3, 'c', '🌔', 8.0), + (3, 'd', '🌕', 9.0); + +Affected Rows: 9 + +admin flush_table ('t'); + ++------------------------+ +| ADMIN flush_table('t') | ++------------------------+ +| 0 | ++------------------------+ + +insert into + t +values + (10, 'a', '🌕', 1.0), + (11, 'b', '🌖', 2.0), + (11, 'a', '🌗', 3.0), + (11, 'c', '🌘', 4.0), + (12, 'a', '🌑', 5.0), + (12, 'b', '🌒', 6.0), + (12, 'a', '🌓', 7.0), + (13, 'c', '🌔', 8.0), + (13, 'd', '🌕', 9.0); + +Affected Rows: 9 + +admin flush_table ('t'); + ++------------------------+ +| ADMIN flush_table('t') | ++------------------------+ +| 0 | ++------------------------+ + +select + count(ts) +from + t; + ++-------------+ +| COUNT(t.ts) | ++-------------+ +| 18 | ++-------------+ + +select + ts +from + t +order by + ts; + ++-------------------------+ +| ts | ++-------------------------+ +| 1970-01-01T00:00:00 | +| 1970-01-01T00:00:00.001 | +| 1970-01-01T00:00:00.001 | +| 1970-01-01T00:00:00.001 | +| 1970-01-01T00:00:00.002 | +| 1970-01-01T00:00:00.002 | +| 1970-01-01T00:00:00.002 | +| 1970-01-01T00:00:00.003 | +| 1970-01-01T00:00:00.003 | +| 1970-01-01T00:00:00.010 | +| 1970-01-01T00:00:00.011 | +| 1970-01-01T00:00:00.011 | +| 1970-01-01T00:00:00.011 | +| 1970-01-01T00:00:00.012 | +| 1970-01-01T00:00:00.012 | +| 1970-01-01T00:00:00.012 | +| 1970-01-01T00:00:00.013 | +| 1970-01-01T00:00:00.013 | ++-------------------------+ + +drop table t; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/select/flush_append_only.sql b/tests/cases/standalone/common/select/flush_append_only.sql new file mode 100644 index 0000000000..e8d6defea2 --- /dev/null +++ b/tests/cases/standalone/common/select/flush_append_only.sql @@ -0,0 +1,58 @@ +create table + t ( + ts timestamp time index, + host string primary key, + not_pk string, + val double, + ) +with + ( + append_mode = 'true', + 'compaction.type' = 'twcs', + 'compaction.twcs.max_active_window_files' = '8', + 'compaction.twcs.max_inactive_window_files' = '8' + ); + +insert into + t +values + (0, 'a', '🌕', 1.0), + (1, 'b', '🌖', 2.0), + (1, 'a', '🌗', 3.0), + (1, 'c', '🌘', 4.0), + (2, 'a', '🌑', 5.0), + (2, 'b', '🌒', 6.0), + (2, 'a', '🌓', 7.0), + (3, 'c', '🌔', 8.0), + (3, 'd', '🌕', 9.0); + +admin flush_table ('t'); + +insert into + t +values + (10, 'a', '🌕', 1.0), + (11, 'b', '🌖', 2.0), + (11, 'a', '🌗', 3.0), + (11, 'c', '🌘', 4.0), + (12, 'a', '🌑', 5.0), + (12, 'b', '🌒', 6.0), + (12, 'a', '🌓', 7.0), + (13, 'c', '🌔', 8.0), + (13, 'd', '🌕', 9.0); + +admin flush_table ('t'); + +select + count(ts) +from + t; + +select + ts +from + t +order by + ts; + +drop table t;