Compare commits

..

61 Commits

Author SHA1 Message Date
Yingwen
17d75c767c chore: update all arm64 builders (#5215) 2024-12-21 18:14:45 +08:00
Yingwen
a1ed450c0c chore: arm64 use 8xlarge runner (#5213) 2024-12-20 22:29:25 +08:00
evenyag
ea4ce9d1e3 chore: set version to 0.11.1 2024-12-20 14:12:19 +08:00
evenyag
1f7d9666b7 chore: Downgrade opendal for releasing 0.11.1
Revert "feat: bump opendal and switch prometheus layer to the upstream impl (#5179)"

This reverts commit 422d18da8b.
2024-12-20 14:12:19 +08:00
LFC
9f1a0d78b2 ci: install latest protobuf in dev-builder image (#5196) 2024-12-20 14:12:19 +08:00
discord9
ed8e418716 fix: auto created table ttl check (#5203)
* fix: auto created table ttl check

* tests: with hint
2024-12-20 14:12:19 +08:00
discord9
9e7121c1bb fix(flow): batch builder with type (#5195)
* fix: typed builder

* chore: clippy

* chore: rename

* fix: unit tests

* refactor: per review
2024-12-20 14:12:19 +08:00
dennis zhuang
94a49ed4f0 chore: update PR template (#5199) 2024-12-20 14:12:19 +08:00
discord9
f5e743379f feat: show flow's mem usage in INFORMATION_SCHEMA.FLOWS (#4890)
* feat: add flow mem size to sys table

* chore: rm dup def

* chore: remove unused variant

* chore: minor refactor

* refactor: per review
2024-12-20 14:12:19 +08:00
Ruihang Xia
6735e5867e feat: bump opendal and switch prometheus layer to the upstream impl (#5179)
* feat: bump opendal and switch prometheus layer to the upstream impl

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* remove unused files

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix tests

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* remove unused things

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* remove root dir on recovering cache

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* filter out non-files entry in test

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2024-12-20 14:12:19 +08:00
Weny Xu
925525726b fix: ensure table route metadata is eventually rolled back on failure (#5174)
* fix: ensure table route metadata is eventually rolled back on procedure failure

* fix(fuzz): enhance procedure condition checking

* chore: add logs

* feat: close downgraded leader region actively

* chore: apply suggestions from CR
2024-12-20 14:12:19 +08:00
Ning Sun
6427682a9a feat: show create postgresql foreign table (#5143)
* feat: add show create table for pg in parser

* feat: implement show create table operation

* fix: adopt upstream changes
2024-12-20 14:12:19 +08:00
Ning Sun
55b0022676 chore: make nix compilation environment config more robust (#5183)
* chore: improve nix-shell support

* fix: add pkg-config

* ci: add a github action to ensure build on clean system

* ci: optimise dependencies of task

* ci: move clean build to nightly
2024-12-20 14:12:19 +08:00
Ruihang Xia
2d84cc8d87 refactor: remove unused symbols (#5193)
chore: remove unused symbols

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2024-12-20 14:12:19 +08:00
Yingwen
c030705b17 docs: fix grafana dashboard row (#5192) 2024-12-20 14:12:19 +08:00
Ruihang Xia
443c600bd0 fix: validate matcher op for __name__ in promql (#5191)
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2024-12-20 14:12:19 +08:00
Lei, HUANG
39cadfe10b fix(sqlness): enforce order in union tests (#5190)
Add ORDER BY clause to subquery union tests

 Updated the SQL and result files for subquery union tests to include an ORDER BY clause, ensuring consistent result ordering. This change aligns with the test case from the DuckDB repository.
2024-12-20 14:12:19 +08:00
jeremyhi
9b5e4e80f7 feat: extract hints from http header (#5128)
* feat: extract hints from http header

* Update src/servers/src/http/hints.rs

Co-authored-by: shuiyisong <113876041+shuiyisong@users.noreply.github.com>

* chore: by comment

* refactor: get instead of loop

---------

Co-authored-by: shuiyisong <113876041+shuiyisong@users.noreply.github.com>
2024-12-20 14:12:19 +08:00
Yingwen
041a276b66 feat: do not remove time filters in ScanRegion (#5180)
* feat: do not remove time filters

* chore: remove `time_range` from parquet reader

* chore: print more message in the check script

* chore: fix unused error
2024-12-20 14:12:19 +08:00
Yingwen
614a25ddc5 feat: do not keep MemtableRefs in ScanInput (#5184) 2024-12-20 14:12:19 +08:00
dennis zhuang
4337e20010 feat: impl label_join and label_replace for promql (#5153)
* feat: impl label_join and label_replace for promql

* chore: style

* fix: dst_label is eqauls to src_label

* fix: forgot to sort the results

* fix: processing empty source label
2024-12-20 14:12:19 +08:00
Lanqing Yang
65c52cc698 fix: display inverted and fulltext index in show index (#5169) 2024-12-20 14:12:19 +08:00
Yohan Wal
50f31fd681 feat: introduce Buffer for non-continuous bytes (#5164)
* feat: introduce Buffer for non-continuous bytes

* Update src/mito2/src/cache/index.rs

Co-authored-by: Weny Xu <wenymedia@gmail.com>

* chore: apply review comments

* refactor: use opendal::Buffer

---------

Co-authored-by: Weny Xu <wenymedia@gmail.com>
2024-12-20 14:12:19 +08:00
LFC
b5af5aaf8d refactor: produce BatchBuilder from a Batch to modify it again (#5186)
chore: pub some mods
2024-12-20 14:12:19 +08:00
Lei, HUANG
27693c7f1e perf: avoid holding memtable during compaction (#5157)
* perf/avoid-holding-memtable-during-compaction: Refactor Compaction Version Handling

 • Introduced CompactionVersion struct to encapsulate region version details for compaction, removing dependency on VersionRef.
 • Updated CompactionRequest and CompactionRegion to use CompactionVersion.
 • Modified open_compaction_region to construct CompactionVersion without memtables.
 • Adjusted WindowedCompactionPicker to work with CompactionVersion.
 • Enhanced flush logic in WriteBufferManager to improve memory usage checks and logging.

* reformat code

* chore: change log level

* reformat code

---------

Co-authored-by: Yingwen <realevenyag@gmail.com>
2024-12-20 14:12:19 +08:00
discord9
a59fef9ffb test: sqlness upgrade compatibility tests (#5126)
* feat: simple version switch

* chore: remove debug print

* chore: add common folder

* tests: add drop table

* feat: pull versioned binary

* chore: don't use native-tls

* chore: rm outdated docs

* chore: new line

* fix: save old bin dir

* fix: switch version restart all node

* feat: use etcd

* fix: wait for election

* fix: normal sqlness

* refactor: hashmap for bin dir

* test: past 3 major version compat crate table

* refactor: allow using without setup etcd
2024-12-20 14:12:19 +08:00
Zhenchi
bcecd8ce52 feat(bloom-filter): add basic bloom filter creator (Part 1) (#5177)
* feat(bloom-filter): add a simple bloom filter creator (Part 1)

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix: clippy

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix: header

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* docs: add format comment

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

---------

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
2024-12-20 14:12:19 +08:00
Yingwen
ffdcb8c1ac fix: deletion between two put may not work in last_non_null mode (#5168)
* fix: deletion between rows with the same key may not work

* test: add sqlness test case

* chore: comments
2024-12-20 14:12:19 +08:00
Yingwen
554121ad79 chore: add aquamarine to dep lists (#5181) 2024-12-20 14:12:19 +08:00
Weny Xu
43c12b4f2c fix: correct set_region_role_state_gracefully behaviors (#5171)
* fix: reduce default max rows for fuzz testing

* chore: remove Postgres setup from fuzz test workflow

* chore(fuzz): increase resource limits for GreptimeDB cluster

* chore(fuzz): increase resource limits for kafka

* fix: correct `set_region_role_state_gracefully` behaviors

* chore: remove Postgres setup from fuzz test workflow

* chore(fuzz): redue resource limits for GreptimeDB & kafka
2024-12-20 14:12:19 +08:00
discord9
7aa8c28fe4 test: flow rebuild (#5162)
* tests: rebuild flow

* tests: more rebuild

* tests: restart

* chore: drop clean
2024-12-20 14:12:19 +08:00
Ning Sun
34fbe7739e chore: add nix-shell configure for a minimal environment for development (#5175)
* chore: add nix-shell development environment

* chore: add rust-analyzer

* chore: use .envrc as a private file
2024-12-20 14:12:19 +08:00
ZonaHe
06d7bd99dd feat: update dashboard to v0.7.3 (#5172)
Co-authored-by: sunchanglong <sunchanglong@users.noreply.github.com>
2024-12-20 14:12:19 +08:00
Ruihang Xia
b71d842615 feat: introduce SKIPPING index (part 1) (#5155)
* skip index parser

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* wip: sqlness

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* impl show create part

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* add empty line

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* change keyword to SKIPPING INDEX

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* rename local variables

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2024-12-20 14:12:19 +08:00
Lei, HUANG
7f71693b8e chore: gauge for flush compaction (#5156)
* add metrics

* chore/bench-metrics: Add INFLIGHT_FLUSH_COUNT Metric to Flush Process

 • Introduced INFLIGHT_FLUSH_COUNT metric to track the number of ongoing flush operations.
 • Incremented INFLIGHT_FLUSH_COUNT in FlushScheduler to monitor active flushes.
 • Removed redundant increment of INFLIGHT_FLUSH_COUNT in RegionWorkerLoop to prevent double counting.

* chore/bench-metrics: Add Metrics for Compaction and Flush Operations

 • Introduced INFLIGHT_COMPACTION_COUNT and INFLIGHT_FLUSH_COUNT metrics to track the number of ongoing compaction and flush operations.
 • Incremented INFLIGHT_COMPACTION_COUNT when scheduling remote and local compaction jobs, and decremented it upon completion.
 • Added INFLIGHT_FLUSH_COUNT increment and decrement logic around flush tasks to monitor active flush operations.
 • Removed redundant metric updates in worker.rs and handle_compaction.rs to streamline metric handling.

* chore: add metrics for remote compaction jobs

* chore: format

* chore: also add dashbaord
2024-12-20 14:12:19 +08:00
Lin Yihai
615ea1a171 feat: Add vector_scalar_mul function. (#5166) 2024-12-20 14:12:19 +08:00
shuiyisong
4e725d259d chore: remove unused dep (#5163)
* chore: remove unused dep

* chore: remove more unused dep
2024-12-20 14:12:19 +08:00
Niwaka
dc2252eb6d fix: support alter table ~ add ~ custom_type (#5165) 2024-12-20 14:12:19 +08:00
Yingwen
6d4cc2e070 ci: use 4xlarge for nightly build (#5158) 2024-12-20 14:12:19 +08:00
localhost
6066ce2c4a fix: loki write row len error (#5161) 2024-12-20 14:12:19 +08:00
Yingwen
b90d8f7dbd docs: Add index panels to standalone grafana dashboard (#5140)
* docs: Add index panels to standalnoe grafana dashboard

* docs: fix flush/compaction op
2024-12-20 14:12:19 +08:00
Yohan Wal
fdccf4ff84 refactor: cache inverted index with fixed-size page (#5114)
* feat: cache inverted index by page instead of file

* fix: add unit test and fix bugs

* chore: typo

* chore: ci

* fix: math

* chore: apply review comments

* chore: renames

* test: add unit test for index key calculation

* refactor: use ReadableSize

* feat: add config for inverted index page size

* chore: update config file

* refactor: handle multiple range read and fix some related bugs

* fix: add config

* test: turn to a fs reader to match behaviors of object store
2024-12-20 14:12:19 +08:00
localhost
8b1484c064 chore: pipeline dryrun api can currently receives pipeline raw content (#5142)
* chore: pipeline dryrun api can currently receives pipeline raw content

* chore: remove dryrun v1 and add test

* chore: change dryrun pipeline api body schema

* chore: remove useless struct PipelineInfo

* chore: update PipelineDryrunParams doc

* chore: increase code readability

* chore: add some comment for pipeline dryrun test

* Apply suggestions from code review

Co-authored-by: shuiyisong <113876041+shuiyisong@users.noreply.github.com>

* chore: format code

---------

Co-authored-by: shuiyisong <113876041+shuiyisong@users.noreply.github.com>
2024-12-20 14:12:19 +08:00
Yingwen
576e20ac78 feat: collect reader metrics from prune reader (#5152) 2024-12-20 14:12:19 +08:00
localhost
10b3e3da0f chore: decide tag column in log api follow table schema if table exists (#5138)
* chore: decide tag column in log api follow table schema if table exists

* chore: add more test for greptime_identity pipeline

* chore: change pipeline get_table function signature

* chore: change identity_pipeline_inner tag_column_names type
2024-12-20 14:12:19 +08:00
Weny Xu
4a3ef2d718 feat(index): add file_size_hint for remote blob reader (#5147)
feat(index): add file_size_hint for remote blob reader
2024-12-20 14:12:19 +08:00
Yohan Wal
65eabb2a05 feat(fuzz): add alter table options for alter fuzzer (#5074)
* feat(fuzz): add set table options to alter fuzzer

* chore: clippy is happy, I'm sad

* chore: happy ci happy

* fix: unit test

* feat(fuzz): add unset table options to alter fuzzer

* fix: unit test

* feat(fuzz): add table option validator

* fix: make clippy happy

* chore: add comments

* chore: apply review comments

* fix: unit test

* feat(fuzz): add more ttl options

* fix: #5108

* chore: add comments

* chore: add comments
2024-12-20 14:12:19 +08:00
Weny Xu
bc5a57f51f feat: introduce PuffinMetadataCache (#5148)
* feat: introduce `PuffinMetadataCache`

* refactor: remove too_many_arguments

* chore: fmt toml
2024-12-20 14:12:19 +08:00
Weny Xu
f24b9d8814 feat: add prefetch support to InvertedIndexFooterReader for reduced I/O time (#5146)
* feat: add prefetch support to `InvertedIndeFooterReader`

* chore: correct struct name

* chore: apply suggestions from CR
2024-12-20 14:12:19 +08:00
Weny Xu
dd4d0a88ce feat: add prefetch support to PuffinFileFooterReader for reduced I/O time (#5145)
* feat: introduce `PuffinFileFooterReader`

* refactor: remove `SyncReader` trait and impl

* refactor: replace `FooterParser` with `PuffinFileFooterReader`

* chore: remove unused errors
2024-12-20 14:12:19 +08:00
Niwaka
3d2096fe9d feat: support push down IN filter (#5129)
* feat: support push down IN filter

* chore: move tests to prune.sql
2024-12-20 14:12:19 +08:00
Ruihang Xia
35715bb710 feat: implement v1/sql/parse endpoint to parse GreptimeDB's SQL dialect (#5144)
* derive ser/de

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* impl method

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix typo

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* remove deserialize

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2024-12-20 14:12:19 +08:00
ZonaHe
08a3befa67 feat: update dashboard to v0.7.2 (#5141)
Co-authored-by: sunchanglong <sunchanglong@users.noreply.github.com>
2024-12-20 14:12:19 +08:00
Yohan Wal
ca1758d4e7 test: part of parser test migrated from duckdb (#5125)
* test: update test

* fix: fix test
2024-12-20 14:12:19 +08:00
Zhenchi
42bf818167 feat(vector): add scalar add function (#5119)
* refactor: extract implicit conversion helper functions of vector

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* feat(vector): add scalar add function

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix fmt

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

---------

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
2024-12-20 14:12:19 +08:00
Lei, HUANG
2c9b117224 perf: avoid cache during compaction (#5135)
* Revert "refactor: Avoid wrapping Option for CacheManagerRef (#4996)"

This reverts commit 42bf7e9965.

* fix: memory usage during log ingestion

* fix: fmt
2024-12-20 14:12:19 +08:00
dennis zhuang
3edf2317e1 feat: adjust WAL purge default configurations (#5107)
* feat: adjust WAL purge default configurations

* fix: config

* feat: change raft engine file_size default to 128Mib
2024-12-20 14:12:19 +08:00
jeremyhi
85d72a3cd0 chore: set store_key_prefix for all kvbackend (#5132) 2024-12-20 14:12:19 +08:00
discord9
928172bd82 chore: fix aws_lc not in depend tree check in CI (#5121)
* chore: fix aws_lc check in CI

* chore: update lock file
2024-12-20 14:12:19 +08:00
shuiyisong
e9f5bddeff chore: add /ready api for health checking (#5124)
* chore: add ready endpoint for health checking

* chore: add test
2024-12-20 14:12:19 +08:00
Yingwen
486755d795 chore: bump main branch version to 0.12 (#5133)
chore: bump version to v0.12.0
2024-12-20 14:12:19 +08:00
46 changed files with 949 additions and 1284 deletions

View File

@@ -5,7 +5,7 @@ meta:
[datanode]
[datanode.client]
timeout = "120s"
timeout = "60s"
datanode:
configData: |-
[runtime]
@@ -21,7 +21,7 @@ frontend:
global_rt_size = 4
[meta_client]
ddl_timeout = "120s"
ddl_timeout = "60s"
objectStorage:
s3:
bucket: default

View File

@@ -5,7 +5,7 @@ meta:
[datanode]
[datanode.client]
timeout = "120s"
timeout = "60s"
datanode:
configData: |-
[runtime]
@@ -17,7 +17,7 @@ frontend:
global_rt_size = 4
[meta_client]
ddl_timeout = "120s"
ddl_timeout = "60s"
objectStorage:
s3:
bucket: default

View File

@@ -11,7 +11,7 @@ meta:
[datanode]
[datanode.client]
timeout = "120s"
timeout = "60s"
datanode:
configData: |-
[runtime]
@@ -28,7 +28,7 @@ frontend:
global_rt_size = 4
[meta_client]
ddl_timeout = "120s"
ddl_timeout = "60s"
objectStorage:
s3:
bucket: default

View File

@@ -29,7 +29,7 @@ on:
linux_arm64_runner:
type: choice
description: The runner uses to build linux-arm64 artifacts
default: ec2-c6g.4xlarge-arm64
default: ec2-c6g.8xlarge-arm64
options:
- ec2-c6g.xlarge-arm64 # 4C8G
- ec2-c6g.2xlarge-arm64 # 8C16G

View File

@@ -27,7 +27,7 @@ on:
linux_arm64_runner:
type: choice
description: The runner uses to build linux-arm64 artifacts
default: ec2-c6g.4xlarge-arm64
default: ec2-c6g.8xlarge-arm64
options:
- ec2-c6g.xlarge-arm64 # 4C8G
- ec2-c6g.2xlarge-arm64 # 8C16G

View File

@@ -117,6 +117,7 @@ jobs:
cleanbuild-linux-nix:
runs-on: ubuntu-latest-8-cores
timeout-minutes: 60
needs: [coverage, fmt, clippy, check]
steps:
- uses: actions/checkout@v4
- uses: cachix/install-nix-action@v27

View File

@@ -91,7 +91,7 @@ env:
# The scheduled version is '${{ env.NEXT_RELEASE_VERSION }}-nightly-YYYYMMDD', like v0.2.0-nigthly-20230313;
NIGHTLY_RELEASE_PREFIX: nightly
# Note: The NEXT_RELEASE_VERSION should be modified manually by every formal release.
NEXT_RELEASE_VERSION: v0.12.0
NEXT_RELEASE_VERSION: v0.11.0
# Permission reference: https://docs.github.com/en/actions/using-jobs/assigning-permissions-to-jobs
permissions:

169
Cargo.lock generated
View File

@@ -188,7 +188,7 @@ checksum = "d301b3b94cb4b2f23d7917810addbbaff90738e0ca2be692bd027e70d7e0330c"
[[package]]
name = "api"
version = "0.12.0"
version = "0.11.1"
dependencies = [
"common-base",
"common-decimal",
@@ -773,7 +773,7 @@ dependencies = [
[[package]]
name = "auth"
version = "0.12.0"
version = "0.11.1"
dependencies = [
"api",
"async-trait",
@@ -896,6 +896,18 @@ dependencies = [
"rand",
]
[[package]]
name = "backon"
version = "0.4.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d67782c3f868daa71d3533538e98a8e13713231969def7536e8039606fc46bf0"
dependencies = [
"fastrand",
"futures-core",
"pin-project",
"tokio",
]
[[package]]
name = "backon"
version = "1.2.0"
@@ -1314,7 +1326,7 @@ dependencies = [
[[package]]
name = "cache"
version = "0.12.0"
version = "0.11.1"
dependencies = [
"catalog",
"common-error",
@@ -1348,7 +1360,7 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5"
[[package]]
name = "catalog"
version = "0.12.0"
version = "0.11.1"
dependencies = [
"api",
"arrow",
@@ -1684,7 +1696,7 @@ checksum = "1462739cb27611015575c0c11df5df7601141071f07518d56fcc1be504cbec97"
[[package]]
name = "cli"
version = "0.12.0"
version = "0.11.1"
dependencies = [
"async-trait",
"auth",
@@ -1727,7 +1739,7 @@ dependencies = [
"session",
"snafu 0.8.5",
"store-api",
"substrait 0.12.0",
"substrait 0.11.1",
"table",
"tempfile",
"tokio",
@@ -1736,7 +1748,7 @@ dependencies = [
[[package]]
name = "client"
version = "0.12.0"
version = "0.11.1"
dependencies = [
"api",
"arc-swap",
@@ -1763,7 +1775,7 @@ dependencies = [
"rand",
"serde_json",
"snafu 0.8.5",
"substrait 0.12.0",
"substrait 0.11.1",
"substrait 0.37.3",
"tokio",
"tokio-stream",
@@ -1804,7 +1816,7 @@ dependencies = [
[[package]]
name = "cmd"
version = "0.12.0"
version = "0.11.1"
dependencies = [
"async-trait",
"auth",
@@ -1864,7 +1876,7 @@ dependencies = [
"similar-asserts",
"snafu 0.8.5",
"store-api",
"substrait 0.12.0",
"substrait 0.11.1",
"table",
"temp-env",
"tempfile",
@@ -1916,7 +1928,7 @@ checksum = "55b672471b4e9f9e95499ea597ff64941a309b2cdbffcc46f2cc5e2d971fd335"
[[package]]
name = "common-base"
version = "0.12.0"
version = "0.11.1"
dependencies = [
"anymap2",
"async-trait",
@@ -1938,11 +1950,11 @@ dependencies = [
[[package]]
name = "common-catalog"
version = "0.12.0"
version = "0.11.1"
[[package]]
name = "common-config"
version = "0.12.0"
version = "0.11.1"
dependencies = [
"common-base",
"common-error",
@@ -1965,7 +1977,7 @@ dependencies = [
[[package]]
name = "common-datasource"
version = "0.12.0"
version = "0.11.1"
dependencies = [
"arrow",
"arrow-schema",
@@ -2001,7 +2013,7 @@ dependencies = [
[[package]]
name = "common-decimal"
version = "0.12.0"
version = "0.11.1"
dependencies = [
"bigdecimal 0.4.5",
"common-error",
@@ -2014,7 +2026,7 @@ dependencies = [
[[package]]
name = "common-error"
version = "0.12.0"
version = "0.11.1"
dependencies = [
"snafu 0.8.5",
"strum 0.25.0",
@@ -2023,7 +2035,7 @@ dependencies = [
[[package]]
name = "common-frontend"
version = "0.12.0"
version = "0.11.1"
dependencies = [
"async-trait",
"common-error",
@@ -2033,7 +2045,7 @@ dependencies = [
[[package]]
name = "common-function"
version = "0.12.0"
version = "0.11.1"
dependencies = [
"api",
"approx 0.5.1",
@@ -2077,7 +2089,7 @@ dependencies = [
[[package]]
name = "common-greptimedb-telemetry"
version = "0.12.0"
version = "0.11.1"
dependencies = [
"async-trait",
"common-runtime",
@@ -2094,7 +2106,7 @@ dependencies = [
[[package]]
name = "common-grpc"
version = "0.12.0"
version = "0.11.1"
dependencies = [
"api",
"arrow-flight",
@@ -2120,7 +2132,7 @@ dependencies = [
[[package]]
name = "common-grpc-expr"
version = "0.12.0"
version = "0.11.1"
dependencies = [
"api",
"common-base",
@@ -2139,7 +2151,7 @@ dependencies = [
[[package]]
name = "common-macro"
version = "0.12.0"
version = "0.11.1"
dependencies = [
"arc-swap",
"common-query",
@@ -2153,7 +2165,7 @@ dependencies = [
[[package]]
name = "common-mem-prof"
version = "0.12.0"
version = "0.11.1"
dependencies = [
"common-error",
"common-macro",
@@ -2166,7 +2178,7 @@ dependencies = [
[[package]]
name = "common-meta"
version = "0.12.0"
version = "0.11.1"
dependencies = [
"anymap2",
"api",
@@ -2223,7 +2235,7 @@ dependencies = [
[[package]]
name = "common-options"
version = "0.12.0"
version = "0.11.1"
dependencies = [
"common-grpc",
"humantime-serde",
@@ -2232,11 +2244,11 @@ dependencies = [
[[package]]
name = "common-plugins"
version = "0.12.0"
version = "0.11.1"
[[package]]
name = "common-pprof"
version = "0.12.0"
version = "0.11.1"
dependencies = [
"common-error",
"common-macro",
@@ -2248,11 +2260,11 @@ dependencies = [
[[package]]
name = "common-procedure"
version = "0.12.0"
version = "0.11.1"
dependencies = [
"async-stream",
"async-trait",
"backon",
"backon 1.2.0",
"common-base",
"common-error",
"common-macro",
@@ -2275,7 +2287,7 @@ dependencies = [
[[package]]
name = "common-procedure-test"
version = "0.12.0"
version = "0.11.1"
dependencies = [
"async-trait",
"common-procedure",
@@ -2283,7 +2295,7 @@ dependencies = [
[[package]]
name = "common-query"
version = "0.12.0"
version = "0.11.1"
dependencies = [
"api",
"async-trait",
@@ -2309,7 +2321,7 @@ dependencies = [
[[package]]
name = "common-recordbatch"
version = "0.12.0"
version = "0.11.1"
dependencies = [
"arc-swap",
"common-error",
@@ -2328,7 +2340,7 @@ dependencies = [
[[package]]
name = "common-runtime"
version = "0.12.0"
version = "0.11.1"
dependencies = [
"async-trait",
"clap 4.5.19",
@@ -2356,7 +2368,7 @@ dependencies = [
[[package]]
name = "common-telemetry"
version = "0.12.0"
version = "0.11.1"
dependencies = [
"atty",
"backtrace",
@@ -2384,7 +2396,7 @@ dependencies = [
[[package]]
name = "common-test-util"
version = "0.12.0"
version = "0.11.1"
dependencies = [
"client",
"common-query",
@@ -2396,7 +2408,7 @@ dependencies = [
[[package]]
name = "common-time"
version = "0.12.0"
version = "0.11.1"
dependencies = [
"arrow",
"chrono",
@@ -2414,7 +2426,7 @@ dependencies = [
[[package]]
name = "common-version"
version = "0.12.0"
version = "0.11.1"
dependencies = [
"build-data",
"const_format",
@@ -2424,7 +2436,7 @@ dependencies = [
[[package]]
name = "common-wal"
version = "0.12.0"
version = "0.11.1"
dependencies = [
"common-base",
"common-error",
@@ -3223,7 +3235,7 @@ dependencies = [
[[package]]
name = "datanode"
version = "0.12.0"
version = "0.11.1"
dependencies = [
"api",
"arrow-flight",
@@ -3274,7 +3286,7 @@ dependencies = [
"session",
"snafu 0.8.5",
"store-api",
"substrait 0.12.0",
"substrait 0.11.1",
"table",
"tokio",
"toml 0.8.19",
@@ -3283,7 +3295,7 @@ dependencies = [
[[package]]
name = "datatypes"
version = "0.12.0"
version = "0.11.1"
dependencies = [
"arrow",
"arrow-array",
@@ -3907,7 +3919,7 @@ dependencies = [
[[package]]
name = "file-engine"
version = "0.12.0"
version = "0.11.1"
dependencies = [
"api",
"async-trait",
@@ -4023,7 +4035,7 @@ checksum = "8bf7cc16383c4b8d58b9905a8509f02926ce3058053c056376248d958c9df1e8"
[[package]]
name = "flow"
version = "0.12.0"
version = "0.11.1"
dependencies = [
"api",
"arrow",
@@ -4081,7 +4093,7 @@ dependencies = [
"snafu 0.8.5",
"store-api",
"strum 0.25.0",
"substrait 0.12.0",
"substrait 0.11.1",
"table",
"tokio",
"tonic 0.11.0",
@@ -4119,7 +4131,7 @@ checksum = "6c2141d6d6c8512188a7891b4b01590a45f6dac67afb4f255c4124dbb86d4eaa"
[[package]]
name = "frontend"
version = "0.12.0"
version = "0.11.1"
dependencies = [
"api",
"arc-swap",
@@ -5268,7 +5280,7 @@ dependencies = [
[[package]]
name = "index"
version = "0.12.0"
version = "0.11.1"
dependencies = [
"async-trait",
"asynchronous-codec",
@@ -6117,7 +6129,7 @@ checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24"
[[package]]
name = "log-query"
version = "0.12.0"
version = "0.11.1"
dependencies = [
"chrono",
"common-error",
@@ -6128,7 +6140,7 @@ dependencies = [
[[package]]
name = "log-store"
version = "0.12.0"
version = "0.11.1"
dependencies = [
"async-stream",
"async-trait",
@@ -6472,7 +6484,7 @@ dependencies = [
[[package]]
name = "meta-client"
version = "0.12.0"
version = "0.11.1"
dependencies = [
"api",
"async-trait",
@@ -6499,7 +6511,7 @@ dependencies = [
[[package]]
name = "meta-srv"
version = "0.12.0"
version = "0.11.1"
dependencies = [
"api",
"async-trait",
@@ -6578,7 +6590,7 @@ dependencies = [
[[package]]
name = "metric-engine"
version = "0.12.0"
version = "0.11.1"
dependencies = [
"api",
"aquamarine",
@@ -6672,7 +6684,7 @@ dependencies = [
[[package]]
name = "mito2"
version = "0.12.0"
version = "0.11.1"
dependencies = [
"api",
"aquamarine",
@@ -7409,7 +7421,7 @@ dependencies = [
[[package]]
name = "object-store"
version = "0.12.0"
version = "0.11.1"
dependencies = [
"anyhow",
"bytes",
@@ -7469,13 +7481,13 @@ checksum = "b410bbe7e14ab526a0e86877eb47c6996a2bd7746f027ba551028c925390e4e9"
[[package]]
name = "opendal"
version = "0.50.2"
version = "0.49.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cb28bb6c64e116ceaf8dd4e87099d3cfea4a58e85e62b104fef74c91afba0f44"
checksum = "9b04d09b9822c2f75a1d2fc513a2c1279c70e91e7407936fffdf6a6976ec530a"
dependencies = [
"anyhow",
"async-trait",
"backon",
"backon 0.4.4",
"base64 0.22.1",
"bytes",
"chrono",
@@ -7488,7 +7500,6 @@ dependencies = [
"md-5",
"once_cell",
"percent-encoding",
"prometheus",
"quick-xml 0.36.2",
"reqsign",
"reqwest",
@@ -7663,7 +7674,7 @@ dependencies = [
[[package]]
name = "operator"
version = "0.12.0"
version = "0.11.1"
dependencies = [
"ahash 0.8.11",
"api",
@@ -7711,7 +7722,7 @@ dependencies = [
"sql",
"sqlparser 0.45.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=54a267ac89c09b11c0c88934690530807185d3e7)",
"store-api",
"substrait 0.12.0",
"substrait 0.11.1",
"table",
"tokio",
"tokio-util",
@@ -7961,7 +7972,7 @@ dependencies = [
[[package]]
name = "partition"
version = "0.12.0"
version = "0.11.1"
dependencies = [
"api",
"async-trait",
@@ -8247,7 +8258,7 @@ checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "pipeline"
version = "0.12.0"
version = "0.11.1"
dependencies = [
"ahash 0.8.11",
"api",
@@ -8409,7 +8420,7 @@ dependencies = [
[[package]]
name = "plugins"
version = "0.12.0"
version = "0.11.1"
dependencies = [
"auth",
"clap 4.5.19",
@@ -8697,7 +8708,7 @@ dependencies = [
[[package]]
name = "promql"
version = "0.12.0"
version = "0.11.1"
dependencies = [
"ahash 0.8.11",
"async-trait",
@@ -8932,7 +8943,7 @@ dependencies = [
[[package]]
name = "puffin"
version = "0.12.0"
version = "0.11.1"
dependencies = [
"async-compression 0.4.13",
"async-trait",
@@ -9057,7 +9068,7 @@ dependencies = [
[[package]]
name = "query"
version = "0.12.0"
version = "0.11.1"
dependencies = [
"ahash 0.8.11",
"api",
@@ -9120,7 +9131,7 @@ dependencies = [
"sqlparser 0.45.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=54a267ac89c09b11c0c88934690530807185d3e7)",
"statrs",
"store-api",
"substrait 0.12.0",
"substrait 0.11.1",
"table",
"tokio",
"tokio-stream",
@@ -9504,9 +9515,9 @@ dependencies = [
[[package]]
name = "reqsign"
version = "0.16.1"
version = "0.16.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eb0075a66c8bfbf4cc8b70dca166e722e1f55a3ea9250ecbb85f4d92a5f64149"
checksum = "03dd4ba7c3901dd43e6b8c7446a760d45bc1ea4301002e1a6fa48f97c3a796fa"
dependencies = [
"anyhow",
"async-trait",
@@ -10604,7 +10615,7 @@ checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49"
[[package]]
name = "script"
version = "0.12.0"
version = "0.11.1"
dependencies = [
"api",
"arc-swap",
@@ -10896,7 +10907,7 @@ dependencies = [
[[package]]
name = "servers"
version = "0.12.0"
version = "0.11.1"
dependencies = [
"ahash 0.8.11",
"api",
@@ -11007,7 +11018,7 @@ dependencies = [
[[package]]
name = "session"
version = "0.12.0"
version = "0.11.1"
dependencies = [
"api",
"arc-swap",
@@ -11361,7 +11372,7 @@ dependencies = [
[[package]]
name = "sql"
version = "0.12.0"
version = "0.11.1"
dependencies = [
"api",
"chrono",
@@ -11425,7 +11436,7 @@ dependencies = [
[[package]]
name = "sqlness-runner"
version = "0.12.0"
version = "0.11.1"
dependencies = [
"async-trait",
"clap 4.5.19",
@@ -11643,7 +11654,7 @@ dependencies = [
[[package]]
name = "store-api"
version = "0.12.0"
version = "0.11.1"
dependencies = [
"api",
"aquamarine",
@@ -11805,7 +11816,7 @@ dependencies = [
[[package]]
name = "substrait"
version = "0.12.0"
version = "0.11.1"
dependencies = [
"async-trait",
"bytes",
@@ -12004,7 +12015,7 @@ dependencies = [
[[package]]
name = "table"
version = "0.12.0"
version = "0.11.1"
dependencies = [
"api",
"async-trait",
@@ -12281,7 +12292,7 @@ checksum = "3369f5ac52d5eb6ab48c6b4ffdc8efbcad6b89c765749064ba298f2c68a16a76"
[[package]]
name = "tests-fuzz"
version = "0.12.0"
version = "0.11.1"
dependencies = [
"arbitrary",
"async-trait",
@@ -12324,7 +12335,7 @@ dependencies = [
[[package]]
name = "tests-integration"
version = "0.12.0"
version = "0.11.1"
dependencies = [
"api",
"arrow-flight",
@@ -12388,7 +12399,7 @@ dependencies = [
"sql",
"sqlx",
"store-api",
"substrait 0.12.0",
"substrait 0.11.1",
"table",
"tempfile",
"time",

View File

@@ -68,7 +68,7 @@ members = [
resolver = "2"
[workspace.package]
version = "0.12.0"
version = "0.11.1"
edition = "2021"
license = "Apache-2.0"

View File

@@ -70,7 +70,7 @@ Our core developers have been building time-series data platforms for years. Bas
* **Unified Processing of Metrics, Logs, and Events**
GreptimeDB unifies time series data processing by treating all data - whether metrics, logs, or events - as timestamped events with context. Users can analyze this data using either [SQL](https://docs.greptime.com/user-guide/query-data/sql) or [PromQL](https://docs.greptime.com/user-guide/query-data/promql) and leverage stream processing ([Flow](https://docs.greptime.com/user-guide/flow-computation/overview)) to enable continuous aggregation. [Read more](https://docs.greptime.com/user-guide/concepts/data-model).
GreptimeDB unifies time series data processing by treating all data - whether metrics, logs, or events - as timestamped events with context. Users can analyze this data using either [SQL](https://docs.greptime.com/user-guide/query-data/sql) or [PromQL](https://docs.greptime.com/user-guide/query-data/promql) and leverage stream processing ([Flow](https://docs.greptime.com/user-guide/continuous-aggregation/overview)) to enable continuous aggregation. [Read more](https://docs.greptime.com/user-guide/concepts/data-model).
* **Cloud-native Distributed Database**
@@ -173,7 +173,7 @@ Our official Grafana dashboard for monitoring GreptimeDB is available at [grafan
## Project Status
GreptimeDB is currently in Beta. We are targeting GA (General Availability) with v1.0 release by Early 2025.
GreptimeDB is currently in Beta. We are targeting GA (General Availability) with v1.0 release by Early 2025.
While in Beta, GreptimeDB is already:

View File

@@ -27,7 +27,7 @@ pub fn build_fs_backend(root: &str) -> Result<ObjectStore> {
DefaultLoggingInterceptor,
))
.layer(object_store::layers::TracingLayer)
.layer(object_store::layers::build_prometheus_metrics_layer(true))
.layer(object_store::layers::PrometheusMetricsLayer::new(true))
.finish();
Ok(object_store)
}

View File

@@ -89,7 +89,7 @@ pub fn build_s3_backend(
DefaultLoggingInterceptor,
))
.layer(object_store::layers::TracingLayer)
.layer(object_store::layers::build_prometheus_metrics_layer(true))
.layer(object_store::layers::PrometheusMetricsLayer::new(true))
.finish())
}

View File

@@ -544,7 +544,7 @@ mod tests {
use common_test_util::temp_dir::create_temp_dir;
use futures_util::future::BoxFuture;
use futures_util::FutureExt;
use object_store::{EntryMode, ObjectStore};
use object_store::ObjectStore;
use tokio::sync::mpsc;
use super::*;
@@ -578,11 +578,7 @@ mod tests {
) {
let dir = proc_path!(procedure_store, "{procedure_id}/");
let lister = object_store.list(&dir).await.unwrap();
let mut files_in_dir: Vec<_> = lister
.into_iter()
.filter(|x| x.metadata().mode() == EntryMode::FILE)
.map(|de| de.name().to_string())
.collect();
let mut files_in_dir: Vec<_> = lister.into_iter().map(|de| de.name().to_string()).collect();
files_in_dir.sort_unstable();
assert_eq!(files, files_in_dir);
}

View File

@@ -193,14 +193,6 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to build http client"))]
BuildHttpClient {
#[snafu(implicit)]
location: Location,
#[snafu(source)]
error: reqwest::Error,
},
#[snafu(display("Missing required field: {}", name))]
MissingRequiredField {
name: String,
@@ -414,10 +406,9 @@ impl ErrorExt for Error {
| MissingKvBackend { .. }
| TomlFormat { .. } => StatusCode::InvalidArguments,
PayloadNotExist { .. }
| Unexpected { .. }
| WatchAsyncTaskChange { .. }
| BuildHttpClient { .. } => StatusCode::Unexpected,
PayloadNotExist { .. } | Unexpected { .. } | WatchAsyncTaskChange { .. } => {
StatusCode::Unexpected
}
AsyncTaskExecute { source, .. } => source.status_code(),

View File

@@ -32,7 +32,7 @@ use object_store::{Access, Error, HttpClient, ObjectStore, ObjectStoreBuilder, O
use snafu::prelude::*;
use crate::config::{HttpClientConfig, ObjectStoreConfig, DEFAULT_OBJECT_STORE_CACHE_SIZE};
use crate::error::{self, BuildHttpClientSnafu, CreateDirSnafu, Result};
use crate::error::{self, CreateDirSnafu, Result};
pub(crate) async fn new_raw_object_store(
store: &ObjectStoreConfig,
@@ -236,8 +236,7 @@ pub(crate) fn build_http_client(config: &HttpClientConfig) -> Result<HttpClient>
builder.timeout(config.timeout)
};
let client = http_builder.build().context(BuildHttpClientSnafu)?;
Ok(HttpClient::with(client))
HttpClient::build(http_builder).context(error::InitBackendSnafu)
}
struct PrintDetailedError;

View File

@@ -46,7 +46,7 @@ impl FileRegionManifest {
pub async fn store(&self, region_dir: &str, object_store: &ObjectStore) -> Result<()> {
let path = &region_manifest_path(region_dir);
let exist = object_store
.exists(path)
.is_exist(path)
.await
.context(CheckObjectSnafu { path })?;
ensure!(!exist, ManifestExistsSnafu { path });

View File

@@ -130,7 +130,7 @@ mod tests {
assert_eq!(region.metadata.primary_key, vec![1]);
assert!(object_store
.exists("create_region_dir/manifest/_file_manifest")
.is_exist("create_region_dir/manifest/_file_manifest")
.await
.unwrap());
@@ -198,13 +198,13 @@ mod tests {
.unwrap();
assert!(object_store
.exists("drop_region_dir/manifest/_file_manifest")
.is_exist("drop_region_dir/manifest/_file_manifest")
.await
.unwrap());
FileRegion::drop(&region, &object_store).await.unwrap();
assert!(!object_store
.exists("drop_region_dir/manifest/_file_manifest")
.is_exist("drop_region_dir/manifest/_file_manifest")
.await
.unwrap());

View File

@@ -15,15 +15,11 @@
use serde::{Deserialize, Serialize};
pub mod creator;
pub mod error;
pub mod reader;
mod error;
pub type Bytes = Vec<u8>;
pub type BytesRef<'a> = &'a [u8];
/// The seed used for the Bloom filter.
pub const SEED: u128 = 42;
/// The Meta information of the bloom filter stored in the file.
#[derive(Debug, Default, Serialize, Deserialize)]
pub struct BloomFilterMeta {

View File

@@ -12,23 +12,21 @@
// See the License for the specific language governing permissions and
// limitations under the License.
mod finalize_segment;
mod intermediate_codec;
use std::collections::HashSet;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use finalize_segment::FinalizedBloomFilterStorage;
use futures::{AsyncWrite, AsyncWriteExt, StreamExt};
use fastbloom::BloomFilter;
use futures::{AsyncWrite, AsyncWriteExt};
use snafu::ResultExt;
use crate::bloom_filter::error::{IoSnafu, Result, SerdeJsonSnafu};
use crate::bloom_filter::{BloomFilterMeta, BloomFilterSegmentLocation, Bytes, SEED};
use crate::external_provider::ExternalTempFileProvider;
use super::error::{IoSnafu, SerdeJsonSnafu};
use crate::bloom_filter::error::Result;
use crate::bloom_filter::{BloomFilterMeta, BloomFilterSegmentLocation, Bytes};
/// The seed used for the Bloom filter.
const SEED: u128 = 42;
/// The false positive rate of the Bloom filter.
pub const FALSE_POSITIVE_RATE: f64 = 0.01;
const FALSE_POSITIVE_RATE: f64 = 0.01;
/// `BloomFilterCreator` is responsible for creating and managing bloom filters
/// for a set of elements. It divides the rows into segments and creates
@@ -60,9 +58,6 @@ pub struct BloomFilterCreator {
/// Storage for finalized Bloom filters.
finalized_bloom_filters: FinalizedBloomFilterStorage,
/// Global memory usage of the bloom filter creator.
global_memory_usage: Arc<AtomicUsize>,
}
impl BloomFilterCreator {
@@ -71,12 +66,7 @@ impl BloomFilterCreator {
/// # PANICS
///
/// `rows_per_segment` <= 0
pub fn new(
rows_per_segment: usize,
intermediate_provider: Box<dyn ExternalTempFileProvider>,
global_memory_usage: Arc<AtomicUsize>,
global_memory_usage_threshold: Option<usize>,
) -> Self {
pub fn new(rows_per_segment: usize) -> Self {
assert!(
rows_per_segment > 0,
"rows_per_segment must be greater than 0"
@@ -87,67 +77,54 @@ impl BloomFilterCreator {
accumulated_row_count: 0,
cur_seg_distinct_elems: HashSet::default(),
cur_seg_distinct_elems_mem_usage: 0,
global_memory_usage: global_memory_usage.clone(),
finalized_bloom_filters: FinalizedBloomFilterStorage::new(
intermediate_provider,
global_memory_usage,
global_memory_usage_threshold,
),
finalized_bloom_filters: FinalizedBloomFilterStorage::default(),
}
}
/// Adds a row of elements to the bloom filter. If the number of accumulated rows
/// reaches `rows_per_segment`, it finalizes the current segment.
pub async fn push_row_elems(&mut self, elems: impl IntoIterator<Item = Bytes>) -> Result<()> {
pub fn push_row_elems(&mut self, elems: impl IntoIterator<Item = Bytes>) {
self.accumulated_row_count += 1;
let mut mem_diff = 0;
for elem in elems.into_iter() {
let len = elem.len();
let is_new = self.cur_seg_distinct_elems.insert(elem);
if is_new {
mem_diff += len;
self.cur_seg_distinct_elems_mem_usage += len;
}
}
self.cur_seg_distinct_elems_mem_usage += mem_diff;
self.global_memory_usage
.fetch_add(mem_diff, Ordering::Relaxed);
if self.accumulated_row_count % self.rows_per_segment == 0 {
self.finalize_segment().await?;
self.finalize_segment();
}
Ok(())
}
/// Finalizes any remaining segments and writes the bloom filters and metadata to the provided writer.
pub async fn finish(&mut self, mut writer: impl AsyncWrite + Unpin) -> Result<()> {
if !self.cur_seg_distinct_elems.is_empty() {
self.finalize_segment().await?;
self.finalize_segment();
}
let mut meta = BloomFilterMeta {
rows_per_segment: self.rows_per_segment,
seg_count: self.finalized_bloom_filters.len(),
row_count: self.accumulated_row_count,
..Default::default()
};
let mut segs = self.finalized_bloom_filters.drain().await?;
while let Some(segment) = segs.next().await {
let segment = segment?;
writer
.write_all(&segment.bloom_filter_bytes)
.await
.context(IoSnafu)?;
let mut buf = Vec::new();
for segment in self.finalized_bloom_filters.drain() {
let slice = segment.bloom_filter.as_slice();
buf.clear();
write_u64_slice(&mut buf, slice);
writer.write_all(&buf).await.context(IoSnafu)?;
let size = segment.bloom_filter_bytes.len();
let size = buf.len();
meta.bloom_filter_segments.push(BloomFilterSegmentLocation {
offset: meta.bloom_filter_segments_size as _,
size: size as _,
elem_count: segment.element_count,
});
meta.bloom_filter_segments_size += size;
meta.seg_count += 1;
}
let meta_bytes = serde_json::to_vec(&meta).context(SerdeJsonSnafu)?;
@@ -168,29 +145,91 @@ impl BloomFilterCreator {
self.cur_seg_distinct_elems_mem_usage + self.finalized_bloom_filters.memory_usage()
}
async fn finalize_segment(&mut self) -> Result<()> {
fn finalize_segment(&mut self) {
let elem_count = self.cur_seg_distinct_elems.len();
self.finalized_bloom_filters
.add(self.cur_seg_distinct_elems.drain(), elem_count)
.await?;
self.global_memory_usage
.fetch_sub(self.cur_seg_distinct_elems_mem_usage, Ordering::Relaxed);
.add(self.cur_seg_distinct_elems.drain(), elem_count);
self.cur_seg_distinct_elems_mem_usage = 0;
Ok(())
}
}
/// Storage for finalized Bloom filters.
///
/// TODO(zhongzc): Add support for storing intermediate bloom filters on disk to control memory usage.
#[derive(Debug, Default)]
struct FinalizedBloomFilterStorage {
/// Bloom filters that are stored in memory.
in_memory: Vec<FinalizedBloomFilterSegment>,
}
impl FinalizedBloomFilterStorage {
fn memory_usage(&self) -> usize {
self.in_memory.iter().map(|s| s.size).sum()
}
/// Adds a new finalized Bloom filter to the storage.
///
/// TODO(zhongzc): Add support for flushing to disk.
fn add(&mut self, elems: impl IntoIterator<Item = Bytes>, elem_count: usize) {
let mut bf = BloomFilter::with_false_pos(FALSE_POSITIVE_RATE)
.seed(&SEED)
.expected_items(elem_count);
for elem in elems.into_iter() {
bf.insert(&elem);
}
let cbf = FinalizedBloomFilterSegment::new(bf, elem_count);
self.in_memory.push(cbf);
}
fn len(&self) -> usize {
self.in_memory.len()
}
fn drain(&mut self) -> impl Iterator<Item = FinalizedBloomFilterSegment> + '_ {
self.in_memory.drain(..)
}
}
/// A finalized Bloom filter segment.
#[derive(Debug)]
struct FinalizedBloomFilterSegment {
/// The underlying Bloom filter.
bloom_filter: BloomFilter,
/// The number of elements in the Bloom filter.
element_count: usize,
/// The occupied memory size of the Bloom filter.
size: usize,
}
impl FinalizedBloomFilterSegment {
fn new(bloom_filter: BloomFilter, elem_count: usize) -> Self {
let memory_usage = std::mem::size_of_val(bloom_filter.as_slice());
Self {
bloom_filter,
element_count: elem_count,
size: memory_usage,
}
}
}
/// Writes a slice of `u64` to the buffer in little-endian order.
fn write_u64_slice(buf: &mut Vec<u8>, slice: &[u64]) {
buf.reserve(std::mem::size_of_val(slice));
for &x in slice {
buf.extend_from_slice(&x.to_le_bytes());
}
}
#[cfg(test)]
mod tests {
use fastbloom::BloomFilter;
use futures::io::Cursor;
use super::*;
use crate::external_provider::MockExternalTempFileProvider;
/// Converts a slice of bytes to a vector of `u64`.
pub fn u64_vec_from_bytes(bytes: &[u8]) -> Vec<u64> {
fn u64_vec_from_bytes(bytes: &[u8]) -> Vec<u64> {
bytes
.chunks_exact(std::mem::size_of::<u64>())
.map(|chunk| u64::from_le_bytes(chunk.try_into().unwrap()))
@@ -200,32 +239,18 @@ mod tests {
#[tokio::test]
async fn test_bloom_filter_creator() {
let mut writer = Cursor::new(Vec::new());
let mut creator = BloomFilterCreator::new(
2,
Box::new(MockExternalTempFileProvider::new()),
Arc::new(AtomicUsize::new(0)),
None,
);
let mut creator = BloomFilterCreator::new(2);
creator
.push_row_elems(vec![b"a".to_vec(), b"b".to_vec()])
.await
.unwrap();
creator.push_row_elems(vec![b"a".to_vec(), b"b".to_vec()]);
assert!(creator.cur_seg_distinct_elems_mem_usage > 0);
assert!(creator.memory_usage() > 0);
creator
.push_row_elems(vec![b"c".to_vec(), b"d".to_vec()])
.await
.unwrap();
creator.push_row_elems(vec![b"c".to_vec(), b"d".to_vec()]);
// Finalize the first segment
assert_eq!(creator.cur_seg_distinct_elems_mem_usage, 0);
assert!(creator.cur_seg_distinct_elems_mem_usage == 0);
assert!(creator.memory_usage() > 0);
creator
.push_row_elems(vec![b"e".to_vec(), b"f".to_vec()])
.await
.unwrap();
creator.push_row_elems(vec![b"e".to_vec(), b"f".to_vec()]);
assert!(creator.cur_seg_distinct_elems_mem_usage > 0);
assert!(creator.memory_usage() > 0);

View File

@@ -1,293 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::pin::Pin;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use asynchronous_codec::{FramedRead, FramedWrite};
use fastbloom::BloomFilter;
use futures::stream::StreamExt;
use futures::{stream, AsyncWriteExt, Stream};
use snafu::ResultExt;
use super::intermediate_codec::IntermediateBloomFilterCodecV1;
use crate::bloom_filter::creator::{FALSE_POSITIVE_RATE, SEED};
use crate::bloom_filter::error::{IntermediateSnafu, IoSnafu, Result};
use crate::bloom_filter::Bytes;
use crate::external_provider::ExternalTempFileProvider;
/// The minimum memory usage threshold for flushing in-memory Bloom filters to disk.
const MIN_MEMORY_USAGE_THRESHOLD: usize = 1024 * 1024; // 1MB
/// Storage for finalized Bloom filters.
pub struct FinalizedBloomFilterStorage {
/// Bloom filters that are stored in memory.
in_memory: Vec<FinalizedBloomFilterSegment>,
/// Used to generate unique file IDs for intermediate Bloom filters.
intermediate_file_id_counter: usize,
/// Prefix for intermediate Bloom filter files.
intermediate_prefix: String,
/// The provider for intermediate Bloom filter files.
intermediate_provider: Box<dyn ExternalTempFileProvider>,
/// The memory usage of the in-memory Bloom filters.
memory_usage: usize,
/// The global memory usage provided by the user to track the
/// total memory usage of the creating Bloom filters.
global_memory_usage: Arc<AtomicUsize>,
/// The threshold of the global memory usage of the creating Bloom filters.
global_memory_usage_threshold: Option<usize>,
}
impl FinalizedBloomFilterStorage {
/// Creates a new `FinalizedBloomFilterStorage`.
pub fn new(
intermediate_provider: Box<dyn ExternalTempFileProvider>,
global_memory_usage: Arc<AtomicUsize>,
global_memory_usage_threshold: Option<usize>,
) -> Self {
let external_prefix = format!("intm-bloom-filters-{}", uuid::Uuid::new_v4());
Self {
in_memory: Vec::new(),
intermediate_file_id_counter: 0,
intermediate_prefix: external_prefix,
intermediate_provider,
memory_usage: 0,
global_memory_usage,
global_memory_usage_threshold,
}
}
/// Returns the memory usage of the storage.
pub fn memory_usage(&self) -> usize {
self.memory_usage
}
/// Adds a new finalized Bloom filter to the storage.
///
/// If the memory usage exceeds the threshold, flushes the in-memory Bloom filters to disk.
pub async fn add(
&mut self,
elems: impl IntoIterator<Item = Bytes>,
element_count: usize,
) -> Result<()> {
let mut bf = BloomFilter::with_false_pos(FALSE_POSITIVE_RATE)
.seed(&SEED)
.expected_items(element_count);
for elem in elems.into_iter() {
bf.insert(&elem);
}
let fbf = FinalizedBloomFilterSegment::from(bf, element_count);
// Update memory usage.
let memory_diff = fbf.bloom_filter_bytes.len();
self.memory_usage += memory_diff;
self.global_memory_usage
.fetch_add(memory_diff, Ordering::Relaxed);
// Add the finalized Bloom filter to the in-memory storage.
self.in_memory.push(fbf);
// Flush to disk if necessary.
// Do not flush if memory usage is too low.
if self.memory_usage < MIN_MEMORY_USAGE_THRESHOLD {
return Ok(());
}
// Check if the global memory usage exceeds the threshold and flush to disk if necessary.
if let Some(threshold) = self.global_memory_usage_threshold {
let global = self.global_memory_usage.load(Ordering::Relaxed);
if global > threshold {
self.flush_in_memory_to_disk().await?;
self.global_memory_usage
.fetch_sub(self.memory_usage, Ordering::Relaxed);
self.memory_usage = 0;
}
}
Ok(())
}
/// Drains the storage and returns a stream of finalized Bloom filter segments.
pub async fn drain(
&mut self,
) -> Result<Pin<Box<dyn Stream<Item = Result<FinalizedBloomFilterSegment>> + '_>>> {
// FAST PATH: memory only
if self.intermediate_file_id_counter == 0 {
return Ok(Box::pin(stream::iter(self.in_memory.drain(..).map(Ok))));
}
// SLOW PATH: memory + disk
let mut on_disk = self
.intermediate_provider
.read_all(&self.intermediate_prefix)
.await
.context(IntermediateSnafu)?;
on_disk.sort_unstable_by(|x, y| x.0.cmp(&y.0));
let streams = on_disk
.into_iter()
.map(|(_, reader)| FramedRead::new(reader, IntermediateBloomFilterCodecV1::default()));
let in_memory_stream = stream::iter(self.in_memory.drain(..)).map(Ok);
Ok(Box::pin(
stream::iter(streams).flatten().chain(in_memory_stream),
))
}
/// Flushes the in-memory Bloom filters to disk.
async fn flush_in_memory_to_disk(&mut self) -> Result<()> {
let file_id = self.intermediate_file_id_counter;
self.intermediate_file_id_counter += 1;
let file_id = format!("{:08}", file_id);
let mut writer = self
.intermediate_provider
.create(&self.intermediate_prefix, &file_id)
.await
.context(IntermediateSnafu)?;
let fw = FramedWrite::new(&mut writer, IntermediateBloomFilterCodecV1::default());
// `forward()` will flush and close the writer when the stream ends
if let Err(e) = stream::iter(self.in_memory.drain(..).map(Ok))
.forward(fw)
.await
{
writer.close().await.context(IoSnafu)?;
writer.flush().await.context(IoSnafu)?;
return Err(e);
}
Ok(())
}
}
/// A finalized Bloom filter segment.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct FinalizedBloomFilterSegment {
/// The underlying Bloom filter bytes.
pub bloom_filter_bytes: Vec<u8>,
/// The number of elements in the Bloom filter.
pub element_count: usize,
}
impl FinalizedBloomFilterSegment {
fn from(bf: BloomFilter, elem_count: usize) -> Self {
let bf_slice = bf.as_slice();
let mut bloom_filter_bytes = Vec::with_capacity(std::mem::size_of_val(bf_slice));
for &x in bf_slice {
bloom_filter_bytes.extend_from_slice(&x.to_le_bytes());
}
Self {
bloom_filter_bytes,
element_count: elem_count,
}
}
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::sync::Mutex;
use futures::AsyncRead;
use tokio::io::duplex;
use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
use super::*;
use crate::bloom_filter::creator::tests::u64_vec_from_bytes;
use crate::external_provider::MockExternalTempFileProvider;
#[tokio::test]
async fn test_finalized_bloom_filter_storage() {
let mut mock_provider = MockExternalTempFileProvider::new();
let mock_files: Arc<Mutex<HashMap<String, Box<dyn AsyncRead + Unpin + Send>>>> =
Arc::new(Mutex::new(HashMap::new()));
mock_provider.expect_create().returning({
let files = Arc::clone(&mock_files);
move |file_group, file_id| {
assert!(file_group.starts_with("intm-bloom-filters-"));
let mut files = files.lock().unwrap();
let (writer, reader) = duplex(2 * 1024 * 1024);
files.insert(file_id.to_string(), Box::new(reader.compat()));
Ok(Box::new(writer.compat_write()))
}
});
mock_provider.expect_read_all().returning({
let files = Arc::clone(&mock_files);
move |file_group| {
assert!(file_group.starts_with("intm-bloom-filters-"));
let mut files = files.lock().unwrap();
Ok(files.drain().collect::<Vec<_>>())
}
});
let global_memory_usage = Arc::new(AtomicUsize::new(0));
let global_memory_usage_threshold = Some(1024 * 1024); // 1MB
let provider = Box::new(mock_provider);
let mut storage = FinalizedBloomFilterStorage::new(
provider,
global_memory_usage.clone(),
global_memory_usage_threshold,
);
let elem_count = 2000;
let batch = 1000;
for i in 0..batch {
let elems = (elem_count * i..elem_count * (i + 1)).map(|x| x.to_string().into_bytes());
storage.add(elems, elem_count).await.unwrap();
}
// Flush happens.
assert!(storage.intermediate_file_id_counter > 0);
// Drain the storage.
let mut stream = storage.drain().await.unwrap();
let mut i = 0;
while let Some(segment) = stream.next().await {
let segment = segment.unwrap();
assert_eq!(segment.element_count, elem_count);
let v = u64_vec_from_bytes(&segment.bloom_filter_bytes);
// Check the correctness of the Bloom filter.
let bf = BloomFilter::from_vec(v)
.seed(&SEED)
.expected_items(segment.element_count);
for elem in (elem_count * i..elem_count * (i + 1)).map(|x| x.to_string().into_bytes()) {
assert!(bf.contains(&elem));
}
i += 1;
}
assert_eq!(i, batch);
}
}

View File

@@ -1,248 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use asynchronous_codec::{BytesMut, Decoder, Encoder};
use bytes::{Buf, BufMut};
use snafu::{ensure, ResultExt};
use crate::bloom_filter::creator::finalize_segment::FinalizedBloomFilterSegment;
use crate::bloom_filter::error::{Error, InvalidIntermediateMagicSnafu, IoSnafu, Result};
/// The magic number for the codec version 1 of the intermediate bloom filter.
const CODEC_V1_MAGIC: &[u8; 4] = b"bi01";
/// Codec of the intermediate finalized bloom filter segment.
///
/// # Format
///
/// [ magic ][ elem count ][ size ][ bloom filter ][ elem count ][ size ][ bloom filter ]...
/// [4] [8] [8] [size] [8] [8] [size]
#[derive(Debug, Default)]
pub struct IntermediateBloomFilterCodecV1 {
handled_header_magic: bool,
}
impl Encoder for IntermediateBloomFilterCodecV1 {
type Item<'a> = FinalizedBloomFilterSegment;
type Error = Error;
fn encode(&mut self, item: FinalizedBloomFilterSegment, dst: &mut BytesMut) -> Result<()> {
if !self.handled_header_magic {
dst.extend_from_slice(CODEC_V1_MAGIC);
self.handled_header_magic = true;
}
let segment_bytes = item.bloom_filter_bytes;
let elem_count = item.element_count;
dst.reserve(2 * std::mem::size_of::<u64>() + segment_bytes.len());
dst.put_u64_le(elem_count as u64);
dst.put_u64_le(segment_bytes.len() as u64);
dst.extend_from_slice(&segment_bytes);
Ok(())
}
}
impl Decoder for IntermediateBloomFilterCodecV1 {
type Item = FinalizedBloomFilterSegment;
type Error = Error;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>> {
if !self.handled_header_magic {
let m_len = CODEC_V1_MAGIC.len();
if src.remaining() < m_len {
return Ok(None);
}
let magic_bytes = &src[..m_len];
ensure!(
magic_bytes == CODEC_V1_MAGIC,
InvalidIntermediateMagicSnafu {
invalid: magic_bytes,
}
);
self.handled_header_magic = true;
src.advance(m_len);
}
let s = &src[..];
let u64_size = std::mem::size_of::<u64>();
let n_size = u64_size * 2;
if s.len() < n_size {
return Ok(None);
}
let element_count = u64::from_le_bytes(s[0..u64_size].try_into().unwrap()) as usize;
let segment_size = u64::from_le_bytes(s[u64_size..n_size].try_into().unwrap()) as usize;
if s.len() < n_size + segment_size {
return Ok(None);
}
let bloom_filter_bytes = s[n_size..n_size + segment_size].to_vec();
src.advance(n_size + segment_size);
Ok(Some(FinalizedBloomFilterSegment {
element_count,
bloom_filter_bytes,
}))
}
}
/// Required for [`Encoder`] and [`Decoder`] implementations.
impl From<std::io::Error> for Error {
fn from(error: std::io::Error) -> Self {
Err::<(), std::io::Error>(error)
.context(IoSnafu)
.unwrap_err()
}
}
#[cfg(test)]
mod tests {
use asynchronous_codec::{FramedRead, FramedWrite};
use futures::io::Cursor;
use futures::{SinkExt, StreamExt};
use super::*;
use crate::bloom_filter::creator::finalize_segment::FinalizedBloomFilterSegment;
#[test]
fn test_intermediate_bloom_filter_codec_v1_basic() {
let mut encoder = IntermediateBloomFilterCodecV1::default();
let mut buf = BytesMut::new();
let item1 = FinalizedBloomFilterSegment {
element_count: 2,
bloom_filter_bytes: vec![1, 2, 3, 4],
};
let item2 = FinalizedBloomFilterSegment {
element_count: 3,
bloom_filter_bytes: vec![5, 6, 7, 8],
};
let item3 = FinalizedBloomFilterSegment {
element_count: 4,
bloom_filter_bytes: vec![9, 10, 11, 12],
};
encoder.encode(item1.clone(), &mut buf).unwrap();
encoder.encode(item2.clone(), &mut buf).unwrap();
encoder.encode(item3.clone(), &mut buf).unwrap();
let mut buf = buf.freeze().try_into_mut().unwrap();
let mut decoder = IntermediateBloomFilterCodecV1::default();
let decoded_item1 = decoder.decode(&mut buf).unwrap().unwrap();
let decoded_item2 = decoder.decode(&mut buf).unwrap().unwrap();
let decoded_item3 = decoder.decode(&mut buf).unwrap().unwrap();
assert_eq!(item1, decoded_item1);
assert_eq!(item2, decoded_item2);
assert_eq!(item3, decoded_item3);
}
#[tokio::test]
async fn test_intermediate_bloom_filter_codec_v1_frame_read_write() {
let item1 = FinalizedBloomFilterSegment {
element_count: 2,
bloom_filter_bytes: vec![1, 2, 3, 4],
};
let item2 = FinalizedBloomFilterSegment {
element_count: 3,
bloom_filter_bytes: vec![5, 6, 7, 8],
};
let item3 = FinalizedBloomFilterSegment {
element_count: 4,
bloom_filter_bytes: vec![9, 10, 11, 12],
};
let mut bytes = Cursor::new(vec![]);
let mut writer = FramedWrite::new(&mut bytes, IntermediateBloomFilterCodecV1::default());
writer.send(item1.clone()).await.unwrap();
writer.send(item2.clone()).await.unwrap();
writer.send(item3.clone()).await.unwrap();
writer.flush().await.unwrap();
writer.close().await.unwrap();
let bytes = bytes.into_inner();
let mut reader =
FramedRead::new(bytes.as_slice(), IntermediateBloomFilterCodecV1::default());
let decoded_item1 = reader.next().await.unwrap().unwrap();
let decoded_item2 = reader.next().await.unwrap().unwrap();
let decoded_item3 = reader.next().await.unwrap().unwrap();
assert!(reader.next().await.is_none());
assert_eq!(item1, decoded_item1);
assert_eq!(item2, decoded_item2);
assert_eq!(item3, decoded_item3);
}
#[tokio::test]
async fn test_intermediate_bloom_filter_codec_v1_frame_read_write_only_magic() {
let bytes = CODEC_V1_MAGIC.to_vec();
let mut reader =
FramedRead::new(bytes.as_slice(), IntermediateBloomFilterCodecV1::default());
assert!(reader.next().await.is_none());
}
#[tokio::test]
async fn test_intermediate_bloom_filter_codec_v1_frame_read_write_partial_magic() {
let bytes = CODEC_V1_MAGIC[..3].to_vec();
let mut reader =
FramedRead::new(bytes.as_slice(), IntermediateBloomFilterCodecV1::default());
let e = reader.next().await.unwrap();
assert!(e.is_err());
}
#[tokio::test]
async fn test_intermediate_bloom_filter_codec_v1_frame_read_write_partial_item() {
let mut bytes = vec![];
bytes.extend_from_slice(CODEC_V1_MAGIC);
bytes.extend_from_slice(&2u64.to_le_bytes());
bytes.extend_from_slice(&4u64.to_le_bytes());
let mut reader =
FramedRead::new(bytes.as_slice(), IntermediateBloomFilterCodecV1::default());
let e = reader.next().await.unwrap();
assert!(e.is_err());
}
#[tokio::test]
async fn test_intermediate_bloom_filter_codec_v1_frame_read_write_corrupted_magic() {
let mut bytes = vec![];
bytes.extend_from_slice(b"bi02");
bytes.extend_from_slice(&2u64.to_le_bytes());
bytes.extend_from_slice(&4u64.to_le_bytes());
bytes.extend_from_slice(&[1, 2, 3, 4]);
let mut reader =
FramedRead::new(bytes.as_slice(), IntermediateBloomFilterCodecV1::default());
let e = reader.next().await.unwrap();
assert!(e.is_err());
}
#[tokio::test]
async fn test_intermediate_bloom_filter_codec_v1_frame_read_write_corrupted_length() {
let mut bytes = vec![];
bytes.extend_from_slice(CODEC_V1_MAGIC);
bytes.extend_from_slice(&2u64.to_le_bytes());
bytes.extend_from_slice(&4u64.to_le_bytes());
bytes.extend_from_slice(&[1, 2, 3]);
let mut reader =
FramedRead::new(bytes.as_slice(), IntermediateBloomFilterCodecV1::default());
let e = reader.next().await.unwrap();
assert!(e.is_err());
}
}

View File

@@ -39,43 +39,6 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to deserialize json"))]
DeserializeJson {
#[snafu(source)]
error: serde_json::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Intermediate error"))]
Intermediate {
source: crate::error::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("File size too small for bloom filter"))]
FileSizeTooSmall {
size: u64,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Unexpected bloom filter meta size"))]
UnexpectedMetaSize {
max_meta_size: u64,
actual_meta_size: u64,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Invalid intermediate magic"))]
InvalidIntermediateMagic {
invalid: Vec<u8>,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("External error"))]
External {
source: BoxedError,
@@ -89,14 +52,8 @@ impl ErrorExt for Error {
use Error::*;
match self {
Io { .. }
| SerdeJson { .. }
| FileSizeTooSmall { .. }
| UnexpectedMetaSize { .. }
| DeserializeJson { .. }
| InvalidIntermediateMagic { .. } => StatusCode::Unexpected,
Io { .. } | Self::SerdeJson { .. } => StatusCode::Unexpected,
Intermediate { source, .. } => source.status_code(),
External { source, .. } => source.status_code(),
}
}

View File

@@ -1,265 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::ops::Range;
use async_trait::async_trait;
use bytes::Bytes;
use common_base::range_read::RangeReader;
use fastbloom::BloomFilter;
use snafu::{ensure, ResultExt};
use crate::bloom_filter::error::{
DeserializeJsonSnafu, FileSizeTooSmallSnafu, IoSnafu, Result, UnexpectedMetaSizeSnafu,
};
use crate::bloom_filter::{BloomFilterMeta, BloomFilterSegmentLocation, SEED};
/// Minimum size of the bloom filter, which is the size of the length of the bloom filter.
const BLOOM_META_LEN_SIZE: u64 = 4;
/// Default prefetch size of bloom filter meta.
pub const DEFAULT_PREFETCH_SIZE: u64 = 1024; // 1KiB
/// `BloomFilterReader` reads the bloom filter from the file.
#[async_trait]
pub trait BloomFilterReader {
/// Reads range of bytes from the file.
async fn range_read(&mut self, offset: u64, size: u32) -> Result<Bytes>;
/// Reads bunch of ranges from the file.
async fn read_vec(&mut self, ranges: &[Range<u64>]) -> Result<Vec<Bytes>>;
/// Reads the meta information of the bloom filter.
async fn metadata(&mut self) -> Result<BloomFilterMeta>;
/// Reads a bloom filter with the given location.
async fn bloom_filter(&mut self, loc: &BloomFilterSegmentLocation) -> Result<BloomFilter> {
let bytes = self.range_read(loc.offset, loc.size as _).await?;
let vec = bytes
.chunks_exact(std::mem::size_of::<u64>())
.map(|chunk| u64::from_le_bytes(chunk.try_into().unwrap()))
.collect();
let bm = BloomFilter::from_vec(vec)
.seed(&SEED)
.expected_items(loc.elem_count);
Ok(bm)
}
}
/// `BloomFilterReaderImpl` reads the bloom filter from the file.
pub struct BloomFilterReaderImpl<R: RangeReader> {
/// The underlying reader.
reader: R,
}
impl<R: RangeReader> BloomFilterReaderImpl<R> {
/// Creates a new `BloomFilterReaderImpl` with the given reader.
pub fn new(reader: R) -> Self {
Self { reader }
}
}
#[async_trait]
impl<R: RangeReader> BloomFilterReader for BloomFilterReaderImpl<R> {
async fn range_read(&mut self, offset: u64, size: u32) -> Result<Bytes> {
self.reader
.read(offset..offset + size as u64)
.await
.context(IoSnafu)
}
async fn read_vec(&mut self, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
self.reader.read_vec(ranges).await.context(IoSnafu)
}
async fn metadata(&mut self) -> Result<BloomFilterMeta> {
let metadata = self.reader.metadata().await.context(IoSnafu)?;
let file_size = metadata.content_length;
let mut meta_reader =
BloomFilterMetaReader::new(&mut self.reader, file_size, Some(DEFAULT_PREFETCH_SIZE));
meta_reader.metadata().await
}
}
/// `BloomFilterMetaReader` reads the metadata of the bloom filter.
struct BloomFilterMetaReader<R: RangeReader> {
reader: R,
file_size: u64,
prefetch_size: u64,
}
impl<R: RangeReader> BloomFilterMetaReader<R> {
pub fn new(reader: R, file_size: u64, prefetch_size: Option<u64>) -> Self {
Self {
reader,
file_size,
prefetch_size: prefetch_size
.unwrap_or(BLOOM_META_LEN_SIZE)
.max(BLOOM_META_LEN_SIZE),
}
}
/// Reads the metadata of the bloom filter.
///
/// It will first prefetch some bytes from the end of the file,
/// then parse the metadata from the prefetch bytes.
pub async fn metadata(&mut self) -> Result<BloomFilterMeta> {
ensure!(
self.file_size >= BLOOM_META_LEN_SIZE,
FileSizeTooSmallSnafu {
size: self.file_size,
}
);
let meta_start = self.file_size.saturating_sub(self.prefetch_size);
let suffix = self
.reader
.read(meta_start..self.file_size)
.await
.context(IoSnafu)?;
let suffix_len = suffix.len();
let length = u32::from_le_bytes(Self::read_tailing_four_bytes(&suffix)?) as u64;
self.validate_meta_size(length)?;
if length > suffix_len as u64 - BLOOM_META_LEN_SIZE {
let metadata_start = self.file_size - length - BLOOM_META_LEN_SIZE;
let meta = self
.reader
.read(metadata_start..self.file_size - BLOOM_META_LEN_SIZE)
.await
.context(IoSnafu)?;
serde_json::from_slice(&meta).context(DeserializeJsonSnafu)
} else {
let metadata_start = self.file_size - length - BLOOM_META_LEN_SIZE - meta_start;
let meta = &suffix[metadata_start as usize..suffix_len - BLOOM_META_LEN_SIZE as usize];
serde_json::from_slice(meta).context(DeserializeJsonSnafu)
}
}
fn read_tailing_four_bytes(suffix: &[u8]) -> Result<[u8; 4]> {
let suffix_len = suffix.len();
ensure!(
suffix_len >= 4,
FileSizeTooSmallSnafu {
size: suffix_len as u64
}
);
let mut bytes = [0; 4];
bytes.copy_from_slice(&suffix[suffix_len - 4..suffix_len]);
Ok(bytes)
}
fn validate_meta_size(&self, length: u64) -> Result<()> {
let max_meta_size = self.file_size - BLOOM_META_LEN_SIZE;
ensure!(
length <= max_meta_size,
UnexpectedMetaSizeSnafu {
max_meta_size,
actual_meta_size: length,
}
);
Ok(())
}
}
#[cfg(test)]
mod tests {
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
use futures::io::Cursor;
use super::*;
use crate::bloom_filter::creator::BloomFilterCreator;
use crate::external_provider::MockExternalTempFileProvider;
async fn mock_bloom_filter_bytes() -> Vec<u8> {
let mut writer = Cursor::new(vec![]);
let mut creator = BloomFilterCreator::new(
2,
Box::new(MockExternalTempFileProvider::new()),
Arc::new(AtomicUsize::new(0)),
None,
);
creator
.push_row_elems(vec![b"a".to_vec(), b"b".to_vec()])
.await
.unwrap();
creator
.push_row_elems(vec![b"c".to_vec(), b"d".to_vec()])
.await
.unwrap();
creator
.push_row_elems(vec![b"e".to_vec(), b"f".to_vec()])
.await
.unwrap();
creator.finish(&mut writer).await.unwrap();
writer.into_inner()
}
#[tokio::test]
async fn test_bloom_filter_meta_reader() {
let bytes = mock_bloom_filter_bytes().await;
let file_size = bytes.len() as u64;
for prefetch in [0u64, file_size / 2, file_size, file_size + 10] {
let mut reader =
BloomFilterMetaReader::new(bytes.clone(), file_size as _, Some(prefetch));
let meta = reader.metadata().await.unwrap();
assert_eq!(meta.rows_per_segment, 2);
assert_eq!(meta.seg_count, 2);
assert_eq!(meta.row_count, 3);
assert_eq!(meta.bloom_filter_segments.len(), 2);
assert_eq!(meta.bloom_filter_segments[0].offset, 0);
assert_eq!(meta.bloom_filter_segments[0].elem_count, 4);
assert_eq!(
meta.bloom_filter_segments[1].offset,
meta.bloom_filter_segments[0].size
);
assert_eq!(meta.bloom_filter_segments[1].elem_count, 2);
}
}
#[tokio::test]
async fn test_bloom_filter_reader() {
let bytes = mock_bloom_filter_bytes().await;
let mut reader = BloomFilterReaderImpl::new(bytes);
let meta = reader.metadata().await.unwrap();
assert_eq!(meta.bloom_filter_segments.len(), 2);
let bf = reader
.bloom_filter(&meta.bloom_filter_segments[0])
.await
.unwrap();
assert!(bf.contains(&b"a"));
assert!(bf.contains(&b"b"));
assert!(bf.contains(&b"c"));
assert!(bf.contains(&b"d"));
let bf = reader
.bloom_filter(&meta.bloom_filter_segments[1])
.await
.unwrap();
assert!(bf.contains(&b"e"));
assert!(bf.contains(&b"f"));
}
}

View File

@@ -1,48 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use common_error::ext::{BoxedError, ErrorExt};
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use snafu::{Location, Snafu};
#[derive(Snafu)]
#[snafu(visibility(pub))]
#[stack_trace_debug]
pub enum Error {
#[snafu(display("External error"))]
External {
source: BoxedError,
#[snafu(implicit)]
location: Location,
},
}
impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
use Error::*;
match self {
External { source, .. } => source.status_code(),
}
}
fn as_any(&self) -> &dyn Any {
self
}
}
pub type Result<T> = std::result::Result<T, Error>;

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod external_provider;
pub mod external_sort;
mod intermediate_rw;
mod merge_stream;

View File

@@ -15,24 +15,25 @@
use async_trait::async_trait;
use futures::{AsyncRead, AsyncWrite};
use crate::error::Error;
use crate::inverted_index::error::Result;
pub type Writer = Box<dyn AsyncWrite + Unpin + Send>;
pub type Reader = Box<dyn AsyncRead + Unpin + Send>;
/// Trait for managing intermediate files to control memory usage for a particular index.
/// Trait for managing intermediate files during external sorting for a particular index.
#[mockall::automock]
#[async_trait]
pub trait ExternalTempFileProvider: Send + Sync {
/// Creates and opens a new intermediate file associated with a specific `file_group` for writing.
/// Creates and opens a new intermediate file associated with a specific index for writing.
/// The implementation should ensure that the file does not already exist.
///
/// - `file_group`: a unique identifier for the group of files
/// - `index_name`: the name of the index for which the file will be associated
/// - `file_id`: a unique identifier for the new file
async fn create(&self, file_group: &str, file_id: &str) -> Result<Writer, Error>;
async fn create(
&self,
index_name: &str,
file_id: &str,
) -> Result<Box<dyn AsyncWrite + Unpin + Send>>;
/// Retrieves all intermediate files and their associated file identifiers for a specific `file_group`.
/// Retrieves all intermediate files associated with a specific index for an external sorting operation.
///
/// `file_group` is a unique identifier for the group of files.
async fn read_all(&self, file_group: &str) -> Result<Vec<(String, Reader)>, Error>;
/// `index_name`: the name of the index to retrieve intermediate files for
async fn read_all(&self, index_name: &str) -> Result<Vec<Box<dyn AsyncRead + Unpin + Send>>>;
}

View File

@@ -23,16 +23,15 @@ use async_trait::async_trait;
use common_base::BitVec;
use common_telemetry::{debug, error};
use futures::stream;
use snafu::ResultExt;
use crate::external_provider::ExternalTempFileProvider;
use crate::inverted_index::create::sort::external_provider::ExternalTempFileProvider;
use crate::inverted_index::create::sort::intermediate_rw::{
IntermediateReader, IntermediateWriter,
};
use crate::inverted_index::create::sort::merge_stream::MergeSortedStream;
use crate::inverted_index::create::sort::{SortOutput, SortedStream, Sorter};
use crate::inverted_index::create::sort_create::SorterFactory;
use crate::inverted_index::error::{IntermediateSnafu, Result};
use crate::inverted_index::error::Result;
use crate::inverted_index::{Bytes, BytesRef};
/// `ExternalSorter` manages the sorting of data using both in-memory structures and external files.
@@ -108,11 +107,7 @@ impl Sorter for ExternalSorter {
/// Finalizes the sorting operation, merging data from both in-memory buffer and external files
/// into a sorted stream
async fn output(&mut self) -> Result<SortOutput> {
let readers = self
.temp_file_provider
.read_all(&self.index_name)
.await
.context(IntermediateSnafu)?;
let readers = self.temp_file_provider.read_all(&self.index_name).await?;
// TODO(zhongzc): k-way merge instead of 2-way merge
@@ -127,7 +122,7 @@ impl Sorter for ExternalSorter {
Ok((value, bitmap))
}),
)));
for (_, reader) in readers {
for reader in readers {
tree_nodes.push_back(IntermediateReader::new(reader).into_stream().await?);
}
@@ -246,11 +241,7 @@ impl ExternalSorter {
let file_id = &format!("{:012}", self.total_row_count);
let index_name = &self.index_name;
let writer = self
.temp_file_provider
.create(index_name, file_id)
.await
.context(IntermediateSnafu)?;
let writer = self.temp_file_provider.create(index_name, file_id).await?;
let values = mem::take(&mut self.values_buffer);
self.global_memory_usage
@@ -311,7 +302,7 @@ mod tests {
use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
use super::*;
use crate::external_provider::MockExternalTempFileProvider;
use crate::inverted_index::create::sort::external_provider::MockExternalTempFileProvider;
async fn test_external_sorter(
current_memory_usage_threshold: Option<usize>,
@@ -341,7 +332,7 @@ mod tests {
move |index_name| {
assert_eq!(index_name, "test");
let mut files = files.lock().unwrap();
Ok(files.drain().collect::<Vec<_>>())
Ok(files.drain().map(|f| f.1).collect::<Vec<_>>())
}
});

View File

@@ -213,13 +213,6 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Intermediate error"))]
Intermediate {
source: crate::error::Error,
#[snafu(implicit)]
location: Location,
},
}
impl ErrorExt for Error {
@@ -252,7 +245,6 @@ impl ErrorExt for Error {
| InconsistentRowCount { .. }
| IndexNotFound { .. } => StatusCode::InvalidArguments,
Intermediate { source, .. } => source.status_code(),
External { source, .. } => source.status_code(),
}
}

View File

@@ -16,7 +16,5 @@
#![feature(assert_matches)]
pub mod bloom_filter;
pub mod error;
pub mod external_provider;
pub mod fulltext_index;
pub mod inverted_index;

View File

@@ -313,12 +313,12 @@ mod test {
let region_dir = "test_metric_region";
// assert metadata region's dir
let metadata_region_dir = join_dir(region_dir, METADATA_REGION_SUBDIR);
let exist = object_store.exists(&metadata_region_dir).await.unwrap();
let exist = object_store.is_exist(&metadata_region_dir).await.unwrap();
assert!(exist);
// assert data region's dir
let data_region_dir = join_dir(region_dir, DATA_REGION_SUBDIR);
let exist = object_store.exists(&data_region_dir).await.unwrap();
let exist = object_store.is_exist(&data_region_dir).await.unwrap();
assert!(exist);
// check mito engine

View File

@@ -286,7 +286,7 @@ impl FileCache {
}
async fn get_reader(&self, file_path: &str) -> object_store::Result<Option<Reader>> {
if self.local_store.exists(file_path).await? {
if self.local_store.is_exist(file_path).await? {
Ok(Some(self.local_store.reader(file_path).await?))
} else {
Ok(None)
@@ -480,7 +480,7 @@ mod tests {
cache.memory_index.run_pending_tasks().await;
// The file also not exists.
assert!(!local_store.exists(&file_path).await.unwrap());
assert!(!local_store.is_exist(&file_path).await.unwrap());
assert_eq!(0, cache.memory_index.weighted_size());
}

View File

@@ -192,12 +192,12 @@ async fn test_engine_create_with_custom_store() {
assert!(object_store_manager
.find("Gcs")
.unwrap()
.exists(region_dir)
.is_exist(region_dir)
.await
.unwrap());
assert!(!object_store_manager
.default_object_store()
.exists(region_dir)
.is_exist(region_dir)
.await
.unwrap());
}

View File

@@ -71,7 +71,7 @@ async fn test_engine_drop_region() {
assert!(!env
.get_object_store()
.unwrap()
.exists(&join_path(&region_dir, DROPPING_MARKER_FILE))
.is_exist(&join_path(&region_dir, DROPPING_MARKER_FILE))
.await
.unwrap());
@@ -93,7 +93,7 @@ async fn test_engine_drop_region() {
listener.wait().await;
let object_store = env.get_object_store().unwrap();
assert!(!object_store.exists(&region_dir).await.unwrap());
assert!(!object_store.is_exist(&region_dir).await.unwrap());
}
#[tokio::test]
@@ -167,13 +167,13 @@ async fn test_engine_drop_region_for_custom_store() {
assert!(object_store_manager
.find("Gcs")
.unwrap()
.exists(&custom_region_dir)
.is_exist(&custom_region_dir)
.await
.unwrap());
assert!(object_store_manager
.find("default")
.unwrap()
.exists(&global_region_dir)
.is_exist(&global_region_dir)
.await
.unwrap());
@@ -190,13 +190,13 @@ async fn test_engine_drop_region_for_custom_store() {
assert!(!object_store_manager
.find("Gcs")
.unwrap()
.exists(&custom_region_dir)
.is_exist(&custom_region_dir)
.await
.unwrap());
assert!(object_store_manager
.find("default")
.unwrap()
.exists(&global_region_dir)
.is_exist(&global_region_dir)
.await
.unwrap());
}

View File

@@ -228,13 +228,13 @@ async fn test_engine_region_open_with_custom_store() {
let object_store_manager = env.get_object_store_manager().unwrap();
assert!(!object_store_manager
.default_object_store()
.exists(region.access_layer.region_dir())
.is_exist(region.access_layer.region_dir())
.await
.unwrap());
assert!(object_store_manager
.find("Gcs")
.unwrap()
.exists(region.access_layer.region_dir())
.is_exist(region.access_layer.region_dir())
.await
.unwrap());
}

View File

@@ -84,7 +84,6 @@ async fn manager_without_checkpoint() {
// check files
let mut expected = vec![
"/",
"00000000000000000010.json",
"00000000000000000009.json",
"00000000000000000008.json",
@@ -131,7 +130,6 @@ async fn manager_with_checkpoint_distance_1() {
// check files
let mut expected = vec![
"/",
"00000000000000000009.checkpoint",
"00000000000000000010.checkpoint",
"00000000000000000010.json",

View File

@@ -185,7 +185,7 @@ mod tests {
scheduler.stop(true).await.unwrap();
assert!(!object_store.exists(&path).await.unwrap());
assert!(!object_store.is_exist(&path).await.unwrap());
}
#[tokio::test]
@@ -247,7 +247,7 @@ mod tests {
scheduler.stop(true).await.unwrap();
assert!(!object_store.exists(&path).await.unwrap());
assert!(!object_store.exists(&index_path).await.unwrap());
assert!(!object_store.is_exist(&path).await.unwrap());
assert!(!object_store.is_exist(&index_path).await.unwrap());
}
}

View File

@@ -104,28 +104,16 @@ impl IntermediateLocation {
&self.files_dir
}
/// Returns the path of the directory for intermediate files associated with the `file_group`:
/// `__intm/{region_id}/{sst_file_id}/{uuid}/{file_group}/`
pub fn file_group_path(&self, file_group: &str) -> String {
util::join_path(&self.files_dir, &format!("{file_group}/"))
/// Returns the path of the directory for intermediate files associated with a column:
/// `__intm/{region_id}/{sst_file_id}/{uuid}/{column_id}/`
pub fn column_path(&self, column_id: &str) -> String {
util::join_path(&self.files_dir, &format!("{column_id}/"))
}
/// Returns the path of the intermediate file with the given `file_group` and `im_file_id`:
/// `__intm/{region_id}/{sst_file_id}/{uuid}/{file_group}/{im_file_id}.im`
pub fn file_path(&self, file_group: &str, im_file_id: &str) -> String {
util::join_path(
&self.file_group_path(file_group),
&format!("{im_file_id}.im"),
)
}
/// Returns the intermediate file id from the path.
pub fn im_file_id_from_path(&self, path: &str) -> String {
path.rsplit('/')
.next()
.and_then(|s| s.strip_suffix(".im"))
.unwrap_or_default()
.to_string()
/// Returns the path of the intermediate file with the given id for a column:
/// `__intm/{region_id}/{sst_file_id}/{uuid}/{column_id}/{im_file_id}.im`
pub fn file_path(&self, column_id: &str, im_file_id: &str) -> String {
util::join_path(&self.column_path(column_id), &format!("{im_file_id}.im"))
}
}
@@ -173,20 +161,17 @@ mod tests {
let uuid = location.files_dir.split('/').nth(3).unwrap();
let file_group = "1";
let column_id = "1";
assert_eq!(
location.file_group_path(file_group),
format!("{INTERMEDIATE_DIR}/0/{sst_file_id}/{uuid}/{file_group}/")
location.column_path(column_id),
format!("{INTERMEDIATE_DIR}/0/{sst_file_id}/{uuid}/{column_id}/")
);
let im_file_id = "000000000010";
let file_path = location.file_path(file_group, im_file_id);
assert_eq!(
file_path,
format!("{INTERMEDIATE_DIR}/0/{sst_file_id}/{uuid}/{file_group}/{im_file_id}.im")
location.file_path(column_id, im_file_id),
format!("{INTERMEDIATE_DIR}/0/{sst_file_id}/{uuid}/{column_id}/{im_file_id}.im")
);
assert_eq!(location.im_file_id_from_path(&file_path), im_file_id);
}
#[tokio::test]

View File

@@ -16,9 +16,9 @@ use async_trait::async_trait;
use common_error::ext::BoxedError;
use common_telemetry::warn;
use futures::{AsyncRead, AsyncWrite};
use index::error as index_error;
use index::error::Result as IndexResult;
use index::external_provider::ExternalTempFileProvider;
use index::inverted_index::create::sort::external_provider::ExternalTempFileProvider;
use index::inverted_index::error as index_error;
use index::inverted_index::error::Result as IndexResult;
use snafu::ResultExt;
use crate::error::Result;
@@ -42,10 +42,10 @@ pub(crate) struct TempFileProvider {
impl ExternalTempFileProvider for TempFileProvider {
async fn create(
&self,
file_group: &str,
column_id: &str,
file_id: &str,
) -> IndexResult<Box<dyn AsyncWrite + Unpin + Send>> {
let path = self.location.file_path(file_group, file_id);
let path = self.location.file_path(column_id, file_id);
let writer = self
.manager
.store()
@@ -63,13 +63,13 @@ impl ExternalTempFileProvider for TempFileProvider {
async fn read_all(
&self,
file_group: &str,
) -> IndexResult<Vec<(String, Box<dyn AsyncRead + Unpin + Send>)>> {
let file_group_path = self.location.file_group_path(file_group);
column_id: &str,
) -> IndexResult<Vec<Box<dyn AsyncRead + Unpin + Send>>> {
let column_path = self.location.column_path(column_id);
let entries = self
.manager
.store()
.list(&file_group_path)
.list(&column_path)
.await
.map_err(BoxedError::new)
.context(index_error::ExternalSnafu)?;
@@ -81,8 +81,6 @@ impl ExternalTempFileProvider for TempFileProvider {
continue;
}
let im_file_id = self.location.im_file_id_from_path(entry.path());
let reader = self
.manager
.store()
@@ -95,7 +93,7 @@ impl ExternalTempFileProvider for TempFileProvider {
.await
.map_err(BoxedError::new)
.context(index_error::ExternalSnafu)?;
readers.push((im_file_id, Box::new(reader) as _));
readers.push(Box::new(reader) as _);
}
Ok(readers)
@@ -135,36 +133,36 @@ mod tests {
let store = IntermediateManager::init_fs(path).await.unwrap();
let provider = TempFileProvider::new(location.clone(), store);
let file_group = "tag0";
let column_name = "tag0";
let file_id = "0000000010";
let mut writer = provider.create(file_group, file_id).await.unwrap();
let mut writer = provider.create(column_name, file_id).await.unwrap();
writer.write_all(b"hello").await.unwrap();
writer.flush().await.unwrap();
writer.close().await.unwrap();
let file_id = "0000000100";
let mut writer = provider.create(file_group, file_id).await.unwrap();
let mut writer = provider.create(column_name, file_id).await.unwrap();
writer.write_all(b"world").await.unwrap();
writer.flush().await.unwrap();
writer.close().await.unwrap();
let file_group = "tag1";
let column_name = "tag1";
let file_id = "0000000010";
let mut writer = provider.create(file_group, file_id).await.unwrap();
let mut writer = provider.create(column_name, file_id).await.unwrap();
writer.write_all(b"foo").await.unwrap();
writer.flush().await.unwrap();
writer.close().await.unwrap();
let readers = provider.read_all("tag0").await.unwrap();
assert_eq!(readers.len(), 2);
for (_, mut reader) in readers {
for mut reader in readers {
let mut buf = Vec::new();
reader.read_to_end(&mut buf).await.unwrap();
assert!(matches!(buf.as_slice(), b"hello" | b"world"));
}
let readers = provider.read_all("tag1").await.unwrap();
assert_eq!(readers.len(), 1);
let mut reader = readers.into_iter().map(|x| x.1).next().unwrap();
let mut reader = readers.into_iter().next().unwrap();
let mut buf = Vec::new();
reader.read_to_end(&mut buf).await.unwrap();
assert_eq!(buf, b"foo");

View File

@@ -51,7 +51,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
// Check if this region is pending drop. And clean the entire dir if so.
if !self.dropping_regions.is_region_exists(region_id)
&& object_store
.exists(&join_path(&request.region_dir, DROPPING_MARKER_FILE))
.is_exist(&join_path(&request.region_dir, DROPPING_MARKER_FILE))
.await
.context(OpenDalSnafu)?
{

View File

@@ -17,9 +17,8 @@ futures.workspace = true
lazy_static.workspace = true
md5 = "0.7"
moka = { workspace = true, features = ["future"] }
opendal = { version = "0.50", features = [
opendal = { version = "0.49", features = [
"layers-tracing",
"layers-prometheus",
"services-azblob",
"services-fs",
"services-gcs",

View File

@@ -13,37 +13,8 @@
// limitations under the License.
mod lru_cache;
mod prometheus;
pub use lru_cache::*;
pub use opendal::layers::*;
pub use prometheus::build_prometheus_metrics_layer;
mod prometheus {
use std::sync::{Mutex, OnceLock};
use opendal::layers::PrometheusLayer;
static PROMETHEUS_LAYER: OnceLock<Mutex<PrometheusLayer>> = OnceLock::new();
pub fn build_prometheus_metrics_layer(with_path_label: bool) -> PrometheusLayer {
PROMETHEUS_LAYER
.get_or_init(|| {
// This logical tries to extract parent path from the object storage operation
// the function also relies on assumption that the region path is built from
// pattern `<data|index>/catalog/schema/table_id/....`
//
// We'll get the data/catalog/schema from path.
let path_level = if with_path_label { 3 } else { 0 };
let layer = PrometheusLayer::builder()
.path_label(path_level)
.register_default()
.unwrap();
Mutex::new(layer)
})
.lock()
.unwrap()
.clone()
}
}
pub use prometheus::PrometheusMetricsLayer;

View File

@@ -156,12 +156,9 @@ impl<C: Access> ReadCache<C> {
let size = entry.metadata().content_length();
OBJECT_STORE_LRU_CACHE_ENTRIES.inc();
OBJECT_STORE_LRU_CACHE_BYTES.add(size as i64);
// ignore root path
if entry.path() != "/" {
self.mem_cache
.insert(read_key.to_string(), ReadResult::Success(size as u32))
.await;
}
self.mem_cache
.insert(read_key.to_string(), ReadResult::Success(size as u32))
.await;
}
Ok(self.cache_stat().await)

View File

@@ -0,0 +1,584 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! code originally from <https://github.com/apache/incubator-opendal/blob/main/core/src/layers/prometheus.rs>, make a tiny change to avoid crash in multi thread env
use std::fmt::{Debug, Formatter};
use common_telemetry::debug;
use lazy_static::lazy_static;
use opendal::raw::*;
use opendal::{Buffer, ErrorKind};
use prometheus::{
exponential_buckets, histogram_opts, register_histogram_vec, register_int_counter_vec,
Histogram, HistogramTimer, HistogramVec, IntCounterVec,
};
use crate::util::extract_parent_path;
type Result<T> = std::result::Result<T, opendal::Error>;
lazy_static! {
static ref REQUESTS_TOTAL: IntCounterVec = register_int_counter_vec!(
"opendal_requests_total",
"Total times of all kinds of operation being called",
&["scheme", "operation", "path"],
)
.unwrap();
static ref REQUESTS_DURATION_SECONDS: HistogramVec = register_histogram_vec!(
histogram_opts!(
"opendal_requests_duration_seconds",
"Histogram of the time spent on specific operation",
exponential_buckets(0.01, 2.0, 16).unwrap()
),
&["scheme", "operation", "path"]
)
.unwrap();
static ref BYTES_TOTAL: HistogramVec = register_histogram_vec!(
histogram_opts!(
"opendal_bytes_total",
"Total size of sync or async Read/Write",
exponential_buckets(0.01, 2.0, 16).unwrap()
),
&["scheme", "operation", "path"]
)
.unwrap();
}
#[inline]
fn increment_errors_total(op: Operation, kind: ErrorKind) {
debug!(
"Prometheus statistics metrics error, operation {} error {}",
op.into_static(),
kind.into_static()
);
}
/// Please refer to [prometheus](https://docs.rs/prometheus) for every operation.
///
/// # Prometheus Metrics
///
/// In this section, we will introduce three metrics that are currently being exported by opendal. These metrics are essential for understanding the behavior and performance of opendal.
///
///
/// | Metric Name | Type | Description | Labels |
/// |-----------------------------------|-----------|------------------------------------------------------|---------------------|
/// | opendal_requests_total | Counter | Total times of all kinds of operation being called | scheme, operation |
/// | opendal_requests_duration_seconds | Histogram | Histogram of the time spent on specific operation | scheme, operation |
/// | opendal_bytes_total | Histogram | Total size of sync or async Read/Write | scheme, operation |
///
/// For a more detailed explanation of these metrics and how they are used, please refer to the [Prometheus documentation](https://prometheus.io/docs/introduction/overview/).
///
/// # Histogram Configuration
///
/// The metric buckets for these histograms are automatically generated based on the `exponential_buckets(0.01, 2.0, 16)` configuration.
#[derive(Default, Debug, Clone)]
pub struct PrometheusMetricsLayer {
pub path_label: bool,
}
impl PrometheusMetricsLayer {
pub fn new(path_label: bool) -> Self {
Self { path_label }
}
}
impl<A: Access> Layer<A> for PrometheusMetricsLayer {
type LayeredAccess = PrometheusAccess<A>;
fn layer(&self, inner: A) -> Self::LayeredAccess {
let meta = inner.info();
let scheme = meta.scheme();
PrometheusAccess {
inner,
scheme: scheme.to_string(),
path_label: self.path_label,
}
}
}
#[derive(Clone)]
pub struct PrometheusAccess<A: Access> {
inner: A,
scheme: String,
path_label: bool,
}
impl<A: Access> PrometheusAccess<A> {
fn get_path_label<'a>(&self, path: &'a str) -> &'a str {
if self.path_label {
extract_parent_path(path)
} else {
""
}
}
}
impl<A: Access> Debug for PrometheusAccess<A> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PrometheusAccessor")
.field("inner", &self.inner)
.finish_non_exhaustive()
}
}
impl<A: Access> LayeredAccess for PrometheusAccess<A> {
type Inner = A;
type Reader = PrometheusMetricWrapper<A::Reader>;
type BlockingReader = PrometheusMetricWrapper<A::BlockingReader>;
type Writer = PrometheusMetricWrapper<A::Writer>;
type BlockingWriter = PrometheusMetricWrapper<A::BlockingWriter>;
type Lister = A::Lister;
type BlockingLister = A::BlockingLister;
fn inner(&self) -> &Self::Inner {
&self.inner
}
async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
let path_label = self.get_path_label(path);
REQUESTS_TOTAL
.with_label_values(&[&self.scheme, Operation::CreateDir.into_static(), path_label])
.inc();
let timer = REQUESTS_DURATION_SECONDS
.with_label_values(&[&self.scheme, Operation::CreateDir.into_static(), path_label])
.start_timer();
let create_res = self.inner.create_dir(path, args).await;
timer.observe_duration();
create_res.inspect_err(|e| {
increment_errors_total(Operation::CreateDir, e.kind());
})
}
async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
let path_label = self.get_path_label(path);
REQUESTS_TOTAL
.with_label_values(&[&self.scheme, Operation::Read.into_static(), path_label])
.inc();
let timer = REQUESTS_DURATION_SECONDS
.with_label_values(&[&self.scheme, Operation::Read.into_static(), path_label])
.start_timer();
let (rp, r) = self.inner.read(path, args).await.inspect_err(|e| {
increment_errors_total(Operation::Read, e.kind());
})?;
Ok((
rp,
PrometheusMetricWrapper::new(
r,
Operation::Read,
BYTES_TOTAL.with_label_values(&[
&self.scheme,
Operation::Read.into_static(),
path_label,
]),
timer,
),
))
}
async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
let path_label = self.get_path_label(path);
REQUESTS_TOTAL
.with_label_values(&[&self.scheme, Operation::Write.into_static(), path_label])
.inc();
let timer = REQUESTS_DURATION_SECONDS
.with_label_values(&[&self.scheme, Operation::Write.into_static(), path_label])
.start_timer();
let (rp, r) = self.inner.write(path, args).await.inspect_err(|e| {
increment_errors_total(Operation::Write, e.kind());
})?;
Ok((
rp,
PrometheusMetricWrapper::new(
r,
Operation::Write,
BYTES_TOTAL.with_label_values(&[
&self.scheme,
Operation::Write.into_static(),
path_label,
]),
timer,
),
))
}
async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
let path_label = self.get_path_label(path);
REQUESTS_TOTAL
.with_label_values(&[&self.scheme, Operation::Stat.into_static(), path_label])
.inc();
let timer = REQUESTS_DURATION_SECONDS
.with_label_values(&[&self.scheme, Operation::Stat.into_static(), path_label])
.start_timer();
let stat_res = self.inner.stat(path, args).await;
timer.observe_duration();
stat_res.inspect_err(|e| {
increment_errors_total(Operation::Stat, e.kind());
})
}
async fn delete(&self, path: &str, args: OpDelete) -> Result<RpDelete> {
let path_label = self.get_path_label(path);
REQUESTS_TOTAL
.with_label_values(&[&self.scheme, Operation::Delete.into_static(), path_label])
.inc();
let timer = REQUESTS_DURATION_SECONDS
.with_label_values(&[&self.scheme, Operation::Delete.into_static(), path_label])
.start_timer();
let delete_res = self.inner.delete(path, args).await;
timer.observe_duration();
delete_res.inspect_err(|e| {
increment_errors_total(Operation::Delete, e.kind());
})
}
async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
let path_label = self.get_path_label(path);
REQUESTS_TOTAL
.with_label_values(&[&self.scheme, Operation::List.into_static(), path_label])
.inc();
let timer = REQUESTS_DURATION_SECONDS
.with_label_values(&[&self.scheme, Operation::List.into_static(), path_label])
.start_timer();
let list_res = self.inner.list(path, args).await;
timer.observe_duration();
list_res.inspect_err(|e| {
increment_errors_total(Operation::List, e.kind());
})
}
async fn batch(&self, args: OpBatch) -> Result<RpBatch> {
REQUESTS_TOTAL
.with_label_values(&[&self.scheme, Operation::Batch.into_static(), ""])
.inc();
let timer = REQUESTS_DURATION_SECONDS
.with_label_values(&[&self.scheme, Operation::Batch.into_static(), ""])
.start_timer();
let result = self.inner.batch(args).await;
timer.observe_duration();
result.inspect_err(|e| {
increment_errors_total(Operation::Batch, e.kind());
})
}
async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
let path_label = self.get_path_label(path);
REQUESTS_TOTAL
.with_label_values(&[&self.scheme, Operation::Presign.into_static(), path_label])
.inc();
let timer = REQUESTS_DURATION_SECONDS
.with_label_values(&[&self.scheme, Operation::Presign.into_static(), path_label])
.start_timer();
let result = self.inner.presign(path, args).await;
timer.observe_duration();
result.inspect_err(|e| {
increment_errors_total(Operation::Presign, e.kind());
})
}
fn blocking_create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
let path_label = self.get_path_label(path);
REQUESTS_TOTAL
.with_label_values(&[
&self.scheme,
Operation::BlockingCreateDir.into_static(),
path_label,
])
.inc();
let timer = REQUESTS_DURATION_SECONDS
.with_label_values(&[
&self.scheme,
Operation::BlockingCreateDir.into_static(),
path_label,
])
.start_timer();
let result = self.inner.blocking_create_dir(path, args);
timer.observe_duration();
result.inspect_err(|e| {
increment_errors_total(Operation::BlockingCreateDir, e.kind());
})
}
fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> {
let path_label = self.get_path_label(path);
REQUESTS_TOTAL
.with_label_values(&[
&self.scheme,
Operation::BlockingRead.into_static(),
path_label,
])
.inc();
let timer = REQUESTS_DURATION_SECONDS
.with_label_values(&[
&self.scheme,
Operation::BlockingRead.into_static(),
path_label,
])
.start_timer();
self.inner
.blocking_read(path, args)
.map(|(rp, r)| {
(
rp,
PrometheusMetricWrapper::new(
r,
Operation::BlockingRead,
BYTES_TOTAL.with_label_values(&[
&self.scheme,
Operation::BlockingRead.into_static(),
path_label,
]),
timer,
),
)
})
.inspect_err(|e| {
increment_errors_total(Operation::BlockingRead, e.kind());
})
}
fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> {
let path_label = self.get_path_label(path);
REQUESTS_TOTAL
.with_label_values(&[
&self.scheme,
Operation::BlockingWrite.into_static(),
path_label,
])
.inc();
let timer = REQUESTS_DURATION_SECONDS
.with_label_values(&[
&self.scheme,
Operation::BlockingWrite.into_static(),
path_label,
])
.start_timer();
self.inner
.blocking_write(path, args)
.map(|(rp, r)| {
(
rp,
PrometheusMetricWrapper::new(
r,
Operation::BlockingWrite,
BYTES_TOTAL.with_label_values(&[
&self.scheme,
Operation::BlockingWrite.into_static(),
path_label,
]),
timer,
),
)
})
.inspect_err(|e| {
increment_errors_total(Operation::BlockingWrite, e.kind());
})
}
fn blocking_stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
let path_label = self.get_path_label(path);
REQUESTS_TOTAL
.with_label_values(&[
&self.scheme,
Operation::BlockingStat.into_static(),
path_label,
])
.inc();
let timer = REQUESTS_DURATION_SECONDS
.with_label_values(&[
&self.scheme,
Operation::BlockingStat.into_static(),
path_label,
])
.start_timer();
let result = self.inner.blocking_stat(path, args);
timer.observe_duration();
result.inspect_err(|e| {
increment_errors_total(Operation::BlockingStat, e.kind());
})
}
fn blocking_delete(&self, path: &str, args: OpDelete) -> Result<RpDelete> {
let path_label = self.get_path_label(path);
REQUESTS_TOTAL
.with_label_values(&[
&self.scheme,
Operation::BlockingDelete.into_static(),
path_label,
])
.inc();
let timer = REQUESTS_DURATION_SECONDS
.with_label_values(&[
&self.scheme,
Operation::BlockingDelete.into_static(),
path_label,
])
.start_timer();
let result = self.inner.blocking_delete(path, args);
timer.observe_duration();
result.inspect_err(|e| {
increment_errors_total(Operation::BlockingDelete, e.kind());
})
}
fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingLister)> {
let path_label = self.get_path_label(path);
REQUESTS_TOTAL
.with_label_values(&[
&self.scheme,
Operation::BlockingList.into_static(),
path_label,
])
.inc();
let timer = REQUESTS_DURATION_SECONDS
.with_label_values(&[
&self.scheme,
Operation::BlockingList.into_static(),
path_label,
])
.start_timer();
let result = self.inner.blocking_list(path, args);
timer.observe_duration();
result.inspect_err(|e| {
increment_errors_total(Operation::BlockingList, e.kind());
})
}
}
pub struct PrometheusMetricWrapper<R> {
inner: R,
op: Operation,
bytes_counter: Histogram,
_requests_duration_timer: HistogramTimer,
bytes: u64,
}
impl<R> Drop for PrometheusMetricWrapper<R> {
fn drop(&mut self) {
self.bytes_counter.observe(self.bytes as f64);
}
}
impl<R> PrometheusMetricWrapper<R> {
fn new(
inner: R,
op: Operation,
bytes_counter: Histogram,
requests_duration_timer: HistogramTimer,
) -> Self {
Self {
inner,
op,
bytes_counter,
_requests_duration_timer: requests_duration_timer,
bytes: 0,
}
}
}
impl<R: oio::Read> oio::Read for PrometheusMetricWrapper<R> {
async fn read(&mut self) -> Result<Buffer> {
self.inner.read().await.inspect_err(|err| {
increment_errors_total(self.op, err.kind());
})
}
}
impl<R: oio::BlockingRead> oio::BlockingRead for PrometheusMetricWrapper<R> {
fn read(&mut self) -> opendal::Result<Buffer> {
self.inner.read().inspect_err(|err| {
increment_errors_total(self.op, err.kind());
})
}
}
impl<R: oio::Write> oio::Write for PrometheusMetricWrapper<R> {
async fn write(&mut self, bs: Buffer) -> Result<()> {
let bytes = bs.len();
match self.inner.write(bs).await {
Ok(_) => {
self.bytes += bytes as u64;
Ok(())
}
Err(err) => {
increment_errors_total(self.op, err.kind());
Err(err)
}
}
}
async fn close(&mut self) -> Result<()> {
self.inner.close().await.inspect_err(|err| {
increment_errors_total(self.op, err.kind());
})
}
async fn abort(&mut self) -> Result<()> {
self.inner.close().await.inspect_err(|err| {
increment_errors_total(self.op, err.kind());
})
}
}
impl<R: oio::BlockingWrite> oio::BlockingWrite for PrometheusMetricWrapper<R> {
fn write(&mut self, bs: Buffer) -> Result<()> {
let bytes = bs.len();
self.inner
.write(bs)
.map(|_| {
self.bytes += bytes as u64;
})
.inspect_err(|err| {
increment_errors_total(self.op, err.kind());
})
}
fn close(&mut self) -> Result<()> {
self.inner.close().inspect_err(|err| {
increment_errors_total(self.op, err.kind());
})
}
}

View File

@@ -15,12 +15,19 @@
use std::fmt::Display;
use common_telemetry::{debug, error, trace};
use futures::TryStreamExt;
use opendal::layers::{LoggingInterceptor, LoggingLayer, TracingLayer};
use opendal::raw::{AccessorInfo, Operation};
use opendal::ErrorKind;
use opendal::{Entry, ErrorKind, Lister};
use crate::layers::PrometheusMetricsLayer;
use crate::ObjectStore;
/// Collect all entries from the [Lister].
pub async fn collect(stream: Lister) -> Result<Vec<Entry>, opendal::Error> {
stream.try_collect::<Vec<_>>().await
}
/// Join two paths and normalize the output dir.
///
/// The output dir is always ends with `/`. e.g.
@@ -120,12 +127,26 @@ pub fn normalize_path(path: &str) -> String {
p
}
// This logical tries to extract parent path from the object storage operation
// the function also relies on assumption that the region path is built from
// pattern `<data|index>/catalog/schema/table_id/....`
//
// this implementation tries to extract at most 3 levels of parent path
pub(crate) fn extract_parent_path(path: &str) -> &str {
// split the path into `catalog`, `schema` and others
path.char_indices()
.filter(|&(_, c)| c == '/')
// we get the data/catalog/schema from path, split at the 3rd /
.nth(2)
.map_or(path, |(i, _)| &path[..i])
}
/// Attaches instrument layers to the object store.
pub fn with_instrument_layers(object_store: ObjectStore, path_label: bool) -> ObjectStore {
object_store
.layer(LoggingLayer::new(DefaultLoggingInterceptor))
.layer(TracingLayer)
.layer(crate::layers::build_prometheus_metrics_layer(path_label))
.layer(PrometheusMetricsLayer::new(path_label))
}
static LOGGING_TARGET: &str = "opendal::services";
@@ -242,4 +263,28 @@ mod tests {
assert_eq!("/abc", join_path("//", "/abc"));
assert_eq!("abc/def", join_path("abc/", "//def"));
}
#[test]
fn test_path_extraction() {
assert_eq!(
"data/greptime/public",
extract_parent_path("data/greptime/public/1024/1024_0000000000/")
);
assert_eq!(
"data/greptime/public",
extract_parent_path("data/greptime/public/1/")
);
assert_eq!(
"data/greptime/public",
extract_parent_path("data/greptime/public")
);
assert_eq!("data/greptime/", extract_parent_path("data/greptime/"));
assert_eq!("data/", extract_parent_path("data/"));
assert_eq!("/", extract_parent_path("/"));
}
}

View File

@@ -65,38 +65,23 @@ async fn test_object_list(store: &ObjectStore) -> Result<()> {
store.write(p3, "Hello, object3!").await?;
// List objects
let entries = store
.list("/")
.await?
.into_iter()
.filter(|x| x.metadata().mode() == EntryMode::FILE)
.collect::<Vec<_>>();
let entries = store.list("/").await?;
assert_eq!(3, entries.len());
store.delete(p1).await?;
store.delete(p3).await?;
// List objects again
// Only o2 and root exist
let entries = store
.list("/")
.await?
.into_iter()
.filter(|x| x.metadata().mode() == EntryMode::FILE)
.collect::<Vec<_>>();
// Only o2 is exists
let entries = store.list("/").await?;
assert_eq!(1, entries.len());
assert_eq!(p2, entries[0].path());
assert_eq!(p2, entries.first().unwrap().path());
let content = store.read(p2).await?;
assert_eq!("Hello, object2!", String::from_utf8(content.to_vec())?);
store.delete(p2).await?;
let entries = store
.list("/")
.await?
.into_iter()
.filter(|x| x.metadata().mode() == EntryMode::FILE)
.collect::<Vec<_>>();
let entries = store.list("/").await?;
assert!(entries.is_empty());
assert!(store.read(p1).await.is_err());
@@ -267,7 +252,7 @@ async fn test_file_backend_with_lru_cache() -> Result<()> {
async fn assert_lru_cache<C: Access>(cache_layer: &LruCacheLayer<C>, file_names: &[&str]) {
for file_name in file_names {
assert!(cache_layer.contains_file(file_name).await, "{file_name}");
assert!(cache_layer.contains_file(file_name).await);
}
}
@@ -279,9 +264,7 @@ async fn assert_cache_files<C: Access>(
let (_, mut lister) = store.list("/", OpList::default()).await?;
let mut objects = vec![];
while let Some(e) = lister.next().await? {
if e.mode() == EntryMode::FILE {
objects.push(e);
}
objects.push(e);
}
// compare the cache file with the expected cache file; ignore orders
@@ -349,9 +332,9 @@ async fn test_object_store_cache_policy() -> Result<()> {
assert_cache_files(
&cache_store,
&[
"6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=0-",
"ecfe0dce85de452eb0a325158e7bfb75.cache-bytes=7-",
"ecfe0dce85de452eb0a325158e7bfb75.cache-bytes=0-",
"6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=0-14",
"ecfe0dce85de452eb0a325158e7bfb75.cache-bytes=7-14",
"ecfe0dce85de452eb0a325158e7bfb75.cache-bytes=0-14",
],
&["Hello, object1!", "object2!", "Hello, object2!"],
)
@@ -359,9 +342,9 @@ async fn test_object_store_cache_policy() -> Result<()> {
assert_lru_cache(
&cache_layer,
&[
"6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=0-",
"ecfe0dce85de452eb0a325158e7bfb75.cache-bytes=7-",
"ecfe0dce85de452eb0a325158e7bfb75.cache-bytes=0-",
"6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=0-14",
"ecfe0dce85de452eb0a325158e7bfb75.cache-bytes=7-14",
"ecfe0dce85de452eb0a325158e7bfb75.cache-bytes=0-14",
],
)
.await;
@@ -372,13 +355,13 @@ async fn test_object_store_cache_policy() -> Result<()> {
assert_eq!(cache_layer.read_cache_stat().await, (1, 15));
assert_cache_files(
&cache_store,
&["6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=0-"],
&["6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=0-14"],
&["Hello, object1!"],
)
.await?;
assert_lru_cache(
&cache_layer,
&["6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=0-"],
&["6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=0-14"],
)
.await;
@@ -405,8 +388,8 @@ async fn test_object_store_cache_policy() -> Result<()> {
assert_cache_files(
&cache_store,
&[
"6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=0-",
"a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-",
"6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=0-14",
"a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-14",
"a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-4",
],
&["Hello, object1!", "Hello, object3!", "Hello"],
@@ -415,8 +398,8 @@ async fn test_object_store_cache_policy() -> Result<()> {
assert_lru_cache(
&cache_layer,
&[
"6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=0-",
"a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-",
"6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=0-14",
"a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-14",
"a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-4",
],
)
@@ -433,7 +416,7 @@ async fn test_object_store_cache_policy() -> Result<()> {
&cache_store,
&[
"6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=1-14",
"a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-",
"a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-14",
"a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-4",
],
&["ello, object1!", "Hello, object3!", "Hello"],
@@ -443,7 +426,7 @@ async fn test_object_store_cache_policy() -> Result<()> {
&cache_layer,
&[
"6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=1-14",
"a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-",
"a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-14",
"a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-4",
],
)
@@ -465,7 +448,7 @@ async fn test_object_store_cache_policy() -> Result<()> {
&cache_layer,
&[
"6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=1-14",
"a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-",
"a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-14",
"a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-4",
],
)

View File

@@ -141,10 +141,10 @@ macro_rules! make_get_from_env_helper {
}
make_get_from_env_helper!(GT_FUZZ_INPUT_MAX_ALTER_ACTIONS, 256);
make_get_from_env_helper!(GT_FUZZ_INPUT_MAX_INSERT_ACTIONS, 4);
make_get_from_env_helper!(GT_FUZZ_INPUT_MAX_INSERT_ACTIONS, 8);
make_get_from_env_helper!(GT_FUZZ_INPUT_MAX_ROWS, 512);
make_get_from_env_helper!(GT_FUZZ_INPUT_MAX_TABLES, 32);
make_get_from_env_helper!(GT_FUZZ_INPUT_MAX_COLUMNS, 16);
make_get_from_env_helper!(GT_FUZZ_INPUT_MAX_TABLES, 64);
make_get_from_env_helper!(GT_FUZZ_INPUT_MAX_COLUMNS, 32);
/// Retrieves a value from the environment variables
/// or returns a default value if the environment variable is not set.