Compare commits

...

37 Commits

Author SHA1 Message Date
dennis zhuang
03a28320d6 feat!: enable read cache and write cache when using remote object stores (#5093)
* feat: enable read cache and write cache when using remote object stores

* feat: make read cache be aware of remote store names

* chore: docs

* chore: apply review suggestions

* chore: trim write cache path

---------

Co-authored-by: Yingwen <realevenyag@gmail.com>
2024-12-10 04:03:44 +00:00
Lei, HUANG
ce86ba3425 chore: Reduce FETCH_OPTION_TIMEOUT from 10 to 3 seconds in config.rs (#5117)
Reduce FETCH_OPTION_TIMEOUT from 10 to 3 seconds in config.rs
2024-12-09 13:39:18 +00:00
Yingwen
2fcb95f50a fix!: fix regression caused by unbalanced partitions and splitting ranges (#5090)
* feat: assign partition ranges by rows

* feat: balance partition rows

* feat: get uppoer bound for part nums

* feat: only split in non-compaction seq scan

* fix: parallel scan on multiple sources

* fix: can split check

* feat: scanner prepare by request

* feat: remove scan_parallelism

* docs: upate docs

* chore: update comment

* style: fix clippy

* feat: skip merge and dedup if there is only one source

* chore: Revert "feat: skip merge and dedup if there is only one source"

Since memtable won't do dedup jobs

This reverts commit 2fc7a54b11.

* test: avoid compaction in sqlness window sort test

* chore: do not create semaphore if num partitions is enough

* chore: more assertions

* chore: fix typo

* fix: compaction flag not set

* chore: address review comments
2024-12-09 12:50:57 +00:00
ZonaHe
1b642ea6a9 feat: update dashboard to v0.7.1 (#5123)
Co-authored-by: sunchanglong <sunchanglong@users.noreply.github.com>
2024-12-09 10:27:35 +00:00
Weny Xu
b35221ccb6 ci: set meta replicas to 1 (#5111) 2024-12-09 07:22:47 +00:00
Zhenchi
bac7e7bac9 refactor: extract implicit conversion helper functions of vector type (#5118)
refactor: extract implicit conversion helper functions of vector

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
2024-12-09 07:19:00 +00:00
dennis zhuang
903da8f4cb fix: show create table doesn't quote option keys which contains dot (#5108)
* fix: show create table doesn't quote option keys which contains dot

* fix: compile
2024-12-09 03:27:46 +00:00
Ning Sun
c0f498b00c feat: update pgwire to 0.28 (#5113)
* feat: update pgwire to 0.28

* test: update tests
2024-12-09 03:12:11 +00:00
Lin Yihai
19373d806d chore: Add timeout setting for find_ttl. (#5088) 2024-12-06 15:02:15 +00:00
Ning Sun
3133f3fb4e feat: add cursor statements (#5094)
* feat: add sql parsers for cursor operations

* feat: cursor operator

* feat: implement RecordBatchStreamCursor

* feat: implement cursor storage and execution

* test: add tests

* chore: update docstring

* feat: add a temporary sql rewrite for cast in limit

this issue is described in #5097

* test: add more sql for cursor integration test

* feat: reject non-select query for cursor statement

* refactor: address review issues

* test: add empty result case

* feat: address review comments
2024-12-06 09:32:22 +00:00
discord9
8b944268da feat: ttl=0/instant/forever/humantime&ttl refactor (#5089)
* feat: ttl zero filter

* refactor: use TimeToLive enum

* fix: unit test

* tests: sqlness

* refactor: Option<TTL> None means UNSET

* tests: sqlness

* fix: 10000 years --> forever

* chore: minor refactor from reviews

* chore: rename back TimeToLive

* refactor: split imme request from normal requests

* fix: use correct lifetime

* refactor: rename immediate to instant

* tests: flow sink table default ttl

* refactor: per review

* tests: sqlness

* fix: ttl alter to instant

* tests: sqlness

* refactor: per review

* chore: per review

* feat: add db ttl type&forbid instant for db

* tests: more unit test
2024-12-06 09:20:42 +00:00
Ning Sun
dc83b0aa15 feat: add more transaction related statement for postgres interface (#5081)
* fix: add match for start and abort transactions

* feat: add commit transaction statement

* feat: add warning on transaction start

* chore: update message
2024-12-06 08:22:25 +00:00
Weny Xu
2b699e735c chore: correct example config file (#5105)
* chore: correct example config file

* fix: fix unit test
2024-12-06 03:14:08 +00:00
Yingwen
7a3d6f2bd5 docs: remove lg_prof_interval from env (#5103) 2024-12-06 02:59:16 +00:00
Ruihang Xia
f9ebb58a12 fix: put PipelineChecker at the end (#5092)
fix: put PipelineChecker in the end

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2024-12-06 02:10:17 +00:00
ZonaHe
c732016fa0 feat: update dashboard to v0.7.0 (#5100)
Co-authored-by: sunchanglong <sunchanglong@users.noreply.github.com>
2024-12-05 13:42:36 +00:00
Weny Xu
01a308fe6b refactor: relocate CLI to a dedicated directory (#5101)
* refactor: relocate CLI to a dedicated directory

* chore: expose method and const

* refactor: use BoxedError

* chore: expose DatabaseClient

* chore: add clone derive
2024-12-05 12:12:24 +00:00
Ruihang Xia
cf0c84bed1 feat!: remove GET method in /debug path (#5102)
* featremove GET method in \/debug path

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* update how-to document as well

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2024-12-05 08:21:55 +00:00
Yingwen
66c0445974 perf: take a new batch to reduce last row cache usage (#5095)
* feat: take and cache last row to save memory

* style: fix clippy
2024-12-05 03:59:28 +00:00
Ruihang Xia
7d8b256942 refactor: replace LogHandler with PipelineHandler (#5096)
* refactor: replace LogHandler with PipelineHandler

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* change method name

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* rename transform to insert

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2024-12-04 11:48:55 +00:00
Ruihang Xia
5092f5f451 feat: define basic structures and implement TimeFilter (#5086)
* feat: define basic structures and implement TimeFilter

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* document column filter

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* define context

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* change variable name to avoid typo checker

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* change error referring style

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* refine context definition

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2024-12-04 07:39:33 +00:00
dennis zhuang
ff4c153d4b test: adds sqlness test for TTL (#5063)
* test: adds sqlness test for TTL

* chore: restart cluster

* fix: typo

* test: adds database TTL with metric engine tables
2024-12-03 11:32:40 +00:00
Lei, HUANG
a51853846a fix: schema cache invalidation (#5067)
* fix: use SchemaCache to locate database metadata

* main:
 Refactor SchemaMetadataManager to use TableInfoCacheRef

 - Replace TableInfoManagerRef with TableInfoCacheRef in SchemaMetadataManager
 - Update DatanodeBuilder to pass TableInfoCacheRef to SchemaMetadataManager
 - Rename error MissingCacheRegistrySnafu to MissingCacheSnafu in datanode module
 - Adjust tests to use new mock_schema_metadata_manager with TableInfoCacheRef

* fix/schema-cache-invalidation: Add cache module and integrate cache registry into datanode

 • Implement build_datanode_cache_registry function to create cache registry for datanode
 • Integrate cache registry into datanode by modifying DatanodeBuilder and HeartbeatTask
 • Refactor InvalidateTableCacheHandler to InvalidateCacheHandler and move to common-meta crate
 • Update Cargo.toml to include cache as a dev-dependency for datanode
 • Adjust related modules (flownode, frontend, tests-integration, standalone) to use new cache handler and registry
 • Remove obsolete handler module from frontend crate

* fix: fuzz imports

* chore: add some doc for cahce builder functions

* refactor: change table info cache to table schema cache

* fix: remove unused variants

* fix fuzz

* chore: apply suggestion

Co-authored-by: Weny Xu <wenymedia@gmail.com>

* chore: apply suggestion

Co-authored-by: Weny Xu <wenymedia@gmail.com>

* fix: compile

---------

Co-authored-by: dennis zhuang <killme2008@gmail.com>
Co-authored-by: Weny Xu <wenymedia@gmail.com>
2024-12-03 10:44:29 +00:00
Weny Xu
51c6eafb16 feat: recover file cache index asynchronously (#5087)
* feat: recover file cache index asynchronously

* chore: apply suggestions from CR
2024-12-03 09:33:52 +00:00
Weny Xu
5bdea1a755 fix: correct is_exceeded_size_limit behavior for in-memory store (#5082)
* fix: correct `is_exceeded_size_limit` behavior for in-memory store

* chore: rename `MetaClientExceededSizeLimit` to `ResponseExceededSizeLimit`
2024-12-02 12:26:02 +00:00
Ning Sun
bcadce3988 chore: remove openssl deps (#5079)
* chore: remove openssl deps

* ci: add ci task to check blacklisted dependencies
2024-12-02 07:12:15 +00:00
Weny Xu
0f116c8501 feat: enable compression for metasrv client (#5078)
* feat: enable compression for metasrv client

* refactor: simplify gRPC service router registration

* chore: fix unit tests
2024-12-02 05:18:25 +00:00
Ruihang Xia
c049ce6ab1 feat: add decolorize processor (#5065)
* feat: add decolorize processor

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* Update src/pipeline/src/etl/processor/cmcd.rs

* add crate level integration test

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2024-11-29 11:18:02 +00:00
dennis zhuang
6308e86e21 docs: tweak readme and AUTHOR (#5069)
* docs: tweak readme

* chore: links

* fix: typo

* fix: link

* docs: update AUTHOR
2024-11-29 04:08:53 +00:00
Ning Sun
36263830bb refactor: remove built-in apidocs and schemars (#5068)
* feat: feature gate apidocs

* refactor: remove built-in apidocs and schemars

* remove redoc html
2024-11-29 03:31:02 +00:00
discord9
d931389a4c fix(flow): minor fix about count(*)&sink keyword (#5061)
* fix: SiNk

* fix: sink&count(*)

* tests: SiNk

* refactor: per review
2024-11-29 03:06:27 +00:00
Lanqing Yang
8bdef776b3 fix: allow physical region alter region options (#5046)
allow physical region alter region options
2024-11-27 08:24:34 +00:00
Zhenchi
91e933517a chore: bump version of main branch to v0.11.0 (#5057)
chore: bump nightly version to v0.11

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
2024-11-27 02:19:24 +00:00
Lei, HUANG
a617e0dbef feat: use cache kv manager for SchemaMetadataManager (#5053)
* feat: add cache for schema options

* fix/use-cache-kv-manager: Add cache invalidation handling to Datanode's heartbeat task

 • Implement InvalidateSchemaCacheHandler in heartbeat.rs to handle cache invalidation instructions.
 • Update HeartbeatTask constructor to accept cached_kv_backend and pass it to InvalidateSchemaCacheHandler.
 • Modify DatanodeBuilder to clone cached_kv_backend when creating schema_metadata_manager.
 • Refactor MetasrvCacheInvalidator in cache_invalidator.rs to reuse MailboxMessage for broadcasting to different channels.

* fix: only remove schema related cache entries

* chore: add more tests

* fix/use-cache-kv-manager: Moved InvalidateSchemaCacheHandler to a separate module

 • Extracted InvalidateSchemaCacheHandler and associated tests into a new file cache_invalidator.rs
 • Removed async_trait and CacheInvalidator related code from heartbeat.rs
 • Added cache_invalidator module declaration in handler.rs

* fix: unit tests

* fix/use-cache-kv-manager:
 Standardize TODO comment format in CachedKvBackend txn method

* Update src/datanode/src/heartbeat/handler/cache_invalidator.rs

* Update src/datanode/src/heartbeat/handler/cache_invalidator.rs

* Update src/datanode/src/heartbeat/handler/cache_invalidator.rs

---------

Co-authored-by: jeremyhi <jiachun_feng@proton.me>
2024-11-26 12:24:47 +00:00
Yingwen
6130c70b63 fix: pass series row selector to file range reader (#5054) 2024-11-26 11:13:00 +00:00
Lei, HUANG
fae141ad0a fix(metric-engine): set ttl also on opening metadata regions (#5051)
* fix/metric-metadata-region-options: Remove APPEND_MODE_KEY and refactor TTL option handling in MetricEngineInner

* fix/metric-metadata-region-options: Refactor metadata region options into a shared function

 • Extract metadata region options into region_options_for_metadata_region function
 • Replace inline options map with a call to the new shared function in both create.rs and open.rs files

* fix: exclude typos

* fix/metric-metadata-region-options:
 Refactor metadata region options to accept original options and remove APPEND_MODE_KEY
2024-11-26 08:14:41 +00:00
LFC
57f31d14c8 refactor: expose configs for http clients used in object store (#5041) 2024-11-25 03:49:54 +00:00
227 changed files with 7298 additions and 2033 deletions

View File

@@ -8,7 +8,7 @@ inputs:
default: 2
description: "Number of Datanode replicas"
meta-replicas:
default: 3
default: 1
description: "Number of Metasrv replicas"
image-registry:
default: "docker.io"
@@ -58,7 +58,7 @@ runs:
--set image.tag=${{ inputs.image-tag }} \
--set base.podTemplate.main.resources.requests.cpu=50m \
--set base.podTemplate.main.resources.requests.memory=256Mi \
--set base.podTemplate.main.resources.limits.cpu=1000m \
--set base.podTemplate.main.resources.limits.cpu=2000m \
--set base.podTemplate.main.resources.limits.memory=2Gi \
--set frontend.replicas=${{ inputs.frontend-replicas }} \
--set datanode.replicas=${{ inputs.datanode-replicas }} \

2
.github/cargo-blacklist.txt vendored Normal file
View File

@@ -0,0 +1,2 @@
native-tls
openssl

36
.github/workflows/dependency-check.yml vendored Normal file
View File

@@ -0,0 +1,36 @@
name: Check Dependencies
on:
push:
branches:
- main
pull_request:
branches:
- main
jobs:
check-dependencies:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Set up Rust
uses: actions-rust-lang/setup-rust-toolchain@v1
- name: Run cargo tree
run: cargo tree --prefix none > dependencies.txt
- name: Extract dependency names
run: awk '{print $1}' dependencies.txt > dependency_names.txt
- name: Check for blacklisted crates
run: |
while read -r dep; do
if grep -qFx "$dep" dependency_names.txt; then
echo "Blacklisted crate '$dep' found in dependencies."
exit 1
fi
done < .github/cargo-blacklist.txt
echo "No blacklisted crates found."

View File

@@ -7,6 +7,8 @@
* [NiwakaDev](https://github.com/NiwakaDev)
* [etolbakov](https://github.com/etolbakov)
* [irenjj](https://github.com/irenjj)
* [tisonkun](https://github.com/tisonkun)
* [Lanqing Yang](https://github.com/lyang24)
## Team Members (in alphabetical order)
@@ -30,7 +32,6 @@
* [shuiyisong](https://github.com/shuiyisong)
* [sunchanglong](https://github.com/sunchanglong)
* [sunng87](https://github.com/sunng87)
* [tisonkun](https://github.com/tisonkun)
* [v0y4g3r](https://github.com/v0y4g3r)
* [waynexia](https://github.com/waynexia)
* [xtang](https://github.com/xtang)

699
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -4,6 +4,7 @@ members = [
"src/auth",
"src/cache",
"src/catalog",
"src/cli",
"src/client",
"src/cmd",
"src/common/base",
@@ -40,6 +41,7 @@ members = [
"src/flow",
"src/frontend",
"src/index",
"src/log-query",
"src/log-store",
"src/meta-client",
"src/meta-srv",
@@ -66,7 +68,7 @@ members = [
resolver = "2"
[workspace.package]
version = "0.10.1"
version = "0.11.0"
edition = "2021"
license = "Apache-2.0"
@@ -167,7 +169,6 @@ rstest = "0.21"
rstest_reuse = "0.7"
rust_decimal = "1.33"
rustc-hash = "2.0"
schemars = "0.8"
serde = { version = "1.0", features = ["derive"] }
serde_json = { version = "1.0", features = ["float_roundtrip"] }
serde_with = "3"
@@ -200,6 +201,7 @@ api = { path = "src/api" }
auth = { path = "src/auth" }
cache = { path = "src/cache" }
catalog = { path = "src/catalog" }
cli = { path = "src/cli" }
client = { path = "src/client" }
cmd = { path = "src/cmd", default-features = false }
common-base = { path = "src/common/base" }

View File

@@ -56,7 +56,7 @@
- [Project Status](#project-status)
- [Join the community](#community)
- [Contributing](#contributing)
- [Extension](#extension )
- [Tools & Extensions](#tools--extensions)
- [License](#license)
- [Acknowledgement](#acknowledgement)
@@ -66,31 +66,33 @@
## Why GreptimeDB
Our core developers have been building time-series data platforms for years. Based on our best-practices, GreptimeDB is born to give you:
Our core developers have been building time-series data platforms for years. Based on our best practices, GreptimeDB was born to give you:
* **Unified all kinds of time series**
* **Unified Processing of Metrics, Logs, and Events**
GreptimeDB treats all time series as contextual events with timestamp, and thus unifies the processing of metrics, logs, and events. It supports analyzing metrics, logs, and events with SQL and PromQL, and doing streaming with continuous aggregation.
GreptimeDB unifies time series data processing by treating all data - whether metrics, logs, or events - as timestamped events with context. Users can analyze this data using either [SQL](https://docs.greptime.com/user-guide/query-data/sql) or [PromQL](https://docs.greptime.com/user-guide/query-data/promql) and leverage stream processing ([Flow](https://docs.greptime.com/user-guide/continuous-aggregation/overview)) to enable continuous aggregation. [Read more](https://docs.greptime.com/user-guide/concepts/data-model).
* **Cloud-Edge collaboration**
* **Cloud-native Distributed Database**
GreptimeDB can be deployed on ARM architecture-compatible Android/Linux systems as well as cloud environments from various vendors. Both sides run the same software, providing identical APIs and control planes, so your application can run at the edge or on the cloud without modification, and data synchronization also becomes extremely easy and efficient.
* **Cloud-native distributed database**
By leveraging object storage (S3 and others), separating compute and storage, scaling stateless compute nodes arbitrarily, GreptimeDB implements seamless scalability. It also supports cross-cloud deployment with a built-in unified data access layer over different object storages.
Built for [Kubernetes](https://docs.greptime.com/user-guide/deployments/deploy-on-kubernetes/greptimedb-operator-management). GreptimeDB achieves seamless scalability with its [cloud-native architecture](https://docs.greptime.com/user-guide/concepts/architecture) of separated compute and storage, built on object storage (AWS S3, Azure Blob Storage, etc.) while enabling cross-cloud deployment through a unified data access layer.
* **Performance and Cost-effective**
Flexible indexing capabilities and distributed, parallel-processing query engine, tackling high cardinality issues down. Optimized columnar layout for handling time-series data; compacted, compressed, and stored on various storage backends, particularly cloud object storage with 50x cost efficiency.
Written in pure Rust for superior performance and reliability. GreptimeDB features a distributed query engine with intelligent indexing to handle high cardinality data efficiently. Its optimized columnar storage achieves 50x cost efficiency on cloud object storage through advanced compression. [Benchmark reports](https://www.greptime.com/blogs/2024-09-09-report-summary).
* **Compatible with InfluxDB, Prometheus and more protocols**
* **Cloud-Edge Collaboration**
Widely adopted database protocols and APIs, including MySQL, PostgreSQL, and Prometheus Remote Storage, etc. [Read more](https://docs.greptime.com/user-guide/protocols/overview).
GreptimeDB seamlessly operates across cloud and edge (ARM/Android/Linux), providing consistent APIs and control plane for unified data management and efficient synchronization. [Learn how to run on Android](https://docs.greptime.com/user-guide/deployments/run-on-android/).
* **Multi-protocol Ingestion, SQL & PromQL Ready**
Widely adopted database protocols and APIs, including MySQL, PostgreSQL, InfluxDB, OpenTelemetry, Loki and Prometheus, etc. Effortless Adoption & Seamless Migration. [Supported Protocols Overview](https://docs.greptime.com/user-guide/protocols/overview).
For more detailed info please read [Why GreptimeDB](https://docs.greptime.com/user-guide/concepts/why-greptimedb).
## Try GreptimeDB
### 1. [GreptimePlay](https://greptime.com/playground)
### 1. [Live Demo](https://greptime.com/playground)
Try out the features of GreptimeDB right from your browser.
@@ -109,9 +111,18 @@ docker pull greptime/greptimedb
Start a GreptimeDB container with:
```shell
docker run --rm --name greptime --net=host greptime/greptimedb standalone start
docker run -p 127.0.0.1:4000-4003:4000-4003 \
-v "$(pwd)/greptimedb:/tmp/greptimedb" \
--name greptime --rm \
greptime/greptimedb:latest standalone start \
--http-addr 0.0.0.0:4000 \
--rpc-addr 0.0.0.0:4001 \
--mysql-addr 0.0.0.0:4002 \
--postgres-addr 0.0.0.0:4003
```
Access the dashboard via `http://localhost:4000/dashboard`.
Read more about [Installation](https://docs.greptime.com/getting-started/installation/overview) on docs.
## Getting Started
@@ -141,7 +152,7 @@ Run a standalone server:
cargo run -- standalone start
```
## Extension
## Tools & Extensions
### Dashboard
@@ -158,14 +169,19 @@ cargo run -- standalone start
### Grafana Dashboard
Our official Grafana dashboard is available at [grafana](grafana/README.md) directory.
Our official Grafana dashboard for monitoring GreptimeDB is available at [grafana](grafana/README.md) directory.
## Project Status
The current version has not yet reached the standards for General Availability.
According to our Greptime 2024 Roadmap, we aim to achieve a production-level version with the release of v1.0 by the end of 2024. [Join Us](https://github.com/GreptimeTeam/greptimedb/issues/3412)
GreptimeDB is currently in Beta. We are targeting GA (General Availability) with v1.0 release by Early 2025.
We welcome you to test and use GreptimeDB. Some users have already adopted it in their production environments. If you're interested in trying it out, please use the latest stable release available.
While in Beta, GreptimeDB is already:
* Being used in production by early adopters
* Actively maintained with regular releases, [about version number](https://docs.greptime.com/nightly/reference/about-greptimedb-version)
* Suitable for testing and evaluation
For production use, we recommend using the latest stable release.
## Community
@@ -184,12 +200,12 @@ In addition, you may:
- Connect us with [Linkedin](https://www.linkedin.com/company/greptime/)
- Follow us on [Twitter](https://twitter.com/greptime)
## Commerial Support
## Commercial Support
If you are running GreptimeDB OSS in your organization, we offer additional
enterprise addons, installation service, training and consulting. [Contact
enterprise add-ons, installation services, training, and consulting. [Contact
us](https://greptime.com/contactus) and we will reach out to you with more
detail of our commerial license.
detail of our commercial license.
## License

View File

@@ -93,7 +93,7 @@
| `storage` | -- | -- | The data storage options. |
| `storage.data_home` | String | `/tmp/greptimedb/` | The working home directory. |
| `storage.type` | String | `File` | The storage type used to store the data.<br/>- `File`: the data is stored in the local file system.<br/>- `S3`: the data is stored in the S3 object storage.<br/>- `Gcs`: the data is stored in the Google Cloud Storage.<br/>- `Azblob`: the data is stored in the Azure Blob Storage.<br/>- `Oss`: the data is stored in the Aliyun OSS. |
| `storage.cache_path` | String | Unset | Cache configuration for object storage such as 'S3' etc. It is recommended to configure it when using object storage for better performance.<br/>The local file cache directory. |
| `storage.cache_path` | String | Unset | Read cache configuration for object storage such as 'S3' etc, it's configured by default when using object storage. It is recommended to configure it when using object storage for better performance.<br/>A local file directory, defaults to `{data_home}/object_cache/read`. An empty string means disabling. |
| `storage.cache_capacity` | String | Unset | The local file cache capacity in bytes. If your disk space is sufficient, it is recommended to set it larger. |
| `storage.bucket` | String | Unset | The S3 bucket name.<br/>**It's only used when the storage type is `S3`, `Oss` and `Gcs`**. |
| `storage.root` | String | Unset | The S3 data will be stored in the specified prefix, for example, `s3://${bucket}/${root}`.<br/>**It's only used when the storage type is `S3`, `Oss` and `Azblob`**. |
@@ -109,6 +109,11 @@
| `storage.sas_token` | String | Unset | The sas token of the azure account.<br/>**It's only used when the storage type is `Azblob`**. |
| `storage.endpoint` | String | Unset | The endpoint of the S3 service.<br/>**It's only used when the storage type is `S3`, `Oss`, `Gcs` and `Azblob`**. |
| `storage.region` | String | Unset | The region of the S3 service.<br/>**It's only used when the storage type is `S3`, `Oss`, `Gcs` and `Azblob`**. |
| `storage.http_client` | -- | -- | The http client options to the storage.<br/>**It's only used when the storage type is `S3`, `Oss`, `Gcs` and `Azblob`**. |
| `storage.http_client.pool_max_idle_per_host` | Integer | `1024` | The maximum idle connection per host allowed in the pool. |
| `storage.http_client.connect_timeout` | String | `30s` | The timeout for only the connect phase of a http client. |
| `storage.http_client.timeout` | String | `30s` | The total request timeout, applied from when the request starts connecting until the response body has finished.<br/>Also considered a total deadline. |
| `storage.http_client.pool_idle_timeout` | String | `90s` | The timeout for idle sockets being kept-alive. |
| `[[region_engine]]` | -- | -- | The region engine options. You can configure multiple region engines. |
| `region_engine.mito` | -- | -- | The Mito engine options. |
| `region_engine.mito.num_workers` | Integer | `8` | Number of region workers. |
@@ -126,12 +131,11 @@
| `region_engine.mito.vector_cache_size` | String | Auto | Cache size for vectors and arrow arrays. Setting it to 0 to disable the cache.<br/>If not set, it's default to 1/16 of OS memory with a max limitation of 512MB. |
| `region_engine.mito.page_cache_size` | String | Auto | Cache size for pages of SST row groups. Setting it to 0 to disable the cache.<br/>If not set, it's default to 1/8 of OS memory. |
| `region_engine.mito.selector_result_cache_size` | String | Auto | Cache size for time series selector (e.g. `last_value()`). Setting it to 0 to disable the cache.<br/>If not set, it's default to 1/16 of OS memory with a max limitation of 512MB. |
| `region_engine.mito.enable_experimental_write_cache` | Bool | `false` | Whether to enable the experimental write cache. It is recommended to enable it when using object storage for better performance. |
| `region_engine.mito.experimental_write_cache_path` | String | `""` | File system path for write cache, defaults to `{data_home}/write_cache`. |
| `region_engine.mito.experimental_write_cache_size` | String | `1GiB` | Capacity for write cache. If your disk space is sufficient, it is recommended to set it larger. |
| `region_engine.mito.enable_experimental_write_cache` | Bool | `false` | Whether to enable the experimental write cache, it's enabled by default when using object storage. It is recommended to enable it when using object storage for better performance. |
| `region_engine.mito.experimental_write_cache_path` | String | `""` | File system path for write cache, defaults to `{data_home}/object_cache/write`. |
| `region_engine.mito.experimental_write_cache_size` | String | `5GiB` | Capacity for write cache. If your disk space is sufficient, it is recommended to set it larger. |
| `region_engine.mito.experimental_write_cache_ttl` | String | Unset | TTL for write cache. |
| `region_engine.mito.sst_write_buffer_size` | String | `8MB` | Buffer size for SST writing. |
| `region_engine.mito.scan_parallelism` | Integer | `0` | Parallelism to scan a region (default: 1/4 of cpu cores).<br/>- `0`: using the default value (1/4 of cpu cores).<br/>- `1`: scan in current thread.<br/>- `n`: scan in parallelism n. |
| `region_engine.mito.parallel_scan_channel_size` | Integer | `32` | Capacity of the channel to send data from parallel scan tasks to the main task. |
| `region_engine.mito.allow_stale_entries` | Bool | `false` | Whether to allow stale WAL entries read during replay. |
| `region_engine.mito.min_compaction_interval` | String | `0m` | Minimum time interval between two compactions.<br/>To align with the old behavior, the default value is 0 (no restrictions). |
@@ -281,7 +285,7 @@
| `data_home` | String | `/tmp/metasrv/` | The working home directory. |
| `bind_addr` | String | `127.0.0.1:3002` | The bind address of metasrv. |
| `server_addr` | String | `127.0.0.1:3002` | The communication server address for frontend and datanode to connect to metasrv, "127.0.0.1:3002" by default for localhost. |
| `store_addr` | String | `127.0.0.1:2379` | Store server address default to etcd store. |
| `store_addrs` | Array | -- | Store server address default to etcd store. |
| `selector` | String | `round_robin` | Datanode selector type.<br/>- `round_robin` (default value)<br/>- `lease_based`<br/>- `load_based`<br/>For details, please see "https://docs.greptime.com/developer-guide/metasrv/selector". |
| `use_memory_store` | Bool | `false` | Store data in memory. |
| `enable_telemetry` | Bool | `true` | Whether to enable greptimedb telemetry. |
@@ -416,7 +420,7 @@
| `storage` | -- | -- | The data storage options. |
| `storage.data_home` | String | `/tmp/greptimedb/` | The working home directory. |
| `storage.type` | String | `File` | The storage type used to store the data.<br/>- `File`: the data is stored in the local file system.<br/>- `S3`: the data is stored in the S3 object storage.<br/>- `Gcs`: the data is stored in the Google Cloud Storage.<br/>- `Azblob`: the data is stored in the Azure Blob Storage.<br/>- `Oss`: the data is stored in the Aliyun OSS. |
| `storage.cache_path` | String | Unset | Cache configuration for object storage such as 'S3' etc. It is recommended to configure it when using object storage for better performance.<br/>The local file cache directory. |
| `storage.cache_path` | String | Unset | Read cache configuration for object storage such as 'S3' etc, it's configured by default when using object storage. It is recommended to configure it when using object storage for better performance.<br/>A local file directory, defaults to `{data_home}/object_cache/read`. An empty string means disabling. |
| `storage.cache_capacity` | String | Unset | The local file cache capacity in bytes. If your disk space is sufficient, it is recommended to set it larger. |
| `storage.bucket` | String | Unset | The S3 bucket name.<br/>**It's only used when the storage type is `S3`, `Oss` and `Gcs`**. |
| `storage.root` | String | Unset | The S3 data will be stored in the specified prefix, for example, `s3://${bucket}/${root}`.<br/>**It's only used when the storage type is `S3`, `Oss` and `Azblob`**. |
@@ -432,6 +436,11 @@
| `storage.sas_token` | String | Unset | The sas token of the azure account.<br/>**It's only used when the storage type is `Azblob`**. |
| `storage.endpoint` | String | Unset | The endpoint of the S3 service.<br/>**It's only used when the storage type is `S3`, `Oss`, `Gcs` and `Azblob`**. |
| `storage.region` | String | Unset | The region of the S3 service.<br/>**It's only used when the storage type is `S3`, `Oss`, `Gcs` and `Azblob`**. |
| `storage.http_client` | -- | -- | The http client options to the storage.<br/>**It's only used when the storage type is `S3`, `Oss`, `Gcs` and `Azblob`**. |
| `storage.http_client.pool_max_idle_per_host` | Integer | `1024` | The maximum idle connection per host allowed in the pool. |
| `storage.http_client.connect_timeout` | String | `30s` | The timeout for only the connect phase of a http client. |
| `storage.http_client.timeout` | String | `30s` | The total request timeout, applied from when the request starts connecting until the response body has finished.<br/>Also considered a total deadline. |
| `storage.http_client.pool_idle_timeout` | String | `90s` | The timeout for idle sockets being kept-alive. |
| `[[region_engine]]` | -- | -- | The region engine options. You can configure multiple region engines. |
| `region_engine.mito` | -- | -- | The Mito engine options. |
| `region_engine.mito.num_workers` | Integer | `8` | Number of region workers. |
@@ -449,12 +458,11 @@
| `region_engine.mito.vector_cache_size` | String | Auto | Cache size for vectors and arrow arrays. Setting it to 0 to disable the cache.<br/>If not set, it's default to 1/16 of OS memory with a max limitation of 512MB. |
| `region_engine.mito.page_cache_size` | String | Auto | Cache size for pages of SST row groups. Setting it to 0 to disable the cache.<br/>If not set, it's default to 1/8 of OS memory. |
| `region_engine.mito.selector_result_cache_size` | String | Auto | Cache size for time series selector (e.g. `last_value()`). Setting it to 0 to disable the cache.<br/>If not set, it's default to 1/16 of OS memory with a max limitation of 512MB. |
| `region_engine.mito.enable_experimental_write_cache` | Bool | `false` | Whether to enable the experimental write cache. It is recommended to enable it when using object storage for better performance. |
| `region_engine.mito.experimental_write_cache_path` | String | `""` | File system path for write cache, defaults to `{data_home}/write_cache`. |
| `region_engine.mito.experimental_write_cache_size` | String | `1GiB` | Capacity for write cache. If your disk space is sufficient, it is recommended to set it larger. |
| `region_engine.mito.enable_experimental_write_cache` | Bool | `false` | Whether to enable the experimental write cache, it's enabled by default when using object storage. It is recommended to enable it when using object storage for better performance. |
| `region_engine.mito.experimental_write_cache_path` | String | `""` | File system path for write cache, defaults to `{data_home}/object_cache/write`. |
| `region_engine.mito.experimental_write_cache_size` | String | `5GiB` | Capacity for write cache. If your disk space is sufficient, it is recommended to set it larger. |
| `region_engine.mito.experimental_write_cache_ttl` | String | Unset | TTL for write cache. |
| `region_engine.mito.sst_write_buffer_size` | String | `8MB` | Buffer size for SST writing. |
| `region_engine.mito.scan_parallelism` | Integer | `0` | Parallelism to scan a region (default: 1/4 of cpu cores).<br/>- `0`: using the default value (1/4 of cpu cores).<br/>- `1`: scan in current thread.<br/>- `n`: scan in parallelism n. |
| `region_engine.mito.parallel_scan_channel_size` | Integer | `32` | Capacity of the channel to send data from parallel scan tasks to the main task. |
| `region_engine.mito.allow_stale_entries` | Bool | `false` | Whether to allow stale WAL entries read during replay. |
| `region_engine.mito.min_compaction_interval` | String | `0m` | Minimum time interval between two compactions.<br/>To align with the old behavior, the default value is 0 (no restrictions). |

View File

@@ -294,14 +294,14 @@ data_home = "/tmp/greptimedb/"
## - `Oss`: the data is stored in the Aliyun OSS.
type = "File"
## Cache configuration for object storage such as 'S3' etc. It is recommended to configure it when using object storage for better performance.
## The local file cache directory.
## Read cache configuration for object storage such as 'S3' etc, it's configured by default when using object storage. It is recommended to configure it when using object storage for better performance.
## A local file directory, defaults to `{data_home}/object_cache/read`. An empty string means disabling.
## @toml2docs:none-default
cache_path = "/path/local_cache"
#+ cache_path = ""
## The local file cache capacity in bytes. If your disk space is sufficient, it is recommended to set it larger.
## @toml2docs:none-default
cache_capacity = "1GiB"
cache_capacity = "5GiB"
## The S3 bucket name.
## **It's only used when the storage type is `S3`, `Oss` and `Gcs`**.
@@ -375,6 +375,23 @@ endpoint = "https://s3.amazonaws.com"
## @toml2docs:none-default
region = "us-west-2"
## The http client options to the storage.
## **It's only used when the storage type is `S3`, `Oss`, `Gcs` and `Azblob`**.
[storage.http_client]
## The maximum idle connection per host allowed in the pool.
pool_max_idle_per_host = 1024
## The timeout for only the connect phase of a http client.
connect_timeout = "30s"
## The total request timeout, applied from when the request starts connecting until the response body has finished.
## Also considered a total deadline.
timeout = "30s"
## The timeout for idle sockets being kept-alive.
pool_idle_timeout = "90s"
# Custom storage options
# [[storage.providers]]
# name = "S3"
@@ -459,14 +476,14 @@ auto_flush_interval = "1h"
## @toml2docs:none-default="Auto"
#+ selector_result_cache_size = "512MB"
## Whether to enable the experimental write cache. It is recommended to enable it when using object storage for better performance.
## Whether to enable the experimental write cache, it's enabled by default when using object storage. It is recommended to enable it when using object storage for better performance.
enable_experimental_write_cache = false
## File system path for write cache, defaults to `{data_home}/write_cache`.
## File system path for write cache, defaults to `{data_home}/object_cache/write`.
experimental_write_cache_path = ""
## Capacity for write cache. If your disk space is sufficient, it is recommended to set it larger.
experimental_write_cache_size = "1GiB"
experimental_write_cache_size = "5GiB"
## TTL for write cache.
## @toml2docs:none-default
@@ -475,12 +492,6 @@ experimental_write_cache_ttl = "8h"
## Buffer size for SST writing.
sst_write_buffer_size = "8MB"
## Parallelism to scan a region (default: 1/4 of cpu cores).
## - `0`: using the default value (1/4 of cpu cores).
## - `1`: scan in current thread.
## - `n`: scan in parallelism n.
scan_parallelism = 0
## Capacity of the channel to send data from parallel scan tasks to the main task.
parallel_scan_channel_size = 32

View File

@@ -8,7 +8,7 @@ bind_addr = "127.0.0.1:3002"
server_addr = "127.0.0.1:3002"
## Store server address default to etcd store.
store_addr = "127.0.0.1:2379"
store_addrs = ["127.0.0.1:2379"]
## Datanode selector type.
## - `round_robin` (default value)

View File

@@ -332,14 +332,14 @@ data_home = "/tmp/greptimedb/"
## - `Oss`: the data is stored in the Aliyun OSS.
type = "File"
## Cache configuration for object storage such as 'S3' etc. It is recommended to configure it when using object storage for better performance.
## The local file cache directory.
## Read cache configuration for object storage such as 'S3' etc, it's configured by default when using object storage. It is recommended to configure it when using object storage for better performance.
## A local file directory, defaults to `{data_home}/object_cache/read`. An empty string means disabling.
## @toml2docs:none-default
cache_path = "/path/local_cache"
#+ cache_path = ""
## The local file cache capacity in bytes. If your disk space is sufficient, it is recommended to set it larger.
## @toml2docs:none-default
cache_capacity = "1GiB"
cache_capacity = "5GiB"
## The S3 bucket name.
## **It's only used when the storage type is `S3`, `Oss` and `Gcs`**.
@@ -413,6 +413,23 @@ endpoint = "https://s3.amazonaws.com"
## @toml2docs:none-default
region = "us-west-2"
## The http client options to the storage.
## **It's only used when the storage type is `S3`, `Oss`, `Gcs` and `Azblob`**.
[storage.http_client]
## The maximum idle connection per host allowed in the pool.
pool_max_idle_per_host = 1024
## The timeout for only the connect phase of a http client.
connect_timeout = "30s"
## The total request timeout, applied from when the request starts connecting until the response body has finished.
## Also considered a total deadline.
timeout = "30s"
## The timeout for idle sockets being kept-alive.
pool_idle_timeout = "90s"
# Custom storage options
# [[storage.providers]]
# name = "S3"
@@ -497,14 +514,14 @@ auto_flush_interval = "1h"
## @toml2docs:none-default="Auto"
#+ selector_result_cache_size = "512MB"
## Whether to enable the experimental write cache. It is recommended to enable it when using object storage for better performance.
## Whether to enable the experimental write cache, it's enabled by default when using object storage. It is recommended to enable it when using object storage for better performance.
enable_experimental_write_cache = false
## File system path for write cache, defaults to `{data_home}/write_cache`.
## File system path for write cache, defaults to `{data_home}/object_cache/write`.
experimental_write_cache_path = ""
## Capacity for write cache. If your disk space is sufficient, it is recommended to set it larger.
experimental_write_cache_size = "1GiB"
experimental_write_cache_size = "5GiB"
## TTL for write cache.
## @toml2docs:none-default
@@ -513,12 +530,6 @@ experimental_write_cache_ttl = "8h"
## Buffer size for SST writing.
sst_write_buffer_size = "8MB"
## Parallelism to scan a region (default: 1/4 of cpu cores).
## - `0`: using the default value (1/4 of cpu cores).
## - `1`: scan in current thread.
## - `n`: scan in parallelism n.
scan_parallelism = 0
## Capacity of the channel to send data from parallel scan tasks to the main task.
parallel_scan_channel_size = 32

View File

@@ -3,7 +3,7 @@
## HTTP API
Sample at 99 Hertz, for 5 seconds, output report in [protobuf format](https://github.com/google/pprof/blob/master/proto/profile.proto).
```bash
curl -s '0:4000/debug/prof/cpu' > /tmp/pprof.out
curl -X POST -s '0:4000/debug/prof/cpu' > /tmp/pprof.out
```
Then you can use `pprof` command with the protobuf file.
@@ -13,10 +13,10 @@ go tool pprof -top /tmp/pprof.out
Sample at 99 Hertz, for 60 seconds, output report in flamegraph format.
```bash
curl -s '0:4000/debug/prof/cpu?seconds=60&output=flamegraph' > /tmp/pprof.svg
curl -X POST -s '0:4000/debug/prof/cpu?seconds=60&output=flamegraph' > /tmp/pprof.svg
```
Sample at 49 Hertz, for 10 seconds, output report in text format.
```bash
curl -s '0:4000/debug/prof/cpu?seconds=10&frequency=49&output=text' > /tmp/pprof.txt
curl -X POST -s '0:4000/debug/prof/cpu?seconds=10&frequency=49&output=text' > /tmp/pprof.txt
```

View File

@@ -23,13 +23,13 @@ curl https://raw.githubusercontent.com/brendangregg/FlameGraph/master/flamegraph
Start GreptimeDB instance with environment variables:
```bash
MALLOC_CONF=prof:true,lg_prof_interval:28 ./target/debug/greptime standalone start
MALLOC_CONF=prof:true ./target/debug/greptime standalone start
```
Dump memory profiling data through HTTP API:
```bash
curl localhost:4000/debug/prof/mem > greptime.hprof
curl -X POST localhost:4000/debug/prof/mem > greptime.hprof
```
You can periodically dump profiling data and compare them to find the delta memory usage.

62
src/cache/src/lib.rs vendored
View File

@@ -19,9 +19,9 @@ use std::time::Duration;
use catalog::kvbackend::new_table_cache;
use common_meta::cache::{
new_table_flownode_set_cache, new_table_info_cache, new_table_name_cache,
new_table_route_cache, new_view_info_cache, CacheRegistry, CacheRegistryBuilder,
LayeredCacheRegistryBuilder,
new_schema_cache, new_table_flownode_set_cache, new_table_info_cache, new_table_name_cache,
new_table_route_cache, new_table_schema_cache, new_view_info_cache, CacheRegistry,
CacheRegistryBuilder, LayeredCacheRegistryBuilder,
};
use common_meta::kv_backend::KvBackendRef;
use moka::future::CacheBuilder;
@@ -37,9 +37,47 @@ pub const TABLE_INFO_CACHE_NAME: &str = "table_info_cache";
pub const VIEW_INFO_CACHE_NAME: &str = "view_info_cache";
pub const TABLE_NAME_CACHE_NAME: &str = "table_name_cache";
pub const TABLE_CACHE_NAME: &str = "table_cache";
pub const SCHEMA_CACHE_NAME: &str = "schema_cache";
pub const TABLE_SCHEMA_NAME_CACHE_NAME: &str = "table_schema_name_cache";
pub const TABLE_FLOWNODE_SET_CACHE_NAME: &str = "table_flownode_set_cache";
pub const TABLE_ROUTE_CACHE_NAME: &str = "table_route_cache";
/// Builds cache registry for datanode, including:
/// - Schema cache.
/// - Table id to schema name cache.
pub fn build_datanode_cache_registry(kv_backend: KvBackendRef) -> CacheRegistry {
// Builds table id schema name cache that never expires.
let cache = CacheBuilder::new(DEFAULT_CACHE_MAX_CAPACITY).build();
let table_id_schema_cache = Arc::new(new_table_schema_cache(
TABLE_SCHEMA_NAME_CACHE_NAME.to_string(),
cache,
kv_backend.clone(),
));
// Builds schema cache
let cache = CacheBuilder::new(DEFAULT_CACHE_MAX_CAPACITY)
.time_to_live(DEFAULT_CACHE_TTL)
.time_to_idle(DEFAULT_CACHE_TTI)
.build();
let schema_cache = Arc::new(new_schema_cache(
SCHEMA_CACHE_NAME.to_string(),
cache,
kv_backend.clone(),
));
CacheRegistryBuilder::default()
.add_cache(table_id_schema_cache)
.add_cache(schema_cache)
.build()
}
/// Builds cache registry for frontend and datanode, including:
/// - Table info cache
/// - Table name cache
/// - Table route cache
/// - Table flow node cache
/// - View cache
/// - Schema cache
pub fn build_fundamental_cache_registry(kv_backend: KvBackendRef) -> CacheRegistry {
// Builds table info cache
let cache = CacheBuilder::new(DEFAULT_CACHE_MAX_CAPACITY)
@@ -95,12 +133,30 @@ pub fn build_fundamental_cache_registry(kv_backend: KvBackendRef) -> CacheRegist
kv_backend.clone(),
));
// Builds schema cache
let cache = CacheBuilder::new(DEFAULT_CACHE_MAX_CAPACITY)
.time_to_live(DEFAULT_CACHE_TTL)
.time_to_idle(DEFAULT_CACHE_TTI)
.build();
let schema_cache = Arc::new(new_schema_cache(
SCHEMA_CACHE_NAME.to_string(),
cache,
kv_backend.clone(),
));
let table_id_schema_cache = Arc::new(new_table_schema_cache(
TABLE_SCHEMA_NAME_CACHE_NAME.to_string(),
CacheBuilder::new(DEFAULT_CACHE_MAX_CAPACITY).build(),
kv_backend,
));
CacheRegistryBuilder::default()
.add_cache(table_info_cache)
.add_cache(table_name_cache)
.add_cache(table_route_cache)
.add_cache(view_info_cache)
.add_cache(table_flownode_set_cache)
.add_cache(schema_cache)
.add_cache(table_id_schema_cache)
.build()
}

View File

@@ -0,0 +1,92 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::meta::ProcedureStatus;
use common_error::ext::BoxedError;
use common_meta::cluster::{ClusterInfo, NodeInfo};
use common_meta::datanode::RegionStat;
use common_meta::ddl::{ExecutorContext, ProcedureExecutor};
use common_meta::rpc::procedure;
use common_procedure::{ProcedureInfo, ProcedureState};
use meta_client::MetaClientRef;
use snafu::ResultExt;
use crate::error;
use crate::information_schema::InformationExtension;
pub struct DistributedInformationExtension {
meta_client: MetaClientRef,
}
impl DistributedInformationExtension {
pub fn new(meta_client: MetaClientRef) -> Self {
Self { meta_client }
}
}
#[async_trait::async_trait]
impl InformationExtension for DistributedInformationExtension {
type Error = crate::error::Error;
async fn nodes(&self) -> std::result::Result<Vec<NodeInfo>, Self::Error> {
self.meta_client
.list_nodes(None)
.await
.map_err(BoxedError::new)
.context(error::ListNodesSnafu)
}
async fn procedures(&self) -> std::result::Result<Vec<(String, ProcedureInfo)>, Self::Error> {
let procedures = self
.meta_client
.list_procedures(&ExecutorContext::default())
.await
.map_err(BoxedError::new)
.context(error::ListProceduresSnafu)?
.procedures;
let mut result = Vec::with_capacity(procedures.len());
for procedure in procedures {
let pid = match procedure.id {
Some(pid) => pid,
None => return error::ProcedureIdNotFoundSnafu {}.fail(),
};
let pid = procedure::pb_pid_to_pid(&pid)
.map_err(BoxedError::new)
.context(error::ConvertProtoDataSnafu)?;
let status = ProcedureStatus::try_from(procedure.status)
.map(|v| v.as_str_name())
.unwrap_or("Unknown")
.to_string();
let procedure_info = ProcedureInfo {
id: pid,
type_name: procedure.type_name,
start_time_ms: procedure.start_time_ms,
end_time_ms: procedure.end_time_ms,
state: ProcedureState::Running,
lock_keys: procedure.lock_keys,
};
result.push((status, procedure_info));
}
Ok(result)
}
async fn region_stats(&self) -> std::result::Result<Vec<RegionStat>, Self::Error> {
self.meta_client
.list_region_stats()
.await
.map_err(BoxedError::new)
.context(error::ListRegionStatsSnafu)
}
}

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub use client::{CachedMetaKvBackend, CachedMetaKvBackendBuilder, MetaKvBackend};
pub use client::{CachedKvBackend, CachedKvBackendBuilder, MetaKvBackend};
mod client;
mod manager;

View File

@@ -22,6 +22,7 @@ use common_error::ext::BoxedError;
use common_meta::cache_invalidator::KvCacheInvalidator;
use common_meta::error::Error::CacheNotGet;
use common_meta::error::{CacheNotGetSnafu, Error, ExternalSnafu, GetKvCacheSnafu, Result};
use common_meta::kv_backend::txn::{Txn, TxnResponse};
use common_meta::kv_backend::{KvBackend, KvBackendRef, TxnService};
use common_meta::rpc::store::{
BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
@@ -42,20 +43,20 @@ const DEFAULT_CACHE_MAX_CAPACITY: u64 = 10000;
const DEFAULT_CACHE_TTL: Duration = Duration::from_secs(10 * 60);
const DEFAULT_CACHE_TTI: Duration = Duration::from_secs(5 * 60);
pub struct CachedMetaKvBackendBuilder {
pub struct CachedKvBackendBuilder {
cache_max_capacity: Option<u64>,
cache_ttl: Option<Duration>,
cache_tti: Option<Duration>,
meta_client: Arc<MetaClient>,
inner: KvBackendRef,
}
impl CachedMetaKvBackendBuilder {
pub fn new(meta_client: Arc<MetaClient>) -> Self {
impl CachedKvBackendBuilder {
pub fn new(inner: KvBackendRef) -> Self {
Self {
cache_max_capacity: None,
cache_ttl: None,
cache_tti: None,
meta_client,
inner,
}
}
@@ -74,7 +75,7 @@ impl CachedMetaKvBackendBuilder {
self
}
pub fn build(self) -> CachedMetaKvBackend {
pub fn build(self) -> CachedKvBackend {
let cache_max_capacity = self
.cache_max_capacity
.unwrap_or(DEFAULT_CACHE_MAX_CAPACITY);
@@ -85,14 +86,11 @@ impl CachedMetaKvBackendBuilder {
.time_to_live(cache_ttl)
.time_to_idle(cache_tti)
.build();
let kv_backend = Arc::new(MetaKvBackend {
client: self.meta_client,
});
let kv_backend = self.inner;
let name = format!("CachedKvBackend({})", kv_backend.name());
let version = AtomicUsize::new(0);
CachedMetaKvBackend {
CachedKvBackend {
kv_backend,
cache,
name,
@@ -112,19 +110,29 @@ pub type CacheBackend = Cache<Vec<u8>, KeyValue>;
/// Therefore, it is recommended to use CachedMetaKvBackend to only read metadata related
/// information. Note: If you read other information, you may read expired data, which depends on
/// TTL and TTI for cache.
pub struct CachedMetaKvBackend {
pub struct CachedKvBackend {
kv_backend: KvBackendRef,
cache: CacheBackend,
name: String,
version: AtomicUsize,
}
impl TxnService for CachedMetaKvBackend {
#[async_trait::async_trait]
impl TxnService for CachedKvBackend {
type Error = Error;
async fn txn(&self, txn: Txn) -> std::result::Result<TxnResponse, Self::Error> {
// TODO(hl): txn of CachedKvBackend simply pass through to inner backend without invalidating caches.
self.kv_backend.txn(txn).await
}
fn max_txn_ops(&self) -> usize {
self.kv_backend.max_txn_ops()
}
}
#[async_trait::async_trait]
impl KvBackend for CachedMetaKvBackend {
impl KvBackend for CachedKvBackend {
fn name(&self) -> &str {
&self.name
}
@@ -305,7 +313,7 @@ impl KvBackend for CachedMetaKvBackend {
}
#[async_trait::async_trait]
impl KvCacheInvalidator for CachedMetaKvBackend {
impl KvCacheInvalidator for CachedKvBackend {
async fn invalidate_key(&self, key: &[u8]) {
self.create_new_version();
self.cache.invalidate(key).await;
@@ -313,7 +321,7 @@ impl KvCacheInvalidator for CachedMetaKvBackend {
}
}
impl CachedMetaKvBackend {
impl CachedKvBackend {
// only for test
#[cfg(test)]
fn wrap(kv_backend: KvBackendRef) -> Self {
@@ -466,7 +474,7 @@ mod tests {
use common_meta::rpc::KeyValue;
use dashmap::DashMap;
use super::CachedMetaKvBackend;
use super::CachedKvBackend;
#[derive(Default)]
pub struct SimpleKvBackend {
@@ -540,7 +548,7 @@ mod tests {
async fn test_cached_kv_backend() {
let simple_kv = Arc::new(SimpleKvBackend::default());
let get_execute_times = simple_kv.get_execute_times.clone();
let cached_kv = CachedMetaKvBackend::wrap(simple_kv);
let cached_kv = CachedKvBackend::wrap(simple_kv);
add_some_vals(&cached_kv).await;

View File

@@ -30,6 +30,7 @@ use table::TableRef;
use crate::error::Result;
pub mod error;
pub mod information_extension;
pub mod kvbackend;
pub mod memory;
mod metrics;

65
src/cli/Cargo.toml Normal file
View File

@@ -0,0 +1,65 @@
[package]
name = "cli"
version.workspace = true
edition.workspace = true
license.workspace = true
[lints]
workspace = true
[dependencies]
async-trait.workspace = true
auth.workspace = true
base64.workspace = true
cache.workspace = true
catalog.workspace = true
chrono.workspace = true
clap.workspace = true
client.workspace = true
common-base.workspace = true
common-catalog.workspace = true
common-config.workspace = true
common-error.workspace = true
common-grpc.workspace = true
common-macro.workspace = true
common-meta.workspace = true
common-options.workspace = true
common-procedure.workspace = true
common-query.workspace = true
common-recordbatch.workspace = true
common-runtime.workspace = true
common-telemetry = { workspace = true, features = [
"deadlock_detection",
] }
common-time.workspace = true
common-version.workspace = true
common-wal.workspace = true
datatypes.workspace = true
either = "1.8"
etcd-client.workspace = true
futures.workspace = true
humantime.workspace = true
meta-client.workspace = true
nu-ansi-term = "0.46"
query.workspace = true
rand.workspace = true
reqwest.workspace = true
rustyline = "10.1"
serde.workspace = true
serde_json.workspace = true
servers.workspace = true
session.workspace = true
snafu.workspace = true
store-api.workspace = true
substrait.workspace = true
table.workspace = true
tokio.workspace = true
tracing-appender.workspace = true
[dev-dependencies]
client = { workspace = true, features = ["testing"] }
common-test-util.workspace = true
common-version.workspace = true
serde.workspace = true
temp-env = "0.3"
tempfile.workspace = true

View File

@@ -19,6 +19,7 @@ use std::time::Duration;
use async_trait::async_trait;
use clap::Parser;
use common_error::ext::BoxedError;
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::etcd::EtcdStore;
use common_meta::peer::Peer;
@@ -30,11 +31,9 @@ use rand::Rng;
use store_api::storage::RegionNumber;
use table::metadata::{RawTableInfo, RawTableMeta, TableId, TableIdent, TableType};
use table::table_name::TableName;
use tracing_appender::non_blocking::WorkerGuard;
use self::metadata::TableMetadataBencher;
use crate::cli::{Instance, Tool};
use crate::error::Result;
use crate::Tool;
mod metadata;
@@ -62,7 +61,7 @@ pub struct BenchTableMetadataCommand {
}
impl BenchTableMetadataCommand {
pub async fn build(&self, guard: Vec<WorkerGuard>) -> Result<Instance> {
pub async fn build(&self) -> std::result::Result<Box<dyn Tool>, BoxedError> {
let etcd_store = EtcdStore::with_endpoints([&self.etcd_addr], 128)
.await
.unwrap();
@@ -73,7 +72,7 @@ impl BenchTableMetadataCommand {
table_metadata_manager,
count: self.count,
};
Ok(Instance::new(Box::new(tool), guard))
Ok(Box::new(tool))
}
}
@@ -84,7 +83,7 @@ struct BenchTableMetadata {
#[async_trait]
impl Tool for BenchTableMetadata {
async fn do_work(&self) -> Result<()> {
async fn do_work(&self) -> std::result::Result<(), BoxedError> {
let bencher = TableMetadataBencher::new(self.table_metadata_manager.clone(), self.count);
bencher.bench_create().await;
bencher.bench_get().await;

View File

@@ -18,7 +18,7 @@ use common_meta::key::table_route::TableRouteValue;
use common_meta::key::TableMetadataManagerRef;
use table::table_name::TableName;
use crate::cli::bench::{
use crate::bench::{
bench_self_recorded, create_region_routes, create_region_wal_options, create_table_info,
};

View File

@@ -26,7 +26,8 @@ use snafu::ResultExt;
use crate::error::{HttpQuerySqlSnafu, Result, SerdeJsonSnafu};
pub(crate) struct DatabaseClient {
#[derive(Debug, Clone)]
pub struct DatabaseClient {
addr: String,
catalog: String,
auth_header: Option<String>,

316
src/cli/src/error.rs Normal file
View File

@@ -0,0 +1,316 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use common_error::ext::{BoxedError, ErrorExt};
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use rustyline::error::ReadlineError;
use snafu::{Location, Snafu};
#[derive(Snafu)]
#[snafu(visibility(pub))]
#[stack_trace_debug]
pub enum Error {
#[snafu(display("Failed to install ring crypto provider: {}", msg))]
InitTlsProvider {
#[snafu(implicit)]
location: Location,
msg: String,
},
#[snafu(display("Failed to create default catalog and schema"))]
InitMetadata {
#[snafu(implicit)]
location: Location,
source: common_meta::error::Error,
},
#[snafu(display("Failed to init DDL manager"))]
InitDdlManager {
#[snafu(implicit)]
location: Location,
source: common_meta::error::Error,
},
#[snafu(display("Failed to init default timezone"))]
InitTimezone {
#[snafu(implicit)]
location: Location,
source: common_time::error::Error,
},
#[snafu(display("Failed to start procedure manager"))]
StartProcedureManager {
#[snafu(implicit)]
location: Location,
source: common_procedure::error::Error,
},
#[snafu(display("Failed to stop procedure manager"))]
StopProcedureManager {
#[snafu(implicit)]
location: Location,
source: common_procedure::error::Error,
},
#[snafu(display("Failed to start wal options allocator"))]
StartWalOptionsAllocator {
#[snafu(implicit)]
location: Location,
source: common_meta::error::Error,
},
#[snafu(display("Missing config, msg: {}", msg))]
MissingConfig {
msg: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Illegal config: {}", msg))]
IllegalConfig {
msg: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Invalid REPL command: {reason}"))]
InvalidReplCommand { reason: String },
#[snafu(display("Cannot create REPL"))]
ReplCreation {
#[snafu(source)]
error: ReadlineError,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Error reading command"))]
Readline {
#[snafu(source)]
error: ReadlineError,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to request database, sql: {sql}"))]
RequestDatabase {
sql: String,
#[snafu(source)]
source: client::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to collect RecordBatches"))]
CollectRecordBatches {
#[snafu(implicit)]
location: Location,
source: common_recordbatch::error::Error,
},
#[snafu(display("Failed to pretty print Recordbatches"))]
PrettyPrintRecordBatches {
#[snafu(implicit)]
location: Location,
source: common_recordbatch::error::Error,
},
#[snafu(display("Failed to start Meta client"))]
StartMetaClient {
#[snafu(implicit)]
location: Location,
source: meta_client::error::Error,
},
#[snafu(display("Failed to parse SQL: {}", sql))]
ParseSql {
sql: String,
#[snafu(implicit)]
location: Location,
source: query::error::Error,
},
#[snafu(display("Failed to plan statement"))]
PlanStatement {
#[snafu(implicit)]
location: Location,
source: query::error::Error,
},
#[snafu(display("Failed to encode logical plan in substrait"))]
SubstraitEncodeLogicalPlan {
#[snafu(implicit)]
location: Location,
source: substrait::error::Error,
},
#[snafu(display("Failed to load layered config"))]
LoadLayeredConfig {
#[snafu(source(from(common_config::error::Error, Box::new)))]
source: Box<common_config::error::Error>,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to connect to Etcd at {etcd_addr}"))]
ConnectEtcd {
etcd_addr: String,
#[snafu(source)]
error: etcd_client::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to serde json"))]
SerdeJson {
#[snafu(source)]
error: serde_json::error::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to run http request: {reason}"))]
HttpQuerySql {
reason: String,
#[snafu(source)]
error: reqwest::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Empty result from output"))]
EmptyResult {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to manipulate file"))]
FileIo {
#[snafu(implicit)]
location: Location,
#[snafu(source)]
error: std::io::Error,
},
#[snafu(display("Failed to create directory {}", dir))]
CreateDir {
dir: String,
#[snafu(source)]
error: std::io::Error,
},
#[snafu(display("Failed to spawn thread"))]
SpawnThread {
#[snafu(source)]
error: std::io::Error,
},
#[snafu(display("Other error"))]
Other {
source: BoxedError,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to build runtime"))]
BuildRuntime {
#[snafu(implicit)]
location: Location,
source: common_runtime::error::Error,
},
#[snafu(display("Failed to get cache from cache registry: {}", name))]
CacheRequired {
#[snafu(implicit)]
location: Location,
name: String,
},
#[snafu(display("Failed to build cache registry"))]
BuildCacheRegistry {
#[snafu(implicit)]
location: Location,
source: cache::error::Error,
},
#[snafu(display("Failed to initialize meta client"))]
MetaClientInit {
#[snafu(implicit)]
location: Location,
source: meta_client::error::Error,
},
#[snafu(display("Cannot find schema {schema} in catalog {catalog}"))]
SchemaNotFound {
catalog: String,
schema: String,
#[snafu(implicit)]
location: Location,
},
}
pub type Result<T> = std::result::Result<T, Error>;
impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
match self {
Error::InitMetadata { source, .. } | Error::InitDdlManager { source, .. } => {
source.status_code()
}
Error::MissingConfig { .. }
| Error::LoadLayeredConfig { .. }
| Error::IllegalConfig { .. }
| Error::InvalidReplCommand { .. }
| Error::InitTimezone { .. }
| Error::ConnectEtcd { .. }
| Error::CreateDir { .. }
| Error::EmptyResult { .. } => StatusCode::InvalidArguments,
Error::StartProcedureManager { source, .. }
| Error::StopProcedureManager { source, .. } => source.status_code(),
Error::StartWalOptionsAllocator { source, .. } => source.status_code(),
Error::ReplCreation { .. } | Error::Readline { .. } | Error::HttpQuerySql { .. } => {
StatusCode::Internal
}
Error::RequestDatabase { source, .. } => source.status_code(),
Error::CollectRecordBatches { source, .. }
| Error::PrettyPrintRecordBatches { source, .. } => source.status_code(),
Error::StartMetaClient { source, .. } => source.status_code(),
Error::ParseSql { source, .. } | Error::PlanStatement { source, .. } => {
source.status_code()
}
Error::SubstraitEncodeLogicalPlan { source, .. } => source.status_code(),
Error::SerdeJson { .. }
| Error::FileIo { .. }
| Error::SpawnThread { .. }
| Error::InitTlsProvider { .. } => StatusCode::Unexpected,
Error::Other { source, .. } => source.status_code(),
Error::BuildRuntime { source, .. } => source.status_code(),
Error::CacheRequired { .. } | Error::BuildCacheRegistry { .. } => StatusCode::Internal,
Error::MetaClientInit { source, .. } => source.status_code(),
Error::SchemaNotFound { .. } => StatusCode::DatabaseNotFound,
}
}
fn as_any(&self) -> &dyn Any {
self
}
}

View File

@@ -19,6 +19,7 @@ use std::time::Duration;
use async_trait::async_trait;
use clap::{Parser, ValueEnum};
use common_error::ext::BoxedError;
use common_telemetry::{debug, error, info};
use serde_json::Value;
use snafu::{OptionExt, ResultExt};
@@ -26,11 +27,10 @@ use tokio::fs::File;
use tokio::io::{AsyncWriteExt, BufWriter};
use tokio::sync::Semaphore;
use tokio::time::Instant;
use tracing_appender::non_blocking::WorkerGuard;
use crate::cli::database::DatabaseClient;
use crate::cli::{database, Instance, Tool};
use crate::database::DatabaseClient;
use crate::error::{EmptyResultSnafu, Error, FileIoSnafu, Result, SchemaNotFoundSnafu};
use crate::{database, Tool};
type TableReference = (String, String, String);
@@ -94,8 +94,9 @@ pub struct ExportCommand {
}
impl ExportCommand {
pub async fn build(&self, guard: Vec<WorkerGuard>) -> Result<Instance> {
let (catalog, schema) = database::split_database(&self.database)?;
pub async fn build(&self) -> std::result::Result<Box<dyn Tool>, BoxedError> {
let (catalog, schema) =
database::split_database(&self.database).map_err(BoxedError::new)?;
let database_client = DatabaseClient::new(
self.addr.clone(),
@@ -105,19 +106,16 @@ impl ExportCommand {
self.timeout.unwrap_or_default(),
);
Ok(Instance::new(
Box::new(Export {
catalog,
schema,
database_client,
output_dir: self.output_dir.clone(),
parallelism: self.export_jobs,
target: self.target.clone(),
start_time: self.start_time.clone(),
end_time: self.end_time.clone(),
}),
guard,
))
Ok(Box::new(Export {
catalog,
schema,
database_client,
output_dir: self.output_dir.clone(),
parallelism: self.export_jobs,
target: self.target.clone(),
start_time: self.start_time.clone(),
end_time: self.end_time.clone(),
}))
}
}
@@ -465,97 +463,22 @@ impl Export {
#[async_trait]
impl Tool for Export {
async fn do_work(&self) -> Result<()> {
async fn do_work(&self) -> std::result::Result<(), BoxedError> {
match self.target {
ExportTarget::Schema => {
self.export_create_database().await?;
self.export_create_table().await
self.export_create_database()
.await
.map_err(BoxedError::new)?;
self.export_create_table().await.map_err(BoxedError::new)
}
ExportTarget::Data => self.export_database_data().await,
ExportTarget::Data => self.export_database_data().await.map_err(BoxedError::new),
ExportTarget::All => {
self.export_create_database().await?;
self.export_create_table().await?;
self.export_database_data().await
self.export_create_database()
.await
.map_err(BoxedError::new)?;
self.export_create_table().await.map_err(BoxedError::new)?;
self.export_database_data().await.map_err(BoxedError::new)
}
}
}
}
#[cfg(test)]
mod tests {
use clap::Parser;
use client::{Client, Database};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_telemetry::logging::LoggingOptions;
use crate::error::Result as CmdResult;
use crate::options::GlobalOptions;
use crate::{cli, standalone, App};
#[tokio::test(flavor = "multi_thread")]
async fn test_export_create_table_with_quoted_names() -> CmdResult<()> {
let output_dir = tempfile::tempdir().unwrap();
let standalone = standalone::Command::parse_from([
"standalone",
"start",
"--data-home",
&*output_dir.path().to_string_lossy(),
]);
let standalone_opts = standalone.load_options(&GlobalOptions::default()).unwrap();
let mut instance = standalone.build(standalone_opts).await?;
instance.start().await?;
let client = Client::with_urls(["127.0.0.1:4001"]);
let database = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client);
database
.sql(r#"CREATE DATABASE "cli.export.create_table";"#)
.await
.unwrap();
database
.sql(
r#"CREATE TABLE "cli.export.create_table"."a.b.c"(
ts TIMESTAMP,
TIME INDEX (ts)
) engine=mito;
"#,
)
.await
.unwrap();
let output_dir = tempfile::tempdir().unwrap();
let cli = cli::Command::parse_from([
"cli",
"export",
"--addr",
"127.0.0.1:4000",
"--output-dir",
&*output_dir.path().to_string_lossy(),
"--target",
"schema",
]);
let mut cli_app = cli.build(LoggingOptions::default()).await?;
cli_app.start().await?;
instance.stop().await?;
let output_file = output_dir
.path()
.join("greptime")
.join("cli.export.create_table")
.join("create_tables.sql");
let res = std::fs::read_to_string(output_file).unwrap();
let expect = r#"CREATE TABLE IF NOT EXISTS "a.b.c" (
"ts" TIMESTAMP(3) NOT NULL,
TIME INDEX ("ts")
)
ENGINE=mito
;
"#;
assert_eq!(res.trim(), expect.trim());
Ok(())
}
}

View File

@@ -19,7 +19,7 @@ use rustyline::highlight::{Highlighter, MatchingBracketHighlighter};
use rustyline::hint::{Hinter, HistoryHinter};
use rustyline::validate::{ValidationContext, ValidationResult, Validator};
use crate::cli::cmd::ReplCommand;
use crate::cmd::ReplCommand;
pub(crate) struct RustylineHelper {
hinter: HistoryHinter,

View File

@@ -19,15 +19,15 @@ use std::time::Duration;
use async_trait::async_trait;
use clap::{Parser, ValueEnum};
use common_catalog::consts::DEFAULT_SCHEMA_NAME;
use common_error::ext::BoxedError;
use common_telemetry::{error, info, warn};
use snafu::{OptionExt, ResultExt};
use tokio::sync::Semaphore;
use tokio::time::Instant;
use tracing_appender::non_blocking::WorkerGuard;
use crate::cli::database::DatabaseClient;
use crate::cli::{database, Instance, Tool};
use crate::database::DatabaseClient;
use crate::error::{Error, FileIoSnafu, Result, SchemaNotFoundSnafu};
use crate::{database, Tool};
#[derive(Debug, Default, Clone, ValueEnum)]
enum ImportTarget {
@@ -79,8 +79,9 @@ pub struct ImportCommand {
}
impl ImportCommand {
pub async fn build(&self, guard: Vec<WorkerGuard>) -> Result<Instance> {
let (catalog, schema) = database::split_database(&self.database)?;
pub async fn build(&self) -> std::result::Result<Box<dyn Tool>, BoxedError> {
let (catalog, schema) =
database::split_database(&self.database).map_err(BoxedError::new)?;
let database_client = DatabaseClient::new(
self.addr.clone(),
catalog.clone(),
@@ -89,17 +90,14 @@ impl ImportCommand {
self.timeout.unwrap_or_default(),
);
Ok(Instance::new(
Box::new(Import {
catalog,
schema,
database_client,
input_dir: self.input_dir.clone(),
parallelism: self.import_jobs,
target: self.target.clone(),
}),
guard,
))
Ok(Box::new(Import {
catalog,
schema,
database_client,
input_dir: self.input_dir.clone(),
parallelism: self.import_jobs,
target: self.target.clone(),
}))
}
}
@@ -218,13 +216,13 @@ impl Import {
#[async_trait]
impl Tool for Import {
async fn do_work(&self) -> Result<()> {
async fn do_work(&self) -> std::result::Result<(), BoxedError> {
match self.target {
ImportTarget::Schema => self.import_create_table().await,
ImportTarget::Data => self.import_database_data().await,
ImportTarget::Schema => self.import_create_table().await.map_err(BoxedError::new),
ImportTarget::Data => self.import_database_data().await.map_err(BoxedError::new),
ImportTarget::All => {
self.import_create_table().await?;
self.import_database_data().await
self.import_create_table().await.map_err(BoxedError::new)?;
self.import_database_data().await.map_err(BoxedError::new)
}
}
}

60
src/cli/src/lib.rs Normal file
View File

@@ -0,0 +1,60 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
mod bench;
pub mod error;
// Wait for https://github.com/GreptimeTeam/greptimedb/issues/2373
#[allow(unused)]
mod cmd;
mod export;
mod helper;
// Wait for https://github.com/GreptimeTeam/greptimedb/issues/2373
mod database;
mod import;
#[allow(unused)]
mod repl;
use async_trait::async_trait;
use clap::Parser;
use common_error::ext::BoxedError;
pub use database::DatabaseClient;
use error::Result;
pub use repl::Repl;
pub use crate::bench::BenchTableMetadataCommand;
pub use crate::export::ExportCommand;
pub use crate::import::ImportCommand;
#[async_trait]
pub trait Tool: Send + Sync {
async fn do_work(&self) -> std::result::Result<(), BoxedError>;
}
#[derive(Debug, Parser)]
pub(crate) struct AttachCommand {
#[clap(long)]
pub(crate) grpc_addr: String,
#[clap(long)]
pub(crate) meta_addr: Option<String>,
#[clap(long, action)]
pub(crate) disable_helper: bool,
}
impl AttachCommand {
#[allow(dead_code)]
async fn build(self) -> Result<Box<dyn Tool>> {
unimplemented!("Wait for https://github.com/GreptimeTeam/greptimedb/issues/2373")
}
}

View File

@@ -20,14 +20,16 @@ use cache::{
build_fundamental_cache_registry, with_default_composite_cache_registry, TABLE_CACHE_NAME,
TABLE_ROUTE_CACHE_NAME,
};
use catalog::information_extension::DistributedInformationExtension;
use catalog::kvbackend::{
CachedMetaKvBackend, CachedMetaKvBackendBuilder, KvBackendCatalogManager, MetaKvBackend,
CachedKvBackend, CachedKvBackendBuilder, KvBackendCatalogManager, MetaKvBackend,
};
use client::{Client, Database, OutputData, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_base::Plugins;
use common_config::Mode;
use common_error::ext::ErrorExt;
use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder};
use common_meta::kv_backend::KvBackendRef;
use common_query::Output;
use common_recordbatch::RecordBatches;
use common_telemetry::debug;
@@ -43,15 +45,14 @@ use session::context::QueryContext;
use snafu::{OptionExt, ResultExt};
use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
use crate::cli::cmd::ReplCommand;
use crate::cli::helper::RustylineHelper;
use crate::cli::AttachCommand;
use crate::cmd::ReplCommand;
use crate::error::{
CollectRecordBatchesSnafu, ParseSqlSnafu, PlanStatementSnafu, PrettyPrintRecordBatchesSnafu,
ReadlineSnafu, ReplCreationSnafu, RequestDatabaseSnafu, Result, StartMetaClientSnafu,
SubstraitEncodeLogicalPlanSnafu,
};
use crate::{error, DistributedInformationExtension};
use crate::helper::RustylineHelper;
use crate::{error, AttachCommand};
/// Captures the state of the repl, gathers commands and executes them one by one
pub struct Repl {
@@ -258,8 +259,9 @@ async fn create_query_engine(meta_addr: &str) -> Result<DatafusionQueryEngine> {
.context(StartMetaClientSnafu)?;
let meta_client = Arc::new(meta_client);
let cached_meta_backend =
Arc::new(CachedMetaKvBackendBuilder::new(meta_client.clone()).build());
let cached_meta_backend = Arc::new(
CachedKvBackendBuilder::new(Arc::new(MetaKvBackend::new(meta_client.clone()))).build(),
);
let layered_cache_builder = LayeredCacheRegistryBuilder::default().add_cache_registry(
CacheRegistryBuilder::default()
.add_cache(cached_meta_backend.clone())

View File

@@ -25,6 +25,7 @@ cache.workspace = true
catalog.workspace = true
chrono.workspace = true
clap.workspace = true
cli.workspace = true
client.workspace = true
common-base.workspace = true
common-catalog.workspace = true

View File

@@ -12,39 +12,17 @@
// See the License for the specific language governing permissions and
// limitations under the License.
mod bench;
// Wait for https://github.com/GreptimeTeam/greptimedb/issues/2373
#[allow(unused)]
mod cmd;
mod export;
mod helper;
// Wait for https://github.com/GreptimeTeam/greptimedb/issues/2373
mod database;
mod import;
#[allow(unused)]
mod repl;
use async_trait::async_trait;
use bench::BenchTableMetadataCommand;
use clap::Parser;
use cli::Tool;
use common_telemetry::logging::{LoggingOptions, TracingOptions};
pub use repl::Repl;
use plugins::SubCommand;
use snafu::ResultExt;
use tracing_appender::non_blocking::WorkerGuard;
use self::export::ExportCommand;
use crate::cli::import::ImportCommand;
use crate::error::Result;
use crate::options::GlobalOptions;
use crate::App;
use crate::{error, App, Result};
pub const APP_NAME: &str = "greptime-cli";
#[async_trait]
pub trait Tool: Send + Sync {
async fn do_work(&self) -> Result<()>;
}
use async_trait::async_trait;
pub struct Instance {
tool: Box<dyn Tool>,
@@ -54,12 +32,16 @@ pub struct Instance {
}
impl Instance {
fn new(tool: Box<dyn Tool>, guard: Vec<WorkerGuard>) -> Self {
pub fn new(tool: Box<dyn Tool>, guard: Vec<WorkerGuard>) -> Self {
Self {
tool,
_guard: guard,
}
}
pub async fn start(&mut self) -> Result<()> {
self.tool.do_work().await.context(error::StartCliSnafu)
}
}
#[async_trait]
@@ -69,7 +51,8 @@ impl App for Instance {
}
async fn start(&mut self) -> Result<()> {
self.tool.do_work().await
self.start().await.unwrap();
Ok(())
}
fn wait_signal(&self) -> bool {
@@ -96,7 +79,12 @@ impl Command {
None,
);
self.cmd.build(guard).await
let tool = self.cmd.build().await.context(error::BuildCliSnafu)?;
let instance = Instance {
tool,
_guard: guard,
};
Ok(instance)
}
pub fn load_options(&self, global_options: &GlobalOptions) -> Result<LoggingOptions> {
@@ -112,38 +100,81 @@ impl Command {
}
}
#[derive(Parser)]
enum SubCommand {
// Attach(AttachCommand),
Bench(BenchTableMetadataCommand),
Export(ExportCommand),
Import(ImportCommand),
}
#[cfg(test)]
mod tests {
use clap::Parser;
use client::{Client, Database};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_telemetry::logging::LoggingOptions;
impl SubCommand {
async fn build(&self, guard: Vec<WorkerGuard>) -> Result<Instance> {
match self {
// SubCommand::Attach(cmd) => cmd.build().await,
SubCommand::Bench(cmd) => cmd.build(guard).await,
SubCommand::Export(cmd) => cmd.build(guard).await,
SubCommand::Import(cmd) => cmd.build(guard).await,
}
}
}
#[derive(Debug, Parser)]
pub(crate) struct AttachCommand {
#[clap(long)]
pub(crate) grpc_addr: String,
#[clap(long)]
pub(crate) meta_addr: Option<String>,
#[clap(long, action)]
pub(crate) disable_helper: bool,
}
impl AttachCommand {
#[allow(dead_code)]
async fn build(self) -> Result<Instance> {
unimplemented!("Wait for https://github.com/GreptimeTeam/greptimedb/issues/2373")
use crate::error::Result as CmdResult;
use crate::options::GlobalOptions;
use crate::{cli, standalone, App};
#[tokio::test(flavor = "multi_thread")]
async fn test_export_create_table_with_quoted_names() -> CmdResult<()> {
let output_dir = tempfile::tempdir().unwrap();
let standalone = standalone::Command::parse_from([
"standalone",
"start",
"--data-home",
&*output_dir.path().to_string_lossy(),
]);
let standalone_opts = standalone.load_options(&GlobalOptions::default()).unwrap();
let mut instance = standalone.build(standalone_opts).await?;
instance.start().await?;
let client = Client::with_urls(["127.0.0.1:4001"]);
let database = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client);
database
.sql(r#"CREATE DATABASE "cli.export.create_table";"#)
.await
.unwrap();
database
.sql(
r#"CREATE TABLE "cli.export.create_table"."a.b.c"(
ts TIMESTAMP,
TIME INDEX (ts)
) engine=mito;
"#,
)
.await
.unwrap();
let output_dir = tempfile::tempdir().unwrap();
let cli = cli::Command::parse_from([
"cli",
"export",
"--addr",
"127.0.0.1:4000",
"--output-dir",
&*output_dir.path().to_string_lossy(),
"--target",
"schema",
]);
let mut cli_app = cli.build(LoggingOptions::default()).await?;
cli_app.start().await?;
instance.stop().await?;
let output_file = output_dir
.path()
.join("greptime")
.join("cli.export.create_table")
.join("create_tables.sql");
let res = std::fs::read_to_string(output_file).unwrap();
let expect = r#"CREATE TABLE IF NOT EXISTS "a.b.c" (
"ts" TIMESTAMP(3) NOT NULL,
TIME INDEX ("ts")
)
ENGINE=mito
;
"#;
assert_eq!(res.trim(), expect.trim());
Ok(())
}
}

View File

@@ -16,10 +16,12 @@ use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use cache::build_datanode_cache_registry;
use catalog::kvbackend::MetaKvBackend;
use clap::Parser;
use common_base::Plugins;
use common_config::Configurable;
use common_meta::cache::LayeredCacheRegistryBuilder;
use common_telemetry::logging::TracingOptions;
use common_telemetry::{info, warn};
use common_version::{short_version, version};
@@ -300,9 +302,17 @@ impl StartCommand {
client: meta_client.clone(),
});
// Builds cache registry for datanode.
let layered_cache_registry = Arc::new(
LayeredCacheRegistryBuilder::default()
.add_cache_registry(build_datanode_cache_registry(meta_backend.clone()))
.build(),
);
let mut datanode = DatanodeBuilder::new(opts.clone(), plugins)
.with_meta_client(meta_client)
.with_kv_backend(meta_backend)
.with_cache_registry(layered_cache_registry)
.build()
.await
.context(StartDatanodeSnafu)?;

View File

@@ -114,6 +114,20 @@ pub enum Error {
source: frontend::error::Error,
},
#[snafu(display("Failed to build cli"))]
BuildCli {
#[snafu(implicit)]
location: Location,
source: BoxedError,
},
#[snafu(display("Failed to start cli"))]
StartCli {
#[snafu(implicit)]
location: Location,
source: BoxedError,
},
#[snafu(display("Failed to build meta server"))]
BuildMetaServer {
#[snafu(implicit)]
@@ -346,6 +360,8 @@ impl ErrorExt for Error {
Error::ShutdownMetaServer { source, .. } => source.status_code(),
Error::BuildMetaServer { source, .. } => source.status_code(),
Error::UnsupportedSelectorType { source, .. } => source.status_code(),
Error::BuildCli { source, .. } => source.status_code(),
Error::StartCli { source, .. } => source.status_code(),
Error::InitMetadata { source, .. } | Error::InitDdlManager { source, .. } => {
source.status_code()

View File

@@ -15,13 +15,15 @@
use std::sync::Arc;
use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry};
use catalog::kvbackend::{CachedMetaKvBackendBuilder, KvBackendCatalogManager, MetaKvBackend};
use catalog::information_extension::DistributedInformationExtension;
use catalog::kvbackend::{CachedKvBackendBuilder, KvBackendCatalogManager, MetaKvBackend};
use clap::Parser;
use client::client_manager::NodeClients;
use common_base::Plugins;
use common_config::Configurable;
use common_grpc::channel_manager::ChannelConfig;
use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder};
use common_meta::heartbeat::handler::invalidate_table_cache::InvalidateCacheHandler;
use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
use common_meta::heartbeat::handler::HandlerGroupExecutor;
use common_meta::key::flow::FlowMetadataManager;
@@ -30,7 +32,6 @@ use common_telemetry::info;
use common_telemetry::logging::TracingOptions;
use common_version::{short_version, version};
use flow::{FlownodeBuilder, FlownodeInstance, FrontendInvoker};
use frontend::heartbeat::handler::invalidate_table_cache::InvalidateTableCacheHandler;
use meta_client::{MetaClientOptions, MetaClientType};
use servers::Mode;
use snafu::{OptionExt, ResultExt};
@@ -41,7 +42,7 @@ use crate::error::{
MissingConfigSnafu, Result, ShutdownFlownodeSnafu, StartFlownodeSnafu,
};
use crate::options::{GlobalOptions, GreptimeOptions};
use crate::{log_versions, App, DistributedInformationExtension};
use crate::{log_versions, App};
pub const APP_NAME: &str = "greptime-flownode";
@@ -246,11 +247,12 @@ impl StartCommand {
let cache_tti = meta_config.metadata_cache_tti;
// TODO(discord9): add helper function to ease the creation of cache registry&such
let cached_meta_backend = CachedMetaKvBackendBuilder::new(meta_client.clone())
.cache_max_capacity(cache_max_capacity)
.cache_ttl(cache_ttl)
.cache_tti(cache_tti)
.build();
let cached_meta_backend =
CachedKvBackendBuilder::new(Arc::new(MetaKvBackend::new(meta_client.clone())))
.cache_max_capacity(cache_max_capacity)
.cache_ttl(cache_ttl)
.cache_tti(cache_tti)
.build();
let cached_meta_backend = Arc::new(cached_meta_backend);
// Builds cache registry
@@ -287,9 +289,7 @@ impl StartCommand {
let executor = HandlerGroupExecutor::new(vec![
Arc::new(ParseMailboxMessageHandler),
Arc::new(InvalidateTableCacheHandler::new(
layered_cache_registry.clone(),
)),
Arc::new(InvalidateCacheHandler::new(layered_cache_registry.clone())),
]);
let heartbeat_task = flow::heartbeat::HeartbeatTask::new(

View File

@@ -17,20 +17,21 @@ use std::time::Duration;
use async_trait::async_trait;
use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry};
use catalog::kvbackend::{CachedMetaKvBackendBuilder, KvBackendCatalogManager, MetaKvBackend};
use catalog::information_extension::DistributedInformationExtension;
use catalog::kvbackend::{CachedKvBackendBuilder, KvBackendCatalogManager, MetaKvBackend};
use clap::Parser;
use client::client_manager::NodeClients;
use common_base::Plugins;
use common_config::Configurable;
use common_grpc::channel_manager::ChannelConfig;
use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder};
use common_meta::heartbeat::handler::invalidate_table_cache::InvalidateCacheHandler;
use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
use common_meta::heartbeat::handler::HandlerGroupExecutor;
use common_telemetry::info;
use common_telemetry::logging::TracingOptions;
use common_time::timezone::set_default_timezone;
use common_version::{short_version, version};
use frontend::heartbeat::handler::invalidate_table_cache::InvalidateTableCacheHandler;
use frontend::heartbeat::HeartbeatTask;
use frontend::instance::builder::FrontendBuilder;
use frontend::instance::{FrontendInstance, Instance as FeInstance};
@@ -46,7 +47,7 @@ use crate::error::{
Result, StartFrontendSnafu,
};
use crate::options::{GlobalOptions, GreptimeOptions};
use crate::{log_versions, App, DistributedInformationExtension};
use crate::{log_versions, App};
type FrontendOptions = GreptimeOptions<frontend::frontend::FrontendOptions>;
@@ -293,11 +294,12 @@ impl StartCommand {
.context(MetaClientInitSnafu)?;
// TODO(discord9): add helper function to ease the creation of cache registry&such
let cached_meta_backend = CachedMetaKvBackendBuilder::new(meta_client.clone())
.cache_max_capacity(cache_max_capacity)
.cache_ttl(cache_ttl)
.cache_tti(cache_tti)
.build();
let cached_meta_backend =
CachedKvBackendBuilder::new(Arc::new(MetaKvBackend::new(meta_client.clone())))
.cache_max_capacity(cache_max_capacity)
.cache_ttl(cache_ttl)
.cache_tti(cache_tti)
.build();
let cached_meta_backend = Arc::new(cached_meta_backend);
// Builds cache registry
@@ -327,9 +329,7 @@ impl StartCommand {
let executor = HandlerGroupExecutor::new(vec![
Arc::new(ParseMailboxMessageHandler),
Arc::new(InvalidateTableCacheHandler::new(
layered_cache_registry.clone(),
)),
Arc::new(InvalidateCacheHandler::new(layered_cache_registry.clone())),
]);
let heartbeat_task = HeartbeatTask::new(

View File

@@ -15,17 +15,7 @@
#![feature(assert_matches, let_chains)]
use async_trait::async_trait;
use catalog::information_schema::InformationExtension;
use client::api::v1::meta::ProcedureStatus;
use common_error::ext::BoxedError;
use common_meta::cluster::{ClusterInfo, NodeInfo};
use common_meta::datanode::RegionStat;
use common_meta::ddl::{ExecutorContext, ProcedureExecutor};
use common_meta::rpc::procedure;
use common_procedure::{ProcedureInfo, ProcedureState};
use common_telemetry::{error, info};
use meta_client::MetaClientRef;
use snafu::ResultExt;
use crate::error::Result;
@@ -130,69 +120,3 @@ fn log_env_flags() {
info!("argument: {}", argument);
}
}
pub struct DistributedInformationExtension {
meta_client: MetaClientRef,
}
impl DistributedInformationExtension {
pub fn new(meta_client: MetaClientRef) -> Self {
Self { meta_client }
}
}
#[async_trait::async_trait]
impl InformationExtension for DistributedInformationExtension {
type Error = catalog::error::Error;
async fn nodes(&self) -> std::result::Result<Vec<NodeInfo>, Self::Error> {
self.meta_client
.list_nodes(None)
.await
.map_err(BoxedError::new)
.context(catalog::error::ListNodesSnafu)
}
async fn procedures(&self) -> std::result::Result<Vec<(String, ProcedureInfo)>, Self::Error> {
let procedures = self
.meta_client
.list_procedures(&ExecutorContext::default())
.await
.map_err(BoxedError::new)
.context(catalog::error::ListProceduresSnafu)?
.procedures;
let mut result = Vec::with_capacity(procedures.len());
for procedure in procedures {
let pid = match procedure.id {
Some(pid) => pid,
None => return catalog::error::ProcedureIdNotFoundSnafu {}.fail(),
};
let pid = procedure::pb_pid_to_pid(&pid)
.map_err(BoxedError::new)
.context(catalog::error::ConvertProtoDataSnafu)?;
let status = ProcedureStatus::try_from(procedure.status)
.map(|v| v.as_str_name())
.unwrap_or("Unknown")
.to_string();
let procedure_info = ProcedureInfo {
id: pid,
type_name: procedure.type_name,
start_time_ms: procedure.start_time_ms,
end_time_ms: procedure.end_time_ms,
state: ProcedureState::Running,
lock_keys: procedure.lock_keys,
};
result.push((status, procedure_info));
}
Ok(result)
}
async fn region_stats(&self) -> std::result::Result<Vec<RegionStat>, Self::Error> {
self.meta_client
.list_region_stats()
.await
.map_err(BoxedError::new)
.context(catalog::error::ListRegionStatsSnafu)
}
}

View File

@@ -497,6 +497,7 @@ impl StartCommand {
let datanode = DatanodeBuilder::new(dn_opts, plugins.clone())
.with_kv_backend(kv_backend.clone())
.with_cache_registry(layered_cache_registry.clone())
.build()
.await
.context(StartDatanodeSnafu)?;

View File

@@ -69,7 +69,6 @@ fn test_load_datanode_example_config() {
region_engine: vec![
RegionEngineConfig::Mito(MitoConfig {
auto_flush_interval: Duration::from_secs(3600),
scan_parallelism: 0,
experimental_write_cache_ttl: Some(Duration::from_secs(60 * 60 * 8)),
..Default::default()
}),
@@ -205,7 +204,6 @@ fn test_load_standalone_example_config() {
RegionEngineConfig::Mito(MitoConfig {
auto_flush_interval: Duration::from_secs(3600),
experimental_write_cache_ttl: Some(Duration::from_secs(60 * 60 * 8)),
scan_parallelism: 0,
..Default::default()
}),
RegionEngineConfig::File(EngineConfig {}),

View File

@@ -14,6 +14,7 @@
mod convert;
mod distance;
pub(crate) mod impl_conv;
use std::sync::Arc;

View File

@@ -18,18 +18,17 @@ mod l2sq;
use std::borrow::Cow;
use std::fmt::Display;
use std::sync::Arc;
use common_query::error::{InvalidFuncArgsSnafu, Result};
use common_query::prelude::Signature;
use datatypes::prelude::ConcreteDataType;
use datatypes::scalars::ScalarVectorBuilder;
use datatypes::value::ValueRef;
use datatypes::vectors::{Float32VectorBuilder, MutableVector, Vector, VectorRef};
use datatypes::vectors::{Float32VectorBuilder, MutableVector, VectorRef};
use snafu::ensure;
use crate::function::{Function, FunctionContext};
use crate::helper;
use crate::scalars::vector::impl_conv::{as_veclit, as_veclit_if_const};
macro_rules! define_distance_function {
($StructName:ident, $display_name:expr, $similarity_method:path) => {
@@ -80,17 +79,17 @@ macro_rules! define_distance_function {
return Ok(result.to_vector());
}
let arg0_const = parse_if_constant_string(arg0)?;
let arg1_const = parse_if_constant_string(arg1)?;
let arg0_const = as_veclit_if_const(arg0)?;
let arg1_const = as_veclit_if_const(arg1)?;
for i in 0..size {
let vec0 = match arg0_const.as_ref() {
Some(a) => Some(Cow::Borrowed(a.as_slice())),
None => as_vector(arg0.get_ref(i))?,
Some(a) => Some(Cow::Borrowed(a.as_ref())),
None => as_veclit(arg0.get_ref(i))?,
};
let vec1 = match arg1_const.as_ref() {
Some(b) => Some(Cow::Borrowed(b.as_slice())),
None => as_vector(arg1.get_ref(i))?,
Some(b) => Some(Cow::Borrowed(b.as_ref())),
None => as_veclit(arg1.get_ref(i))?,
};
if let (Some(vec0), Some(vec1)) = (vec0, vec1) {
@@ -129,98 +128,6 @@ define_distance_function!(CosDistanceFunction, "vec_cos_distance", cos::cos);
define_distance_function!(L2SqDistanceFunction, "vec_l2sq_distance", l2sq::l2sq);
define_distance_function!(DotProductFunction, "vec_dot_product", dot::dot);
/// Parse a vector value if the value is a constant string.
fn parse_if_constant_string(arg: &Arc<dyn Vector>) -> Result<Option<Vec<f32>>> {
if !arg.is_const() {
return Ok(None);
}
if arg.data_type() != ConcreteDataType::string_datatype() {
return Ok(None);
}
arg.get_ref(0)
.as_string()
.unwrap() // Safe: checked if it is a string
.map(parse_f32_vector_from_string)
.transpose()
}
/// Convert a value to a vector value.
/// Supported data types are binary and string.
fn as_vector(arg: ValueRef<'_>) -> Result<Option<Cow<'_, [f32]>>> {
match arg.data_type() {
ConcreteDataType::Binary(_) => arg
.as_binary()
.unwrap() // Safe: checked if it is a binary
.map(binary_as_vector)
.transpose(),
ConcreteDataType::String(_) => arg
.as_string()
.unwrap() // Safe: checked if it is a string
.map(|s| Ok(Cow::Owned(parse_f32_vector_from_string(s)?)))
.transpose(),
ConcreteDataType::Null(_) => Ok(None),
_ => InvalidFuncArgsSnafu {
err_msg: format!("Unsupported data type: {:?}", arg.data_type()),
}
.fail(),
}
}
/// Convert a u8 slice to a vector value.
fn binary_as_vector(bytes: &[u8]) -> Result<Cow<'_, [f32]>> {
if bytes.len() % std::mem::size_of::<f32>() != 0 {
return InvalidFuncArgsSnafu {
err_msg: format!("Invalid binary length of vector: {}", bytes.len()),
}
.fail();
}
if cfg!(target_endian = "little") {
Ok(unsafe {
let vec = std::slice::from_raw_parts(
bytes.as_ptr() as *const f32,
bytes.len() / std::mem::size_of::<f32>(),
);
Cow::Borrowed(vec)
})
} else {
let v = bytes
.chunks_exact(std::mem::size_of::<f32>())
.map(|chunk| f32::from_le_bytes(chunk.try_into().unwrap()))
.collect::<Vec<f32>>();
Ok(Cow::Owned(v))
}
}
/// Parse a string to a vector value.
/// Valid inputs are strings like "[1.0, 2.0, 3.0]".
fn parse_f32_vector_from_string(s: &str) -> Result<Vec<f32>> {
let trimmed = s.trim();
if !trimmed.starts_with('[') || !trimmed.ends_with(']') {
return InvalidFuncArgsSnafu {
err_msg: format!(
"Failed to parse {s} to Vector value: not properly enclosed in brackets"
),
}
.fail();
}
let content = trimmed[1..trimmed.len() - 1].trim();
if content.is_empty() {
return Ok(Vec::new());
}
content
.split(',')
.map(|s| s.trim().parse::<f32>())
.collect::<std::result::Result<_, _>>()
.map_err(|e| {
InvalidFuncArgsSnafu {
err_msg: format!("Failed to parse {s} to Vector value: {e}"),
}
.build()
})
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
@@ -456,27 +363,4 @@ mod tests {
assert!(result.is_err());
}
}
#[test]
fn test_parse_vector_from_string() {
let result = parse_f32_vector_from_string("[1.0, 2.0, 3.0]").unwrap();
assert_eq!(result, vec![1.0, 2.0, 3.0]);
let result = parse_f32_vector_from_string("[]").unwrap();
assert_eq!(result, Vec::<f32>::new());
let result = parse_f32_vector_from_string("[1.0, a, 3.0]");
assert!(result.is_err());
}
#[test]
fn test_binary_as_vector() {
let bytes = [0, 0, 128, 63];
let result = binary_as_vector(&bytes).unwrap();
assert_eq!(result.as_ref(), &[1.0]);
let invalid_bytes = [0, 0, 128];
let result = binary_as_vector(&invalid_bytes);
assert!(result.is_err());
}
}

View File

@@ -0,0 +1,156 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::borrow::Cow;
use std::sync::Arc;
use common_query::error::{InvalidFuncArgsSnafu, Result};
use datatypes::prelude::ConcreteDataType;
use datatypes::value::ValueRef;
use datatypes::vectors::Vector;
/// Convert a constant string or binary literal to a vector literal.
pub fn as_veclit_if_const(arg: &Arc<dyn Vector>) -> Result<Option<Cow<'_, [f32]>>> {
if !arg.is_const() {
return Ok(None);
}
if arg.data_type() != ConcreteDataType::string_datatype()
&& arg.data_type() != ConcreteDataType::binary_datatype()
{
return Ok(None);
}
as_veclit(arg.get_ref(0))
}
/// Convert a string or binary literal to a vector literal.
pub fn as_veclit(arg: ValueRef<'_>) -> Result<Option<Cow<'_, [f32]>>> {
match arg.data_type() {
ConcreteDataType::Binary(_) => arg
.as_binary()
.unwrap() // Safe: checked if it is a binary
.map(binlit_as_veclit)
.transpose(),
ConcreteDataType::String(_) => arg
.as_string()
.unwrap() // Safe: checked if it is a string
.map(|s| Ok(Cow::Owned(parse_veclit_from_strlit(s)?)))
.transpose(),
ConcreteDataType::Null(_) => Ok(None),
_ => InvalidFuncArgsSnafu {
err_msg: format!("Unsupported data type: {:?}", arg.data_type()),
}
.fail(),
}
}
/// Convert a u8 slice to a vector literal.
pub fn binlit_as_veclit(bytes: &[u8]) -> Result<Cow<'_, [f32]>> {
if bytes.len() % std::mem::size_of::<f32>() != 0 {
return InvalidFuncArgsSnafu {
err_msg: format!("Invalid binary length of vector: {}", bytes.len()),
}
.fail();
}
if cfg!(target_endian = "little") {
Ok(unsafe {
let vec = std::slice::from_raw_parts(
bytes.as_ptr() as *const f32,
bytes.len() / std::mem::size_of::<f32>(),
);
Cow::Borrowed(vec)
})
} else {
let v = bytes
.chunks_exact(std::mem::size_of::<f32>())
.map(|chunk| f32::from_le_bytes(chunk.try_into().unwrap()))
.collect::<Vec<f32>>();
Ok(Cow::Owned(v))
}
}
/// Parse a string literal to a vector literal.
/// Valid inputs are strings like "[1.0, 2.0, 3.0]".
pub fn parse_veclit_from_strlit(s: &str) -> Result<Vec<f32>> {
let trimmed = s.trim();
if !trimmed.starts_with('[') || !trimmed.ends_with(']') {
return InvalidFuncArgsSnafu {
err_msg: format!(
"Failed to parse {s} to Vector value: not properly enclosed in brackets"
),
}
.fail();
}
let content = trimmed[1..trimmed.len() - 1].trim();
if content.is_empty() {
return Ok(Vec::new());
}
content
.split(',')
.map(|s| s.trim().parse::<f32>())
.collect::<std::result::Result<_, _>>()
.map_err(|e| {
InvalidFuncArgsSnafu {
err_msg: format!("Failed to parse {s} to Vector value: {e}"),
}
.build()
})
}
#[allow(unused)]
/// Convert a vector literal to a binary literal.
pub fn veclit_to_binlit(vec: &[f32]) -> Vec<u8> {
if cfg!(target_endian = "little") {
unsafe {
std::slice::from_raw_parts(vec.as_ptr() as *const u8, std::mem::size_of_val(vec))
.to_vec()
}
} else {
let mut bytes = Vec::with_capacity(std::mem::size_of_val(vec));
for e in vec {
bytes.extend_from_slice(&e.to_le_bytes());
}
bytes
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parse_veclit_from_strlit() {
let result = parse_veclit_from_strlit("[1.0, 2.0, 3.0]").unwrap();
assert_eq!(result, vec![1.0, 2.0, 3.0]);
let result = parse_veclit_from_strlit("[]").unwrap();
assert_eq!(result, Vec::<f32>::new());
let result = parse_veclit_from_strlit("[1.0, a, 3.0]");
assert!(result.is_err());
}
#[test]
fn test_binlit_as_veclit() {
let vec = &[1.0, 2.0, 3.0];
let bytes = veclit_to_binlit(vec);
let result = binlit_as_veclit(&bytes).unwrap();
assert_eq!(result.as_ref(), vec);
let invalid_bytes = [0, 0, 128];
let result = binlit_as_veclit(&invalid_bytes);
assert!(result.is_err());
}
}

View File

@@ -24,7 +24,8 @@ pub use registry::{
LayeredCacheRegistryBuilder, LayeredCacheRegistryRef,
};
pub use table::{
new_table_info_cache, new_table_name_cache, new_table_route_cache, new_view_info_cache,
TableInfoCache, TableInfoCacheRef, TableNameCache, TableNameCacheRef, TableRoute,
TableRouteCache, TableRouteCacheRef, ViewInfoCache, ViewInfoCacheRef,
new_schema_cache, new_table_info_cache, new_table_name_cache, new_table_route_cache,
new_table_schema_cache, new_view_info_cache, SchemaCache, SchemaCacheRef, TableInfoCache,
TableInfoCacheRef, TableNameCache, TableNameCacheRef, TableRoute, TableRouteCache,
TableRouteCacheRef, TableSchemaCache, TableSchemaCacheRef, ViewInfoCache, ViewInfoCacheRef,
};

View File

@@ -12,12 +12,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.
mod schema;
mod table_info;
mod table_name;
mod table_route;
mod table_schema;
mod view_info;
pub use schema::{new_schema_cache, SchemaCache, SchemaCacheRef};
pub use table_info::{new_table_info_cache, TableInfoCache, TableInfoCacheRef};
pub use table_name::{new_table_name_cache, TableNameCache, TableNameCacheRef};
pub use table_route::{new_table_route_cache, TableRoute, TableRouteCache, TableRouteCacheRef};
pub use table_schema::{new_table_schema_cache, TableSchemaCache, TableSchemaCacheRef};
pub use view_info::{new_view_info_cache, ViewInfoCache, ViewInfoCacheRef};

View File

@@ -0,0 +1,73 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use futures_util::future::BoxFuture;
use moka::future::Cache;
use snafu::OptionExt;
use crate::cache::{CacheContainer, Initializer};
use crate::error::ValueNotExistSnafu;
use crate::instruction::CacheIdent;
use crate::key::schema_name::{SchemaManager, SchemaName, SchemaNameKey, SchemaNameValue};
use crate::kv_backend::KvBackendRef;
pub type SchemaCache = CacheContainer<SchemaName, Arc<SchemaNameValue>, CacheIdent>;
pub type SchemaCacheRef = Arc<SchemaCache>;
/// Constructs a [SchemaCache].
pub fn new_schema_cache(
name: String,
cache: Cache<SchemaName, Arc<SchemaNameValue>>,
kv_backend: KvBackendRef,
) -> SchemaCache {
let schema_manager = SchemaManager::new(kv_backend.clone());
let init = init_factory(schema_manager);
CacheContainer::new(name, cache, Box::new(invalidator), init, Box::new(filter))
}
fn init_factory(schema_manager: SchemaManager) -> Initializer<SchemaName, Arc<SchemaNameValue>> {
Arc::new(move |schema_name| {
let manager = schema_manager.clone();
Box::pin(async move {
let schema_value = manager
.get(SchemaNameKey {
catalog: &schema_name.catalog_name,
schema: &schema_name.schema_name,
})
.await?
.context(ValueNotExistSnafu)?
.into_inner();
Ok(Some(Arc::new(schema_value)))
})
})
}
fn invalidator<'a>(
cache: &'a Cache<SchemaName, Arc<SchemaNameValue>>,
ident: &'a CacheIdent,
) -> BoxFuture<'a, crate::error::Result<()>> {
Box::pin(async move {
if let CacheIdent::SchemaName(schema_name) = ident {
cache.invalidate(schema_name).await
}
Ok(())
})
}
fn filter(ident: &CacheIdent) -> bool {
matches!(ident, CacheIdent::SchemaName(_))
}

View File

@@ -0,0 +1,76 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Cache for table id to schema name mapping.
use std::sync::Arc;
use futures_util::future::BoxFuture;
use moka::future::Cache;
use snafu::OptionExt;
use store_api::storage::TableId;
use crate::cache::{CacheContainer, Initializer};
use crate::error;
use crate::instruction::CacheIdent;
use crate::key::schema_name::SchemaName;
use crate::key::table_info::TableInfoManager;
use crate::kv_backend::KvBackendRef;
pub type TableSchemaCache = CacheContainer<TableId, Arc<SchemaName>, CacheIdent>;
pub type TableSchemaCacheRef = Arc<TableSchemaCache>;
/// Constructs a [TableSchemaCache].
pub fn new_table_schema_cache(
name: String,
cache: Cache<TableId, Arc<SchemaName>>,
kv_backend: KvBackendRef,
) -> TableSchemaCache {
let table_info_manager = TableInfoManager::new(kv_backend);
let init = init_factory(table_info_manager);
CacheContainer::new(name, cache, Box::new(invalidator), init, Box::new(filter))
}
fn init_factory(table_info_manager: TableInfoManager) -> Initializer<TableId, Arc<SchemaName>> {
Arc::new(move |table_id| {
let table_info_manager = table_info_manager.clone();
Box::pin(async move {
let raw_table_info = table_info_manager
.get(*table_id)
.await?
.context(error::ValueNotExistSnafu)?
.into_inner()
.table_info;
Ok(Some(Arc::new(SchemaName {
catalog_name: raw_table_info.catalog_name,
schema_name: raw_table_info.schema_name,
})))
})
})
}
/// Never invalidates table id schema cache.
fn invalidator<'a>(
_cache: &'a Cache<TableId, Arc<SchemaName>>,
_ident: &'a CacheIdent,
) -> BoxFuture<'a, error::Result<()>> {
Box::pin(std::future::ready(Ok(())))
}
/// Never invalidates table id schema cache.
fn filter(_ident: &CacheIdent) -> bool {
false
}

View File

@@ -46,11 +46,7 @@ fn build_new_schema_value(
for option in options.0.iter() {
match option {
SetDatabaseOption::Ttl(ttl) => {
if ttl.is_zero() {
value.ttl = None;
} else {
value.ttl = Some(*ttl);
}
value.ttl = Some(*ttl);
}
}
}
@@ -230,12 +226,12 @@ mod tests {
#[test]
fn test_build_new_schema_value() {
let set_ttl = AlterDatabaseKind::SetDatabaseOptions(SetDatabaseOptions(vec![
SetDatabaseOption::Ttl(Duration::from_secs(10)),
SetDatabaseOption::Ttl(Duration::from_secs(10).into()),
]));
let current_schema_value = SchemaNameValue::default();
let new_schema_value =
build_new_schema_value(current_schema_value.clone(), &set_ttl).unwrap();
assert_eq!(new_schema_value.ttl, Some(Duration::from_secs(10)));
assert_eq!(new_schema_value.ttl, Some(Duration::from_secs(10).into()));
let unset_ttl_alter_kind =
AlterDatabaseKind::UnsetDatabaseOptions(UnsetDatabaseOptions(vec![

View File

@@ -425,6 +425,13 @@ pub enum Error {
source: BoxedError,
},
#[snafu(display("The response exceeded size limit"))]
ResponseExceededSizeLimit {
#[snafu(implicit)]
location: Location,
source: BoxedError,
},
#[snafu(display("Invalid heartbeat response"))]
InvalidHeartbeatResponse {
#[snafu(implicit)]
@@ -763,6 +770,7 @@ impl ErrorExt for Error {
| StopProcedureManager { source, .. } => source.status_code(),
RegisterProcedureLoader { source, .. } => source.status_code(),
External { source, .. } => source.status_code(),
ResponseExceededSizeLimit { source, .. } => source.status_code(),
OperateDatanode { source, .. } => source.status_code(),
Table { source, .. } => source.status_code(),
RetryLater { source, .. } => source.status_code(),
@@ -805,13 +813,13 @@ impl Error {
/// Returns true if the response exceeds the size limit.
pub fn is_exceeded_size_limit(&self) -> bool {
if let Error::EtcdFailed {
error: etcd_client::Error::GRpcStatus(status),
..
} = self
{
return status.code() == tonic::Code::OutOfRange;
match self {
Error::EtcdFailed {
error: etcd_client::Error::GRpcStatus(status),
..
} => status.code() == tonic::Code::OutOfRange,
Error::ResponseExceededSizeLimit { .. } => true,
_ => false,
}
false
}
}

View File

@@ -21,6 +21,7 @@ use common_telemetry::error;
use crate::error::Result;
use crate::heartbeat::mailbox::{IncomingMessage, MailboxRef};
pub mod invalidate_table_cache;
pub mod parse_mailbox_message;
#[cfg(test)]
mod tests;

View File

@@ -13,21 +13,22 @@
// limitations under the License.
use async_trait::async_trait;
use common_meta::cache_invalidator::{CacheInvalidatorRef, Context};
use common_meta::error::Result as MetaResult;
use common_meta::heartbeat::handler::{
HandleControl, HeartbeatResponseHandler, HeartbeatResponseHandlerContext,
};
use common_meta::instruction::Instruction;
use common_telemetry::debug;
use crate::cache_invalidator::{CacheInvalidatorRef, Context};
use crate::error::Result as MetaResult;
use crate::heartbeat::handler::{
HandleControl, HeartbeatResponseHandler, HeartbeatResponseHandlerContext,
};
use crate::instruction::Instruction;
#[derive(Clone)]
pub struct InvalidateTableCacheHandler {
pub struct InvalidateCacheHandler {
cache_invalidator: CacheInvalidatorRef,
}
#[async_trait]
impl HeartbeatResponseHandler for InvalidateTableCacheHandler {
impl HeartbeatResponseHandler for InvalidateCacheHandler {
fn is_acceptable(&self, ctx: &HeartbeatResponseHandlerContext) -> bool {
matches!(
ctx.incoming_message.as_ref(),
@@ -37,13 +38,10 @@ impl HeartbeatResponseHandler for InvalidateTableCacheHandler {
async fn handle(&self, ctx: &mut HeartbeatResponseHandlerContext) -> MetaResult<HandleControl> {
let Some((_, Instruction::InvalidateCaches(caches))) = ctx.incoming_message.take() else {
unreachable!("InvalidateTableCacheHandler: should be guarded by 'is_acceptable'")
unreachable!("InvalidateCacheHandler: should be guarded by 'is_acceptable'")
};
debug!(
"InvalidateTableCacheHandler: invalidating caches: {:?}",
caches
);
debug!("InvalidateCacheHandler: invalidating caches: {:?}", caches);
// Invalidate local cache always success
let _ = self
@@ -55,7 +53,7 @@ impl HeartbeatResponseHandler for InvalidateTableCacheHandler {
}
}
impl InvalidateTableCacheHandler {
impl InvalidateCacheHandler {
pub fn new(cache_invalidator: CacheInvalidatorRef) -> Self {
Self { cache_invalidator }
}

View File

@@ -149,7 +149,7 @@ use crate::DatanodeId;
pub const NAME_PATTERN: &str = r"[a-zA-Z_:-][a-zA-Z0-9_:\-\.@#]*";
pub const MAINTENANCE_KEY: &str = "__maintenance";
const DATANODE_TABLE_KEY_PREFIX: &str = "__dn_table";
pub const DATANODE_TABLE_KEY_PREFIX: &str = "__dn_table";
pub const TABLE_INFO_KEY_PREFIX: &str = "__table_info";
pub const VIEW_INFO_KEY_PREFIX: &str = "__view_info";
pub const TABLE_NAME_KEY_PREFIX: &str = "__table_name";

View File

@@ -19,41 +19,39 @@ use std::sync::Arc;
use snafu::OptionExt;
use store_api::storage::TableId;
use crate::cache::{SchemaCacheRef, TableSchemaCacheRef};
use crate::error::TableInfoNotFoundSnafu;
use crate::key::schema_name::{SchemaManager, SchemaNameKey};
use crate::key::table_info::{TableInfoManager, TableInfoManagerRef};
use crate::kv_backend::KvBackendRef;
use crate::{error, SchemaOptions};
pub type SchemaMetadataManagerRef = Arc<SchemaMetadataManager>;
pub struct SchemaMetadataManager {
table_info_manager: TableInfoManagerRef,
schema_manager: SchemaManager,
table_id_schema_cache: TableSchemaCacheRef,
schema_cache: SchemaCacheRef,
#[cfg(any(test, feature = "testing"))]
kv_backend: KvBackendRef,
kv_backend: crate::kv_backend::KvBackendRef,
}
impl SchemaMetadataManager {
/// Creates a new database meta
#[cfg(not(any(test, feature = "testing")))]
pub fn new(kv_backend: KvBackendRef) -> Self {
let table_info_manager = Arc::new(TableInfoManager::new(kv_backend.clone()));
let schema_manager = SchemaManager::new(kv_backend);
pub fn new(table_id_schema_cache: TableSchemaCacheRef, schema_cache: SchemaCacheRef) -> Self {
Self {
table_info_manager,
schema_manager,
table_id_schema_cache,
schema_cache,
}
}
/// Creates a new database meta
#[cfg(any(test, feature = "testing"))]
pub fn new(kv_backend: KvBackendRef) -> Self {
let table_info_manager = Arc::new(TableInfoManager::new(kv_backend.clone()));
let schema_manager = SchemaManager::new(kv_backend.clone());
pub fn new(
kv_backend: crate::kv_backend::KvBackendRef,
table_id_schema_cache: TableSchemaCacheRef,
schema_cache: SchemaCacheRef,
) -> Self {
Self {
table_info_manager,
schema_manager,
table_id_schema_cache,
schema_cache,
kv_backend,
}
}
@@ -62,23 +60,16 @@ impl SchemaMetadataManager {
pub async fn get_schema_options_by_table_id(
&self,
table_id: TableId,
) -> error::Result<Option<SchemaOptions>> {
let table_info = self
.table_info_manager
) -> error::Result<Option<Arc<SchemaOptions>>> {
let schema_name = self
.table_id_schema_cache
.get(table_id)
.await?
.with_context(|| TableInfoNotFoundSnafu {
table: format!("table id: {}", table_id),
})?;
let key = SchemaNameKey::new(
&table_info.table_info.catalog_name,
&table_info.table_info.schema_name,
);
self.schema_manager
.get(key)
.await
.map(|v| v.map(|v| v.into_inner()))
self.schema_cache.get_by_ref(&schema_name).await
}
#[cfg(any(test, feature = "testing"))]
@@ -100,17 +91,19 @@ impl SchemaMetadataManager {
meta: Default::default(),
table_type: TableType::Base,
});
let (txn, _) = self
.table_info_manager
let table_info_manager =
crate::key::table_info::TableInfoManager::new(self.kv_backend.clone());
let (txn, _) = table_info_manager
.build_create_txn(table_id, &value)
.unwrap();
let resp = self.kv_backend.txn(txn).await.unwrap();
assert!(resp.succeeded, "Failed to create table metadata");
let key = SchemaNameKey {
let key = crate::key::schema_name::SchemaNameKey {
catalog: catalog_name,
schema: schema_name,
};
self.schema_manager
crate::key::schema_name::SchemaManager::new(self.kv_backend.clone())
.create(key, schema_value, false)
.await
.expect("Failed to create schema metadata");

View File

@@ -15,9 +15,9 @@
use std::collections::HashMap;
use std::fmt::Display;
use std::sync::Arc;
use std::time::Duration;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_time::DatabaseTimeToLive;
use futures::stream::BoxStream;
use humantime_serde::re::humantime;
use serde::{Deserialize, Serialize};
@@ -57,15 +57,13 @@ impl Default for SchemaNameKey<'_> {
#[derive(Debug, Default, Clone, PartialEq, Serialize, Deserialize)]
pub struct SchemaNameValue {
#[serde(default)]
#[serde(with = "humantime_serde")]
pub ttl: Option<Duration>,
pub ttl: Option<DatabaseTimeToLive>,
}
impl Display for SchemaNameValue {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
if let Some(ttl) = self.ttl {
let ttl = humantime::format_duration(ttl);
write!(f, "ttl='{ttl}'")?;
if let Some(ttl) = self.ttl.map(|i| i.to_string()) {
write!(f, "ttl='{}'", ttl)?;
}
Ok(())
@@ -96,11 +94,8 @@ impl TryFrom<&HashMap<String, String>> for SchemaNameValue {
impl From<SchemaNameValue> for HashMap<String, String> {
fn from(value: SchemaNameValue) -> Self {
let mut opts = HashMap::new();
if let Some(ttl) = value.ttl {
opts.insert(
OPT_KEY_TTL.to_string(),
format!("{}", humantime::format_duration(ttl)),
);
if let Some(ttl) = value.ttl.map(|ttl| ttl.to_string()) {
opts.insert(OPT_KEY_TTL.to_string(), ttl);
}
opts
}
@@ -171,6 +166,7 @@ impl<'a> TryFrom<&'a str> for SchemaNameKey<'a> {
}
}
#[derive(Clone)]
pub struct SchemaManager {
kv_backend: KvBackendRef,
}
@@ -312,6 +308,7 @@ impl<'a> From<&'a SchemaName> for SchemaNameKey<'a> {
#[cfg(test)]
mod tests {
use std::time::Duration;
use super::*;
use crate::kv_backend::memory::MemoryKvBackend;
@@ -322,9 +319,14 @@ mod tests {
assert_eq!("", schema_value.to_string());
let schema_value = SchemaNameValue {
ttl: Some(Duration::from_secs(9)),
ttl: Some(Duration::from_secs(9).into()),
};
assert_eq!("ttl='9s'", schema_value.to_string());
let schema_value = SchemaNameValue {
ttl: Some(Duration::from_secs(0).into()),
};
assert_eq!("ttl='forever'", schema_value.to_string());
}
#[test]
@@ -337,17 +339,36 @@ mod tests {
assert_eq!(key, parsed);
let value = SchemaNameValue {
ttl: Some(Duration::from_secs(10)),
ttl: Some(Duration::from_secs(10).into()),
};
let mut opts: HashMap<String, String> = HashMap::new();
opts.insert("ttl".to_string(), "10s".to_string());
let from_value = SchemaNameValue::try_from(&opts).unwrap();
assert_eq!(value, from_value);
let parsed = SchemaNameValue::try_from_raw_value("{\"ttl\":\"10s\"}".as_bytes()).unwrap();
let parsed = SchemaNameValue::try_from_raw_value(
serde_json::json!({"ttl": "10s"}).to_string().as_bytes(),
)
.unwrap();
assert_eq!(Some(value), parsed);
let forever = SchemaNameValue {
ttl: Some(Default::default()),
};
let parsed = SchemaNameValue::try_from_raw_value(
serde_json::json!({"ttl": "forever"}).to_string().as_bytes(),
)
.unwrap();
assert_eq!(Some(forever), parsed);
let instant_err = SchemaNameValue::try_from_raw_value(
serde_json::json!({"ttl": "instant"}).to_string().as_bytes(),
);
assert!(instant_err.is_err());
let none = SchemaNameValue::try_from_raw_value("null".as_bytes()).unwrap();
assert!(none.is_none());
let err_empty = SchemaNameValue::try_from_raw_value("".as_bytes());
assert!(err_empty.is_err());
}
@@ -373,7 +394,7 @@ mod tests {
let current_schema_value = manager.get(schema_key).await.unwrap().unwrap();
let new_schema_value = SchemaNameValue {
ttl: Some(Duration::from_secs(10)),
ttl: Some(Duration::from_secs(10).into()),
};
manager
.update(schema_key, &current_schema_value, &new_schema_value)
@@ -387,10 +408,10 @@ mod tests {
.unwrap();
let new_schema_value = SchemaNameValue {
ttl: Some(Duration::from_secs(40)),
ttl: Some(Duration::from_secs(40).into()),
};
let incorrect_schema_value = SchemaNameValue {
ttl: Some(Duration::from_secs(20)),
ttl: Some(Duration::from_secs(20).into()),
}
.try_as_raw_value()
.unwrap();
@@ -401,5 +422,15 @@ mod tests {
.update(schema_key, &incorrect_schema_value, &new_schema_value)
.await
.unwrap_err();
let current_schema_value = manager.get(schema_key).await.unwrap().unwrap();
let new_schema_value = SchemaNameValue { ttl: None };
manager
.update(schema_key, &current_schema_value, &new_schema_value)
.await
.unwrap();
let current_schema_value = manager.get(schema_key).await.unwrap().unwrap();
assert_eq!(new_schema_value, *current_schema_value);
}
}

View File

@@ -136,6 +136,13 @@ pub struct Txn {
c_else: bool,
}
#[cfg(any(test, feature = "testing"))]
impl Txn {
pub fn req(&self) -> &TxnRequest {
&self.req
}
}
impl Txn {
pub fn merge_all<T: IntoIterator<Item = Txn>>(values: T) -> Self {
values

View File

@@ -14,7 +14,6 @@
use std::collections::{HashMap, HashSet};
use std::result;
use std::time::Duration;
use api::v1::alter_database_expr::Kind as PbAlterDatabaseKind;
use api::v1::meta::ddl_task_request::Task;
@@ -36,7 +35,7 @@ use api::v1::{
};
use base64::engine::general_purpose;
use base64::Engine as _;
use humantime_serde::re::humantime;
use common_time::DatabaseTimeToLive;
use prost::Message;
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DefaultOnNull};
@@ -1009,12 +1008,8 @@ impl TryFrom<PbOption> for SetDatabaseOption {
fn try_from(PbOption { key, value }: PbOption) -> Result<Self> {
match key.to_ascii_lowercase().as_str() {
TTL_KEY => {
let ttl = if value.is_empty() {
Duration::from_secs(0)
} else {
humantime::parse_duration(&value)
.map_err(|_| InvalidSetDatabaseOptionSnafu { key, value }.build())?
};
let ttl = DatabaseTimeToLive::from_humantime_or_str(&value)
.map_err(|_| InvalidSetDatabaseOptionSnafu { key, value }.build())?;
Ok(SetDatabaseOption::Ttl(ttl))
}
@@ -1025,7 +1020,7 @@ impl TryFrom<PbOption> for SetDatabaseOption {
#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
pub enum SetDatabaseOption {
Ttl(Duration),
Ttl(DatabaseTimeToLive),
}
#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]

View File

@@ -0,0 +1,173 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use futures::StreamExt;
use tokio::sync::Mutex;
use crate::error::Result;
use crate::recordbatch::merge_record_batches;
use crate::{RecordBatch, SendableRecordBatchStream};
struct Inner {
stream: SendableRecordBatchStream,
current_row_index: usize,
current_batch: Option<RecordBatch>,
total_rows_in_current_batch: usize,
}
/// A cursor on RecordBatchStream that fetches data batch by batch
pub struct RecordBatchStreamCursor {
inner: Mutex<Inner>,
}
impl RecordBatchStreamCursor {
pub fn new(stream: SendableRecordBatchStream) -> RecordBatchStreamCursor {
Self {
inner: Mutex::new(Inner {
stream,
current_row_index: 0,
current_batch: None,
total_rows_in_current_batch: 0,
}),
}
}
/// Take `size` of row from the `RecordBatchStream` and create a new
/// `RecordBatch` for these rows.
pub async fn take(&self, size: usize) -> Result<RecordBatch> {
let mut remaining_rows_to_take = size;
let mut accumulated_rows = Vec::new();
let mut inner = self.inner.lock().await;
while remaining_rows_to_take > 0 {
// Ensure we have a current batch or fetch the next one
if inner.current_batch.is_none()
|| inner.current_row_index >= inner.total_rows_in_current_batch
{
match inner.stream.next().await {
Some(Ok(batch)) => {
inner.total_rows_in_current_batch = batch.num_rows();
inner.current_batch = Some(batch);
inner.current_row_index = 0;
}
Some(Err(e)) => return Err(e),
None => {
// Stream is exhausted
break;
}
}
}
// If we still have no batch after attempting to fetch
let current_batch = match &inner.current_batch {
Some(batch) => batch,
None => break,
};
// Calculate how many rows we can take from this batch
let rows_to_take_from_batch = remaining_rows_to_take
.min(inner.total_rows_in_current_batch - inner.current_row_index);
// Slice the current batch to get the desired rows
let taken_batch =
current_batch.slice(inner.current_row_index, rows_to_take_from_batch)?;
// Add the taken batch to accumulated rows
accumulated_rows.push(taken_batch);
// Update cursor and remaining rows
inner.current_row_index += rows_to_take_from_batch;
remaining_rows_to_take -= rows_to_take_from_batch;
}
// If no rows were accumulated, return empty
if accumulated_rows.is_empty() {
return Ok(RecordBatch::new_empty(inner.stream.schema()));
}
// If only one batch was accumulated, return it directly
if accumulated_rows.len() == 1 {
return Ok(accumulated_rows.remove(0));
}
// Merge multiple batches
merge_record_batches(inner.stream.schema(), &accumulated_rows)
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnSchema, Schema};
use datatypes::vectors::StringVector;
use super::*;
use crate::RecordBatches;
#[tokio::test]
async fn test_cursor() {
let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
"a",
ConcreteDataType::string_datatype(),
false,
)]));
let rbs = RecordBatches::try_from_columns(
schema.clone(),
vec![Arc::new(StringVector::from(vec!["hello", "world"])) as _],
)
.unwrap();
let cursor = RecordBatchStreamCursor::new(rbs.as_stream());
let result_rb = cursor.take(1).await.expect("take from cursor failed");
assert_eq!(result_rb.num_rows(), 1);
let result_rb = cursor.take(1).await.expect("take from cursor failed");
assert_eq!(result_rb.num_rows(), 1);
let result_rb = cursor.take(1).await.expect("take from cursor failed");
assert_eq!(result_rb.num_rows(), 0);
let rb = RecordBatch::new(
schema.clone(),
vec![Arc::new(StringVector::from(vec!["hello", "world"])) as _],
)
.unwrap();
let rbs2 =
RecordBatches::try_new(schema.clone(), vec![rb.clone(), rb.clone(), rb]).unwrap();
let cursor = RecordBatchStreamCursor::new(rbs2.as_stream());
let result_rb = cursor.take(3).await.expect("take from cursor failed");
assert_eq!(result_rb.num_rows(), 3);
let result_rb = cursor.take(2).await.expect("take from cursor failed");
assert_eq!(result_rb.num_rows(), 2);
let result_rb = cursor.take(2).await.expect("take from cursor failed");
assert_eq!(result_rb.num_rows(), 1);
let result_rb = cursor.take(2).await.expect("take from cursor failed");
assert_eq!(result_rb.num_rows(), 0);
let rb = RecordBatch::new(
schema.clone(),
vec![Arc::new(StringVector::from(vec!["hello", "world"])) as _],
)
.unwrap();
let rbs3 =
RecordBatches::try_new(schema.clone(), vec![rb.clone(), rb.clone(), rb]).unwrap();
let cursor = RecordBatchStreamCursor::new(rbs3.as_stream());
let result_rb = cursor.take(10).await.expect("take from cursor failed");
assert_eq!(result_rb.num_rows(), 6);
}
}

View File

@@ -168,6 +168,13 @@ pub enum Error {
#[snafu(source)]
error: tokio::time::error::Elapsed,
},
#[snafu(display("RecordBatch slice index overflow: {visit_index} > {size}"))]
RecordBatchSliceIndexOverflow {
#[snafu(implicit)]
location: Location,
size: usize,
visit_index: usize,
},
}
impl ErrorExt for Error {
@@ -182,7 +189,8 @@ impl ErrorExt for Error {
| Error::Format { .. }
| Error::ToArrowScalar { .. }
| Error::ProjectArrowRecordBatch { .. }
| Error::PhysicalExpr { .. } => StatusCode::Internal,
| Error::PhysicalExpr { .. }
| Error::RecordBatchSliceIndexOverflow { .. } => StatusCode::Internal,
Error::PollStream { .. } => StatusCode::EngineExecuteQuery,

View File

@@ -15,6 +15,7 @@
#![feature(never_type)]
pub mod adapter;
pub mod cursor;
pub mod error;
pub mod filter;
mod recordbatch;

View File

@@ -23,7 +23,7 @@ use datatypes::value::Value;
use datatypes::vectors::{Helper, VectorRef};
use serde::ser::{Error, SerializeStruct};
use serde::{Serialize, Serializer};
use snafu::{OptionExt, ResultExt};
use snafu::{ensure, OptionExt, ResultExt};
use crate::error::{
self, CastVectorSnafu, ColumnNotExistsSnafu, DataTypesSnafu, ProjectArrowRecordBatchSnafu,
@@ -194,6 +194,19 @@ impl RecordBatch {
.map(|t| t.to_string())
.unwrap_or("failed to pretty display a record batch".to_string())
}
/// Return a slice record batch starts from offset, with len rows
pub fn slice(&self, offset: usize, len: usize) -> Result<RecordBatch> {
ensure!(
offset + len <= self.num_rows(),
error::RecordBatchSliceIndexOverflowSnafu {
size: self.num_rows(),
visit_index: offset + len
}
);
let columns = self.columns.iter().map(|vector| vector.slice(offset, len));
RecordBatch::new(self.schema.clone(), columns)
}
}
impl Serialize for RecordBatch {
@@ -256,6 +269,36 @@ impl Iterator for RecordBatchRowIterator<'_> {
}
}
/// merge multiple recordbatch into a single
pub fn merge_record_batches(schema: SchemaRef, batches: &[RecordBatch]) -> Result<RecordBatch> {
let batches_len = batches.len();
if batches_len == 0 {
return Ok(RecordBatch::new_empty(schema));
}
let n_rows = batches.iter().map(|b| b.num_rows()).sum();
let n_columns = schema.num_columns();
// Collect arrays from each batch
let mut merged_columns = Vec::with_capacity(n_columns);
for col_idx in 0..n_columns {
let mut acc = schema.column_schemas()[col_idx]
.data_type
.create_mutable_vector(n_rows);
for batch in batches {
let column = batch.column(col_idx);
acc.extend_slice_of(column.as_ref(), 0, column.len())
.context(error::DataTypesSnafu)?;
}
merged_columns.push(acc.to_vector());
}
// Create a new RecordBatch with merged columns
RecordBatch::new(schema, merged_columns)
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
@@ -375,4 +418,80 @@ mod tests {
assert!(record_batch_iter.next().is_none());
}
#[test]
fn test_record_batch_slice() {
let column_schemas = vec![
ColumnSchema::new("numbers", ConcreteDataType::uint32_datatype(), false),
ColumnSchema::new("strings", ConcreteDataType::string_datatype(), true),
];
let schema = Arc::new(Schema::new(column_schemas));
let columns: Vec<VectorRef> = vec![
Arc::new(UInt32Vector::from_slice(vec![1, 2, 3, 4])),
Arc::new(StringVector::from(vec![
None,
Some("hello"),
Some("greptime"),
None,
])),
];
let recordbatch = RecordBatch::new(schema, columns).unwrap();
let recordbatch = recordbatch.slice(1, 2).expect("recordbatch slice");
let mut record_batch_iter = recordbatch.rows();
assert_eq!(
vec![Value::UInt32(2), Value::String("hello".into())],
record_batch_iter
.next()
.unwrap()
.into_iter()
.collect::<Vec<Value>>()
);
assert_eq!(
vec![Value::UInt32(3), Value::String("greptime".into())],
record_batch_iter
.next()
.unwrap()
.into_iter()
.collect::<Vec<Value>>()
);
assert!(record_batch_iter.next().is_none());
assert!(recordbatch.slice(1, 5).is_err());
}
#[test]
fn test_merge_record_batch() {
let column_schemas = vec![
ColumnSchema::new("numbers", ConcreteDataType::uint32_datatype(), false),
ColumnSchema::new("strings", ConcreteDataType::string_datatype(), true),
];
let schema = Arc::new(Schema::new(column_schemas));
let columns: Vec<VectorRef> = vec![
Arc::new(UInt32Vector::from_slice(vec![1, 2, 3, 4])),
Arc::new(StringVector::from(vec![
None,
Some("hello"),
Some("greptime"),
None,
])),
];
let recordbatch = RecordBatch::new(schema.clone(), columns).unwrap();
let columns: Vec<VectorRef> = vec![
Arc::new(UInt32Vector::from_slice(vec![1, 2, 3, 4])),
Arc::new(StringVector::from(vec![
None,
Some("hello"),
Some("greptime"),
None,
])),
];
let recordbatch2 = RecordBatch::new(schema.clone(), columns).unwrap();
let merged = merge_record_batches(schema.clone(), &[recordbatch, recordbatch2])
.expect("merge recordbatch");
assert_eq!(merged.num_rows(), 8);
}
}

View File

@@ -13,6 +13,8 @@ chrono.workspace = true
chrono-tz = "0.8"
common-error.workspace = true
common-macro.workspace = true
humantime.workspace = true
humantime-serde.workspace = true
once_cell.workspace = true
serde = { version = "1.0", features = ["derive"] }
serde_json.workspace = true

View File

@@ -93,12 +93,28 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to parse duration"))]
ParseDuration {
#[snafu(source)]
error: humantime::DurationError,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Database's TTL can't be `instant`"))]
InvalidDatabaseTtl {
#[snafu(implicit)]
location: Location,
},
}
impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
match self {
Error::ParseDateStr { .. }
| Error::ParseDuration { .. }
| Error::InvalidDatabaseTtl { .. }
| Error::ParseTimestamp { .. }
| Error::InvalidTimezoneOffset { .. }
| Error::Format { .. }

View File

@@ -22,6 +22,7 @@ pub mod time;
pub mod timestamp;
pub mod timestamp_millis;
pub mod timezone;
pub mod ttl;
pub mod util;
pub use date::Date;
@@ -32,3 +33,4 @@ pub use range::RangeMillis;
pub use timestamp::Timestamp;
pub use timestamp_millis::TimestampMillis;
pub use timezone::Timezone;
pub use ttl::{DatabaseTimeToLive, TimeToLive, FOREVER, INSTANT};

266
src/common/time/src/ttl.rs Normal file
View File

@@ -0,0 +1,266 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt::Display;
use std::time::Duration;
use serde::{Deserialize, Serialize};
use snafu::ResultExt;
use crate::error::{Error, InvalidDatabaseTtlSnafu, ParseDurationSnafu};
use crate::Timestamp;
pub const INSTANT: &str = "instant";
pub const FOREVER: &str = "forever";
/// Time To Live for database, which can be `Forever`, or a `Duration`, but can't be `Instant`.
///
/// unlike `TimeToLive` which can be `Instant`, `Forever`, or a `Duration`
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Default, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum DatabaseTimeToLive {
/// Keep the data forever
#[default]
Forever,
/// Duration to keep the data, this duration should be non-zero
#[serde(untagged, with = "humantime_serde")]
Duration(Duration),
}
impl Display for DatabaseTimeToLive {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
DatabaseTimeToLive::Forever => write!(f, "{}", FOREVER),
DatabaseTimeToLive::Duration(d) => write!(f, "{}", humantime::Duration::from(*d)),
}
}
}
impl DatabaseTimeToLive {
/// Parse a string that is either `forever`, or a duration to `TimeToLive`
///
/// note that an empty string or a zero duration(a duration that spans no time) is treat as `forever` too
pub fn from_humantime_or_str(s: &str) -> Result<Self, Error> {
let ttl = match s.to_lowercase().as_ref() {
INSTANT => InvalidDatabaseTtlSnafu.fail()?,
FOREVER | "" => Self::Forever,
_ => {
let d = humantime::parse_duration(s).context(ParseDurationSnafu)?;
Self::from(d)
}
};
Ok(ttl)
}
}
impl TryFrom<TimeToLive> for DatabaseTimeToLive {
type Error = Error;
fn try_from(value: TimeToLive) -> Result<Self, Self::Error> {
match value {
TimeToLive::Instant => InvalidDatabaseTtlSnafu.fail()?,
TimeToLive::Forever => Ok(Self::Forever),
TimeToLive::Duration(d) => Ok(Self::from(d)),
}
}
}
impl From<DatabaseTimeToLive> for TimeToLive {
fn from(value: DatabaseTimeToLive) -> Self {
match value {
DatabaseTimeToLive::Forever => TimeToLive::Forever,
DatabaseTimeToLive::Duration(d) => TimeToLive::from(d),
}
}
}
impl From<Duration> for DatabaseTimeToLive {
fn from(duration: Duration) -> Self {
if duration.is_zero() {
Self::Forever
} else {
Self::Duration(duration)
}
}
}
impl From<humantime::Duration> for DatabaseTimeToLive {
fn from(duration: humantime::Duration) -> Self {
Self::from(*duration)
}
}
/// Time To Live
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Default, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum TimeToLive {
/// Instantly discard upon insert
Instant,
/// Keep the data forever
#[default]
Forever,
/// Duration to keep the data, this duration should be non-zero
#[serde(untagged, with = "humantime_serde")]
Duration(Duration),
}
impl Display for TimeToLive {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
TimeToLive::Instant => write!(f, "{}", INSTANT),
TimeToLive::Duration(d) => write!(f, "{}", humantime::Duration::from(*d)),
TimeToLive::Forever => write!(f, "{}", FOREVER),
}
}
}
impl TimeToLive {
/// Parse a string that is either `instant`, `forever`, or a duration to `TimeToLive`
///
/// note that an empty string or a zero duration(a duration that spans no time) is treat as `forever` too
pub fn from_humantime_or_str(s: &str) -> Result<Self, Error> {
match s.to_lowercase().as_ref() {
INSTANT => Ok(TimeToLive::Instant),
FOREVER | "" => Ok(TimeToLive::Forever),
_ => {
let d = humantime::parse_duration(s).context(ParseDurationSnafu)?;
Ok(TimeToLive::from(d))
}
}
}
/// Check if the TimeToLive is expired
/// with the given `created_at` and `now` timestamp
pub fn is_expired(
&self,
created_at: &Timestamp,
now: &Timestamp,
) -> crate::error::Result<bool> {
Ok(match self {
TimeToLive::Instant => true,
TimeToLive::Forever => false,
TimeToLive::Duration(d) => now.sub_duration(*d)? > *created_at,
})
}
/// is instant variant
pub fn is_instant(&self) -> bool {
matches!(self, TimeToLive::Instant)
}
/// Is the default value, which is `Forever`
pub fn is_forever(&self) -> bool {
matches!(self, TimeToLive::Forever)
}
}
impl From<Duration> for TimeToLive {
fn from(duration: Duration) -> Self {
if duration.is_zero() {
// compatibility with old code, and inline with cassandra's behavior when ttl set to 0
TimeToLive::Forever
} else {
TimeToLive::Duration(duration)
}
}
}
impl From<humantime::Duration> for TimeToLive {
fn from(duration: humantime::Duration) -> Self {
Self::from(*duration)
}
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn test_db_ttl_table_ttl() {
// test from ttl to db ttl
let ttl = TimeToLive::from(Duration::from_secs(10));
let db_ttl: DatabaseTimeToLive = ttl.try_into().unwrap();
assert_eq!(db_ttl, DatabaseTimeToLive::from(Duration::from_secs(10)));
assert_eq!(TimeToLive::from(db_ttl), ttl);
let ttl = TimeToLive::from(Duration::from_secs(0));
let db_ttl: DatabaseTimeToLive = ttl.try_into().unwrap();
assert_eq!(db_ttl, DatabaseTimeToLive::Forever);
assert_eq!(TimeToLive::from(db_ttl), ttl);
let ttl = TimeToLive::Instant;
let err_instant = DatabaseTimeToLive::try_from(ttl);
assert!(err_instant.is_err());
// test 0 duration
let ttl = Duration::from_secs(0);
let db_ttl: DatabaseTimeToLive = ttl.into();
assert_eq!(db_ttl, DatabaseTimeToLive::Forever);
let ttl = Duration::from_secs(10);
let db_ttl: DatabaseTimeToLive = ttl.into();
assert_eq!(
db_ttl,
DatabaseTimeToLive::Duration(Duration::from_secs(10))
);
let ttl = DatabaseTimeToLive::from_humantime_or_str("10s").unwrap();
let ttl: TimeToLive = ttl.into();
assert_eq!(ttl, TimeToLive::from(Duration::from_secs(10)));
let ttl = DatabaseTimeToLive::from_humantime_or_str("forever").unwrap();
let ttl: TimeToLive = ttl.into();
assert_eq!(ttl, TimeToLive::Forever);
assert!(DatabaseTimeToLive::from_humantime_or_str("instant").is_err());
// test 0s
let ttl = DatabaseTimeToLive::from_humantime_or_str("0s").unwrap();
let ttl: TimeToLive = ttl.into();
assert_eq!(ttl, TimeToLive::Forever);
}
#[test]
fn test_serde() {
let cases = vec![
("\"instant\"", TimeToLive::Instant),
("\"forever\"", TimeToLive::Forever),
("\"10d\"", Duration::from_secs(86400 * 10).into()),
(
"\"10000 years\"",
humantime::parse_duration("10000 years").unwrap().into(),
),
];
for (s, expected) in cases {
let serialized = serde_json::to_string(&expected).unwrap();
let deserialized: TimeToLive = serde_json::from_str(&serialized).unwrap();
assert_eq!(deserialized, expected);
let deserialized: TimeToLive = serde_json::from_str(s).unwrap_or_else(|err| {
panic!("Actual serialized: {}, s=`{s}`, err: {:?}", serialized, err)
});
assert_eq!(deserialized, expected);
// test db ttl too
if s == "\"instant\"" {
assert!(serde_json::from_str::<DatabaseTimeToLive>(s).is_err());
continue;
}
let db_ttl: DatabaseTimeToLive = serde_json::from_str(s).unwrap();
let re_serialized = serde_json::to_string(&db_ttl).unwrap();
assert_eq!(re_serialized, serialized);
}
}
}

View File

@@ -8,11 +8,10 @@ license.workspace = true
workspace = true
[features]
codec = ["dep:serde", "dep:schemars"]
codec = ["dep:serde"]
[dependencies]
const_format = "0.2"
schemars = { workspace = true, optional = true }
serde = { workspace = true, optional = true }
shadow-rs.workspace = true

View File

@@ -49,10 +49,7 @@ impl Display for BuildInfo {
}
#[derive(Clone, Debug, PartialEq)]
#[cfg_attr(
feature = "codec",
derive(serde::Serialize, serde::Deserialize, schemars::JsonSchema)
)]
#[cfg_attr(feature = "codec", derive(serde::Serialize, serde::Deserialize))]
pub struct OwnedBuildInfo {
pub branch: String,
pub commit: String,

View File

@@ -65,6 +65,7 @@ toml.workspace = true
tonic.workspace = true
[dev-dependencies]
cache.workspace = true
client.workspace = true
common-meta = { workspace = true, features = ["testing"] }
common-query.workspace = true

View File

@@ -14,6 +14,8 @@
//! Datanode configurations
use core::time::Duration;
use common_base::readable_size::ReadableSize;
use common_base::secrets::{ExposeSecret, SecretString};
use common_config::Configurable;
@@ -30,7 +32,7 @@ use servers::heartbeat_options::HeartbeatOptions;
use servers::http::HttpOptions;
use servers::Mode;
pub const DEFAULT_OBJECT_STORE_CACHE_SIZE: ReadableSize = ReadableSize::gb(1);
pub const DEFAULT_OBJECT_STORE_CACHE_SIZE: ReadableSize = ReadableSize::gb(5);
/// Default data home in file storage
const DEFAULT_DATA_HOME: &str = "/tmp/greptimedb";
@@ -58,6 +60,11 @@ impl ObjectStoreConfig {
}
}
/// Returns true when it's a remote object storage such as AWS s3 etc.
pub fn is_object_storage(&self) -> bool {
!matches!(self, Self::File(_))
}
/// Returns the object storage configuration name, return the provider name if it's empty.
pub fn config_name(&self) -> &str {
let name = match self {
@@ -89,6 +96,13 @@ pub struct StorageConfig {
pub providers: Vec<ObjectStoreConfig>,
}
impl StorageConfig {
/// Returns true when the default storage config is a remote object storage service such as AWS S3, etc.
pub fn is_object_storage(&self) -> bool {
self.store.is_object_storage()
}
}
impl Default for StorageConfig {
fn default() -> Self {
Self {
@@ -112,6 +126,38 @@ pub struct ObjectStorageCacheConfig {
pub cache_capacity: Option<ReadableSize>,
}
/// The http client options to the storage.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(default)]
pub struct HttpClientConfig {
/// The maximum idle connection per host allowed in the pool.
pub(crate) pool_max_idle_per_host: u32,
/// The timeout for only the connect phase of a http client.
#[serde(with = "humantime_serde")]
pub(crate) connect_timeout: Duration,
/// The total request timeout, applied from when the request starts connecting until the response body has finished.
/// Also considered a total deadline.
#[serde(with = "humantime_serde")]
pub(crate) timeout: Duration,
/// The timeout for idle sockets being kept-alive.
#[serde(with = "humantime_serde")]
pub(crate) pool_idle_timeout: Duration,
}
impl Default for HttpClientConfig {
fn default() -> Self {
Self {
pool_max_idle_per_host: 1024,
connect_timeout: Duration::from_secs(30),
timeout: Duration::from_secs(30),
pool_idle_timeout: Duration::from_secs(90),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(default)]
pub struct S3Config {
@@ -126,6 +172,7 @@ pub struct S3Config {
pub region: Option<String>,
#[serde(flatten)]
pub cache: ObjectStorageCacheConfig,
pub http_client: HttpClientConfig,
}
impl PartialEq for S3Config {
@@ -138,6 +185,7 @@ impl PartialEq for S3Config {
&& self.endpoint == other.endpoint
&& self.region == other.region
&& self.cache == other.cache
&& self.http_client == other.http_client
}
}
@@ -154,6 +202,7 @@ pub struct OssConfig {
pub endpoint: String,
#[serde(flatten)]
pub cache: ObjectStorageCacheConfig,
pub http_client: HttpClientConfig,
}
impl PartialEq for OssConfig {
@@ -165,6 +214,7 @@ impl PartialEq for OssConfig {
&& self.access_key_secret.expose_secret() == other.access_key_secret.expose_secret()
&& self.endpoint == other.endpoint
&& self.cache == other.cache
&& self.http_client == other.http_client
}
}
@@ -182,6 +232,7 @@ pub struct AzblobConfig {
pub sas_token: Option<String>,
#[serde(flatten)]
pub cache: ObjectStorageCacheConfig,
pub http_client: HttpClientConfig,
}
impl PartialEq for AzblobConfig {
@@ -194,6 +245,7 @@ impl PartialEq for AzblobConfig {
&& self.endpoint == other.endpoint
&& self.sas_token == other.sas_token
&& self.cache == other.cache
&& self.http_client == other.http_client
}
}
@@ -211,6 +263,7 @@ pub struct GcsConfig {
pub endpoint: String,
#[serde(flatten)]
pub cache: ObjectStorageCacheConfig,
pub http_client: HttpClientConfig,
}
impl PartialEq for GcsConfig {
@@ -223,6 +276,7 @@ impl PartialEq for GcsConfig {
&& self.credential.expose_secret() == other.credential.expose_secret()
&& self.endpoint == other.endpoint
&& self.cache == other.cache
&& self.http_client == other.http_client
}
}
@@ -237,6 +291,7 @@ impl Default for S3Config {
endpoint: Option::default(),
region: Option::default(),
cache: ObjectStorageCacheConfig::default(),
http_client: HttpClientConfig::default(),
}
}
}
@@ -251,6 +306,7 @@ impl Default for OssConfig {
access_key_secret: SecretString::from(String::default()),
endpoint: String::default(),
cache: ObjectStorageCacheConfig::default(),
http_client: HttpClientConfig::default(),
}
}
}
@@ -266,6 +322,7 @@ impl Default for AzblobConfig {
endpoint: String::default(),
sas_token: Option::default(),
cache: ObjectStorageCacheConfig::default(),
http_client: HttpClientConfig::default(),
}
}
}
@@ -281,6 +338,7 @@ impl Default for GcsConfig {
credential: SecretString::from(String::default()),
endpoint: String::default(),
cache: ObjectStorageCacheConfig::default(),
http_client: HttpClientConfig::default(),
}
}
}
@@ -406,6 +464,20 @@ mod tests {
assert_eq!("S3", s3_config.provider_name());
}
#[test]
fn test_is_object_storage() {
let store = ObjectStoreConfig::default();
assert!(!store.is_object_storage());
let s3_config = ObjectStoreConfig::S3(S3Config::default());
assert!(s3_config.is_object_storage());
let oss_config = ObjectStoreConfig::Oss(OssConfig::default());
assert!(oss_config.is_object_storage());
let gcs_config = ObjectStoreConfig::Gcs(GcsConfig::default());
assert!(gcs_config.is_object_storage());
let azblob_config = ObjectStoreConfig::Azblob(AzblobConfig::default());
assert!(azblob_config.is_object_storage());
}
#[test]
fn test_secstr() {
let toml_str = r#"

View File

@@ -22,6 +22,7 @@ use catalog::memory::MemoryCatalogManager;
use common_base::Plugins;
use common_error::ext::BoxedError;
use common_greptimedb_telemetry::GreptimeDBTelemetryTask;
use common_meta::cache::{LayeredCacheRegistry, SchemaCacheRef, TableSchemaCacheRef};
use common_meta::key::datanode_table::{DatanodeTableManager, DatanodeTableValue};
use common_meta::key::{SchemaMetadataManager, SchemaMetadataManagerRef};
use common_meta::kv_backend::KvBackendRef;
@@ -56,9 +57,9 @@ use tokio::sync::Notify;
use crate::config::{DatanodeOptions, RegionEngineConfig, StorageConfig};
use crate::error::{
self, BuildMitoEngineSnafu, CreateDirSnafu, GetMetadataSnafu, MissingKvBackendSnafu,
MissingNodeIdSnafu, OpenLogStoreSnafu, Result, ShutdownInstanceSnafu, ShutdownServerSnafu,
StartServerSnafu,
self, BuildMitoEngineSnafu, CreateDirSnafu, GetMetadataSnafu, MissingCacheSnafu,
MissingKvBackendSnafu, MissingNodeIdSnafu, OpenLogStoreSnafu, Result, ShutdownInstanceSnafu,
ShutdownServerSnafu, StartServerSnafu,
};
use crate::event_listener::{
new_region_server_event_channel, NoopRegionServerEventListener, RegionServerEventListenerRef,
@@ -159,6 +160,7 @@ pub struct DatanodeBuilder {
plugins: Plugins,
meta_client: Option<MetaClientRef>,
kv_backend: Option<KvBackendRef>,
cache_registry: Option<Arc<LayeredCacheRegistry>>,
}
impl DatanodeBuilder {
@@ -170,6 +172,7 @@ impl DatanodeBuilder {
plugins,
meta_client: None,
kv_backend: None,
cache_registry: None,
}
}
@@ -180,6 +183,13 @@ impl DatanodeBuilder {
}
}
pub fn with_cache_registry(self, cache_registry: Arc<LayeredCacheRegistry>) -> Self {
Self {
cache_registry: Some(cache_registry),
..self
}
}
pub fn with_kv_backend(self, kv_backend: KvBackendRef) -> Self {
Self {
kv_backend: Some(kv_backend),
@@ -208,7 +218,16 @@ impl DatanodeBuilder {
(Box::new(NoopRegionServerEventListener) as _, None)
};
let schema_metadata_manager = Arc::new(SchemaMetadataManager::new(kv_backend.clone()));
let cache_registry = self.cache_registry.take().context(MissingCacheSnafu)?;
let schema_cache: SchemaCacheRef = cache_registry.get().context(MissingCacheSnafu)?;
let table_id_schema_cache: TableSchemaCacheRef =
cache_registry.get().context(MissingCacheSnafu)?;
let schema_metadata_manager = Arc::new(SchemaMetadataManager::new(
kv_backend.clone(),
table_id_schema_cache,
schema_cache,
));
let region_server = self
.new_region_server(schema_metadata_manager, region_event_listener)
.await?;
@@ -239,7 +258,15 @@ impl DatanodeBuilder {
}
let heartbeat_task = if let Some(meta_client) = meta_client {
Some(HeartbeatTask::try_new(&self.opts, region_server.clone(), meta_client).await?)
Some(
HeartbeatTask::try_new(
&self.opts,
region_server.clone(),
meta_client,
cache_registry,
)
.await?,
)
} else {
None
};
@@ -401,10 +428,16 @@ impl DatanodeBuilder {
async fn build_mito_engine(
opts: &DatanodeOptions,
object_store_manager: ObjectStoreManagerRef,
config: MitoConfig,
mut config: MitoConfig,
schema_metadata_manager: SchemaMetadataManagerRef,
plugins: Plugins,
) -> Result<MitoEngine> {
if opts.storage.is_object_storage() {
// Enable the write cache when setting object storage
config.enable_experimental_write_cache = true;
info!("Configured 'enable_experimental_write_cache=true' for mito engine.");
}
let mito_engine = match &opts.wal {
DatanodeWalConfig::RaftEngine(raft_engine_config) => MitoEngine::new(
&opts.storage.data_home,
@@ -579,7 +612,9 @@ mod tests {
use std::collections::{BTreeMap, HashMap};
use std::sync::Arc;
use cache::build_datanode_cache_registry;
use common_base::Plugins;
use common_meta::cache::LayeredCacheRegistryBuilder;
use common_meta::key::datanode_table::DatanodeTableManager;
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::kv_backend::KvBackendRef;
@@ -616,13 +651,21 @@ mod tests {
mock_region_server.register_engine(mock_region.clone());
let kv_backend = Arc::new(MemoryKvBackend::new());
let layered_cache_registry = Arc::new(
LayeredCacheRegistryBuilder::default()
.add_cache_registry(build_datanode_cache_registry(kv_backend))
.build(),
);
let builder = DatanodeBuilder::new(
DatanodeOptions {
node_id: Some(0),
..Default::default()
},
Plugins::default(),
);
)
.with_cache_registry(layered_cache_registry);
let kv = Arc::new(MemoryKvBackend::default()) as _;
setup_table_datanode(&kv).await;

View File

@@ -364,6 +364,12 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Cache not found in registry"))]
MissingCache {
#[snafu(implicit)]
location: Location,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -431,6 +437,7 @@ impl ErrorExt for Error {
ConcurrentQueryLimiterClosed { .. } | ConcurrentQueryLimiterTimeout { .. } => {
StatusCode::RegionBusy
}
MissingCache { .. } => StatusCode::Internal,
}
}

View File

@@ -18,8 +18,10 @@ use std::sync::Arc;
use std::time::Duration;
use api::v1::meta::{HeartbeatRequest, NodeInfo, Peer, RegionRole, RegionStat};
use common_meta::cache_invalidator::CacheInvalidatorRef;
use common_meta::datanode::REGION_STATISTIC_KEY;
use common_meta::distributed_time_constants::META_KEEP_ALIVE_INTERVAL_SECS;
use common_meta::heartbeat::handler::invalidate_table_cache::InvalidateCacheHandler;
use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
use common_meta::heartbeat::handler::{
HandlerGroupExecutor, HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutorRef,
@@ -70,6 +72,7 @@ impl HeartbeatTask {
opts: &DatanodeOptions,
region_server: RegionServer,
meta_client: MetaClientRef,
cache_invalidator: CacheInvalidatorRef,
) -> Result<Self> {
let region_alive_keeper = Arc::new(RegionAliveKeeper::new(
region_server.clone(),
@@ -79,6 +82,7 @@ impl HeartbeatTask {
region_alive_keeper.clone(),
Arc::new(ParseMailboxMessageHandler),
Arc::new(RegionHeartbeatResponseHandler::new(region_server.clone())),
Arc::new(InvalidateCacheHandler::new(cache_invalidator)),
]));
Ok(Self {

View File

@@ -134,7 +134,7 @@ impl HeartbeatResponseHandler for RegionHeartbeatResponseHandler {
}
});
Ok(HandleControl::Done)
Ok(HandleControl::Continue)
}
}
@@ -285,7 +285,7 @@ mod tests {
let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
assert_matches!(control, HandleControl::Done);
assert_matches!(control, HandleControl::Continue);
let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
@@ -340,7 +340,7 @@ mod tests {
let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
assert_matches!(control, HandleControl::Done);
assert_matches!(control, HandleControl::Continue);
let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
@@ -373,7 +373,7 @@ mod tests {
let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
assert_matches!(control, HandleControl::Done);
assert_matches!(control, HandleControl::Continue);
let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
@@ -420,7 +420,7 @@ mod tests {
let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
assert_matches!(control, HandleControl::Done);
assert_matches!(control, HandleControl::Continue);
let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
@@ -442,7 +442,7 @@ mod tests {
});
let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
assert_matches!(control, HandleControl::Done);
assert_matches!(control, HandleControl::Continue);
let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();

View File

@@ -13,6 +13,7 @@
// limitations under the License.
#![feature(assert_matches)]
#![feature(let_chains)]
pub mod alive_keeper;
pub mod config;

View File

@@ -19,21 +19,20 @@ mod fs;
mod gcs;
mod oss;
mod s3;
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
use std::{env, path};
use common_base::readable_size::ReadableSize;
use common_telemetry::{info, warn};
use object_store::layers::{LruCacheLayer, RetryInterceptor, RetryLayer};
use object_store::services::Fs;
use object_store::util::{join_dir, normalize_dir, with_instrument_layers};
use object_store::{Access, Error, HttpClient, ObjectStore, ObjectStoreBuilder};
use object_store::{Access, Error, HttpClient, ObjectStore, ObjectStoreBuilder, OBJECT_CACHE_DIR};
use snafu::prelude::*;
use crate::config::{ObjectStoreConfig, DEFAULT_OBJECT_STORE_CACHE_SIZE};
use crate::error::{self, Result};
use crate::config::{HttpClientConfig, ObjectStoreConfig, DEFAULT_OBJECT_STORE_CACHE_SIZE};
use crate::error::{self, CreateDirSnafu, Result};
pub(crate) async fn new_raw_object_store(
store: &ObjectStoreConfig,
@@ -68,7 +67,7 @@ pub(crate) async fn new_object_store_without_cache(
) -> Result<ObjectStore> {
let object_store = new_raw_object_store(store, data_home).await?;
// Enable retry layer and cache layer for non-fs object storages
let object_store = if !matches!(store, ObjectStoreConfig::File(..)) {
let object_store = if store.is_object_storage() {
// Adds retry layer
with_retry_layers(object_store)
} else {
@@ -85,8 +84,8 @@ pub(crate) async fn new_object_store(
) -> Result<ObjectStore> {
let object_store = new_raw_object_store(&store, data_home).await?;
// Enable retry layer and cache layer for non-fs object storages
let object_store = if !matches!(store, ObjectStoreConfig::File(..)) {
let object_store = if let Some(cache_layer) = build_cache_layer(&store).await? {
let object_store = if store.is_object_storage() {
let object_store = if let Some(cache_layer) = build_cache_layer(&store, data_home).await? {
// Adds cache layer
object_store.layer(cache_layer)
} else {
@@ -105,44 +104,72 @@ pub(crate) async fn new_object_store(
async fn build_cache_layer(
store_config: &ObjectStoreConfig,
data_home: &str,
) -> Result<Option<LruCacheLayer<impl Access>>> {
let (cache_path, cache_capacity) = match store_config {
let (name, mut cache_path, cache_capacity) = match store_config {
ObjectStoreConfig::S3(s3_config) => {
let path = s3_config.cache.cache_path.as_ref();
let path = s3_config.cache.cache_path.clone();
let name = &s3_config.name;
let capacity = s3_config
.cache
.cache_capacity
.unwrap_or(DEFAULT_OBJECT_STORE_CACHE_SIZE);
(path, capacity)
(name, path, capacity)
}
ObjectStoreConfig::Oss(oss_config) => {
let path = oss_config.cache.cache_path.as_ref();
let path = oss_config.cache.cache_path.clone();
let name = &oss_config.name;
let capacity = oss_config
.cache
.cache_capacity
.unwrap_or(DEFAULT_OBJECT_STORE_CACHE_SIZE);
(path, capacity)
(name, path, capacity)
}
ObjectStoreConfig::Azblob(azblob_config) => {
let path = azblob_config.cache.cache_path.as_ref();
let path = azblob_config.cache.cache_path.clone();
let name = &azblob_config.name;
let capacity = azblob_config
.cache
.cache_capacity
.unwrap_or(DEFAULT_OBJECT_STORE_CACHE_SIZE);
(path, capacity)
(name, path, capacity)
}
ObjectStoreConfig::Gcs(gcs_config) => {
let path = gcs_config.cache.cache_path.as_ref();
let path = gcs_config.cache.cache_path.clone();
let name = &gcs_config.name;
let capacity = gcs_config
.cache
.cache_capacity
.unwrap_or(DEFAULT_OBJECT_STORE_CACHE_SIZE);
(path, capacity)
(name, path, capacity)
}
_ => (None, ReadableSize(0)),
_ => unreachable!("Already checked above"),
};
if let Some(path) = cache_path {
// Enable object cache by default
// Set the cache_path to be `${data_home}/object_cache/read/{name}` by default
// if it's not present
if cache_path.is_none() {
let object_cache_path = join_dir(data_home, OBJECT_CACHE_DIR);
let read_cache_path = join_dir(&object_cache_path, "read");
let read_cache_path = join_dir(&read_cache_path, &name.to_lowercase());
tokio::fs::create_dir_all(Path::new(&read_cache_path))
.await
.context(CreateDirSnafu {
dir: &read_cache_path,
})?;
info!(
"The object storage cache path is not set for '{}', using the default path: '{}'",
name, &read_cache_path
);
cache_path = Some(read_cache_path);
}
if let Some(path) = cache_path.as_ref()
&& !path.trim().is_empty()
{
let atomic_temp_dir = join_dir(path, ".tmp/");
clean_temp_dir(&atomic_temp_dir)?;
@@ -153,9 +180,8 @@ async fn build_cache_layer(
.context(error::InitBackendSnafu)?;
let cache_layer = LruCacheLayer::new(Arc::new(cache_store), cache_capacity.0 as usize)
.await
.context(error::InitBackendSnafu)?;
cache_layer.recover_cache(false).await;
info!(
"Enabled local object storage cache, path: {}, capacity: {}.",
path, cache_capacity
@@ -177,7 +203,7 @@ pub(crate) fn clean_temp_dir(dir: &str) -> Result<()> {
Ok(())
}
pub(crate) fn build_http_client() -> Result<HttpClient> {
pub(crate) fn build_http_client(config: &HttpClientConfig) -> Result<HttpClient> {
let http_builder = {
let mut builder = reqwest::ClientBuilder::new();
@@ -186,25 +212,28 @@ pub(crate) fn build_http_client() -> Result<HttpClient> {
let pool_max_idle_per_host = env::var("_GREPTIMEDB_HTTP_POOL_MAX_IDLE_PER_HOST")
.ok()
.and_then(|v| v.parse::<usize>().ok())
.unwrap_or(usize::MAX);
.inspect(|_| warn!("'_GREPTIMEDB_HTTP_POOL_MAX_IDLE_PER_HOST' might be deprecated in the future. Please set it in the config file instead."))
.unwrap_or(config.pool_max_idle_per_host as usize);
builder = builder.pool_max_idle_per_host(pool_max_idle_per_host);
// Connect timeout default to 30s.
let connect_timeout = env::var("_GREPTIMEDB_HTTP_CONNECT_TIMEOUT")
.ok()
.and_then(|v| v.parse::<u64>().ok())
.unwrap_or(30);
builder = builder.connect_timeout(Duration::from_secs(connect_timeout));
.and_then(|v| v.parse::<u64>().ok().map(Duration::from_secs))
.inspect(|_| warn!("'_GREPTIMEDB_HTTP_CONNECT_TIMEOUT' might be deprecated in the future. Please set it in the config file instead."))
.unwrap_or(config.connect_timeout);
builder = builder.connect_timeout(connect_timeout);
// Pool connection idle timeout default to 90s.
let idle_timeout = env::var("_GREPTIMEDB_HTTP_POOL_IDLE_TIMEOUT")
.ok()
.and_then(|v| v.parse::<u64>().ok())
.unwrap_or(90);
.and_then(|v| v.parse::<u64>().ok().map(Duration::from_secs))
.inspect(|_| warn!("'_GREPTIMEDB_HTTP_POOL_IDLE_TIMEOUT' might be deprecated in the future. Please set it in the config file instead."))
.unwrap_or(config.pool_idle_timeout);
builder = builder.pool_idle_timeout(Duration::from_secs(idle_timeout));
builder = builder.pool_idle_timeout(idle_timeout);
builder
builder.timeout(config.timeout)
};
HttpClient::build(http_builder).context(error::InitBackendSnafu)

View File

@@ -30,13 +30,15 @@ pub(crate) async fn new_azblob_object_store(azblob_config: &AzblobConfig) -> Res
azblob_config.container, &root
);
let client = build_http_client(&azblob_config.http_client)?;
let mut builder = Azblob::default()
.root(&root)
.container(&azblob_config.container)
.endpoint(&azblob_config.endpoint)
.account_name(azblob_config.account_name.expose_secret())
.account_key(azblob_config.account_key.expose_secret())
.http_client(build_http_client()?);
.http_client(client);
if let Some(token) = &azblob_config.sas_token {
builder = builder.sas_token(token);

View File

@@ -29,6 +29,8 @@ pub(crate) async fn new_gcs_object_store(gcs_config: &GcsConfig) -> Result<Objec
gcs_config.bucket, &root
);
let client = build_http_client(&gcs_config.http_client);
let builder = Gcs::default()
.root(&root)
.bucket(&gcs_config.bucket)
@@ -36,7 +38,7 @@ pub(crate) async fn new_gcs_object_store(gcs_config: &GcsConfig) -> Result<Objec
.credential_path(gcs_config.credential_path.expose_secret())
.credential(gcs_config.credential.expose_secret())
.endpoint(&gcs_config.endpoint)
.http_client(build_http_client()?);
.http_client(client?);
Ok(ObjectStore::new(builder)
.context(error::InitBackendSnafu)?

View File

@@ -29,13 +29,15 @@ pub(crate) async fn new_oss_object_store(oss_config: &OssConfig) -> Result<Objec
oss_config.bucket, &root
);
let client = build_http_client(&oss_config.http_client)?;
let builder = Oss::default()
.root(&root)
.bucket(&oss_config.bucket)
.endpoint(&oss_config.endpoint)
.access_key_id(oss_config.access_key_id.expose_secret())
.access_key_secret(oss_config.access_key_secret.expose_secret())
.http_client(build_http_client()?);
.http_client(client);
Ok(ObjectStore::new(builder)
.context(error::InitBackendSnafu)?

View File

@@ -30,12 +30,14 @@ pub(crate) async fn new_s3_object_store(s3_config: &S3Config) -> Result<ObjectSt
s3_config.bucket, &root
);
let client = build_http_client(&s3_config.http_client)?;
let mut builder = S3::default()
.root(&root)
.bucket(&s3_config.bucket)
.access_key_id(s3_config.access_key_id.expose_secret())
.secret_access_key(s3_config.secret_access_key.expose_secret())
.http_client(build_http_client()?);
.http_client(client);
if s3_config.endpoint.is_some() {
builder = builder.endpoint(s3_config.endpoint.as_ref().unwrap());

View File

@@ -23,6 +23,7 @@ use common_error::ext::BoxedError;
use common_telemetry::debug;
use datafusion::config::ConfigOptions;
use datafusion::error::DataFusionError;
use datafusion::optimizer::analyzer::count_wildcard_rule::CountWildcardRule;
use datafusion::optimizer::analyzer::type_coercion::TypeCoercion;
use datafusion::optimizer::common_subexpr_eliminate::CommonSubexprEliminate;
use datafusion::optimizer::optimize_projections::OptimizeProjections;
@@ -59,6 +60,7 @@ pub async fn apply_df_optimizer(
) -> Result<datafusion_expr::LogicalPlan, Error> {
let cfg = ConfigOptions::new();
let analyzer = Analyzer::with_rules(vec![
Arc::new(CountWildcardRule::new()),
Arc::new(AvgExpandRule::new()),
Arc::new(TumbleExpandRule::new()),
Arc::new(CheckGroupByRule::new()),

View File

@@ -12,6 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#[cfg(test)]
mod tests;
use std::sync::Arc;
use api::v1::meta::{HeartbeatRequest, NodeInfo, Peer};
@@ -34,8 +37,6 @@ use crate::error::Result;
use crate::frontend::FrontendOptions;
use crate::metrics::{HEARTBEAT_RECV_COUNT, HEARTBEAT_SENT_COUNT};
pub mod handler;
/// The frontend heartbeat task which sending `[HeartbeatRequest]` to Metasrv periodically in background.
#[derive(Clone)]
pub struct HeartbeatTask {

View File

@@ -17,6 +17,7 @@ use std::sync::{Arc, Mutex};
use api::v1::meta::HeartbeatResponse;
use common_meta::cache_invalidator::KvCacheInvalidator;
use common_meta::heartbeat::handler::invalidate_table_cache::InvalidateCacheHandler;
use common_meta::heartbeat::handler::{
HandlerGroupExecutor, HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutor,
};
@@ -29,8 +30,6 @@ use partition::manager::TableRouteCacheInvalidator;
use table::metadata::TableId;
use tokio::sync::mpsc;
use super::invalidate_table_cache::InvalidateTableCacheHandler;
#[derive(Default)]
pub struct MockKvCacheInvalidator {
inner: Mutex<HashMap<Vec<u8>, i32>>,
@@ -85,7 +84,7 @@ async fn test_invalidate_table_cache_handler() {
});
let executor = Arc::new(HandlerGroupExecutor::new(vec![Arc::new(
InvalidateTableCacheHandler::new(backend.clone()),
InvalidateCacheHandler::new(backend.clone()),
)]));
let (tx, _) = mpsc::channel(8);
@@ -124,7 +123,7 @@ async fn test_invalidate_schema_key_handler() {
});
let executor = Arc::new(HandlerGroupExecutor::new(vec![Arc::new(
InvalidateTableCacheHandler::new(backend.clone()),
InvalidateCacheHandler::new(backend.clone()),
)]));
let (tx, _) = mpsc::channel(8);

View File

@@ -64,8 +64,8 @@ use servers::prometheus_handler::PrometheusHandler;
use servers::query_handler::grpc::GrpcQueryHandler;
use servers::query_handler::sql::SqlQueryHandler;
use servers::query_handler::{
InfluxdbLineProtocolHandler, LogHandler, OpenTelemetryProtocolHandler, OpentsdbProtocolHandler,
PromStoreProtocolHandler, ScriptHandler,
InfluxdbLineProtocolHandler, OpenTelemetryProtocolHandler, OpentsdbProtocolHandler,
PipelineHandler, PromStoreProtocolHandler, ScriptHandler,
};
use servers::server::ServerHandlers;
use session::context::QueryContextRef;
@@ -98,7 +98,7 @@ pub trait FrontendInstance:
+ OpenTelemetryProtocolHandler
+ ScriptHandler
+ PrometheusHandler
+ LogHandler
+ PipelineHandler
+ Send
+ Sync
+ 'static
@@ -487,7 +487,11 @@ pub fn check_permission(
// TODO(dennis): add a hook for admin commands.
Statement::Admin(_) => {}
// These are executed by query engine, and will be checked there.
Statement::Query(_) | Statement::Explain(_) | Statement::Tql(_) | Statement::Delete(_) => {}
Statement::Query(_)
| Statement::Explain(_)
| Statement::Tql(_)
| Statement::Delete(_)
| Statement::DeclareCursor(_) => {}
// database ops won't be checked
Statement::CreateDatabase(_)
| Statement::ShowDatabases(_)
@@ -580,6 +584,8 @@ pub fn check_permission(
Statement::TruncateTable(stmt) => {
validate_param(stmt.table_name(), query_ctx)?;
}
// cursor operations are always allowed once it's created
Statement::FetchCursor(_) | Statement::CloseCursor(_) => {}
}
Ok(())
}

View File

@@ -24,19 +24,15 @@ use servers::error::{
AuthSnafu, Error as ServerError, ExecuteGrpcRequestSnafu, PipelineSnafu, Result as ServerResult,
};
use servers::interceptor::{LogIngestInterceptor, LogIngestInterceptorRef};
use servers::query_handler::LogHandler;
use servers::query_handler::PipelineHandler;
use session::context::QueryContextRef;
use snafu::ResultExt;
use crate::instance::Instance;
#[async_trait]
impl LogHandler for Instance {
async fn insert_logs(
&self,
log: RowInsertRequests,
ctx: QueryContextRef,
) -> ServerResult<Output> {
impl PipelineHandler for Instance {
async fn insert(&self, log: RowInsertRequests, ctx: QueryContextRef) -> ServerResult<Output> {
self.plugins
.get::<PermissionCheckerRef>()
.as_ref()

15
src/log-query/Cargo.toml Normal file
View File

@@ -0,0 +1,15 @@
[package]
name = "log-query"
version.workspace = true
edition.workspace = true
license.workspace = true
[lints]
workspace = true
[dependencies]
chrono.workspace = true
common-error.workspace = true
common-macro.workspace = true
snafu.workspace = true
table.workspace = true

View File

@@ -0,0 +1,46 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use common_error::ext::ErrorExt;
use common_macro::stack_trace_debug;
use snafu::Snafu;
use crate::TimeFilter;
#[derive(Snafu)]
#[snafu(visibility(pub))]
#[stack_trace_debug]
pub enum Error {
#[snafu(display("Invalid time filter: {filter:?}"))]
InvalidTimeFilter { filter: TimeFilter },
#[snafu(display("Invalid date format: {input}"))]
InvalidDateFormat { input: String },
#[snafu(display("Invalid span format: {input}"))]
InvalidSpanFormat { input: String },
#[snafu(display("End time {end} is before start time {start}"))]
EndBeforeStart { start: String, end: String },
}
impl ErrorExt for Error {
fn as_any(&self) -> &dyn Any {
self
}
}
pub type Result<T> = std::result::Result<T, Error>;

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod invalidate_table_cache;
pub mod error;
mod log_query;
#[cfg(test)]
pub(crate) mod tests;
pub use log_query::*;

View File

@@ -0,0 +1,322 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use chrono::{DateTime, Datelike, Duration, NaiveDate, NaiveTime, TimeZone, Utc};
use table::table_name::TableName;
use crate::error::{
EndBeforeStartSnafu, InvalidDateFormatSnafu, InvalidSpanFormatSnafu, InvalidTimeFilterSnafu,
Result,
};
/// GreptimeDB's log query request.
pub struct LogQuery {
/// A fully qualified table name to query logs from.
pub table_name: TableName,
/// Specifies the time range for the log query. See [`TimeFilter`] for more details.
pub time_filter: TimeFilter,
/// Columns with filters to query.
pub columns: Vec<ColumnFilters>,
/// Maximum number of logs to return. If not provided, it will return all matched logs.
pub limit: Option<usize>,
/// Adjacent lines to return.
pub context: Context,
}
/// Represents a time range for log query.
///
/// This struct allows various formats to express a time range from the user side
/// for best flexibility:
/// - Only `start` is provided: the `start` string can be any valid "date" or vaguer
/// content. For example: "2024-12-01", "2024-12", "2024", etc. It will be treated
/// as an time range corresponding to the provided date. E.g., "2024-12-01" refers
/// to the entire 24 hours in that day. In this case, the `start` field cannot be a
/// timestamp (like "2024-12-01T12:00:00Z").
/// - Both `start` and `end` are provided: the `start` and `end` strings can be either
/// a date or a timestamp. The `end` field is exclusive (`[start, end)`). When
/// `start` is a date it implies the start of the day, and when `end` is a date it
/// implies the end of the day.
/// - `span` with `start` OR `end`: the `span` string can be any valid "interval"
/// For example: "1024s", "1 week", "1 month", etc. The `span` field is applied to
/// the `start` or `end` field to calculate the other one correspondingly. If `start`
/// is provided, `end` is calculated as `start + span` and vice versa.
/// - Only `span` is provided: the `span` string can be any valid "interval" as mentioned
/// above. In this case, the current time (on the server side) is considered as the `end`.
/// - All fields are provided: in this case, the `start` and `end` fields are considered
/// with higher priority, and the `span` field is ignored.
///
/// This struct doesn't require a timezone to be presented. When the timezone is not
/// provided, it will fill the default timezone with the same rules akin to other queries.
#[derive(Debug, Clone)]
pub struct TimeFilter {
pub start: Option<String>,
pub end: Option<String>,
pub span: Option<String>,
}
impl TimeFilter {
/// Validate and canonicalize the time filter.
///
/// This function will try to fill the missing fields and convert all dates to timestamps
// false positive
#[allow(unused_assignments)]
pub fn canonicalize(&mut self) -> Result<()> {
let mut start_dt = None;
let mut end_dt = None;
if self.start.is_some() && self.end.is_none() && self.span.is_none() {
// Only 'start' is provided
let s = self.start.as_ref().unwrap();
let (start, end_opt) = Self::parse_datetime(s)?;
if end_opt.is_none() {
return Err(InvalidTimeFilterSnafu {
filter: self.clone(),
}
.build());
}
start_dt = Some(start);
end_dt = end_opt;
} else if self.start.is_some() && self.end.is_some() {
// Both 'start' and 'end' are provided
let (start, _) = Self::parse_datetime(self.start.as_ref().unwrap())?;
let (end, _) = Self::parse_datetime(self.end.as_ref().unwrap())?;
start_dt = Some(start);
end_dt = Some(end);
} else if self.span.is_some() && (self.start.is_some() || self.end.is_some()) {
// 'span' with 'start' or 'end'
let span = Self::parse_span(self.span.as_ref().unwrap())?;
if self.start.is_some() {
let (start, _) = Self::parse_datetime(self.start.as_ref().unwrap())?;
let end = start + span;
start_dt = Some(start);
end_dt = Some(end);
} else {
let (end, _) = Self::parse_datetime(self.end.as_ref().unwrap())?;
let start = end - span;
start_dt = Some(start);
end_dt = Some(end);
}
} else if self.span.is_some() && self.start.is_none() && self.end.is_none() {
// Only 'span' is provided
let span = Self::parse_span(self.span.as_ref().unwrap())?;
let end = Utc::now();
let start = end - span;
start_dt = Some(start);
end_dt = Some(end);
} else if self.start.is_some() && self.span.is_some() && self.end.is_some() {
// All fields are provided; 'start' and 'end' take priority
let (start, _) = Self::parse_datetime(self.start.as_ref().unwrap())?;
let (end, _) = Self::parse_datetime(self.end.as_ref().unwrap())?;
start_dt = Some(start);
end_dt = Some(end);
} else {
// Exception
return Err(InvalidTimeFilterSnafu {
filter: self.clone(),
}
.build());
}
// Validate that end is after start
if let (Some(start), Some(end)) = (&start_dt, &end_dt) {
if end <= start {
return Err(EndBeforeStartSnafu {
start: start.to_rfc3339(),
end: end.to_rfc3339(),
}
.build());
}
}
// Update the fields with canonicalized timestamps
if let Some(start) = start_dt {
self.start = Some(start.to_rfc3339());
}
if let Some(end) = end_dt {
self.end = Some(end.to_rfc3339());
}
Ok(())
}
/// Util function returns a start and optional end DateTime
fn parse_datetime(s: &str) -> Result<(DateTime<Utc>, Option<DateTime<Utc>>)> {
if let Ok(dt) = DateTime::parse_from_rfc3339(s) {
Ok((dt.with_timezone(&Utc), None))
} else {
let formats = ["%Y-%m-%d", "%Y-%m", "%Y"];
for format in &formats {
if let Ok(naive_date) = NaiveDate::parse_from_str(s, format) {
let start = Utc.from_utc_datetime(
&naive_date.and_time(NaiveTime::from_hms_opt(0, 0, 0).unwrap()),
);
let end = match *format {
"%Y-%m-%d" => start + Duration::days(1),
"%Y-%m" => {
let next_month = if naive_date.month() == 12 {
NaiveDate::from_ymd_opt(naive_date.year() + 1, 1, 1).unwrap()
} else {
NaiveDate::from_ymd_opt(
naive_date.year(),
naive_date.month() + 1,
1,
)
.unwrap()
};
Utc.from_utc_datetime(&next_month.and_hms_opt(0, 0, 0).unwrap())
}
"%Y" => {
let next_year =
NaiveDate::from_ymd_opt(naive_date.year() + 1, 1, 1).unwrap();
Utc.from_utc_datetime(&next_year.and_hms_opt(0, 0, 0).unwrap())
}
_ => unreachable!(),
};
return Ok((start, Some(end)));
}
}
Err(InvalidDateFormatSnafu {
input: s.to_string(),
}
.build())
}
}
/// Util function handles durations like "1 week", "1 month", etc (unimplemented).
fn parse_span(s: &str) -> Result<Duration> {
// Simplified parsing logic
if let Ok(seconds) = s.parse::<i64>() {
Ok(Duration::seconds(seconds))
} else {
Err(InvalidSpanFormatSnafu {
input: s.to_string(),
}
.build())
}
}
}
/// Represents a column with filters to query.
pub struct ColumnFilters {
/// Case-sensitive column name to query.
pub column_name: String,
/// Filters to apply to the column. Can be empty.
pub filters: Vec<ContentFilter>,
}
pub enum ContentFilter {
/// Only match the exact content.
///
/// For example, if the content is "pale blue dot", the filter "pale" or "pale blue" will match.
Exact(String),
/// Match the content with a prefix.
///
/// For example, if the content is "error message", the filter "err" or "error mess" will match.
Prefix(String),
/// Match the content with a postfix. Similar to `Prefix`.
Postfix(String),
/// Match the content with a substring.
Contains(String),
/// Match the content with a regex pattern. The pattern should be a valid Rust regex.
Regex(String),
Compound(Vec<ContentFilter>, BinaryOperator),
}
pub enum BinaryOperator {
And,
Or,
}
/// Controls how many adjacent lines to return.
pub enum Context {
None,
/// Specify the number of lines before and after the matched line separately.
Lines(usize, usize),
/// Specify the number of seconds before and after the matched line occurred.
Seconds(usize, usize),
}
#[cfg(test)]
mod tests {
use super::*;
use crate::error::Error;
#[test]
fn test_canonicalize() {
// with 'start' only
let mut tf = TimeFilter {
start: Some("2023-10-01".to_string()),
end: None,
span: None,
};
tf.canonicalize().unwrap();
assert!(tf.end.is_some());
// with 'start' and 'span'
let mut tf = TimeFilter {
start: Some("2023-10-01T00:00:00Z".to_string()),
end: None,
span: Some("86400".to_string()), // 1 day in seconds
};
tf.canonicalize().unwrap();
assert_eq!(tf.end.as_ref().unwrap(), "2023-10-02T00:00:00+00:00");
// with 'end' and 'span'
let mut tf = TimeFilter {
start: None,
end: Some("2023-10-02T00:00:00Z".to_string()),
span: Some("86400".to_string()), // 1 day in seconds
};
tf.canonicalize().unwrap();
assert_eq!(tf.start.as_ref().unwrap(), "2023-10-01T00:00:00+00:00");
// with both 'start' and 'end'
let mut tf = TimeFilter {
start: Some("2023-10-01T00:00:00Z".to_string()),
end: Some("2023-10-02T00:00:00Z".to_string()),
span: None,
};
tf.canonicalize().unwrap();
assert_eq!(tf.start.as_ref().unwrap(), "2023-10-01T00:00:00+00:00");
assert_eq!(tf.end.as_ref().unwrap(), "2023-10-02T00:00:00+00:00");
// with invalid date format
let mut tf = TimeFilter {
start: Some("invalid-date".to_string()),
end: None,
span: None,
};
let result = tf.canonicalize();
assert!(matches!(result, Err(Error::InvalidDateFormat { .. })));
// with missing 'start' and 'end'
let mut tf = TimeFilter {
start: None,
end: None,
span: None,
};
let result = tf.canonicalize();
assert!(matches!(result, Err(Error::InvalidTimeFilter { .. })));
// 'end' is before 'start'
let mut tf = TimeFilter {
start: Some("2023-10-02T00:00:00Z".to_string()),
end: Some("2023-10-01T00:00:00Z".to_string()),
span: None,
};
let result = tf.canonicalize();
assert!(matches!(result, Err(Error::EndBeforeStart { .. })));
}
}

View File

@@ -543,24 +543,29 @@ impl MetaClient {
#[cfg(test)]
mod tests {
use api::v1::meta::{HeartbeatRequest, Peer};
use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
use rand::Rng;
use super::*;
use crate::{error, mocks};
use crate::error;
use crate::mocks::{self, MockMetaContext};
const TEST_KEY_PREFIX: &str = "__unit_test__meta__";
struct TestClient {
ns: String,
client: MetaClient,
meta_ctx: MockMetaContext,
}
impl TestClient {
async fn new(ns: impl Into<String>) -> Self {
// can also test with etcd: mocks::mock_client_with_etcdstore("127.0.0.1:2379").await;
let client = mocks::mock_client_with_memstore().await;
let (client, meta_ctx) = mocks::mock_client_with_memstore().await;
Self {
ns: ns.into(),
client,
meta_ctx,
}
}
@@ -585,6 +590,15 @@ mod tests {
let res = self.client.delete_range(req).await;
let _ = res.unwrap();
}
#[allow(dead_code)]
fn kv_backend(&self) -> KvBackendRef {
self.meta_ctx.kv_backend.clone()
}
fn in_memory(&self) -> Option<ResettableKvBackendRef> {
self.meta_ctx.in_memory.clone()
}
}
async fn new_client(ns: impl Into<String>) -> TestClient {
@@ -940,4 +954,37 @@ mod tests {
);
}
}
fn mock_decoder(_kv: KeyValue) -> MetaResult<()> {
Ok(())
}
#[tokio::test]
async fn test_cluster_client_adaptive_range() {
let tx = new_client("test_cluster_client").await;
let in_memory = tx.in_memory().unwrap();
let cluster_client = tx.client.cluster_client().unwrap();
let mut rng = rand::thread_rng();
// Generates rough 10MB data, which is larger than the default grpc message size limit.
for i in 0..10 {
let data: Vec<u8> = (0..1024 * 1024).map(|_| rng.gen()).collect();
in_memory
.put(
PutRequest::new()
.with_key(format!("__prefix/{i}").as_bytes())
.with_value(data.clone()),
)
.await
.unwrap();
}
let req = RangeRequest::new().with_prefix(b"__prefix/");
let stream =
PaginationStream::new(Arc::new(cluster_client), req, 10, Arc::new(mock_decoder))
.into_stream();
let res = stream.try_collect::<Vec<_>>().await.unwrap();
assert_eq!(10, res.len());
}
}

View File

@@ -21,7 +21,9 @@ use api::v1::meta::cluster_client::ClusterClient;
use api::v1::meta::{MetasrvNodeInfo, MetasrvPeersRequest, ResponseHeader, Role};
use common_error::ext::BoxedError;
use common_grpc::channel_manager::ChannelManager;
use common_meta::error::{Error as MetaError, ExternalSnafu, Result as MetaResult};
use common_meta::error::{
Error as MetaError, ExternalSnafu, ResponseExceededSizeLimitSnafu, Result as MetaResult,
};
use common_meta::kv_backend::{KvBackend, TxnService};
use common_meta::rpc::store::{
BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
@@ -31,6 +33,7 @@ use common_meta::rpc::store::{
use common_telemetry::{info, warn};
use snafu::{ensure, ResultExt};
use tokio::sync::RwLock;
use tonic::codec::CompressionEncoding;
use tonic::transport::Channel;
use tonic::Status;
@@ -102,10 +105,14 @@ impl KvBackend for Client {
}
async fn range(&self, req: RangeRequest) -> MetaResult<RangeResponse> {
self.range(req)
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)
let resp = self.range(req).await;
match resp {
Ok(resp) => Ok(resp),
Err(err) if err.is_exceeded_size_limit() => {
Err(BoxedError::new(err)).context(ResponseExceededSizeLimitSnafu)
}
Err(err) => Err(BoxedError::new(err)).context(ExternalSnafu),
}
}
async fn put(&self, _: PutRequest) -> MetaResult<PutResponse> {
@@ -173,7 +180,10 @@ impl Inner {
fn make_client(&self, addr: impl AsRef<str>) -> Result<ClusterClient<Channel>> {
let channel = self.channel_manager.get(addr).context(CreateChannelSnafu)?;
Ok(ClusterClient::new(channel))
Ok(ClusterClient::new(channel)
.accept_compressed(CompressionEncoding::Gzip)
.accept_compressed(CompressionEncoding::Zstd)
.send_compressed(CompressionEncoding::Zstd))
}
#[inline]

View File

@@ -23,6 +23,7 @@ use common_telemetry::tracing_context::TracingContext;
use snafu::{ensure, OptionExt, ResultExt};
use tokio::sync::{mpsc, RwLock};
use tokio_stream::wrappers::ReceiverStream;
use tonic::codec::CompressionEncoding;
use tonic::transport::Channel;
use tonic::Streaming;
@@ -249,7 +250,10 @@ impl Inner {
.get(addr)
.context(error::CreateChannelSnafu)?;
Ok(HeartbeatClient::new(channel))
Ok(HeartbeatClient::new(channel)
.accept_compressed(CompressionEncoding::Zstd)
.accept_compressed(CompressionEncoding::Gzip)
.send_compressed(CompressionEncoding::Zstd))
}
#[inline]

View File

@@ -27,6 +27,7 @@ use common_telemetry::tracing_context::TracingContext;
use common_telemetry::{info, warn};
use snafu::{ensure, ResultExt};
use tokio::sync::RwLock;
use tonic::codec::CompressionEncoding;
use tonic::transport::Channel;
use tonic::Status;
@@ -141,7 +142,10 @@ impl Inner {
.get(addr)
.context(error::CreateChannelSnafu)?;
Ok(ProcedureServiceClient::new(channel))
Ok(ProcedureServiceClient::new(channel)
.accept_compressed(CompressionEncoding::Gzip)
.accept_compressed(CompressionEncoding::Zstd)
.send_compressed(CompressionEncoding::Zstd))
}
#[inline]

View File

@@ -25,6 +25,7 @@ use common_grpc::channel_manager::ChannelManager;
use common_telemetry::tracing_context::TracingContext;
use snafu::{ensure, OptionExt, ResultExt};
use tokio::sync::RwLock;
use tonic::codec::CompressionEncoding;
use tonic::transport::Channel;
use crate::client::{load_balance as lb, Id};
@@ -236,7 +237,10 @@ impl Inner {
.get(addr)
.context(error::CreateChannelSnafu)?;
Ok(StoreClient::new(channel))
Ok(StoreClient::new(channel)
.accept_compressed(CompressionEncoding::Gzip)
.accept_compressed(CompressionEncoding::Zstd)
.send_compressed(CompressionEncoding::Zstd))
}
#[inline]

View File

@@ -31,7 +31,11 @@ pub enum Error {
},
#[snafu(display("{}", msg))]
MetaServer { code: StatusCode, msg: String },
MetaServer {
code: StatusCode,
msg: String,
tonic_code: tonic::Code,
},
#[snafu(display("No leader, should ask leader first"))]
NoLeader {
@@ -127,6 +131,18 @@ impl ErrorExt for Error {
}
}
impl Error {
pub fn is_exceeded_size_limit(&self) -> bool {
matches!(
self,
Error::MetaServer {
tonic_code: tonic::Code::OutOfRange,
..
}
)
}
}
// FIXME(dennis): partial duplicated with src/client/src/error.rs
impl From<Status> for Error {
fn from(e: Status) -> Self {
@@ -149,6 +165,10 @@ impl From<Status> for Error {
let msg = get_metadata_value(&e, GREPTIME_DB_HEADER_ERROR_MSG)
.unwrap_or_else(|| e.message().to_string());
Self::MetaServer { code, msg }
Self::MetaServer {
code,
msg,
tonic_code: e.code(),
}
}
}

View File

@@ -12,31 +12,57 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use common_grpc::channel_manager::ChannelManager;
use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
use meta_srv::mocks as server_mock;
use meta_srv::mocks::MockInfo;
use crate::client::{MetaClient, MetaClientBuilder};
pub async fn mock_client_with_memstore() -> MetaClient {
let mock_info = server_mock::mock_with_memstore().await;
mock_client_by(mock_info).await
pub struct MockMetaContext {
pub kv_backend: KvBackendRef,
pub in_memory: Option<ResettableKvBackendRef>,
}
#[allow(dead_code)]
pub async fn mock_client_with_etcdstore(addr: &str) -> MetaClient {
let mock_info = server_mock::mock_with_etcdstore(addr).await;
mock_client_by(mock_info).await
}
pub async fn mock_client_by(mock_info: MockInfo) -> MetaClient {
pub async fn mock_client_with_memstore() -> (MetaClient, MockMetaContext) {
let MockInfo {
server_addr,
channel_manager,
kv_backend,
in_memory,
..
} = mock_info;
} = server_mock::mock_with_memstore().await;
(
mock_client_by(server_addr, channel_manager).await,
MockMetaContext {
kv_backend,
in_memory,
},
)
}
#[allow(dead_code)]
pub async fn mock_client_with_etcdstore(addr: &str) -> (MetaClient, MockMetaContext) {
let MockInfo {
server_addr,
channel_manager,
kv_backend,
in_memory,
..
} = server_mock::mock_with_etcdstore(addr).await;
(
mock_client_by(server_addr, channel_manager).await,
MockMetaContext {
kv_backend,
in_memory,
},
)
}
pub async fn mock_client_by(server_addr: String, channel_manager: ChannelManager) -> MetaClient {
let id = (1000u64, 2000u64);
let mut meta_client = MetaClientBuilder::datanode_default_options(id.0, id.1)
.enable_access_cluster_info()
.channel_manager(channel_manager)
.build();
meta_client.start(&[&server_addr]).await.unwrap();

View File

@@ -41,6 +41,7 @@ use tokio::net::TcpListener;
use tokio::sync::mpsc::{self, Receiver, Sender};
#[cfg(feature = "pg_kvbackend")]
use tokio_postgres::NoTls;
use tonic::codec::CompressionEncoding;
use tonic::transport::server::{Router, TcpIncoming};
use crate::election::etcd::EtcdElection;
@@ -178,14 +179,26 @@ pub async fn bootstrap_metasrv_with_router(
Ok(())
}
#[macro_export]
macro_rules! add_compressed_service {
($builder:expr, $server:expr) => {
$builder.add_service(
$server
.accept_compressed(CompressionEncoding::Gzip)
.accept_compressed(CompressionEncoding::Zstd)
.send_compressed(CompressionEncoding::Gzip)
.send_compressed(CompressionEncoding::Zstd),
)
};
}
pub fn router(metasrv: Arc<Metasrv>) -> Router {
tonic::transport::Server::builder()
.accept_http1(true) // for admin services
.add_service(HeartbeatServer::from_arc(metasrv.clone()))
.add_service(StoreServer::from_arc(metasrv.clone()))
.add_service(ClusterServer::from_arc(metasrv.clone()))
.add_service(ProcedureServiceServer::from_arc(metasrv.clone()))
.add_service(admin::make_admin_service(metasrv))
let mut router = tonic::transport::Server::builder().accept_http1(true); // for admin services
let router = add_compressed_service!(router, HeartbeatServer::from_arc(metasrv.clone()));
let router = add_compressed_service!(router, StoreServer::from_arc(metasrv.clone()));
let router = add_compressed_service!(router, ClusterServer::from_arc(metasrv.clone()));
let router = add_compressed_service!(router, ProcedureServiceServer::from_arc(metasrv.clone()));
router.add_service(admin::make_admin_service(metasrv))
}
pub async fn metasrv_builder(

View File

@@ -44,7 +44,7 @@ impl MetasrvCacheInvalidator {
.clone()
.unwrap_or_else(|| DEFAULT_SUBJECT.to_string());
let msg = &MailboxMessage::json_message(
let mut msg = MailboxMessage::json_message(
subject,
&format!("Metasrv@{}", self.info.server_addr),
"Frontend broadcast",
@@ -54,22 +54,21 @@ impl MetasrvCacheInvalidator {
.with_context(|_| meta_error::SerdeJsonSnafu)?;
self.mailbox
.broadcast(&BroadcastChannel::Frontend, msg)
.broadcast(&BroadcastChannel::Frontend, &msg)
.await
.map_err(BoxedError::new)
.context(meta_error::ExternalSnafu)?;
let msg = &MailboxMessage::json_message(
subject,
&format!("Metasrv@{}", self.info.server_addr),
"Flownode broadcast",
common_time::util::current_time_millis(),
&instruction,
)
.with_context(|_| meta_error::SerdeJsonSnafu)?;
msg.to = "Datanode broadcast".to_string();
self.mailbox
.broadcast(&BroadcastChannel::Flownode, msg)
.broadcast(&BroadcastChannel::Datanode, &msg)
.await
.map_err(BoxedError::new)
.context(meta_error::ExternalSnafu)?;
msg.to = "Flownode broadcast".to_string();
self.mailbox
.broadcast(&BroadcastChannel::Flownode, &msg)
.await
.map_err(BoxedError::new)
.context(meta_error::ExternalSnafu)

View File

@@ -38,6 +38,7 @@ pub mod selector;
pub mod service;
pub mod state;
pub mod table_meta_alloc;
pub use crate::error::Result;
mod greptimedb_telemetry;

View File

@@ -179,7 +179,7 @@ impl Default for MetasrvOptions {
impl Configurable for MetasrvOptions {
fn env_list_keys() -> Option<&'static [&'static str]> {
Some(&["wal.broker_endpoints"])
Some(&["wal.broker_endpoints", "store_addrs"])
}
}

View File

@@ -15,6 +15,7 @@
use std::sync::Arc;
use std::time::Duration;
use api::v1::meta::cluster_server::ClusterServer;
use api::v1::meta::heartbeat_server::HeartbeatServer;
use api::v1::meta::procedure_service_server::ProcedureServiceServer;
use api::v1::meta::store_server::StoreServer;
@@ -23,9 +24,11 @@ use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
use common_meta::key::TableMetadataManager;
use common_meta::kv_backend::etcd::EtcdStore;
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::kv_backend::KvBackendRef;
use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
use tonic::codec::CompressionEncoding;
use tower::service_fn;
use crate::add_compressed_service;
use crate::metasrv::builder::MetasrvBuilder;
use crate::metasrv::{Metasrv, MetasrvOptions, SelectorRef};
@@ -34,21 +37,24 @@ pub struct MockInfo {
pub server_addr: String,
pub channel_manager: ChannelManager,
pub metasrv: Arc<Metasrv>,
pub kv_backend: KvBackendRef,
pub in_memory: Option<ResettableKvBackendRef>,
}
pub async fn mock_with_memstore() -> MockInfo {
let kv_backend = Arc::new(MemoryKvBackend::new());
mock(Default::default(), kv_backend, None, None).await
let in_memory = Arc::new(MemoryKvBackend::new());
mock(Default::default(), kv_backend, None, None, Some(in_memory)).await
}
pub async fn mock_with_etcdstore(addr: &str) -> MockInfo {
let kv_backend = EtcdStore::with_endpoints([addr], 128).await.unwrap();
mock(Default::default(), kv_backend, None, None).await
mock(Default::default(), kv_backend, None, None, None).await
}
pub async fn mock_with_memstore_and_selector(selector: SelectorRef) -> MockInfo {
let kv_backend = Arc::new(MemoryKvBackend::new());
mock(Default::default(), kv_backend, Some(selector), None).await
mock(Default::default(), kv_backend, Some(selector), None, None).await
}
pub async fn mock(
@@ -56,13 +62,16 @@ pub async fn mock(
kv_backend: KvBackendRef,
selector: Option<SelectorRef>,
datanode_clients: Option<Arc<NodeClients>>,
in_memory: Option<ResettableKvBackendRef>,
) -> MockInfo {
let server_addr = opts.server_addr.clone();
let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
table_metadata_manager.init().await.unwrap();
let builder = MetasrvBuilder::new().options(opts).kv_backend(kv_backend);
let builder = MetasrvBuilder::new()
.options(opts)
.kv_backend(kv_backend.clone());
let builder = match selector {
Some(s) => builder.selector(s),
@@ -74,17 +83,26 @@ pub async fn mock(
None => builder,
};
let builder = match &in_memory {
Some(in_memory) => builder.in_memory(in_memory.clone()),
None => builder,
};
let metasrv = builder.build().await.unwrap();
metasrv.try_start().await.unwrap();
let (client, server) = tokio::io::duplex(1024);
let metasrv = Arc::new(metasrv);
let service = metasrv.clone();
let _handle = tokio::spawn(async move {
tonic::transport::Server::builder()
.add_service(HeartbeatServer::from_arc(service.clone()))
.add_service(StoreServer::from_arc(service.clone()))
.add_service(ProcedureServiceServer::from_arc(service.clone()))
let mut router = tonic::transport::Server::builder();
let router = add_compressed_service!(router, HeartbeatServer::from_arc(service.clone()));
let router = add_compressed_service!(router, StoreServer::from_arc(service.clone()));
let router =
add_compressed_service!(router, ProcedureServiceServer::from_arc(service.clone()));
let router = add_compressed_service!(router, ClusterServer::from_arc(service.clone()));
router
.serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.await
});
@@ -121,5 +139,7 @@ pub async fn mock(
server_addr,
channel_manager,
metasrv,
kv_backend,
in_memory,
}
}

View File

@@ -12,6 +12,7 @@ api.workspace = true
aquamarine.workspace = true
async-trait.workspace = true
base64.workspace = true
common-base.workspace = true
common-error.workspace = true
common-macro.workspace = true
common-query.workspace = true

View File

@@ -27,9 +27,10 @@ use store_api::storage::consts::ReservedColumnId;
use store_api::storage::{ConcreteDataType, RegionId};
use crate::error::{
ColumnTypeMismatchSnafu, MitoReadOperationSnafu, MitoWriteOperationSnafu, Result,
ColumnTypeMismatchSnafu, ForbiddenPhysicalAlterSnafu, MitoReadOperationSnafu,
MitoWriteOperationSnafu, Result,
};
use crate::metrics::MITO_DDL_DURATION;
use crate::metrics::{FORBIDDEN_OPERATION_COUNT, MITO_DDL_DURATION};
use crate::utils;
const MAX_RETRIES: usize = 5;
@@ -186,6 +187,30 @@ impl DataRegion {
.context(MitoReadOperationSnafu)?;
Ok(metadata.column_metadatas.clone())
}
pub async fn alter_region_options(
&self,
region_id: RegionId,
request: RegionAlterRequest,
) -> Result<AffectedRows> {
match request.kind {
AlterKind::SetRegionOptions { options: _ }
| AlterKind::UnsetRegionOptions { keys: _ } => {
let region_id = utils::to_data_region_id(region_id);
self.mito
.handle_request(region_id, RegionRequest::Alter(request))
.await
.context(MitoWriteOperationSnafu)
.map(|result| result.affected_rows)
}
_ => {
info!("Metric region received alter request {request:?} on physical region {region_id:?}");
FORBIDDEN_OPERATION_COUNT.inc();
ForbiddenPhysicalAlterSnafu.fail()
}
}
}
}
#[cfg(test)]

View File

@@ -96,9 +96,10 @@ use crate::utils;
/// | Read | ✅ | ✅ |
/// | Close | ✅ | ✅ |
/// | Open | ✅ | ✅ |
/// | Alter | ✅ | |
/// | Alter | ✅ | ❓* |
///
/// *: Physical region can be dropped only when all related logical regions are dropped.
/// *: Alter: Physical regions only support altering region options.
///
/// ## Internal Columns
///

Some files were not shown because too many files have changed in this diff Show More