Compare commits

..

27 Commits

Author SHA1 Message Date
Weny Xu
904d560175 feat(promql-planner): introduce vector matching binary operation (#5578)
* feat(promql-planner): support vector matching for binary operation

* test: add sqlness tests
2025-02-27 07:39:19 +00:00
Lei, HUANG
765d1277ee fix(metasrv): clean expired nodes in memory (#5592)
* fix/frontend-node-state: Refactor NodeInfoKey and Context Handling in Meta Server

 • Removed unused cluster_id from NodeInfoKey struct.
 • Updated HeartbeatHandlerGroup to return Context alongside HeartbeatResponse.
 • Added current_node_info to Context for tracking node information.
 • Implemented on_node_disconnect in Context to handle node disconnection events, specifically for Frontend roles.
 • Adjusted register_pusher function to return PusherId directly.
 • Updated tests to accommodate changes in Context structure.

* fix/frontend-node-state: Refactor Heartbeat Handler Context Management

Refactored the HeartbeatHandlerGroup::handle method to use a mutable reference for Context instead of passing it by value. This change simplifies the
context management by eliminating the need to return the context with the response. Updated the Metasrv implementation to align with this new context
handling approach, improving code clarity and reducing unnecessary context cloning.

* revert: clean cluster info on disconnect

* fix/frontend-node-state: Add Frontend Expiry Listener and Update NodeInfoKey Conversion

 • Introduced FrontendExpiryListener to manage the expiration of frontend nodes, including its integration with leadership change notifications.
 • Modified NodeInfoKey conversion to use references, enhancing efficiency and consistency across the codebase.
 • Updated collect_cluster_info_handler and metasrv to incorporate the new listener and conversion changes.
 • Added frontend_expiry module to the project structure for better organization and maintainability.

* chore: add config for node expiry

* add some doc

* fix: clippy

* fix/frontend-node-state:
 ### Refactor Node Expiry Handling
 - **Configuration Update**: Removed `node_expiry_tick` from `metasrv.example.toml` and `MetasrvOptions` in `metasrv.rs`.
 - **Module Renaming**: Renamed `frontend_expiry.rs` to `node_expiry_listener.rs` and updated references in `lib.rs`.
 - **Code Refactoring**: Replaced `FrontendExpiryListener` with `NodeExpiryListener` in `node_expiry_listener.rs` and `metasrv.rs`, removing the tick     interval and adjusting logic to use a fixed 60-second interval for node expiry checks.

* fix/frontend-node-state:
 Improve logging in `node_expiry_listener.rs`

 - Enhanced warning message to include peer information when an unrecognized node info key is encountered in `node_expiry_listener.rs`.

* docs: update config docs

* fix/frontend-node-state:
 **Refactor Context Handling in Heartbeat Services**

 - Updated `HeartbeatHandlerGroup` in `handler.rs` to pass `Context` by value instead of by mutable reference, allowing for more flexible context
 management.
 - Modified `Metasrv` implementation in `heartbeat.rs` to clone `Context` when passing to `handle` method, ensuring thread safety and consistency in
 asynchronous operations.
2025-02-27 06:16:36 +00:00
discord9
ccf42a9d97 fix: flow heartbeat retry (#5600)
* fix: flow heartbeat retry

* fix?: not sure if fixed

* chore: per review
2025-02-27 03:58:21 +00:00
Weny Xu
71e2fb895f feat: introduce prom_round fn (#5604)
* feat: introduce `prom_round` fn

* test: add sqlness tests
2025-02-27 03:30:15 +00:00
Ruihang Xia
c9671fd669 feat(promql): implement subquery (#5606)
* feat: initial implement for promql subquery

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* impl and test

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* refactor

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix clippy

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2025-02-27 03:28:04 +00:00
Ruihang Xia
b5efc75aab feat(promql): ignore invalid input in histogram plan (#5607)
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2025-02-27 03:18:20 +00:00
Weny Xu
c1d18d9980 fix(prom): preserve the order of series in PromQueryResult (#5601)
fix(prom): keep the order of tags
2025-02-26 13:40:09 +00:00
Lei, HUANG
5d9faaaf39 fix(metasrv): reject ddl when metasrv is follower (#5599)
* fix/reject-ddl-in-follower-metasrv:
 Add leader check and logging for gRPC requests in `procedure.rs`

 - Implemented leader verification for `query_procedure_state`, `ddl`, and `procedure_details` gRPC requests in `procedure.rs`.
 - Added logging with `warn` for requests reaching a non-leader node.
 - Introduced `ResponseHeader` and `Error::is_not_leader()` to handle non-leader responses.

* fix/reject-ddl-in-follower-metasrv:
 Improve leader address handling in `heartbeat.rs`

 - Refactor leader address retrieval by renaming `leader` to `leader_addr` for clarity.
 - Update `make_client` function to use a reference to `leader_addr`.
 - Enhance logging to include the leader address in the success message for creating a heartbeat stream.

* fmt

* fix/reject-ddl-in-follower-metasrv:
 **Enhance Leader Check in `procedure.rs`**

 - Updated the leader verification logic in `procedure.rs` to return a failed `MigrateRegionResponse` when the server is not the leader.
 - Added logging to warn when a migrate request is received by a non-leader server.
2025-02-26 08:10:40 +00:00
ZonaHe
538875abee feat: update dashboard to v0.7.11 (#5597)
Co-authored-by: sunchanglong <sunchanglong@users.noreply.github.com>
2025-02-26 07:57:59 +00:00
jeremyhi
5ed09c4584 fix: all heartbeat channel need to check leader (#5593) 2025-02-25 10:45:30 +00:00
Yingwen
3f6a41eac5 fix: update show create table output for fulltext index (#5591)
* fix: update full index syntax in show create table

* test: update fulltext sqlness result
2025-02-25 09:36:27 +00:00
yihong
ff0dcf12c5 perf: close issue 4974 by do not delete columns when drop logical region about 100 times faster (#5561)
* perf: do not delete columns when drop logical region in drop database

Signed-off-by: yihong0618 <zouzou0208@gmail.com>

* fix: make ci happy

Signed-off-by: yihong0618 <zouzou0208@gmail.com>

* fix: address review comments

Signed-off-by: yihong0618 <zouzou0208@gmail.com>

* fix: address some comments

Signed-off-by: yihong0618 <zouzou0208@gmail.com>

* fix: drop stupid comments by copilot

Signed-off-by: yihong0618 <zouzou0208@gmail.com>

* chore: minor refactor

* chore: minor refactor

* chore: update grpetime-proto

---------

Signed-off-by: yihong0618 <zouzou0208@gmail.com>
Co-authored-by: WenyXu <wenymedia@gmail.com>
2025-02-25 09:00:49 +00:00
Yingwen
5b1fca825a fix: remove cached and uploaded files on failure (#5590) 2025-02-25 08:51:37 +00:00
Ruihang Xia
7bd108e2be feat: impl hll_state, hll_merge and hll_calc for incremental distinct counting (#5579)
* basic impl

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* more tests

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* sqlness test

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix clippy

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* update with more test and logs

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* impl

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* impl merge fn

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* rename function names

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2025-02-24 19:07:37 +00:00
Weny Xu
286f225e50 fix: correct inverted_indexed_column_ids behavior (#5586)
* fix: correct `inverted_indexed_column_ids`

* fix: fix unit tests
2025-02-23 07:17:38 +00:00
Ruihang Xia
4f988b5ba9 feat: remove default inverted index for physical table (#5583)
* feat: remove default inverted index for physical table

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* update sqlness result

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2025-02-22 06:48:05 +00:00
Ruihang Xia
500d0852eb fix: avoid run labeler job concurrently (#5584)
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2025-02-22 05:18:26 +00:00
Zhenchi
8d05fb3503 feat: unify puffin name passed to stager (#5564)
* feat: purge a given puffin file in staging area

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* polish log

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* ttl set to 2d

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* feat: expose staging_ttl to index config

* feat: unify puffin name passed to stager

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix test

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* address comments

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fallback to remote index

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* refactor

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

---------

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
Co-authored-by: evenyag <realevenyag@gmail.com>
2025-02-21 09:27:03 +00:00
Ruihang Xia
d7b6718be0 feat: run sqlness in parallel (#5499)
* define server mode

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* bump sqlness

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* all good

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* clean up

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* refactor: Move config generation logic from Env to ServerMode

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* finalize

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* change license header

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* rename variables

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* override parallelism

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* rename more variables

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2025-02-21 07:05:19 +00:00
Ruihang Xia
6f0783e17e fix: broken link in AUTHOR.md (#5581) 2025-02-21 07:01:41 +00:00
Ruihang Xia
d69e93b91a feat: support to generate json output for explain analyze in http api (#5567)
* impl

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* integration test

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* Update src/servers/src/http/hints.rs

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* refactor: with FORMAT option for explain format

* lift some well-known metrics

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Ning Sun <sunning@greptime.com>
2025-02-21 05:13:09 +00:00
Ruihang Xia
76083892cd feat: support UNNEST (#5580)
* feat: support UNNEST

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix clippy and sqlness

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2025-02-21 04:53:56 +00:00
Ruihang Xia
7981c06989 feat: implement uddsketch function to calculate percentile (#5574)
* basic impl

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* more tests

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* sqlness test

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix clippy

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* update with more test and logs

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2025-02-20 18:59:20 +00:00
beryl678
97bb1519f8 docs: revise the author list (#5575) 2025-02-20 18:04:23 +00:00
Weny Xu
1d8c9c1843 feat: enable gzip for prometheus query handlers and ignore NaN values in prometheus response (#5576)
* feat: enable gzip for prometheus query handlers and ignore nan values in prometheus response

* Apply suggestions from code review

Co-authored-by: shuiyisong <113876041+shuiyisong@users.noreply.github.com>

---------

Co-authored-by: shuiyisong <113876041+shuiyisong@users.noreply.github.com>
2025-02-20 11:34:32 +00:00
jeremyhi
71007e200c feat: remap flow route address (#5565)
* feat: remap fow peers

* refactor: not stream

* feat: remap flownode addr on FlowRoute and TableFlow

* fix: unit test

* Update src/meta-srv/src/handler/remap_flow_peer_handler.rs

Co-authored-by: Lei, HUANG <6406592+v0y4g3r@users.noreply.github.com>

* chore: by comment

* Update src/meta-srv/src/handler/remap_flow_peer_handler.rs

* Update src/common/meta/src/key/flow/table_flow.rs

* Update src/common/meta/src/key/flow/flow_route.rs

* chore: remove duplicate field

---------

Co-authored-by: Lei, HUANG <6406592+v0y4g3r@users.noreply.github.com>
2025-02-20 08:21:32 +00:00
jeremyhi
a0ff9e751e feat: flow type on creating procedure (#5572)
feat: flow type on creating
2025-02-20 08:12:02 +00:00
154 changed files with 4699 additions and 1192 deletions

View File

@@ -24,9 +24,4 @@ runs:
--set auth.rbac.token.enabled=false \
--set persistence.size=2Gi \
--create-namespace \
--set global.security.allowInsecureImages=true \
--set image.registry=docker.io \
--set image.repository=greptime/etcd \
--set image.tag=3.6.1-debian-12-r3 \
--version 12.0.8 \
-n ${{ inputs.namespace }}

View File

@@ -51,7 +51,7 @@ runs:
run: |
helm upgrade \
--install my-greptimedb \
--set meta.backendStorage.etcd.endpoints=${{ inputs.etcd-endpoints }} \
--set meta.etcdEndpoints=${{ inputs.etcd-endpoints }} \
--set meta.enableRegionFailover=${{ inputs.enable-region-failover }} \
--set image.registry=${{ inputs.image-registry }} \
--set image.repository=${{ inputs.image-repository }} \

View File

@@ -23,8 +23,4 @@ runs:
--set listeners.controller.protocol=PLAINTEXT \
--set listeners.client.protocol=PLAINTEXT \
--create-namespace \
--set image.registry=docker.io \
--set image.repository=greptime/kafka \
--set image.tag=3.9.0-debian-12-r1 \
--version 31.0.0 \
-n ${{ inputs.namespace }}

View File

@@ -1,34 +0,0 @@
#!/bin/bash
# This script is used to pull the test dependency images that are stored in public ECR one by one to avoid rate limiting.
set -e
MAX_RETRIES=3
IMAGES=(
"greptime/zookeeper:3.7"
"greptime/kafka:3.9.0-debian-12-r1"
"greptime/etcd:3.6.1-debian-12-r3"
"greptime/minio:2024"
"greptime/mysql:5.7"
)
for image in "${IMAGES[@]}"; do
for ((attempt=1; attempt<=MAX_RETRIES; attempt++)); do
if docker pull "$image"; then
# Successfully pulled the image.
break
else
# Use some simple exponential backoff to avoid rate limiting.
if [ $attempt -lt $MAX_RETRIES ]; then
sleep_seconds=$((attempt * 5))
echo "Attempt $attempt failed for $image, waiting $sleep_seconds seconds"
sleep $sleep_seconds # 5s, 10s delays
else
echo "Failed to pull $image after $MAX_RETRIES attempts"
exit 1
fi
fi
done
done

View File

@@ -14,7 +14,7 @@ name: Build API docs
jobs:
apidoc:
runs-on: ubuntu-22.04
runs-on: ubuntu-20.04
steps:
- uses: actions/checkout@v4
with:

View File

@@ -16,11 +16,11 @@ on:
description: The runner uses to build linux-amd64 artifacts
default: ec2-c6i.4xlarge-amd64
options:
- ubuntu-22.04
- ubuntu-22.04-8-cores
- ubuntu-22.04-16-cores
- ubuntu-22.04-32-cores
- ubuntu-22.04-64-cores
- ubuntu-20.04
- ubuntu-20.04-8-cores
- ubuntu-20.04-16-cores
- ubuntu-20.04-32-cores
- ubuntu-20.04-64-cores
- ec2-c6i.xlarge-amd64 # 4C8G
- ec2-c6i.2xlarge-amd64 # 8C16G
- ec2-c6i.4xlarge-amd64 # 16C32G
@@ -83,7 +83,7 @@ jobs:
allocate-runners:
name: Allocate runners
if: ${{ github.repository == 'GreptimeTeam/greptimedb' }}
runs-on: ubuntu-22.04
runs-on: ubuntu-20.04
outputs:
linux-amd64-runner: ${{ steps.start-linux-amd64-runner.outputs.label }}
linux-arm64-runner: ${{ steps.start-linux-arm64-runner.outputs.label }}
@@ -218,7 +218,7 @@ jobs:
build-linux-amd64-artifacts,
build-linux-arm64-artifacts,
]
runs-on: ubuntu-22.04
runs-on: ubuntu-20.04
outputs:
build-result: ${{ steps.set-build-result.outputs.build-result }}
steps:
@@ -251,7 +251,7 @@ jobs:
allocate-runners,
release-images-to-dockerhub,
]
runs-on: ubuntu-22.04
runs-on: ubuntu-20.04
continue-on-error: true
steps:
- uses: actions/checkout@v4
@@ -283,7 +283,7 @@ jobs:
name: Stop linux-amd64 runner
# Only run this job when the runner is allocated.
if: ${{ always() }}
runs-on: ubuntu-22.04
runs-on: ubuntu-20.04
needs: [
allocate-runners,
build-linux-amd64-artifacts,
@@ -309,7 +309,7 @@ jobs:
name: Stop linux-arm64 runner
# Only run this job when the runner is allocated.
if: ${{ always() }}
runs-on: ubuntu-22.04
runs-on: ubuntu-20.04
needs: [
allocate-runners,
build-linux-arm64-artifacts,
@@ -337,7 +337,7 @@ jobs:
needs: [
release-images-to-dockerhub
]
runs-on: ubuntu-22.04
runs-on: ubuntu-20.04
permissions:
issues: write

View File

@@ -21,13 +21,14 @@ concurrency:
cancel-in-progress: true
jobs:
check-docs:
name: Check docs
runs-on: ubuntu-22.04
check-typos-and-docs:
name: Check typos and docs
runs-on: ubuntu-20.04
steps:
- uses: actions/checkout@v4
with:
persist-credentials: false
- uses: crate-ci/typos@master
- name: Check the config docs
run: |
make config-docs && \
@@ -35,7 +36,7 @@ jobs:
|| (echo "'config/config.md' is not up-to-date, please run 'make config-docs'." && exit 1)
license-header-check:
runs-on: ubuntu-22.04
runs-on: ubuntu-20.04
name: Check License Header
steps:
- uses: actions/checkout@v4
@@ -48,7 +49,7 @@ jobs:
runs-on: ${{ matrix.os }}
strategy:
matrix:
os: [ ubuntu-22.04 ]
os: [ ubuntu-20.04 ]
timeout-minutes: 60
steps:
- uses: actions/checkout@v4
@@ -71,7 +72,7 @@ jobs:
toml:
name: Toml Check
runs-on: ubuntu-22.04
runs-on: ubuntu-20.04
timeout-minutes: 60
steps:
- uses: actions/checkout@v4
@@ -88,7 +89,7 @@ jobs:
runs-on: ${{ matrix.os }}
strategy:
matrix:
os: [ ubuntu-22.04 ]
os: [ ubuntu-20.04 ]
timeout-minutes: 60
steps:
- uses: actions/checkout@v4
@@ -247,7 +248,7 @@ jobs:
runs-on: ${{ matrix.os }}
strategy:
matrix:
os: [ ubuntu-22.04 ]
os: [ ubuntu-20.04 ]
timeout-minutes: 60
steps:
- uses: actions/checkout@v4
@@ -567,7 +568,7 @@ jobs:
runs-on: ${{ matrix.os }}
strategy:
matrix:
os: [ ubuntu-22.04 ]
os: [ ubuntu-20.04 ]
mode:
- name: "Basic"
opts: ""
@@ -586,8 +587,7 @@ jobs:
- if: matrix.mode.kafka
name: Setup kafka server
working-directory: tests-integration/fixtures
run: ../../.github/scripts/pull-test-deps-images.sh && docker compose up -d --wait kafka
run: docker compose up -d --wait kafka
- name: Download pre-built binaries
uses: actions/download-artifact@v4
with:
@@ -607,7 +607,7 @@ jobs:
fmt:
name: Rustfmt
runs-on: ubuntu-22.04
runs-on: ubuntu-20.04
timeout-minutes: 60
steps:
- uses: actions/checkout@v4
@@ -624,7 +624,7 @@ jobs:
clippy:
name: Clippy
runs-on: ubuntu-22.04
runs-on: ubuntu-20.04
timeout-minutes: 60
steps:
- uses: actions/checkout@v4
@@ -685,8 +685,7 @@ jobs:
uses: taiki-e/install-action@nextest
- name: Setup external services
working-directory: tests-integration/fixtures
run: ../../.github/scripts/pull-test-deps-images.sh && docker compose up -d --wait
run: docker compose up -d --wait
- name: Run nextest cases
run: cargo nextest run --workspace -F dashboard -F pg_kvbackend
env:
@@ -711,7 +710,7 @@ jobs:
coverage:
if: github.event_name == 'merge_group'
runs-on: ubuntu-22.04-8-cores
runs-on: ubuntu-20.04-8-cores
timeout-minutes: 60
steps:
- uses: actions/checkout@v4
@@ -738,8 +737,7 @@ jobs:
uses: taiki-e/install-action@cargo-llvm-cov
- name: Setup external services
working-directory: tests-integration/fixtures
run: ../../.github/scripts/pull-test-deps-images.sh && docker compose up -d --wait
run: docker compose up -d --wait
- name: Run nextest cases
run: cargo llvm-cov nextest --workspace --lcov --output-path lcov.info -F dashboard -F pg_kvbackend
env:
@@ -772,7 +770,7 @@ jobs:
# compat:
# name: Compatibility Test
# needs: build
# runs-on: ubuntu-22.04
# runs-on: ubuntu-20.04
# timeout-minutes: 60
# steps:
# - uses: actions/checkout@v4

View File

@@ -3,9 +3,13 @@ on:
pull_request_target:
types: [opened, edited]
concurrency:
group: ${{ github.workflow }}-${{ github.head_ref || github.run_id }}
cancel-in-progress: true
jobs:
docbot:
runs-on: ubuntu-22.04
runs-on: ubuntu-20.04
permissions:
pull-requests: write
contents: read

View File

@@ -31,7 +31,7 @@ name: CI
jobs:
typos:
name: Spell Check with Typos
runs-on: ubuntu-22.04
runs-on: ubuntu-20.04
steps:
- uses: actions/checkout@v4
with:
@@ -39,7 +39,7 @@ jobs:
- uses: crate-ci/typos@master
license-header-check:
runs-on: ubuntu-22.04
runs-on: ubuntu-20.04
name: Check License Header
steps:
- uses: actions/checkout@v4
@@ -49,29 +49,29 @@ jobs:
check:
name: Check
runs-on: ubuntu-22.04
runs-on: ubuntu-20.04
steps:
- run: 'echo "No action required"'
fmt:
name: Rustfmt
runs-on: ubuntu-22.04
runs-on: ubuntu-20.04
steps:
- run: 'echo "No action required"'
clippy:
name: Clippy
runs-on: ubuntu-22.04
runs-on: ubuntu-20.04
steps:
- run: 'echo "No action required"'
coverage:
runs-on: ubuntu-22.04
runs-on: ubuntu-20.04
steps:
- run: 'echo "No action required"'
test:
runs-on: ubuntu-22.04
runs-on: ubuntu-20.04
steps:
- run: 'echo "No action required"'
@@ -80,7 +80,7 @@ jobs:
runs-on: ${{ matrix.os }}
strategy:
matrix:
os: [ ubuntu-22.04 ]
os: [ ubuntu-20.04 ]
mode:
- name: "Basic"
- name: "Remote WAL"

View File

@@ -14,11 +14,11 @@ on:
description: The runner uses to build linux-amd64 artifacts
default: ec2-c6i.4xlarge-amd64
options:
- ubuntu-22.04
- ubuntu-22.04-8-cores
- ubuntu-22.04-16-cores
- ubuntu-22.04-32-cores
- ubuntu-22.04-64-cores
- ubuntu-20.04
- ubuntu-20.04-8-cores
- ubuntu-20.04-16-cores
- ubuntu-20.04-32-cores
- ubuntu-20.04-64-cores
- ec2-c6i.xlarge-amd64 # 4C8G
- ec2-c6i.2xlarge-amd64 # 8C16G
- ec2-c6i.4xlarge-amd64 # 16C32G
@@ -70,7 +70,7 @@ jobs:
allocate-runners:
name: Allocate runners
if: ${{ github.repository == 'GreptimeTeam/greptimedb' }}
runs-on: ubuntu-22.04
runs-on: ubuntu-20.04
outputs:
linux-amd64-runner: ${{ steps.start-linux-amd64-runner.outputs.label }}
linux-arm64-runner: ${{ steps.start-linux-arm64-runner.outputs.label }}
@@ -182,7 +182,7 @@ jobs:
build-linux-amd64-artifacts,
build-linux-arm64-artifacts,
]
runs-on: ubuntu-22.04
runs-on: ubuntu-20.04
outputs:
nightly-build-result: ${{ steps.set-nightly-build-result.outputs.nightly-build-result }}
steps:
@@ -214,7 +214,7 @@ jobs:
allocate-runners,
release-images-to-dockerhub,
]
runs-on: ubuntu-22.04
runs-on: ubuntu-20.04
# When we push to ACR, it's easy to fail due to some unknown network issues.
# However, we don't want to fail the whole workflow because of this.
# The ACR have daily sync with DockerHub, so don't worry about the image not being updated.
@@ -249,7 +249,7 @@ jobs:
name: Stop linux-amd64 runner
# Only run this job when the runner is allocated.
if: ${{ always() }}
runs-on: ubuntu-22.04
runs-on: ubuntu-20.04
needs: [
allocate-runners,
build-linux-amd64-artifacts,
@@ -275,7 +275,7 @@ jobs:
name: Stop linux-arm64 runner
# Only run this job when the runner is allocated.
if: ${{ always() }}
runs-on: ubuntu-22.04
runs-on: ubuntu-20.04
needs: [
allocate-runners,
build-linux-arm64-artifacts,
@@ -303,7 +303,7 @@ jobs:
needs: [
release-images-to-dockerhub
]
runs-on: ubuntu-22.04
runs-on: ubuntu-20.04
permissions:
issues: write
env:

View File

@@ -133,7 +133,7 @@ jobs:
name: Check status
needs: [sqlness-test, sqlness-windows, test-on-windows]
if: ${{ github.repository == 'GreptimeTeam/greptimedb' }}
runs-on: ubuntu-22.04
runs-on: ubuntu-20.04
outputs:
check-result: ${{ steps.set-check-result.outputs.check-result }}
steps:
@@ -146,7 +146,7 @@ jobs:
if: ${{ github.repository == 'GreptimeTeam/greptimedb' && always() }} # Not requiring successful dependent jobs, always run.
name: Send notification to Greptime team
needs: [check-status]
runs-on: ubuntu-22.04
runs-on: ubuntu-20.04
env:
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL_DEVELOP_CHANNEL }}
steps:

View File

@@ -29,7 +29,7 @@ jobs:
release-dev-builder-images:
name: Release dev builder images
if: ${{ inputs.release_dev_builder_ubuntu_image || inputs.release_dev_builder_centos_image || inputs.release_dev_builder_android_image }} # Only manually trigger this job.
runs-on: ubuntu-22.04-16-cores
runs-on: ubuntu-20.04-16-cores
outputs:
version: ${{ steps.set-version.outputs.version }}
steps:
@@ -63,7 +63,7 @@ jobs:
release-dev-builder-images-ecr:
name: Release dev builder images to AWS ECR
runs-on: ubuntu-22.04
runs-on: ubuntu-20.04
needs: [
release-dev-builder-images
]
@@ -148,7 +148,7 @@ jobs:
release-dev-builder-images-cn: # Note: Be careful issue: https://github.com/containers/skopeo/issues/1874 and we decide to use the latest stable skopeo container.
name: Release dev builder images to CN region
runs-on: ubuntu-22.04
runs-on: ubuntu-20.04
needs: [
release-dev-builder-images
]

View File

@@ -18,11 +18,11 @@ on:
description: The runner uses to build linux-amd64 artifacts
default: ec2-c6i.4xlarge-amd64
options:
- ubuntu-22.04
- ubuntu-22.04-8-cores
- ubuntu-22.04-16-cores
- ubuntu-22.04-32-cores
- ubuntu-22.04-64-cores
- ubuntu-20.04
- ubuntu-20.04-8-cores
- ubuntu-20.04-16-cores
- ubuntu-20.04-32-cores
- ubuntu-20.04-64-cores
- ec2-c6i.xlarge-amd64 # 4C8G
- ec2-c6i.2xlarge-amd64 # 8C16G
- ec2-c6i.4xlarge-amd64 # 16C32G
@@ -97,7 +97,7 @@ jobs:
allocate-runners:
name: Allocate runners
if: ${{ github.repository == 'GreptimeTeam/greptimedb' }}
runs-on: ubuntu-22.04
runs-on: ubuntu-20.04
outputs:
linux-amd64-runner: ${{ steps.start-linux-amd64-runner.outputs.label }}
linux-arm64-runner: ${{ steps.start-linux-arm64-runner.outputs.label }}
@@ -335,7 +335,7 @@ jobs:
build-windows-artifacts,
release-images-to-dockerhub,
]
runs-on: ubuntu-22.04
runs-on: ubuntu-20.04
# When we push to ACR, it's easy to fail due to some unknown network issues.
# However, we don't want to fail the whole workflow because of this.
# The ACR have daily sync with DockerHub, so don't worry about the image not being updated.
@@ -377,7 +377,7 @@ jobs:
build-windows-artifacts,
release-images-to-dockerhub,
]
runs-on: ubuntu-22.04
runs-on: ubuntu-20.04
steps:
- uses: actions/checkout@v4
with:
@@ -396,7 +396,7 @@ jobs:
name: Stop linux-amd64 runner
# Only run this job when the runner is allocated.
if: ${{ always() }}
runs-on: ubuntu-22.04
runs-on: ubuntu-20.04
needs: [
allocate-runners,
build-linux-amd64-artifacts,
@@ -422,7 +422,7 @@ jobs:
name: Stop linux-arm64 runner
# Only run this job when the runner is allocated.
if: ${{ always() }}
runs-on: ubuntu-22.04
runs-on: ubuntu-20.04
needs: [
allocate-runners,
build-linux-arm64-artifacts,
@@ -448,7 +448,7 @@ jobs:
name: Bump doc version
if: ${{ github.event_name == 'push' || github.event_name == 'schedule' }}
needs: [allocate-runners]
runs-on: ubuntu-22.04
runs-on: ubuntu-20.04
# Permission reference: https://docs.github.com/en/actions/using-jobs/assigning-permissions-to-jobs
permissions:
issues: write # Allows the action to create issues for cyborg.
@@ -475,7 +475,7 @@ jobs:
build-macos-artifacts,
build-windows-artifacts,
]
runs-on: ubuntu-22.04
runs-on: ubuntu-20.04
# Permission reference: https://docs.github.com/en/actions/using-jobs/assigning-permissions-to-jobs
permissions:
issues: write # Allows the action to create issues for cyborg.

View File

@@ -7,9 +7,13 @@ on:
- reopened
- edited
concurrency:
group: ${{ github.workflow }}-${{ github.head_ref || github.run_id }}
cancel-in-progress: true
jobs:
check:
runs-on: ubuntu-22.04
runs-on: ubuntu-20.04
timeout-minutes: 10
steps:
- uses: actions/checkout@v4

View File

@@ -3,30 +3,28 @@
## Individual Committers (in alphabetical order)
* [CookiePieWw](https://github.com/CookiePieWw)
* [KKould](https://github.com/KKould)
* [NiwakaDev](https://github.com/NiwakaDev)
* [etolbakov](https://github.com/etolbakov)
* [irenjj](https://github.com/irenjj)
* [tisonkun](https://github.com/tisonkun)
* [KKould](https://github.com/KKould)
* [Lanqing Yang](https://github.com/lyang24)
* [NiwakaDev](https://github.com/NiwakaDev)
* [tisonkun](https://github.com/tisonkun)
## Team Members (in alphabetical order)
* [Breeze-P](https://github.com/Breeze-P)
* [GrepTime](https://github.com/GrepTime)
* [MichaelScofield](https://github.com/MichaelScofield)
* [Wenjie0329](https://github.com/Wenjie0329)
* [WenyXu](https://github.com/WenyXu)
* [ZonaHex](https://github.com/ZonaHex)
* [apdong2022](https://github.com/apdong2022)
* [beryl678](https://github.com/beryl678)
* [Breeze-P](https://github.com/Breeze-P)
* [daviderli614](https://github.com/daviderli614)
* [discord9](https://github.com/discord9)
* [evenyag](https://github.com/evenyag)
* [fengjiachun](https://github.com/fengjiachun)
* [fengys1996](https://github.com/fengys1996)
* [GrepTime](https://github.com/GrepTime)
* [holalengyu](https://github.com/holalengyu)
* [killme2008](https://github.com/killme2008)
* [MichaelScofield](https://github.com/MichaelScofield)
* [nicecui](https://github.com/nicecui)
* [paomian](https://github.com/paomian)
* [shuiyisong](https://github.com/shuiyisong)
@@ -34,11 +32,14 @@
* [sunng87](https://github.com/sunng87)
* [v0y4g3r](https://github.com/v0y4g3r)
* [waynexia](https://github.com/waynexia)
* [Wenjie0329](https://github.com/Wenjie0329)
* [WenyXu](https://github.com/WenyXu)
* [xtang](https://github.com/xtang)
* [zhaoyingnan01](https://github.com/zhaoyingnan01)
* [zhongzc](https://github.com/zhongzc)
* [ZonaHex](https://github.com/ZonaHex)
* [zyy17](https://github.com/zyy17)
## All Contributors
[![All Contributors](https://contrib.rocks/image?repo=GreptimeTeam/greptimedb)](https://github.com/GreptimeTeam/greptimedb/graphs/contributors)
To see the full list of contributors, please visit our [Contributors page](https://github.com/GreptimeTeam/greptimedb/graphs/contributors)

221
Cargo.lock generated
View File

@@ -185,7 +185,7 @@ checksum = "d301b3b94cb4b2f23d7917810addbbaff90738e0ca2be692bd027e70d7e0330c"
[[package]]
name = "api"
version = "0.12.1"
version = "0.12.0"
dependencies = [
"common-base",
"common-decimal",
@@ -432,7 +432,7 @@ dependencies = [
"arrow-schema",
"chrono",
"half",
"indexmap 2.6.0",
"indexmap 2.7.1",
"lexical-core",
"num",
"serde",
@@ -710,7 +710,7 @@ dependencies = [
[[package]]
name = "auth"
version = "0.12.1"
version = "0.12.0"
dependencies = [
"api",
"async-trait",
@@ -1324,7 +1324,7 @@ dependencies = [
[[package]]
name = "cache"
version = "0.12.1"
version = "0.12.0"
dependencies = [
"catalog",
"common-error",
@@ -1348,7 +1348,7 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5"
[[package]]
name = "catalog"
version = "0.12.1"
version = "0.12.0"
dependencies = [
"api",
"arrow",
@@ -1475,7 +1475,7 @@ version = "0.13.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6026d8cd82ada8bbcfe337805dd1eb6afdc9e80fa4d57e977b3a36315e0c5525"
dependencies = [
"indexmap 2.6.0",
"indexmap 2.7.1",
"lazy_static",
"num-traits",
"regex",
@@ -1661,7 +1661,7 @@ checksum = "1462739cb27611015575c0c11df5df7601141071f07518d56fcc1be504cbec97"
[[package]]
name = "cli"
version = "0.12.1"
version = "0.12.0"
dependencies = [
"async-trait",
"auth",
@@ -1703,7 +1703,7 @@ dependencies = [
"session",
"snafu 0.8.5",
"store-api",
"substrait 0.12.1",
"substrait 0.12.0",
"table",
"tempfile",
"tokio",
@@ -1712,7 +1712,7 @@ dependencies = [
[[package]]
name = "client"
version = "0.12.1"
version = "0.12.0"
dependencies = [
"api",
"arc-swap",
@@ -1739,7 +1739,7 @@ dependencies = [
"rand",
"serde_json",
"snafu 0.8.5",
"substrait 0.12.1",
"substrait 0.12.0",
"substrait 0.37.3",
"tokio",
"tokio-stream",
@@ -1780,7 +1780,7 @@ dependencies = [
[[package]]
name = "cmd"
version = "0.12.1"
version = "0.12.0"
dependencies = [
"async-trait",
"auth",
@@ -1841,7 +1841,7 @@ dependencies = [
"similar-asserts",
"snafu 0.8.5",
"store-api",
"substrait 0.12.1",
"substrait 0.12.0",
"table",
"temp-env",
"tempfile",
@@ -1887,7 +1887,7 @@ checksum = "55b672471b4e9f9e95499ea597ff64941a309b2cdbffcc46f2cc5e2d971fd335"
[[package]]
name = "common-base"
version = "0.12.1"
version = "0.12.0"
dependencies = [
"anymap2",
"async-trait",
@@ -1909,11 +1909,11 @@ dependencies = [
[[package]]
name = "common-catalog"
version = "0.12.1"
version = "0.12.0"
[[package]]
name = "common-config"
version = "0.12.1"
version = "0.12.0"
dependencies = [
"common-base",
"common-error",
@@ -1938,7 +1938,7 @@ dependencies = [
[[package]]
name = "common-datasource"
version = "0.12.1"
version = "0.12.0"
dependencies = [
"arrow",
"arrow-schema",
@@ -1974,7 +1974,7 @@ dependencies = [
[[package]]
name = "common-decimal"
version = "0.12.1"
version = "0.12.0"
dependencies = [
"bigdecimal 0.4.5",
"common-error",
@@ -1987,7 +1987,7 @@ dependencies = [
[[package]]
name = "common-error"
version = "0.12.1"
version = "0.12.0"
dependencies = [
"http 1.1.0",
"snafu 0.8.5",
@@ -1997,7 +1997,7 @@ dependencies = [
[[package]]
name = "common-frontend"
version = "0.12.1"
version = "0.12.0"
dependencies = [
"async-trait",
"common-error",
@@ -2007,12 +2007,14 @@ dependencies = [
[[package]]
name = "common-function"
version = "0.12.1"
version = "0.12.0"
dependencies = [
"ahash 0.8.11",
"api",
"approx 0.5.1",
"arc-swap",
"async-trait",
"bincode",
"common-base",
"common-catalog",
"common-error",
@@ -2030,6 +2032,7 @@ dependencies = [
"geo-types",
"geohash",
"h3o",
"hyperloglogplus",
"jsonb",
"nalgebra 0.33.2",
"num",
@@ -2046,12 +2049,13 @@ dependencies = [
"store-api",
"table",
"tokio",
"uddsketch",
"wkt",
]
[[package]]
name = "common-greptimedb-telemetry"
version = "0.12.1"
version = "0.12.0"
dependencies = [
"async-trait",
"common-runtime",
@@ -2068,7 +2072,7 @@ dependencies = [
[[package]]
name = "common-grpc"
version = "0.12.1"
version = "0.12.0"
dependencies = [
"api",
"arrow-flight",
@@ -2096,7 +2100,7 @@ dependencies = [
[[package]]
name = "common-grpc-expr"
version = "0.12.1"
version = "0.12.0"
dependencies = [
"api",
"common-base",
@@ -2115,7 +2119,7 @@ dependencies = [
[[package]]
name = "common-macro"
version = "0.12.1"
version = "0.12.0"
dependencies = [
"arc-swap",
"common-query",
@@ -2129,7 +2133,7 @@ dependencies = [
[[package]]
name = "common-mem-prof"
version = "0.12.1"
version = "0.12.0"
dependencies = [
"common-error",
"common-macro",
@@ -2142,7 +2146,7 @@ dependencies = [
[[package]]
name = "common-meta"
version = "0.12.1"
version = "0.12.0"
dependencies = [
"anymap2",
"api",
@@ -2202,7 +2206,7 @@ dependencies = [
[[package]]
name = "common-options"
version = "0.12.1"
version = "0.12.0"
dependencies = [
"common-grpc",
"humantime-serde",
@@ -2211,11 +2215,11 @@ dependencies = [
[[package]]
name = "common-plugins"
version = "0.12.1"
version = "0.12.0"
[[package]]
name = "common-pprof"
version = "0.12.1"
version = "0.12.0"
dependencies = [
"common-error",
"common-macro",
@@ -2227,7 +2231,7 @@ dependencies = [
[[package]]
name = "common-procedure"
version = "0.12.1"
version = "0.12.0"
dependencies = [
"async-stream",
"async-trait",
@@ -2254,7 +2258,7 @@ dependencies = [
[[package]]
name = "common-procedure-test"
version = "0.12.1"
version = "0.12.0"
dependencies = [
"async-trait",
"common-procedure",
@@ -2262,7 +2266,7 @@ dependencies = [
[[package]]
name = "common-query"
version = "0.12.1"
version = "0.12.0"
dependencies = [
"api",
"async-trait",
@@ -2288,7 +2292,7 @@ dependencies = [
[[package]]
name = "common-recordbatch"
version = "0.12.1"
version = "0.12.0"
dependencies = [
"arc-swap",
"common-error",
@@ -2307,7 +2311,7 @@ dependencies = [
[[package]]
name = "common-runtime"
version = "0.12.1"
version = "0.12.0"
dependencies = [
"async-trait",
"clap 4.5.19",
@@ -2337,7 +2341,7 @@ dependencies = [
[[package]]
name = "common-telemetry"
version = "0.12.1"
version = "0.12.0"
dependencies = [
"atty",
"backtrace",
@@ -2365,7 +2369,7 @@ dependencies = [
[[package]]
name = "common-test-util"
version = "0.12.1"
version = "0.12.0"
dependencies = [
"client",
"common-query",
@@ -2377,7 +2381,7 @@ dependencies = [
[[package]]
name = "common-time"
version = "0.12.1"
version = "0.12.0"
dependencies = [
"arrow",
"chrono",
@@ -2395,7 +2399,7 @@ dependencies = [
[[package]]
name = "common-version"
version = "0.12.1"
version = "0.12.0"
dependencies = [
"build-data",
"const_format",
@@ -2405,7 +2409,7 @@ dependencies = [
[[package]]
name = "common-wal"
version = "0.12.1"
version = "0.12.0"
dependencies = [
"common-base",
"common-error",
@@ -2972,7 +2976,7 @@ dependencies = [
"chrono",
"half",
"hashbrown 0.14.5",
"indexmap 2.6.0",
"indexmap 2.7.1",
"libc",
"object_store",
"parquet",
@@ -3032,7 +3036,7 @@ dependencies = [
"datafusion-functions-aggregate-common",
"datafusion-functions-window-common",
"datafusion-physical-expr-common",
"indexmap 2.6.0",
"indexmap 2.7.1",
"paste",
"recursive",
"serde_json",
@@ -3154,7 +3158,7 @@ dependencies = [
"datafusion-physical-expr-common",
"datafusion-physical-plan",
"half",
"indexmap 2.6.0",
"indexmap 2.7.1",
"log",
"parking_lot 0.12.3",
"paste",
@@ -3205,7 +3209,7 @@ dependencies = [
"datafusion-common",
"datafusion-expr",
"datafusion-physical-expr",
"indexmap 2.6.0",
"indexmap 2.7.1",
"itertools 0.13.0",
"log",
"recursive",
@@ -3230,7 +3234,7 @@ dependencies = [
"datafusion-physical-expr-common",
"half",
"hashbrown 0.14.5",
"indexmap 2.6.0",
"indexmap 2.7.1",
"itertools 0.13.0",
"log",
"paste",
@@ -3289,7 +3293,7 @@ dependencies = [
"futures",
"half",
"hashbrown 0.14.5",
"indexmap 2.6.0",
"indexmap 2.7.1",
"itertools 0.13.0",
"log",
"once_cell",
@@ -3309,7 +3313,7 @@ dependencies = [
"arrow-schema",
"datafusion-common",
"datafusion-expr",
"indexmap 2.6.0",
"indexmap 2.7.1",
"log",
"recursive",
"regex",
@@ -3336,7 +3340,7 @@ dependencies = [
[[package]]
name = "datanode"
version = "0.12.1"
version = "0.12.0"
dependencies = [
"api",
"arrow-flight",
@@ -3388,7 +3392,7 @@ dependencies = [
"session",
"snafu 0.8.5",
"store-api",
"substrait 0.12.1",
"substrait 0.12.0",
"table",
"tokio",
"toml 0.8.19",
@@ -3397,7 +3401,7 @@ dependencies = [
[[package]]
name = "datatypes"
version = "0.12.1"
version = "0.12.0"
dependencies = [
"arrow",
"arrow-array",
@@ -4041,7 +4045,7 @@ dependencies = [
[[package]]
name = "file-engine"
version = "0.12.1"
version = "0.12.0"
dependencies = [
"api",
"async-trait",
@@ -4151,7 +4155,7 @@ checksum = "8bf7cc16383c4b8d58b9905a8509f02926ce3058053c056376248d958c9df1e8"
[[package]]
name = "flow"
version = "0.12.1"
version = "0.12.0"
dependencies = [
"api",
"arrow",
@@ -4212,7 +4216,7 @@ dependencies = [
"snafu 0.8.5",
"store-api",
"strum 0.25.0",
"substrait 0.12.1",
"substrait 0.12.0",
"table",
"tokio",
"tonic 0.12.3",
@@ -4267,7 +4271,7 @@ checksum = "6c2141d6d6c8512188a7891b4b01590a45f6dac67afb4f255c4124dbb86d4eaa"
[[package]]
name = "frontend"
version = "0.12.1"
version = "0.12.0"
dependencies = [
"api",
"arc-swap",
@@ -4695,7 +4699,7 @@ dependencies = [
[[package]]
name = "greptime-proto"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=a25adc8a01340231121646d8f0a29d0e92f45461#a25adc8a01340231121646d8f0a29d0e92f45461"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=072ce580502e015df1a6b03a185b60309a7c2a7a#072ce580502e015df1a6b03a185b60309a7c2a7a"
dependencies = [
"prost 0.13.3",
"serde",
@@ -4718,7 +4722,7 @@ dependencies = [
"futures-sink",
"futures-util",
"http 0.2.12",
"indexmap 2.6.0",
"indexmap 2.7.1",
"slab",
"tokio",
"tokio-util",
@@ -4737,7 +4741,7 @@ dependencies = [
"futures-core",
"futures-sink",
"http 1.1.0",
"indexmap 2.6.0",
"indexmap 2.7.1",
"slab",
"tokio",
"tokio-util",
@@ -5287,6 +5291,15 @@ dependencies = [
"tracing",
]
[[package]]
name = "hyperloglogplus"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "621debdf94dcac33e50475fdd76d34d5ea9c0362a834b9db08c3024696c1fbe3"
dependencies = [
"serde",
]
[[package]]
name = "i_float"
version = "1.3.1"
@@ -5526,7 +5539,7 @@ dependencies = [
[[package]]
name = "index"
version = "0.12.1"
version = "0.12.0"
dependencies = [
"async-trait",
"asynchronous-codec",
@@ -5575,9 +5588,9 @@ dependencies = [
[[package]]
name = "indexmap"
version = "2.6.0"
version = "2.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "707907fe3c25f5424cce2cb7e1cbcafee6bdbe735ca90ef77c29e84591e5b9da"
checksum = "8c9c992b02b5b4c94ea26e32fe5bccb7aa7d9f390ab5c1221ff895bc7ea8b652"
dependencies = [
"equivalent",
"hashbrown 0.15.2",
@@ -5591,7 +5604,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "232929e1d75fe899576a3d5c7416ad0d88dbfbb3c3d6aa00873a7408a50ddb88"
dependencies = [
"ahash 0.8.11",
"indexmap 2.6.0",
"indexmap 2.7.1",
"is-terminal",
"itoa",
"log",
@@ -5938,7 +5951,7 @@ version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4ee7893dab2e44ae5f9d0173f26ff4aa327c10b01b06a72b52dd9405b628640d"
dependencies = [
"indexmap 2.6.0",
"indexmap 2.7.1",
]
[[package]]
@@ -6318,7 +6331,7 @@ checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24"
[[package]]
name = "log-query"
version = "0.12.1"
version = "0.12.0"
dependencies = [
"chrono",
"common-error",
@@ -6330,7 +6343,7 @@ dependencies = [
[[package]]
name = "log-store"
version = "0.12.1"
version = "0.12.0"
dependencies = [
"async-stream",
"async-trait",
@@ -6421,7 +6434,7 @@ dependencies = [
"cactus",
"cfgrammar",
"filetime",
"indexmap 2.6.0",
"indexmap 2.7.1",
"lazy_static",
"lrtable",
"num-traits",
@@ -6623,7 +6636,7 @@ dependencies = [
[[package]]
name = "meta-client"
version = "0.12.1"
version = "0.12.0"
dependencies = [
"api",
"async-trait",
@@ -6650,7 +6663,7 @@ dependencies = [
[[package]]
name = "meta-srv"
version = "0.12.1"
version = "0.12.0"
dependencies = [
"api",
"async-trait",
@@ -6736,7 +6749,7 @@ dependencies = [
[[package]]
name = "metric-engine"
version = "0.12.1"
version = "0.12.0"
dependencies = [
"api",
"aquamarine",
@@ -6834,7 +6847,7 @@ dependencies = [
[[package]]
name = "mito2"
version = "0.12.1"
version = "0.12.0"
dependencies = [
"api",
"aquamarine",
@@ -7531,7 +7544,7 @@ dependencies = [
[[package]]
name = "object-store"
version = "0.12.1"
version = "0.12.0"
dependencies = [
"anyhow",
"bytes",
@@ -7662,7 +7675,7 @@ checksum = "1e32339a5dc40459130b3bd269e9892439f55b33e772d2a9d402a789baaf4e8a"
dependencies = [
"futures-core",
"futures-sink",
"indexmap 2.6.0",
"indexmap 2.7.1",
"js-sys",
"once_cell",
"pin-project-lite",
@@ -7780,7 +7793,7 @@ dependencies = [
[[package]]
name = "operator"
version = "0.12.1"
version = "0.12.0"
dependencies = [
"ahash 0.8.11",
"api",
@@ -7828,7 +7841,7 @@ dependencies = [
"sql",
"sqlparser 0.52.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=71dd86058d2af97b9925093d40c4e03360403170)",
"store-api",
"substrait 0.12.1",
"substrait 0.12.0",
"table",
"tokio",
"tokio-util",
@@ -8065,7 +8078,7 @@ dependencies = [
[[package]]
name = "partition"
version = "0.12.1"
version = "0.12.0"
dependencies = [
"api",
"async-trait",
@@ -8234,7 +8247,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b4c5cc86750666a3ed20bdaf5ca2a0344f9c67674cae0515bec2da16fbaa47db"
dependencies = [
"fixedbitset",
"indexmap 2.6.0",
"indexmap 2.7.1",
]
[[package]]
@@ -8333,7 +8346,7 @@ checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "pipeline"
version = "0.12.1"
version = "0.12.0"
dependencies = [
"ahash 0.8.11",
"api",
@@ -8473,7 +8486,7 @@ dependencies = [
[[package]]
name = "plugins"
version = "0.12.1"
version = "0.12.0"
dependencies = [
"auth",
"clap 4.5.19",
@@ -8735,7 +8748,7 @@ dependencies = [
[[package]]
name = "promql"
version = "0.12.1"
version = "0.12.0"
dependencies = [
"ahash 0.8.11",
"async-trait",
@@ -8980,7 +8993,7 @@ dependencies = [
[[package]]
name = "puffin"
version = "0.12.1"
version = "0.12.0"
dependencies = [
"async-compression 0.4.13",
"async-trait",
@@ -9021,7 +9034,7 @@ dependencies = [
[[package]]
name = "query"
version = "0.12.1"
version = "0.12.0"
dependencies = [
"ahash 0.8.11",
"api",
@@ -9086,7 +9099,7 @@ dependencies = [
"sqlparser 0.52.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=71dd86058d2af97b9925093d40c4e03360403170)",
"statrs",
"store-api",
"substrait 0.12.1",
"substrait 0.12.0",
"table",
"tokio",
"tokio-stream",
@@ -10325,7 +10338,7 @@ version = "1.0.137"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "930cfb6e6abf99298aaad7d29abbef7a9999a9a8806a40088f55f0dcec03146b"
dependencies = [
"indexmap 2.6.0",
"indexmap 2.7.1",
"itoa",
"memchr",
"ryu",
@@ -10396,7 +10409,7 @@ dependencies = [
"chrono",
"hex",
"indexmap 1.9.3",
"indexmap 2.6.0",
"indexmap 2.7.1",
"serde",
"serde_derive",
"serde_json",
@@ -10422,7 +10435,7 @@ version = "0.9.34+deprecated"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6a8b1a1a2ebf674015cc02edccce75287f1a0130d394307b36743c2f5d504b47"
dependencies = [
"indexmap 2.6.0",
"indexmap 2.7.1",
"itoa",
"ryu",
"serde",
@@ -10431,7 +10444,7 @@ dependencies = [
[[package]]
name = "servers"
version = "0.12.1"
version = "0.12.0"
dependencies = [
"ahash 0.8.11",
"api",
@@ -10483,6 +10496,7 @@ dependencies = [
"humantime",
"humantime-serde",
"hyper 1.4.1",
"indexmap 2.7.1",
"influxdb_line_protocol",
"itertools 0.10.5",
"json5",
@@ -10547,7 +10561,7 @@ dependencies = [
[[package]]
name = "session"
version = "0.12.1"
version = "0.12.0"
dependencies = [
"api",
"arc-swap",
@@ -10856,7 +10870,7 @@ dependencies = [
[[package]]
name = "sql"
version = "0.12.1"
version = "0.12.0"
dependencies = [
"api",
"chrono",
@@ -10893,12 +10907,12 @@ dependencies = [
[[package]]
name = "sqlness"
version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "308a7338f2211813d6e9da117e9b9b7aee5d072872d11a934002fd2bd4ab5276"
source = "git+https://github.com/CeresDB/sqlness.git?rev=bb91f31ff58993e07ea89845791235138283a24c#bb91f31ff58993e07ea89845791235138283a24c"
dependencies = [
"async-trait",
"derive_builder 0.11.2",
"duration-str",
"futures",
"minijinja",
"prettydiff",
"regex",
@@ -10910,7 +10924,7 @@ dependencies = [
[[package]]
name = "sqlness-runner"
version = "0.12.1"
version = "0.12.0"
dependencies = [
"async-trait",
"clap 4.5.19",
@@ -10924,6 +10938,7 @@ dependencies = [
"hex",
"local-ip-address",
"mysql",
"num_cpus",
"reqwest",
"serde",
"serde_json",
@@ -11023,7 +11038,7 @@ dependencies = [
"futures-util",
"hashbrown 0.15.2",
"hashlink",
"indexmap 2.6.0",
"indexmap 2.7.1",
"log",
"memchr",
"once_cell",
@@ -11226,7 +11241,7 @@ dependencies = [
[[package]]
name = "store-api"
version = "0.12.1"
version = "0.12.0"
dependencies = [
"api",
"aquamarine",
@@ -11356,7 +11371,7 @@ dependencies = [
[[package]]
name = "substrait"
version = "0.12.1"
version = "0.12.0"
dependencies = [
"async-trait",
"bytes",
@@ -11537,7 +11552,7 @@ dependencies = [
[[package]]
name = "table"
version = "0.12.1"
version = "0.12.0"
dependencies = [
"api",
"async-trait",
@@ -11788,7 +11803,7 @@ checksum = "3369f5ac52d5eb6ab48c6b4ffdc8efbcad6b89c765749064ba298f2c68a16a76"
[[package]]
name = "tests-fuzz"
version = "0.12.1"
version = "0.12.0"
dependencies = [
"arbitrary",
"async-trait",
@@ -11832,7 +11847,7 @@ dependencies = [
[[package]]
name = "tests-integration"
version = "0.12.1"
version = "0.12.0"
dependencies = [
"api",
"arrow-flight",
@@ -11898,7 +11913,7 @@ dependencies = [
"sql",
"sqlx",
"store-api",
"substrait 0.12.1",
"substrait 0.12.0",
"table",
"tempfile",
"time",
@@ -12319,7 +12334,7 @@ version = "0.19.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1b5bb770da30e5cbfde35a2d7b9b8a2c4b8ef89548a7a6aeab5c9a576e3e7421"
dependencies = [
"indexmap 2.6.0",
"indexmap 2.7.1",
"toml_datetime",
"winnow 0.5.40",
]
@@ -12330,7 +12345,7 @@ version = "0.22.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4ae48d6208a266e853d946088ed816055e556cc6028c5e8e2b84d9fa5dd7c7f5"
dependencies = [
"indexmap 2.6.0",
"indexmap 2.7.1",
"serde",
"serde_spanned",
"toml_datetime",
@@ -12468,7 +12483,7 @@ dependencies = [
"futures-core",
"futures-util",
"hdrhistogram",
"indexmap 2.6.0",
"indexmap 2.7.1",
"pin-project-lite",
"slab",
"sync_wrapper 1.0.1",
@@ -12956,6 +12971,14 @@ version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2896d95c02a80c6d6a5d6e953d479f5ddf2dfdb6a244441010e373ac0fb88971"
[[package]]
name = "uddsketch"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/timescaledb-toolkit.git?rev=84828fe8fb494a6a61412a3da96517fc80f7bb20#84828fe8fb494a6a61412a3da96517fc80f7bb20"
dependencies = [
"serde",
]
[[package]]
name = "unescaper"
version = "0.1.5"

View File

@@ -67,7 +67,7 @@ members = [
resolver = "2"
[workspace.package]
version = "0.12.1"
version = "0.12.0"
edition = "2021"
license = "Apache-2.0"
@@ -129,7 +129,7 @@ etcd-client = "0.14"
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "a25adc8a01340231121646d8f0a29d0e92f45461" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "072ce580502e015df1a6b03a185b60309a7c2a7a" }
hex = "0.4"
http = "1"
humantime = "2.1"

View File

@@ -319,6 +319,7 @@
| `selector` | String | `round_robin` | Datanode selector type.<br/>- `round_robin` (default value)<br/>- `lease_based`<br/>- `load_based`<br/>For details, please see "https://docs.greptime.com/developer-guide/metasrv/selector". |
| `use_memory_store` | Bool | `false` | Store data in memory. |
| `enable_region_failover` | Bool | `false` | Whether to enable region failover.<br/>This feature is only available on GreptimeDB running on cluster mode and<br/>- Using Remote WAL<br/>- Using shared storage (e.g., s3). |
| `node_max_idle_time` | String | `24hours` | Max allowed idle time before removing node info from metasrv memory. |
| `enable_telemetry` | Bool | `true` | Whether to enable greptimedb telemetry. Enabled by default. |
| `runtime` | -- | -- | The runtime options. |
| `runtime.global_rt_size` | Integer | `8` | The number of threads to execute the runtime for global read operations. |

View File

@@ -50,6 +50,9 @@ use_memory_store = false
## - Using shared storage (e.g., s3).
enable_region_failover = false
## Max allowed idle time before removing node info from metasrv memory.
node_max_idle_time = "24hours"
## Whether to enable greptimedb telemetry. Enabled by default.
#+ enable_telemetry = true

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashSet;
use std::sync::{Arc, Weak};
use arrow_schema::SchemaRef as ArrowSchemaRef;
@@ -243,17 +244,14 @@ impl InformationSchemaTablesBuilder {
// TODO(dennis): `region_stats` API is not stable in distributed cluster because of network issue etc.
// But we don't want the statements such as `show tables` fail,
// so using `unwrap_or_else` here instead of `?` operator.
let region_stats = {
let mut x = information_extension
.region_stats()
.await
.unwrap_or_else(|e| {
error!(e; "Failed to find region stats in information_schema, fallback to all empty");
vec![]
});
x.sort_unstable_by_key(|x| x.id);
x
};
let region_stats = information_extension
.region_stats()
.await
.map_err(|e| {
error!(e; "Failed to call region_stats");
e
})
.unwrap_or_else(|_| vec![]);
for schema_name in catalog_manager.schema_names(&catalog_name, None).await? {
let mut stream = catalog_manager.tables(&catalog_name, &schema_name, None);
@@ -264,16 +262,16 @@ impl InformationSchemaTablesBuilder {
// TODO(dennis): make it working for metric engine
let table_region_stats =
if table_info.meta.engine == MITO_ENGINE || table_info.is_physical_table() {
table_info
let region_ids = table_info
.meta
.region_numbers
.iter()
.map(|n| RegionId::new(table_info.ident.table_id, *n))
.flat_map(|region_id| {
region_stats
.binary_search_by_key(&region_id, |x| x.id)
.map(|i| &region_stats[i])
})
.collect::<HashSet<_>>();
region_stats
.iter()
.filter(|stat| region_ids.contains(&stat.id))
.collect::<Vec<_>>()
} else {
vec![]

View File

@@ -12,9 +12,11 @@ default = ["geo"]
geo = ["geohash", "h3o", "s2", "wkt", "geo-types", "dep:geo"]
[dependencies]
ahash = "0.8"
api.workspace = true
arc-swap = "1.0"
async-trait.workspace = true
bincode = "1.3"
common-base.workspace = true
common-catalog.workspace = true
common-error.workspace = true
@@ -32,6 +34,7 @@ geo = { version = "0.29", optional = true }
geo-types = { version = "0.7", optional = true }
geohash = { version = "0.13", optional = true }
h3o = { version = "0.6", optional = true }
hyperloglogplus = "0.4"
jsonb.workspace = true
nalgebra.workspace = true
num = "0.4"
@@ -47,6 +50,7 @@ sql.workspace = true
statrs = "0.16"
store-api.workspace = true
table.workspace = true
uddsketch = { git = "https://github.com/GreptimeTeam/timescaledb-toolkit.git", rev = "84828fe8fb494a6a61412a3da96517fc80f7bb20" }
wkt = { version = "0.11", optional = true }
[dev-dependencies]

View File

@@ -0,0 +1,20 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
mod hll;
mod uddsketch_state;
pub(crate) use hll::HllStateType;
pub use hll::{HllState, HLL_MERGE_NAME, HLL_NAME};
pub use uddsketch_state::{UddSketchState, UDDSKETCH_STATE_NAME};

View File

@@ -0,0 +1,319 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use common_query::prelude::*;
use common_telemetry::trace;
use datafusion::arrow::array::ArrayRef;
use datafusion::common::cast::{as_binary_array, as_string_array};
use datafusion::common::not_impl_err;
use datafusion::error::{DataFusionError, Result as DfResult};
use datafusion::logical_expr::function::AccumulatorArgs;
use datafusion::logical_expr::{Accumulator as DfAccumulator, AggregateUDF};
use datafusion::prelude::create_udaf;
use datatypes::arrow::datatypes::DataType;
use hyperloglogplus::{HyperLogLog, HyperLogLogPlus};
use crate::utils::FixedRandomState;
pub const HLL_NAME: &str = "hll";
pub const HLL_MERGE_NAME: &str = "hll_merge";
const DEFAULT_PRECISION: u8 = 14;
pub(crate) type HllStateType = HyperLogLogPlus<String, FixedRandomState>;
pub struct HllState {
hll: HllStateType,
}
impl std::fmt::Debug for HllState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "HllState<Opaque>")
}
}
impl Default for HllState {
fn default() -> Self {
Self::new()
}
}
impl HllState {
pub fn new() -> Self {
Self {
// Safety: the DEFAULT_PRECISION is fixed and valid
hll: HllStateType::new(DEFAULT_PRECISION, FixedRandomState::new()).unwrap(),
}
}
/// Create a UDF for the `hll` function.
///
/// `hll` accepts a string column and aggregates the
/// values into a HyperLogLog state.
pub fn state_udf_impl() -> AggregateUDF {
create_udaf(
HLL_NAME,
vec![DataType::Utf8],
Arc::new(DataType::Binary),
Volatility::Immutable,
Arc::new(Self::create_accumulator),
Arc::new(vec![DataType::Binary]),
)
}
/// Create a UDF for the `hll_merge` function.
///
/// `hll_merge` accepts a binary column of states generated by `hll`
/// and merges them into a single state.
pub fn merge_udf_impl() -> AggregateUDF {
create_udaf(
HLL_MERGE_NAME,
vec![DataType::Binary],
Arc::new(DataType::Binary),
Volatility::Immutable,
Arc::new(Self::create_merge_accumulator),
Arc::new(vec![DataType::Binary]),
)
}
fn update(&mut self, value: &str) {
self.hll.insert(value);
}
fn merge(&mut self, raw: &[u8]) {
if let Ok(serialized) = bincode::deserialize::<HllStateType>(raw) {
if let Ok(()) = self.hll.merge(&serialized) {
return;
}
}
trace!("Warning: Failed to merge HyperLogLog from {:?}", raw);
}
fn create_accumulator(acc_args: AccumulatorArgs) -> DfResult<Box<dyn DfAccumulator>> {
let data_type = acc_args.exprs[0].data_type(acc_args.schema)?;
match data_type {
DataType::Utf8 => Ok(Box::new(HllState::new())),
other => not_impl_err!("{HLL_NAME} does not support data type: {other}"),
}
}
fn create_merge_accumulator(acc_args: AccumulatorArgs) -> DfResult<Box<dyn DfAccumulator>> {
let data_type = acc_args.exprs[0].data_type(acc_args.schema)?;
match data_type {
DataType::Binary => Ok(Box::new(HllState::new())),
other => not_impl_err!("{HLL_MERGE_NAME} does not support data type: {other}"),
}
}
}
impl DfAccumulator for HllState {
fn update_batch(&mut self, values: &[ArrayRef]) -> DfResult<()> {
let array = &values[0];
match array.data_type() {
DataType::Utf8 => {
let string_array = as_string_array(array)?;
for value in string_array.iter().flatten() {
self.update(value);
}
}
DataType::Binary => {
let binary_array = as_binary_array(array)?;
for v in binary_array.iter().flatten() {
self.merge(v);
}
}
_ => {
return not_impl_err!(
"HLL functions do not support data type: {}",
array.data_type()
)
}
}
Ok(())
}
fn evaluate(&mut self) -> DfResult<ScalarValue> {
Ok(ScalarValue::Binary(Some(
bincode::serialize(&self.hll).map_err(|e| {
DataFusionError::Internal(format!("Failed to serialize HyperLogLog: {}", e))
})?,
)))
}
fn size(&self) -> usize {
std::mem::size_of_val(&self.hll)
}
fn state(&mut self) -> DfResult<Vec<ScalarValue>> {
Ok(vec![ScalarValue::Binary(Some(
bincode::serialize(&self.hll).map_err(|e| {
DataFusionError::Internal(format!("Failed to serialize HyperLogLog: {}", e))
})?,
))])
}
fn merge_batch(&mut self, states: &[ArrayRef]) -> DfResult<()> {
let array = &states[0];
let binary_array = as_binary_array(array)?;
for v in binary_array.iter().flatten() {
self.merge(v);
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use datafusion::arrow::array::{BinaryArray, StringArray};
use super::*;
#[test]
fn test_hll_basic() {
let mut state = HllState::new();
state.update("1");
state.update("2");
state.update("3");
let result = state.evaluate().unwrap();
if let ScalarValue::Binary(Some(bytes)) = result {
let mut hll: HllStateType = bincode::deserialize(&bytes).unwrap();
assert_eq!(hll.count().trunc() as u32, 3);
} else {
panic!("Expected binary scalar value");
}
}
#[test]
fn test_hll_roundtrip() {
let mut state = HllState::new();
state.update("1");
state.update("2");
// Serialize
let serialized = state.evaluate().unwrap();
// Create new state and merge the serialized data
let mut new_state = HllState::new();
if let ScalarValue::Binary(Some(bytes)) = &serialized {
new_state.merge(bytes);
// Verify the merged state matches original
let result = new_state.evaluate().unwrap();
if let ScalarValue::Binary(Some(new_bytes)) = result {
let mut original: HllStateType = bincode::deserialize(bytes).unwrap();
let mut merged: HllStateType = bincode::deserialize(&new_bytes).unwrap();
assert_eq!(original.count(), merged.count());
} else {
panic!("Expected binary scalar value");
}
} else {
panic!("Expected binary scalar value");
}
}
#[test]
fn test_hll_batch_update() {
let mut state = HllState::new();
// Test string values
let str_values = vec!["a", "b", "c", "d", "e", "f", "g", "h", "i"];
let str_array = Arc::new(StringArray::from(str_values)) as ArrayRef;
state.update_batch(&[str_array]).unwrap();
let result = state.evaluate().unwrap();
if let ScalarValue::Binary(Some(bytes)) = result {
let mut hll: HllStateType = bincode::deserialize(&bytes).unwrap();
assert_eq!(hll.count().trunc() as u32, 9);
} else {
panic!("Expected binary scalar value");
}
}
#[test]
fn test_hll_merge_batch() {
let mut state1 = HllState::new();
state1.update("1");
let state1_binary = state1.evaluate().unwrap();
let mut state2 = HllState::new();
state2.update("2");
let state2_binary = state2.evaluate().unwrap();
let mut merged_state = HllState::new();
if let (ScalarValue::Binary(Some(bytes1)), ScalarValue::Binary(Some(bytes2))) =
(&state1_binary, &state2_binary)
{
let binary_array = Arc::new(BinaryArray::from(vec![
bytes1.as_slice(),
bytes2.as_slice(),
])) as ArrayRef;
merged_state.merge_batch(&[binary_array]).unwrap();
let result = merged_state.evaluate().unwrap();
if let ScalarValue::Binary(Some(bytes)) = result {
let mut hll: HllStateType = bincode::deserialize(&bytes).unwrap();
assert_eq!(hll.count().trunc() as u32, 2);
} else {
panic!("Expected binary scalar value");
}
} else {
panic!("Expected binary scalar values");
}
}
#[test]
fn test_hll_merge_function() {
// Create two HLL states with different values
let mut state1 = HllState::new();
state1.update("1");
state1.update("2");
let state1_binary = state1.evaluate().unwrap();
let mut state2 = HllState::new();
state2.update("2");
state2.update("3");
let state2_binary = state2.evaluate().unwrap();
// Create a merge state and merge both states
let mut merge_state = HllState::new();
if let (ScalarValue::Binary(Some(bytes1)), ScalarValue::Binary(Some(bytes2))) =
(&state1_binary, &state2_binary)
{
let binary_array = Arc::new(BinaryArray::from(vec![
bytes1.as_slice(),
bytes2.as_slice(),
])) as ArrayRef;
merge_state.update_batch(&[binary_array]).unwrap();
let result = merge_state.evaluate().unwrap();
if let ScalarValue::Binary(Some(bytes)) = result {
let mut hll: HllStateType = bincode::deserialize(&bytes).unwrap();
// Should have 3 unique values: "1", "2", "3"
assert_eq!(hll.count().trunc() as u32, 3);
} else {
panic!("Expected binary scalar value");
}
} else {
panic!("Expected binary scalar values");
}
}
}

View File

@@ -0,0 +1,307 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use common_query::prelude::*;
use common_telemetry::trace;
use datafusion::common::cast::{as_binary_array, as_primitive_array};
use datafusion::common::not_impl_err;
use datafusion::error::{DataFusionError, Result as DfResult};
use datafusion::logical_expr::function::AccumulatorArgs;
use datafusion::logical_expr::{Accumulator as DfAccumulator, AggregateUDF};
use datafusion::physical_plan::expressions::Literal;
use datafusion::prelude::create_udaf;
use datatypes::arrow::array::ArrayRef;
use datatypes::arrow::datatypes::{DataType, Float64Type};
use uddsketch::{SketchHashKey, UDDSketch};
pub const UDDSKETCH_STATE_NAME: &str = "uddsketch_state";
#[derive(Debug)]
pub struct UddSketchState {
uddsketch: UDDSketch,
}
impl UddSketchState {
pub fn new(bucket_size: u64, error_rate: f64) -> Self {
Self {
uddsketch: UDDSketch::new(bucket_size, error_rate),
}
}
pub fn udf_impl() -> AggregateUDF {
create_udaf(
UDDSKETCH_STATE_NAME,
vec![DataType::Int64, DataType::Float64, DataType::Float64],
Arc::new(DataType::Binary),
Volatility::Immutable,
Arc::new(|args| {
let (bucket_size, error_rate) = downcast_accumulator_args(args)?;
Ok(Box::new(UddSketchState::new(bucket_size, error_rate)))
}),
Arc::new(vec![DataType::Binary]),
)
}
fn update(&mut self, value: f64) {
self.uddsketch.add_value(value);
}
fn merge(&mut self, raw: &[u8]) {
if let Ok(uddsketch) = bincode::deserialize::<UDDSketch>(raw) {
if uddsketch.count() != 0 {
self.uddsketch.merge_sketch(&uddsketch);
}
} else {
trace!("Warning: Failed to deserialize UDDSketch from {:?}", raw);
}
}
}
fn downcast_accumulator_args(args: AccumulatorArgs) -> DfResult<(u64, f64)> {
let bucket_size = match args.exprs[0]
.as_any()
.downcast_ref::<Literal>()
.map(|lit| lit.value())
{
Some(ScalarValue::Int64(Some(value))) => *value as u64,
_ => {
return not_impl_err!(
"{} not supported for bucket size: {}",
UDDSKETCH_STATE_NAME,
&args.exprs[0]
)
}
};
let error_rate = match args.exprs[1]
.as_any()
.downcast_ref::<Literal>()
.map(|lit| lit.value())
{
Some(ScalarValue::Float64(Some(value))) => *value,
_ => {
return not_impl_err!(
"{} not supported for error rate: {}",
UDDSKETCH_STATE_NAME,
&args.exprs[1]
)
}
};
Ok((bucket_size, error_rate))
}
impl DfAccumulator for UddSketchState {
fn update_batch(&mut self, values: &[ArrayRef]) -> DfResult<()> {
let array = &values[2]; // the third column is data value
let f64_array = as_primitive_array::<Float64Type>(array)?;
for v in f64_array.iter().flatten() {
self.update(v);
}
Ok(())
}
fn evaluate(&mut self) -> DfResult<ScalarValue> {
Ok(ScalarValue::Binary(Some(
bincode::serialize(&self.uddsketch).map_err(|e| {
DataFusionError::Internal(format!("Failed to serialize UDDSketch: {}", e))
})?,
)))
}
fn size(&self) -> usize {
// Base size of UDDSketch struct fields
let mut total_size = std::mem::size_of::<f64>() * 3 + // alpha, gamma, values_sum
std::mem::size_of::<u32>() + // compactions
std::mem::size_of::<u64>() * 2; // max_buckets, num_values
// Size of buckets (SketchHashMap)
// Each bucket entry contains:
// - SketchHashKey (enum with i64/Zero/Invalid variants)
// - SketchHashEntry (count: u64, next: SketchHashKey)
let bucket_entry_size = std::mem::size_of::<SketchHashKey>() + // key
std::mem::size_of::<u64>() + // count
std::mem::size_of::<SketchHashKey>(); // next
total_size += self.uddsketch.current_buckets_count() * bucket_entry_size;
total_size
}
fn state(&mut self) -> DfResult<Vec<ScalarValue>> {
Ok(vec![ScalarValue::Binary(Some(
bincode::serialize(&self.uddsketch).map_err(|e| {
DataFusionError::Internal(format!("Failed to serialize UDDSketch: {}", e))
})?,
))])
}
fn merge_batch(&mut self, states: &[ArrayRef]) -> DfResult<()> {
let array = &states[0];
let binary_array = as_binary_array(array)?;
for v in binary_array.iter().flatten() {
self.merge(v);
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use datafusion::arrow::array::{BinaryArray, Float64Array};
use super::*;
#[test]
fn test_uddsketch_state_basic() {
let mut state = UddSketchState::new(10, 0.01);
state.update(1.0);
state.update(2.0);
state.update(3.0);
let result = state.evaluate().unwrap();
if let ScalarValue::Binary(Some(bytes)) = result {
let deserialized: UDDSketch = bincode::deserialize(&bytes).unwrap();
assert_eq!(deserialized.count(), 3);
} else {
panic!("Expected binary scalar value");
}
}
#[test]
fn test_uddsketch_state_roundtrip() {
let mut state = UddSketchState::new(10, 0.01);
state.update(1.0);
state.update(2.0);
// Serialize
let serialized = state.evaluate().unwrap();
// Create new state and merge the serialized data
let mut new_state = UddSketchState::new(10, 0.01);
if let ScalarValue::Binary(Some(bytes)) = &serialized {
new_state.merge(bytes);
// Verify the merged state matches original by comparing deserialized values
let original_sketch: UDDSketch = bincode::deserialize(bytes).unwrap();
let new_result = new_state.evaluate().unwrap();
if let ScalarValue::Binary(Some(new_bytes)) = new_result {
let new_sketch: UDDSketch = bincode::deserialize(&new_bytes).unwrap();
assert_eq!(original_sketch.count(), new_sketch.count());
assert_eq!(original_sketch.sum(), new_sketch.sum());
assert_eq!(original_sketch.mean(), new_sketch.mean());
assert_eq!(original_sketch.max_error(), new_sketch.max_error());
// Compare a few quantiles to ensure statistical equivalence
for q in [0.1, 0.5, 0.9].iter() {
assert!(
(original_sketch.estimate_quantile(*q) - new_sketch.estimate_quantile(*q))
.abs()
< 1e-10,
"Quantile {} mismatch: original={}, new={}",
q,
original_sketch.estimate_quantile(*q),
new_sketch.estimate_quantile(*q)
);
}
} else {
panic!("Expected binary scalar value");
}
} else {
panic!("Expected binary scalar value");
}
}
#[test]
fn test_uddsketch_state_batch_update() {
let mut state = UddSketchState::new(10, 0.01);
let values = vec![1.0f64, 2.0, 3.0];
let array = Arc::new(Float64Array::from(values)) as ArrayRef;
state
.update_batch(&[array.clone(), array.clone(), array])
.unwrap();
let result = state.evaluate().unwrap();
if let ScalarValue::Binary(Some(bytes)) = result {
let deserialized: UDDSketch = bincode::deserialize(&bytes).unwrap();
assert_eq!(deserialized.count(), 3);
} else {
panic!("Expected binary scalar value");
}
}
#[test]
fn test_uddsketch_state_merge_batch() {
let mut state1 = UddSketchState::new(10, 0.01);
state1.update(1.0);
let state1_binary = state1.evaluate().unwrap();
let mut state2 = UddSketchState::new(10, 0.01);
state2.update(2.0);
let state2_binary = state2.evaluate().unwrap();
let mut merged_state = UddSketchState::new(10, 0.01);
if let (ScalarValue::Binary(Some(bytes1)), ScalarValue::Binary(Some(bytes2))) =
(&state1_binary, &state2_binary)
{
let binary_array = Arc::new(BinaryArray::from(vec![
bytes1.as_slice(),
bytes2.as_slice(),
])) as ArrayRef;
merged_state.merge_batch(&[binary_array]).unwrap();
let result = merged_state.evaluate().unwrap();
if let ScalarValue::Binary(Some(bytes)) = result {
let deserialized: UDDSketch = bincode::deserialize(&bytes).unwrap();
assert_eq!(deserialized.count(), 2);
} else {
panic!("Expected binary scalar value");
}
} else {
panic!("Expected binary scalar values");
}
}
#[test]
fn test_uddsketch_state_size() {
let mut state = UddSketchState::new(10, 0.01);
let initial_size = state.size();
// Add some values to create buckets
state.update(1.0);
state.update(2.0);
state.update(3.0);
let size_with_values = state.size();
assert!(
size_with_values > initial_size,
"Size should increase after adding values: initial={}, with_values={}",
initial_size,
size_with_values
);
// Verify size increases with more buckets
state.update(10.0); // This should create a new bucket
assert!(
state.size() > size_with_values,
"Size should increase after adding new bucket: prev={}, new={}",
size_with_values,
state.size()
);
}
}

View File

@@ -22,10 +22,12 @@ use crate::function::{AsyncFunctionRef, FunctionRef};
use crate::scalars::aggregate::{AggregateFunctionMetaRef, AggregateFunctions};
use crate::scalars::date::DateFunction;
use crate::scalars::expression::ExpressionFunction;
use crate::scalars::hll_count::HllCalcFunction;
use crate::scalars::json::JsonFunction;
use crate::scalars::matches::MatchesFunction;
use crate::scalars::math::MathFunction;
use crate::scalars::timestamp::TimestampFunction;
use crate::scalars::uddsketch_calc::UddSketchCalcFunction;
use crate::scalars::vector::VectorFunction;
use crate::system::SystemFunction;
use crate::table::TableFunction;
@@ -105,6 +107,8 @@ pub static FUNCTION_REGISTRY: Lazy<Arc<FunctionRegistry>> = Lazy::new(|| {
TimestampFunction::register(&function_registry);
DateFunction::register(&function_registry);
ExpressionFunction::register(&function_registry);
UddSketchCalcFunction::register(&function_registry);
HllCalcFunction::register(&function_registry);
// Aggregate functions
AggregateFunctions::register(&function_registry);

View File

@@ -21,6 +21,7 @@ pub mod scalars;
mod system;
mod table;
pub mod aggr;
pub mod function;
pub mod function_registry;
pub mod handlers;

View File

@@ -22,7 +22,9 @@ pub mod matches;
pub mod math;
pub mod vector;
pub(crate) mod hll_count;
#[cfg(test)]
pub(crate) mod test;
pub(crate) mod timestamp;
pub(crate) mod uddsketch_calc;
pub mod udf;

View File

@@ -0,0 +1,175 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Implementation of the scalar function `hll_count`.
use std::fmt;
use std::fmt::Display;
use std::sync::Arc;
use common_query::error::{DowncastVectorSnafu, InvalidFuncArgsSnafu, Result};
use common_query::prelude::{Signature, Volatility};
use datatypes::data_type::ConcreteDataType;
use datatypes::prelude::Vector;
use datatypes::scalars::{ScalarVector, ScalarVectorBuilder};
use datatypes::vectors::{BinaryVector, MutableVector, UInt64VectorBuilder, VectorRef};
use hyperloglogplus::HyperLogLog;
use snafu::OptionExt;
use crate::aggr::HllStateType;
use crate::function::{Function, FunctionContext};
use crate::function_registry::FunctionRegistry;
const NAME: &str = "hll_count";
/// HllCalcFunction implements the scalar function `hll_count`.
///
/// It accepts one argument:
/// 1. The serialized HyperLogLogPlus state, as produced by the aggregator (binary).
///
/// For each row, it deserializes the sketch and returns the estimated cardinality.
#[derive(Debug, Default)]
pub struct HllCalcFunction;
impl HllCalcFunction {
pub fn register(registry: &FunctionRegistry) {
registry.register(Arc::new(HllCalcFunction));
}
}
impl Display for HllCalcFunction {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}", NAME.to_ascii_uppercase())
}
}
impl Function for HllCalcFunction {
fn name(&self) -> &str {
NAME
}
fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
Ok(ConcreteDataType::uint64_datatype())
}
fn signature(&self) -> Signature {
// Only argument: HyperLogLogPlus state (binary)
Signature::exact(
vec![ConcreteDataType::binary_datatype()],
Volatility::Immutable,
)
}
fn eval(&self, _func_ctx: FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
if columns.len() != 1 {
return InvalidFuncArgsSnafu {
err_msg: format!("hll_count expects 1 argument, got {}", columns.len()),
}
.fail();
}
let hll_vec = columns[0]
.as_any()
.downcast_ref::<BinaryVector>()
.with_context(|| DowncastVectorSnafu {
err_msg: format!("expect BinaryVector, got {}", columns[0].vector_type_name()),
})?;
let len = hll_vec.len();
let mut builder = UInt64VectorBuilder::with_capacity(len);
for i in 0..len {
let hll_opt = hll_vec.get_data(i);
if hll_opt.is_none() {
builder.push_null();
continue;
}
let hll_bytes = hll_opt.unwrap();
// Deserialize the HyperLogLogPlus from its bincode representation
let mut hll: HllStateType = match bincode::deserialize(hll_bytes) {
Ok(h) => h,
Err(e) => {
common_telemetry::trace!("Failed to deserialize HyperLogLogPlus: {}", e);
builder.push_null();
continue;
}
};
builder.push(Some(hll.count().round() as u64));
}
Ok(builder.to_vector())
}
}
#[cfg(test)]
mod tests {
use datatypes::vectors::BinaryVector;
use super::*;
use crate::utils::FixedRandomState;
#[test]
fn test_hll_count_function() {
let function = HllCalcFunction;
assert_eq!("hll_count", function.name());
assert_eq!(
ConcreteDataType::uint64_datatype(),
function
.return_type(&[ConcreteDataType::uint64_datatype()])
.unwrap()
);
// Create a test HLL
let mut hll = HllStateType::new(14, FixedRandomState::new()).unwrap();
for i in 1..=10 {
hll.insert(&i.to_string());
}
let serialized_bytes = bincode::serialize(&hll).unwrap();
let args: Vec<VectorRef> = vec![Arc::new(BinaryVector::from(vec![Some(serialized_bytes)]))];
let result = function.eval(FunctionContext::default(), &args).unwrap();
assert_eq!(result.len(), 1);
// Test cardinality estimate
if let datatypes::value::Value::UInt64(v) = result.get(0) {
assert_eq!(v, 10);
} else {
panic!("Expected uint64 value");
}
}
#[test]
fn test_hll_count_function_errors() {
let function = HllCalcFunction;
// Test with invalid number of arguments
let args: Vec<VectorRef> = vec![];
let result = function.eval(FunctionContext::default(), &args);
assert!(result.is_err());
assert!(result
.unwrap_err()
.to_string()
.contains("hll_count expects 1 argument"));
// Test with invalid binary data
let args: Vec<VectorRef> = vec![Arc::new(BinaryVector::from(vec![Some(vec![1, 2, 3])]))]; // Invalid binary data
let result = function.eval(FunctionContext::default(), &args).unwrap();
assert_eq!(result.len(), 1);
assert!(matches!(result.get(0), datatypes::value::Value::Null));
}
}

View File

@@ -0,0 +1,211 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Implementation of the scalar function `uddsketch_calc`.
use std::fmt;
use std::fmt::Display;
use std::sync::Arc;
use common_query::error::{DowncastVectorSnafu, InvalidFuncArgsSnafu, Result};
use common_query::prelude::{Signature, Volatility};
use datatypes::data_type::ConcreteDataType;
use datatypes::prelude::Vector;
use datatypes::scalars::{ScalarVector, ScalarVectorBuilder};
use datatypes::vectors::{BinaryVector, Float64VectorBuilder, MutableVector, VectorRef};
use snafu::OptionExt;
use uddsketch::UDDSketch;
use crate::function::{Function, FunctionContext};
use crate::function_registry::FunctionRegistry;
const NAME: &str = "uddsketch_calc";
/// UddSketchCalcFunction implements the scalar function `uddsketch_calc`.
///
/// It accepts two arguments:
/// 1. A percentile (as f64) for which to compute the estimated quantile (e.g. 0.95 for p95).
/// 2. The serialized UDDSketch state, as produced by the aggregator (binary).
///
/// For each row, it deserializes the sketch and returns the computed quantile value.
#[derive(Debug, Default)]
pub struct UddSketchCalcFunction;
impl UddSketchCalcFunction {
pub fn register(registry: &FunctionRegistry) {
registry.register(Arc::new(UddSketchCalcFunction));
}
}
impl Display for UddSketchCalcFunction {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}", NAME.to_ascii_uppercase())
}
}
impl Function for UddSketchCalcFunction {
fn name(&self) -> &str {
NAME
}
fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
Ok(ConcreteDataType::float64_datatype())
}
fn signature(&self) -> Signature {
// First argument: percentile (float64)
// Second argument: UDDSketch state (binary)
Signature::exact(
vec![
ConcreteDataType::float64_datatype(),
ConcreteDataType::binary_datatype(),
],
Volatility::Immutable,
)
}
fn eval(&self, _func_ctx: FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
if columns.len() != 2 {
return InvalidFuncArgsSnafu {
err_msg: format!("uddsketch_calc expects 2 arguments, got {}", columns.len()),
}
.fail();
}
let perc_vec = &columns[0];
let sketch_vec = columns[1]
.as_any()
.downcast_ref::<BinaryVector>()
.with_context(|| DowncastVectorSnafu {
err_msg: format!("expect BinaryVector, got {}", columns[1].vector_type_name()),
})?;
let len = sketch_vec.len();
let mut builder = Float64VectorBuilder::with_capacity(len);
for i in 0..len {
let perc_opt = perc_vec.get(i).as_f64_lossy();
let sketch_opt = sketch_vec.get_data(i);
if sketch_opt.is_none() || perc_opt.is_none() {
builder.push_null();
continue;
}
let sketch_bytes = sketch_opt.unwrap();
let perc = perc_opt.unwrap();
// Deserialize the UDDSketch from its bincode representation
let sketch: UDDSketch = match bincode::deserialize(sketch_bytes) {
Ok(s) => s,
Err(e) => {
common_telemetry::trace!("Failed to deserialize UDDSketch: {}", e);
builder.push_null();
continue;
}
};
// Compute the estimated quantile from the sketch
let result = sketch.estimate_quantile(perc);
builder.push(Some(result));
}
Ok(builder.to_vector())
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use datatypes::vectors::{BinaryVector, Float64Vector};
use super::*;
#[test]
fn test_uddsketch_calc_function() {
let function = UddSketchCalcFunction;
assert_eq!("uddsketch_calc", function.name());
assert_eq!(
ConcreteDataType::float64_datatype(),
function
.return_type(&[ConcreteDataType::float64_datatype()])
.unwrap()
);
// Create a test sketch
let mut sketch = UDDSketch::new(128, 0.01);
sketch.add_value(10.0);
sketch.add_value(20.0);
sketch.add_value(30.0);
sketch.add_value(40.0);
sketch.add_value(50.0);
sketch.add_value(60.0);
sketch.add_value(70.0);
sketch.add_value(80.0);
sketch.add_value(90.0);
sketch.add_value(100.0);
// Get expected values directly from the sketch
let expected_p50 = sketch.estimate_quantile(0.5);
let expected_p90 = sketch.estimate_quantile(0.9);
let expected_p95 = sketch.estimate_quantile(0.95);
let serialized = bincode::serialize(&sketch).unwrap();
let percentiles = vec![0.5, 0.9, 0.95];
let args: Vec<VectorRef> = vec![
Arc::new(Float64Vector::from_vec(percentiles.clone())),
Arc::new(BinaryVector::from(vec![Some(serialized.clone()); 3])),
];
let result = function.eval(FunctionContext::default(), &args).unwrap();
assert_eq!(result.len(), 3);
// Test median (p50)
assert!(
matches!(result.get(0), datatypes::value::Value::Float64(v) if (v - expected_p50).abs() < 1e-10)
);
// Test p90
assert!(
matches!(result.get(1), datatypes::value::Value::Float64(v) if (v - expected_p90).abs() < 1e-10)
);
// Test p95
assert!(
matches!(result.get(2), datatypes::value::Value::Float64(v) if (v - expected_p95).abs() < 1e-10)
);
}
#[test]
fn test_uddsketch_calc_function_errors() {
let function = UddSketchCalcFunction;
// Test with invalid number of arguments
let args: Vec<VectorRef> = vec![Arc::new(Float64Vector::from_vec(vec![0.95]))];
let result = function.eval(FunctionContext::default(), &args);
assert!(result.is_err());
assert!(result
.unwrap_err()
.to_string()
.contains("uddsketch_calc expects 2 arguments"));
// Test with invalid binary data
let args: Vec<VectorRef> = vec![
Arc::new(Float64Vector::from_vec(vec![0.95])),
Arc::new(BinaryVector::from(vec![Some(vec![1, 2, 3])])), // Invalid binary data
];
let result = function.eval(FunctionContext::default(), &args).unwrap();
assert_eq!(result.len(), 1);
assert!(matches!(result.get(0), datatypes::value::Value::Null));
}
}

View File

@@ -12,6 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::hash::BuildHasher;
use ahash::RandomState;
use serde::{Deserialize, Serialize};
/// Escapes special characters in the provided pattern string for `LIKE`.
///
/// Specifically, it prefixes the backslash (`\`), percent (`%`), and underscore (`_`)
@@ -32,6 +37,71 @@ pub fn escape_like_pattern(pattern: &str) -> String {
})
.collect::<String>()
}
/// A random state with fixed seeds.
///
/// This is used to ensure that the hash values are consistent across
/// different processes, and easy to serialize and deserialize.
#[derive(Debug)]
pub struct FixedRandomState {
state: RandomState,
}
impl FixedRandomState {
// some random seeds
const RANDOM_SEED_0: u64 = 0x517cc1b727220a95;
const RANDOM_SEED_1: u64 = 0x428a2f98d728ae22;
const RANDOM_SEED_2: u64 = 0x7137449123ef65cd;
const RANDOM_SEED_3: u64 = 0xb5c0fbcfec4d3b2f;
pub fn new() -> Self {
Self {
state: ahash::RandomState::with_seeds(
Self::RANDOM_SEED_0,
Self::RANDOM_SEED_1,
Self::RANDOM_SEED_2,
Self::RANDOM_SEED_3,
),
}
}
}
impl Default for FixedRandomState {
fn default() -> Self {
Self::new()
}
}
impl BuildHasher for FixedRandomState {
type Hasher = ahash::AHasher;
fn build_hasher(&self) -> Self::Hasher {
self.state.build_hasher()
}
fn hash_one<T: std::hash::Hash>(&self, x: T) -> u64 {
self.state.hash_one(x)
}
}
impl Serialize for FixedRandomState {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
serializer.serialize_unit()
}
}
impl<'de> Deserialize<'de> for FixedRandomState {
fn deserialize<D>(_deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
Ok(Self::new())
}
}
#[cfg(test)]
mod tests {
use super::*;

View File

@@ -16,7 +16,6 @@ use std::collections::HashMap;
use std::sync::Arc;
use futures::future::BoxFuture;
use futures::TryStreamExt;
use moka::future::Cache;
use moka::ops::compute::Op;
use table::metadata::TableId;
@@ -54,9 +53,13 @@ fn init_factory(table_flow_manager: TableFlowManagerRef) -> Initializer<TableId,
Box::pin(async move {
table_flow_manager
.flows(table_id)
.map_ok(|(key, value)| (key.flownode_id(), value.peer))
.try_collect::<HashMap<_, _>>()
.await
.map(|flows| {
flows
.into_iter()
.map(|(key, value)| (key.flownode_id(), value.peer))
.collect::<HashMap<_, _>>()
})
// We must cache the `HashSet` even if it's empty,
// to avoid future requests to the remote storage next time;
// If the value is added to the remote storage,

View File

@@ -57,12 +57,10 @@ pub trait ClusterInfo {
}
/// The key of [NodeInfo] in the storage. The format is `__meta_cluster_node_info-{cluster_id}-{role}-{node_id}`.
///
/// This key cannot be used to describe the `Metasrv` because the `Metasrv` does not have
/// a `cluster_id`, it serves multiple clusters.
#[derive(Debug, Clone, Copy, Eq, Hash, PartialEq, Serialize, Deserialize)]
pub struct NodeInfoKey {
/// The cluster id.
// todo(hl): remove cluster_id as it is not assigned anywhere.
pub cluster_id: ClusterId,
/// The role of the node. It can be `[Role::Datanode]` or `[Role::Frontend]`.
pub role: Role,
@@ -232,8 +230,8 @@ impl TryFrom<Vec<u8>> for NodeInfoKey {
}
}
impl From<NodeInfoKey> for Vec<u8> {
fn from(key: NodeInfoKey) -> Self {
impl From<&NodeInfoKey> for Vec<u8> {
fn from(key: &NodeInfoKey) -> Self {
format!(
"{}-{}-{}-{}",
CLUSTER_NODE_INFO_PREFIX,
@@ -315,7 +313,7 @@ mod tests {
node_id: 2,
};
let key_bytes: Vec<u8> = key.into();
let key_bytes: Vec<u8> = (&key).into();
let new_key: NodeInfoKey = key_bytes.try_into().unwrap();
assert_eq!(1, new_key.cluster_id);

View File

@@ -15,6 +15,7 @@
mod metadata;
use std::collections::BTreeMap;
use std::fmt;
use api::v1::flow::flow_request::Body as PbFlowRequest;
use api::v1::flow::{CreateRequest, FlowRequest, FlowRequestHeader};
@@ -28,7 +29,6 @@ use common_procedure::{
use common_telemetry::info;
use common_telemetry::tracing_context::TracingContext;
use futures::future::join_all;
use futures::TryStreamExt;
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use snafu::{ensure, ResultExt};
@@ -77,6 +77,7 @@ impl CreateFlowProcedure {
query_context,
state: CreateFlowState::Prepare,
prev_flow_info_value: None,
flow_type: None,
},
}
}
@@ -104,7 +105,7 @@ impl CreateFlowProcedure {
if create_if_not_exists && or_replace {
// this is forbidden because not clear what does that mean exactly
return error::UnsupportedSnafu {
operation: "Create flow with both `IF NOT EXISTS` and `OR REPLACE`".to_string(),
operation: "Create flow with both `IF NOT EXISTS` and `OR REPLACE`",
}
.fail();
}
@@ -129,9 +130,10 @@ impl CreateFlowProcedure {
.flow_metadata_manager
.flow_route_manager()
.routes(flow_id)
.map_ok(|(_, value)| value.peer)
.try_collect::<Vec<_>>()
.await?;
.await?
.into_iter()
.map(|(_, value)| value.peer)
.collect::<Vec<_>>();
self.data.flow_id = Some(flow_id);
self.data.peers = peers;
info!("Replacing flow, flow_id: {}", flow_id);
@@ -175,6 +177,8 @@ impl CreateFlowProcedure {
self.allocate_flow_id().await?;
}
self.data.state = CreateFlowState::CreateFlows;
// determine flow type
self.data.flow_type = Some(determine_flow_type(&self.data.task));
Ok(Status::executing(true))
}
@@ -309,6 +313,11 @@ impl Procedure for CreateFlowProcedure {
}
}
pub fn determine_flow_type(_flow_task: &CreateFlowTask) -> FlowType {
// TODO(discord9): determine flow type
FlowType::RecordingRule
}
/// The state of [CreateFlowProcedure].
#[derive(Debug, Clone, Serialize, Deserialize, AsRefStr, PartialEq)]
pub enum CreateFlowState {
@@ -322,6 +331,35 @@ pub enum CreateFlowState {
CreateMetadata,
}
/// The type of flow.
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub enum FlowType {
/// The flow is a recording rule task.
RecordingRule,
/// The flow is a streaming task.
Streaming,
}
impl FlowType {
pub const RECORDING_RULE: &str = "recording_rule";
pub const STREAMING: &str = "streaming";
}
impl Default for FlowType {
fn default() -> Self {
Self::RecordingRule
}
}
impl fmt::Display for FlowType {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
FlowType::RecordingRule => write!(f, "{}", FlowType::RECORDING_RULE),
FlowType::Streaming => write!(f, "{}", FlowType::STREAMING),
}
}
}
/// The serializable data.
#[derive(Debug, Serialize, Deserialize)]
pub struct CreateFlowData {
@@ -335,6 +373,7 @@ pub struct CreateFlowData {
/// For verify if prev value is consistent when need to update flow metadata.
/// only set when `or_replace` is true.
pub(crate) prev_flow_info_value: Option<DeserializedValueWithBytes<FlowInfoValue>>,
pub(crate) flow_type: Option<FlowType>,
}
impl From<&CreateFlowData> for CreateRequest {
@@ -342,7 +381,7 @@ impl From<&CreateFlowData> for CreateRequest {
let flow_id = value.flow_id.unwrap();
let source_table_ids = &value.source_table_ids;
CreateRequest {
let mut req = CreateRequest {
flow_id: Some(api::v1::FlowId { id: flow_id }),
source_table_ids: source_table_ids
.iter()
@@ -356,7 +395,11 @@ impl From<&CreateFlowData> for CreateRequest {
comment: value.task.comment.clone(),
sql: value.task.sql.clone(),
flow_options: value.task.flow_options.clone(),
}
};
let flow_type = value.flow_type.unwrap_or_default().to_string();
req.flow_options.insert("flow_type".to_string(), flow_type);
req
}
}
@@ -369,7 +412,7 @@ impl From<&CreateFlowData> for (FlowInfoValue, Vec<(FlowPartitionId, FlowRouteVa
expire_after,
comment,
sql,
flow_options: options,
flow_options: mut options,
..
} = value.task.clone();
@@ -386,19 +429,21 @@ impl From<&CreateFlowData> for (FlowInfoValue, Vec<(FlowPartitionId, FlowRouteVa
.map(|(idx, peer)| (idx as u32, FlowRouteValue { peer: peer.clone() }))
.collect::<Vec<_>>();
(
FlowInfoValue {
source_table_ids: value.source_table_ids.clone(),
sink_table_name,
flownode_ids,
catalog_name,
flow_name,
raw_sql: sql,
expire_after,
comment,
options,
},
flow_routes,
)
let flow_type = value.flow_type.unwrap_or_default().to_string();
options.insert("flow_type".to_string(), flow_type);
let flow_info = FlowInfoValue {
source_table_ids: value.source_table_ids.clone(),
sink_table_name,
flownode_ids,
catalog_name,
flow_name,
raw_sql: sql,
expire_after,
comment,
options,
};
(flow_info, flow_routes)
}
}

View File

@@ -128,7 +128,7 @@ impl State for DropDatabaseExecutor {
.await?;
executor.invalidate_table_cache(ddl_ctx).await?;
executor
.on_drop_regions(ddl_ctx, &self.physical_region_routes)
.on_drop_regions(ddl_ctx, &self.physical_region_routes, true)
.await?;
info!("Table: {}({}) is dropped", self.table_name, self.table_id);

View File

@@ -13,7 +13,6 @@
// limitations under the License.
use common_catalog::format_full_flow_name;
use futures::TryStreamExt;
use snafu::{ensure, OptionExt};
use crate::ddl::drop_flow::DropFlowProcedure;
@@ -39,9 +38,10 @@ impl DropFlowProcedure {
.flow_metadata_manager
.flow_route_manager()
.routes(self.data.task.flow_id)
.map_ok(|(_, value)| value)
.try_collect::<Vec<_>>()
.await?;
.await?
.into_iter()
.map(|(_, value)| value)
.collect::<Vec<_>>();
ensure!(
!flow_route_values.is_empty(),
error::FlowRouteNotFoundSnafu {

View File

@@ -156,7 +156,7 @@ impl DropTableProcedure {
pub async fn on_datanode_drop_regions(&mut self) -> Result<Status> {
self.executor
.on_drop_regions(&self.context, &self.data.physical_region_routes)
.on_drop_regions(&self.context, &self.data.physical_region_routes, false)
.await?;
self.data.state = DropTableState::DeleteTombstone;
Ok(Status::executing(true))

View File

@@ -214,6 +214,7 @@ impl DropTableExecutor {
&self,
ctx: &DdlContext,
region_routes: &[RegionRoute],
fast_path: bool,
) -> Result<()> {
let leaders = find_leaders(region_routes);
let mut drop_region_tasks = Vec::with_capacity(leaders.len());
@@ -236,6 +237,7 @@ impl DropTableExecutor {
}),
body: Some(region_request::Body::Drop(PbDropRegionRequest {
region_id: region_id.as_u64(),
fast_path,
})),
};
let datanode = datanode.clone();

View File

@@ -16,9 +16,9 @@ pub mod flow_info;
pub(crate) mod flow_name;
pub(crate) mod flow_route;
pub mod flow_state;
mod flownode_addr_helper;
pub(crate) mod flownode_flow;
pub(crate) mod table_flow;
use std::ops::Deref;
use std::sync::Arc;
@@ -506,7 +506,6 @@ mod tests {
let routes = flow_metadata_manager
.flow_route_manager()
.routes(flow_id)
.try_collect::<Vec<_>>()
.await
.unwrap();
assert_eq!(
@@ -538,7 +537,6 @@ mod tests {
let nodes = flow_metadata_manager
.table_flow_manager()
.flows(table_id)
.try_collect::<Vec<_>>()
.await
.unwrap();
assert_eq!(
@@ -727,7 +725,6 @@ mod tests {
let routes = flow_metadata_manager
.flow_route_manager()
.routes(flow_id)
.try_collect::<Vec<_>>()
.await
.unwrap();
assert_eq!(
@@ -759,7 +756,6 @@ mod tests {
let nodes = flow_metadata_manager
.table_flow_manager()
.flows(table_id)
.try_collect::<Vec<_>>()
.await
.unwrap();
assert_eq!(

View File

@@ -12,14 +12,15 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use futures::stream::BoxStream;
use futures::TryStreamExt;
use lazy_static::lazy_static;
use regex::Regex;
use serde::{Deserialize, Serialize};
use snafu::OptionExt;
use crate::error::{self, Result};
use crate::key::flow::FlowScoped;
use crate::key::flow::{flownode_addr_helper, FlowScoped};
use crate::key::node_address::NodeAddressKey;
use crate::key::{BytesAdapter, FlowId, FlowPartitionId, MetadataKey, MetadataValue};
use crate::kv_backend::txn::{Txn, TxnOp};
use crate::kv_backend::KvBackendRef;
@@ -167,10 +168,7 @@ impl FlowRouteManager {
}
/// Retrieves all [FlowRouteValue]s of the specified `flow_id`.
pub fn routes(
&self,
flow_id: FlowId,
) -> BoxStream<'static, Result<(FlowRouteKey, FlowRouteValue)>> {
pub async fn routes(&self, flow_id: FlowId) -> Result<Vec<(FlowRouteKey, FlowRouteValue)>> {
let start_key = FlowRouteKey::range_start_key(flow_id);
let req = RangeRequest::new().with_prefix(start_key);
let stream = PaginationStream::new(
@@ -181,7 +179,9 @@ impl FlowRouteManager {
)
.into_stream();
Box::pin(stream)
let mut res = stream.try_collect::<Vec<_>>().await?;
self.remap_flow_route_addresses(&mut res).await?;
Ok(res)
}
/// Builds a create flow routes transaction.
@@ -203,6 +203,28 @@ impl FlowRouteManager {
Ok(Txn::new().and_then(txns))
}
async fn remap_flow_route_addresses(
&self,
flow_routes: &mut [(FlowRouteKey, FlowRouteValue)],
) -> Result<()> {
let keys = flow_routes
.iter()
.map(|(_, value)| NodeAddressKey::with_flownode(value.peer.id))
.collect();
let flow_node_addrs =
flownode_addr_helper::get_flownode_addresses(&self.kv_backend, keys).await?;
for (_, flow_route_value) in flow_routes.iter_mut() {
let flownode_id = flow_route_value.peer.id;
// If an id lacks a corresponding address in the `flow_node_addrs`,
// it means the old address in `table_flow_value` is still valid,
// which is expected.
if let Some(node_addr) = flow_node_addrs.get(&flownode_id) {
flow_route_value.peer.addr = node_addr.peer.addr.clone();
}
}
Ok(())
}
}
#[cfg(test)]

View File

@@ -0,0 +1,47 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use crate::error::Result;
use crate::key::node_address::{NodeAddressKey, NodeAddressValue};
use crate::key::{MetadataKey, MetadataValue};
use crate::kv_backend::KvBackendRef;
use crate::rpc::store::BatchGetRequest;
/// Get the addresses of the flownodes.
/// The result is a map: node_id -> NodeAddressValue
pub(crate) async fn get_flownode_addresses(
kv_backend: &KvBackendRef,
keys: Vec<NodeAddressKey>,
) -> Result<HashMap<u64, NodeAddressValue>> {
if keys.is_empty() {
return Ok(HashMap::default());
}
let req = BatchGetRequest {
keys: keys.into_iter().map(|k| k.to_bytes()).collect(),
};
kv_backend
.batch_get(req)
.await?
.kvs
.into_iter()
.map(|kv| {
let key = NodeAddressKey::from_bytes(&kv.key)?;
let value = NodeAddressValue::try_from_raw_value(&kv.value)?;
Ok((key.node_id, value))
})
.collect()
}

View File

@@ -14,7 +14,7 @@
use std::sync::Arc;
use futures::stream::BoxStream;
use futures::TryStreamExt;
use lazy_static::lazy_static;
use regex::Regex;
use serde::{Deserialize, Serialize};
@@ -22,7 +22,8 @@ use snafu::OptionExt;
use table::metadata::TableId;
use crate::error::{self, Result};
use crate::key::flow::FlowScoped;
use crate::key::flow::{flownode_addr_helper, FlowScoped};
use crate::key::node_address::NodeAddressKey;
use crate::key::{BytesAdapter, FlowId, FlowPartitionId, MetadataKey, MetadataValue};
use crate::kv_backend::txn::{Txn, TxnOp};
use crate::kv_backend::KvBackendRef;
@@ -196,10 +197,7 @@ impl TableFlowManager {
/// Retrieves all [TableFlowKey]s of the specified `table_id`.
///
/// TODO(discord9): add cache for it since range request does not support cache.
pub fn flows(
&self,
table_id: TableId,
) -> BoxStream<'static, Result<(TableFlowKey, TableFlowValue)>> {
pub async fn flows(&self, table_id: TableId) -> Result<Vec<(TableFlowKey, TableFlowValue)>> {
let start_key = TableFlowKey::range_start_key(table_id);
let req = RangeRequest::new().with_prefix(start_key);
let stream = PaginationStream::new(
@@ -210,7 +208,9 @@ impl TableFlowManager {
)
.into_stream();
Box::pin(stream)
let mut res = stream.try_collect::<Vec<_>>().await?;
self.remap_table_flow_addresses(&mut res).await?;
Ok(res)
}
/// Builds a create table flow transaction.
@@ -238,6 +238,28 @@ impl TableFlowManager {
Ok(Txn::new().and_then(txns))
}
async fn remap_table_flow_addresses(
&self,
table_flows: &mut [(TableFlowKey, TableFlowValue)],
) -> Result<()> {
let keys = table_flows
.iter()
.map(|(_, value)| NodeAddressKey::with_flownode(value.peer.id))
.collect::<Vec<_>>();
let flownode_addrs =
flownode_addr_helper::get_flownode_addresses(&self.kv_backend, keys).await?;
for (_, table_flow_value) in table_flows.iter_mut() {
let flownode_id = table_flow_value.peer.id;
// If an id lacks a corresponding address in the `flow_node_addrs`,
// it means the old address in `table_flow_value` is still valid,
// which is expected.
if let Some(flownode_addr) = flownode_addrs.get(&flownode_id) {
table_flow_value.peer.addr = flownode_addr.peer.addr.clone();
}
}
Ok(())
}
}
#[cfg(test)]

View File

@@ -39,6 +39,10 @@ impl NodeAddressKey {
pub fn with_datanode(node_id: u64) -> Self {
Self::new(Role::Datanode, node_id)
}
pub fn with_flownode(node_id: u64) -> Self {
Self::new(Role::Flownode, node_id)
}
}
#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]

View File

@@ -34,6 +34,7 @@ pub mod kv_backend;
pub mod leadership_notifier;
pub mod lock_key;
pub mod metrics;
pub mod node_expiry_listener;
pub mod node_manager;
pub mod peer;
pub mod range_stream;

View File

@@ -0,0 +1,152 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Mutex;
use std::time::Duration;
use common_telemetry::{debug, error, info, warn};
use tokio::task::JoinHandle;
use tokio::time::{interval, MissedTickBehavior};
use crate::cluster::{NodeInfo, NodeInfoKey};
use crate::error;
use crate::kv_backend::ResettableKvBackendRef;
use crate::leadership_notifier::LeadershipChangeListener;
use crate::rpc::store::RangeRequest;
use crate::rpc::KeyValue;
/// [NodeExpiryListener] periodically checks all node info in memory and removes
/// expired node info to prevent memory leak.
pub struct NodeExpiryListener {
handle: Mutex<Option<JoinHandle<()>>>,
max_idle_time: Duration,
in_memory: ResettableKvBackendRef,
}
impl Drop for NodeExpiryListener {
fn drop(&mut self) {
self.stop();
}
}
impl NodeExpiryListener {
pub fn new(max_idle_time: Duration, in_memory: ResettableKvBackendRef) -> Self {
Self {
handle: Mutex::new(None),
max_idle_time,
in_memory,
}
}
async fn start(&self) {
let mut handle = self.handle.lock().unwrap();
if handle.is_none() {
let in_memory = self.in_memory.clone();
let max_idle_time = self.max_idle_time;
let ticker_loop = tokio::spawn(async move {
// Run clean task every minute.
let mut interval = interval(Duration::from_secs(60));
interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
loop {
interval.tick().await;
if let Err(e) = Self::clean_expired_nodes(&in_memory, max_idle_time).await {
error!(e; "Failed to clean expired node");
}
}
});
*handle = Some(ticker_loop);
}
}
fn stop(&self) {
if let Some(handle) = self.handle.lock().unwrap().take() {
handle.abort();
info!("Node expiry listener stopped")
}
}
/// Cleans expired nodes from memory.
async fn clean_expired_nodes(
in_memory: &ResettableKvBackendRef,
max_idle_time: Duration,
) -> error::Result<()> {
let node_keys = Self::list_expired_nodes(in_memory, max_idle_time).await?;
for key in node_keys {
let key_bytes: Vec<u8> = (&key).into();
if let Err(e) = in_memory.delete(&key_bytes, false).await {
warn!(e; "Failed to delete expired node: {:?}", key_bytes);
} else {
debug!("Deleted expired node key: {:?}", key);
}
}
Ok(())
}
/// Lists expired nodes that have been inactive more than `max_idle_time`.
async fn list_expired_nodes(
in_memory: &ResettableKvBackendRef,
max_idle_time: Duration,
) -> error::Result<impl Iterator<Item = NodeInfoKey>> {
let prefix = NodeInfoKey::key_prefix_with_cluster_id(0);
let req = RangeRequest::new().with_prefix(prefix);
let current_time_millis = common_time::util::current_time_millis();
let resp = in_memory.range(req).await?;
Ok(resp
.kvs
.into_iter()
.filter_map(move |KeyValue { key, value }| {
let Ok(info) = NodeInfo::try_from(value).inspect_err(|e| {
warn!(e; "Unrecognized node info value");
}) else {
return None;
};
if (current_time_millis - info.last_activity_ts) > max_idle_time.as_millis() as i64
{
NodeInfoKey::try_from(key)
.inspect_err(|e| {
warn!(e; "Unrecognized node info key: {:?}", info.peer);
})
.ok()
.inspect(|node_key| {
debug!("Found expired node: {:?}", node_key);
})
} else {
None
}
}))
}
}
#[async_trait::async_trait]
impl LeadershipChangeListener for NodeExpiryListener {
fn name(&self) -> &str {
"NodeExpiryListener"
}
async fn on_leader_start(&self) -> error::Result<()> {
self.start().await;
info!(
"On leader start, node expiry listener started with max idle time: {:?}",
self.max_idle_time
);
Ok(())
}
async fn on_leader_stop(&self) -> error::Result<()> {
self.stop();
info!("On leader stop, node expiry listener stopped");
Ok(())
}
}

View File

@@ -1218,7 +1218,10 @@ mod tests {
);
let response = mock_region_server
.handle_request(region_id, RegionRequest::Drop(RegionDropRequest {}))
.handle_request(
region_id,
RegionRequest::Drop(RegionDropRequest { fast_path: false }),
)
.await
.unwrap();
assert_eq!(response.affected_rows, 0);
@@ -1310,7 +1313,10 @@ mod tests {
.insert(region_id, RegionEngineWithStatus::Ready(engine.clone()));
mock_region_server
.handle_request(region_id, RegionRequest::Drop(RegionDropRequest {}))
.handle_request(
region_id,
RegionRequest::Drop(RegionDropRequest { fast_path: false }),
)
.await
.unwrap_err();

View File

@@ -32,5 +32,5 @@ pub mod types;
pub mod value;
pub mod vectors;
pub use arrow;
pub use arrow::{self, compute};
pub use error::{Error, Result};

View File

@@ -60,12 +60,12 @@ async fn query_flow_state(
#[derive(Clone)]
pub struct HeartbeatTask {
node_id: u64,
node_epoch: u64,
peer_addr: String,
meta_client: Arc<MetaClient>,
report_interval: Duration,
retry_interval: Duration,
resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
start_time_ms: u64,
running: Arc<AtomicBool>,
query_stat_size: Option<SizeReportSender>,
}
@@ -83,12 +83,12 @@ impl HeartbeatTask {
) -> Self {
Self {
node_id: opts.node_id.unwrap_or(0),
node_epoch: common_time::util::current_time_millis() as u64,
peer_addr: addrs::resolve_addr(&opts.grpc.bind_addr, Some(&opts.grpc.server_addr)),
meta_client,
report_interval: heartbeat_opts.interval,
retry_interval: heartbeat_opts.retry_interval,
resp_handler_executor,
start_time_ms: common_time::util::current_time_millis() as u64,
running: Arc::new(AtomicBool::new(false)),
query_stat_size: None,
}
@@ -103,6 +103,11 @@ impl HeartbeatTask {
warn!("Heartbeat task started multiple times");
return Ok(());
}
self.create_streams().await
}
async fn create_streams(&self) -> Result<(), Error> {
info!("Start to establish the heartbeat connection to metasrv.");
let (req_sender, resp_stream) = self
.meta_client
@@ -181,7 +186,7 @@ impl HeartbeatTask {
mut outgoing_rx: mpsc::Receiver<OutgoingMessage>,
) {
let report_interval = self.report_interval;
let start_time_ms = self.start_time_ms;
let node_epoch = self.node_epoch;
let self_peer = Some(Peer {
id: self.node_id,
addr: self.peer_addr.clone(),
@@ -198,7 +203,8 @@ impl HeartbeatTask {
let heartbeat_request = HeartbeatRequest {
peer: self_peer,
info: Self::build_node_info(start_time_ms),
node_epoch,
info: Self::build_node_info(node_epoch),
..Default::default()
};
@@ -230,6 +236,8 @@ impl HeartbeatTask {
// set the timeout to half of the report interval so that it wouldn't delay heartbeat if something went horribly wrong
latest_report = query_flow_state(&query_stat_size, report_interval / 2).await;
}
info!("flownode heartbeat task stopped.");
});
}
@@ -273,7 +281,7 @@ impl HeartbeatTask {
info!("Try to re-establish the heartbeat connection to metasrv.");
if self.start().await.is_ok() {
if self.create_streams().await.is_ok() {
break;
}
}

View File

@@ -237,6 +237,13 @@ impl Instance {
let output = match stmt {
Statement::Query(_) | Statement::Explain(_) | Statement::Delete(_) => {
// TODO: remove this when format is supported in datafusion
if let Statement::Explain(explain) = &stmt {
if let Some(format) = explain.format() {
query_ctx.set_explain_format(format.to_string());
}
}
let stmt = QueryStatement::Sql(stmt);
let plan = self
.statement_executor

View File

@@ -25,7 +25,7 @@ use crate::fulltext_index::create::{FulltextIndexCreator, TantivyFulltextIndexCr
use crate::fulltext_index::search::{FulltextIndexSearcher, RowId, TantivyFulltextIndexSearcher};
use crate::fulltext_index::{Analyzer, Config};
async fn new_bounded_stager(prefix: &str) -> (TempDir, Arc<BoundedStager>) {
async fn new_bounded_stager(prefix: &str) -> (TempDir, Arc<BoundedStager<String>>) {
let staging_dir = create_temp_dir(prefix);
let path = staging_dir.path().to_path_buf();
(
@@ -68,13 +68,13 @@ async fn test_search(
let file_accessor = Arc::new(MockFileAccessor::new(prefix));
let puffin_manager = FsPuffinManager::new(stager, file_accessor);
let file_name = "fulltext_index";
let blob_key = "fulltext_index";
let mut writer = puffin_manager.writer(file_name).await.unwrap();
create_index(prefix, &mut writer, blob_key, texts, config).await;
let file_name = "fulltext_index".to_string();
let blob_key = "fulltext_index".to_string();
let mut writer = puffin_manager.writer(&file_name).await.unwrap();
create_index(prefix, &mut writer, &blob_key, texts, config).await;
let reader = puffin_manager.reader(file_name).await.unwrap();
let index_dir = reader.dir(blob_key).await.unwrap();
let reader = puffin_manager.reader(&file_name).await.unwrap();
let index_dir = reader.dir(&blob_key).await.unwrap();
let searcher = TantivyFulltextIndexSearcher::new(index_dir.path()).unwrap();
let results = searcher.search(query).await.unwrap();

View File

@@ -198,13 +198,13 @@ impl Inner {
}
);
let leader = self
let leader_addr = self
.ask_leader
.as_ref()
.unwrap()
.get_leader()
.context(error::NoLeaderSnafu)?;
let mut leader = self.make_client(leader)?;
let mut leader = self.make_client(&leader_addr)?;
let (sender, receiver) = mpsc::channel::<HeartbeatRequest>(128);
@@ -236,7 +236,11 @@ impl Inner {
.await
.map_err(error::Error::from)?
.context(error::CreateHeartbeatStreamSnafu)?;
info!("Success to create heartbeat stream to server: {:#?}", res);
info!(
"Success to create heartbeat stream to server: {}, response: {:#?}",
leader_addr, res
);
Ok((
HeartbeatSender::new(self.id, self.role, sender),

View File

@@ -44,6 +44,7 @@ use mailbox_handler::MailboxHandler;
use on_leader_start_handler::OnLeaderStartHandler;
use publish_heartbeat_handler::PublishHeartbeatHandler;
use region_lease_handler::RegionLeaseHandler;
use remap_flow_peer_handler::RemapFlowPeerHandler;
use response_header_handler::ResponseHeaderHandler;
use snafu::{OptionExt, ResultExt};
use store_api::storage::RegionId;
@@ -71,6 +72,7 @@ pub mod mailbox_handler;
pub mod on_leader_start_handler;
pub mod publish_heartbeat_handler;
pub mod region_lease_handler;
pub mod remap_flow_peer_handler;
pub mod response_header_handler;
#[async_trait::async_trait]
@@ -573,6 +575,7 @@ impl HeartbeatHandlerGroupBuilder {
self.add_handler_last(publish_heartbeat_handler);
}
self.add_handler_last(CollectStatsHandler::new(self.flush_stats_factor));
self.add_handler_last(RemapFlowPeerHandler::default());
if let Some(flow_state_handler) = self.flow_state_handler.take() {
self.add_handler_last(flow_state_handler);
@@ -853,7 +856,7 @@ mod tests {
.unwrap();
let handlers = group.handlers;
assert_eq!(12, handlers.len());
assert_eq!(13, handlers.len());
let names = [
"ResponseHeaderHandler",
@@ -868,6 +871,7 @@ mod tests {
"MailboxHandler",
"FilterInactiveRegionStatsHandler",
"CollectStatsHandler",
"RemapFlowPeerHandler",
];
for (handler, name) in handlers.iter().zip(names.into_iter()) {
@@ -888,7 +892,7 @@ mod tests {
let group = builder.build().unwrap();
let handlers = group.handlers;
assert_eq!(13, handlers.len());
assert_eq!(14, handlers.len());
let names = [
"ResponseHeaderHandler",
@@ -904,6 +908,7 @@ mod tests {
"CollectStatsHandler",
"FilterInactiveRegionStatsHandler",
"CollectStatsHandler",
"RemapFlowPeerHandler",
];
for (handler, name) in handlers.iter().zip(names.into_iter()) {
@@ -921,7 +926,7 @@ mod tests {
let group = builder.build().unwrap();
let handlers = group.handlers;
assert_eq!(13, handlers.len());
assert_eq!(14, handlers.len());
let names = [
"CollectStatsHandler",
@@ -937,6 +942,7 @@ mod tests {
"MailboxHandler",
"FilterInactiveRegionStatsHandler",
"CollectStatsHandler",
"RemapFlowPeerHandler",
];
for (handler, name) in handlers.iter().zip(names.into_iter()) {
@@ -954,7 +960,7 @@ mod tests {
let group = builder.build().unwrap();
let handlers = group.handlers;
assert_eq!(13, handlers.len());
assert_eq!(14, handlers.len());
let names = [
"ResponseHeaderHandler",
@@ -970,6 +976,7 @@ mod tests {
"CollectStatsHandler",
"FilterInactiveRegionStatsHandler",
"CollectStatsHandler",
"RemapFlowPeerHandler",
];
for (handler, name) in handlers.iter().zip(names.into_iter()) {
@@ -987,7 +994,7 @@ mod tests {
let group = builder.build().unwrap();
let handlers = group.handlers;
assert_eq!(13, handlers.len());
assert_eq!(14, handlers.len());
let names = [
"ResponseHeaderHandler",
@@ -1003,6 +1010,7 @@ mod tests {
"FilterInactiveRegionStatsHandler",
"CollectStatsHandler",
"ResponseHeaderHandler",
"RemapFlowPeerHandler",
];
for (handler, name) in handlers.iter().zip(names.into_iter()) {
@@ -1020,7 +1028,7 @@ mod tests {
let group = builder.build().unwrap();
let handlers = group.handlers;
assert_eq!(12, handlers.len());
assert_eq!(13, handlers.len());
let names = [
"ResponseHeaderHandler",
@@ -1035,6 +1043,7 @@ mod tests {
"CollectStatsHandler",
"FilterInactiveRegionStatsHandler",
"CollectStatsHandler",
"RemapFlowPeerHandler",
];
for (handler, name) in handlers.iter().zip(names.into_iter()) {
@@ -1052,7 +1061,7 @@ mod tests {
let group = builder.build().unwrap();
let handlers = group.handlers;
assert_eq!(12, handlers.len());
assert_eq!(13, handlers.len());
let names = [
"ResponseHeaderHandler",
@@ -1067,6 +1076,7 @@ mod tests {
"MailboxHandler",
"FilterInactiveRegionStatsHandler",
"ResponseHeaderHandler",
"RemapFlowPeerHandler",
];
for (handler, name) in handlers.iter().zip(names.into_iter()) {
@@ -1084,7 +1094,7 @@ mod tests {
let group = builder.build().unwrap();
let handlers = group.handlers;
assert_eq!(12, handlers.len());
assert_eq!(13, handlers.len());
let names = [
"CollectStatsHandler",
@@ -1099,6 +1109,7 @@ mod tests {
"MailboxHandler",
"FilterInactiveRegionStatsHandler",
"CollectStatsHandler",
"RemapFlowPeerHandler",
];
for (handler, name) in handlers.iter().zip(names.into_iter()) {

View File

@@ -23,8 +23,8 @@ pub struct CheckLeaderHandler;
#[async_trait::async_trait]
impl HeartbeatHandler for CheckLeaderHandler {
fn is_acceptable(&self, role: Role) -> bool {
role == Role::Datanode
fn is_acceptable(&self, _role: Role) -> bool {
true
}
async fn handle(

View File

@@ -157,7 +157,7 @@ fn extract_base_info(request: &HeartbeatRequest) -> Option<(NodeInfoKey, Peer, P
}
async fn put_into_memory_store(ctx: &mut Context, key: NodeInfoKey, value: NodeInfo) -> Result<()> {
let key = key.into();
let key = (&key).into();
let value = value.try_into().context(InvalidClusterInfoFormatSnafu)?;
let put_req = PutRequest {
key,

View File

@@ -21,7 +21,7 @@ use common_meta::key::node_address::{NodeAddressKey, NodeAddressValue};
use common_meta::key::{MetadataKey, MetadataValue};
use common_meta::peer::Peer;
use common_meta::rpc::store::PutRequest;
use common_telemetry::{error, warn};
use common_telemetry::{error, info, warn};
use dashmap::DashMap;
use snafu::ResultExt;
@@ -185,6 +185,10 @@ async fn rewrite_node_address(ctx: &mut Context, stat: &Stat) {
match ctx.leader_cached_kv_backend.put(put).await {
Ok(_) => {
info!(
"Successfully updated datanode `NodeAddressValue`: {:?}",
peer
);
// broadcast invalidating cache
let cache_idents = stat
.table_ids()
@@ -200,11 +204,14 @@ async fn rewrite_node_address(ctx: &mut Context, stat: &Stat) {
}
}
Err(e) => {
error!(e; "Failed to update NodeAddressValue: {:?}", peer);
error!(e; "Failed to update datanode `NodeAddressValue`: {:?}", peer);
}
}
} else {
warn!("Failed to serialize NodeAddressValue: {:?}", peer);
warn!(
"Failed to serialize datanode `NodeAddressValue`: {:?}",
peer
);
}
}

View File

@@ -0,0 +1,92 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::meta::{HeartbeatRequest, Peer, Role};
use common_meta::key::node_address::{NodeAddressKey, NodeAddressValue};
use common_meta::key::{MetadataKey, MetadataValue};
use common_meta::rpc::store::PutRequest;
use common_telemetry::{error, info, warn};
use dashmap::DashMap;
use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler};
use crate::metasrv::Context;
use crate::Result;
#[derive(Debug, Default)]
pub struct RemapFlowPeerHandler {
/// flow_node_id -> epoch
epoch_cache: DashMap<u64, u64>,
}
#[async_trait::async_trait]
impl HeartbeatHandler for RemapFlowPeerHandler {
fn is_acceptable(&self, role: Role) -> bool {
role == Role::Flownode
}
async fn handle(
&self,
req: &HeartbeatRequest,
ctx: &mut Context,
_acc: &mut HeartbeatAccumulator,
) -> Result<HandleControl> {
let Some(peer) = req.peer.as_ref() else {
return Ok(HandleControl::Continue);
};
let current_epoch = req.node_epoch;
let flow_node_id = peer.id;
let refresh = if let Some(mut epoch) = self.epoch_cache.get_mut(&flow_node_id) {
if current_epoch > *epoch.value() {
*epoch.value_mut() = current_epoch;
true
} else {
false
}
} else {
self.epoch_cache.insert(flow_node_id, current_epoch);
true
};
if refresh {
rewrite_node_address(ctx, peer).await;
}
Ok(HandleControl::Continue)
}
}
async fn rewrite_node_address(ctx: &mut Context, peer: &Peer) {
let key = NodeAddressKey::with_flownode(peer.id).to_bytes();
if let Ok(value) = NodeAddressValue::new(peer.clone().into()).try_as_raw_value() {
let put = PutRequest {
key,
value,
prev_kv: false,
};
match ctx.leader_cached_kv_backend.put(put).await {
Ok(_) => {
info!("Successfully updated flow `NodeAddressValue`: {:?}", peer);
// TODO(discord): broadcast invalidating cache to all frontends
}
Err(e) => {
error!(e; "Failed to update flow `NodeAddressValue`: {:?}", peer);
}
}
} else {
warn!("Failed to serialize flow `NodeAddressValue`: {:?}", peer);
}
}

View File

@@ -32,6 +32,7 @@ use common_meta::kv_backend::{KvBackendRef, ResettableKvBackend, ResettableKvBac
use common_meta::leadership_notifier::{
LeadershipChangeNotifier, LeadershipChangeNotifierCustomizerRef,
};
use common_meta::node_expiry_listener::NodeExpiryListener;
use common_meta::peer::Peer;
use common_meta::region_keeper::MemoryRegionKeeperRef;
use common_meta::wal_options_allocator::WalOptionsAllocatorRef;
@@ -151,6 +152,8 @@ pub struct MetasrvOptions {
#[cfg(feature = "pg_kvbackend")]
/// Lock id for meta kv election. Only effect when using pg_kvbackend.
pub meta_election_lock_id: u64,
#[serde(with = "humantime_serde")]
pub node_max_idle_time: Duration,
}
const DEFAULT_METASRV_ADDR_PORT: &str = "3002";
@@ -192,6 +195,7 @@ impl Default for MetasrvOptions {
meta_table_name: DEFAULT_META_TABLE_NAME.to_string(),
#[cfg(feature = "pg_kvbackend")]
meta_election_lock_id: DEFAULT_META_ELECTION_LOCK_ID,
node_max_idle_time: Duration::from_secs(24 * 60 * 60),
}
}
}
@@ -442,6 +446,10 @@ impl Metasrv {
leadership_change_notifier.add_listener(self.wal_options_allocator.clone());
leadership_change_notifier
.add_listener(Arc::new(ProcedureManagerListenerAdapter(procedure_manager)));
leadership_change_notifier.add_listener(Arc::new(NodeExpiryListener::new(
self.options.node_max_idle_time,
self.in_memory.clone(),
)));
if let Some(region_supervisor_ticker) = &self.region_supervisor_ticker {
leadership_change_notifier.add_listener(region_supervisor_ticker.clone() as _);
}

View File

@@ -68,13 +68,15 @@ impl heartbeat_server::Heartbeat for Metasrv {
};
if pusher_id.is_none() {
pusher_id = register_pusher(&handler_group, header, tx.clone()).await;
pusher_id =
Some(register_pusher(&handler_group, header, tx.clone()).await);
}
if let Some(k) = &pusher_id {
METRIC_META_HEARTBEAT_RECV.with_label_values(&[&k.to_string()]);
} else {
METRIC_META_HEARTBEAT_RECV.with_label_values(&["none"]);
}
let res = handler_group
.handle(req, ctx.clone())
.await
@@ -173,13 +175,13 @@ async fn register_pusher(
handler_group: &HeartbeatHandlerGroup,
header: &RequestHeader,
sender: Sender<std::result::Result<HeartbeatResponse, tonic::Status>>,
) -> Option<PusherId> {
) -> PusherId {
let role = header.role();
let id = get_node_id(header);
let pusher_id = PusherId::new(role, id);
let pusher = Pusher::new(sender, header);
handler_group.register_pusher(pusher_id, pusher).await;
Some(pusher_id)
pusher_id
}
#[cfg(test)]

View File

@@ -17,13 +17,15 @@ use std::time::Duration;
use api::v1::meta::{
procedure_service_server, DdlTaskRequest as PbDdlTaskRequest,
DdlTaskResponse as PbDdlTaskResponse, MigrateRegionRequest, MigrateRegionResponse,
DdlTaskResponse as PbDdlTaskResponse, Error, MigrateRegionRequest, MigrateRegionResponse,
ProcedureDetailRequest, ProcedureDetailResponse, ProcedureStateResponse, QueryProcedureRequest,
ResponseHeader,
};
use common_meta::ddl::ExecutorContext;
use common_meta::rpc::ddl::{DdlTask, SubmitDdlTaskRequest};
use common_meta::rpc::procedure;
use snafu::{ensure, OptionExt, ResultExt};
use common_telemetry::warn;
use snafu::{OptionExt, ResultExt};
use tonic::{Request, Response};
use super::GrpcResult;
@@ -37,6 +39,16 @@ impl procedure_service_server::ProcedureService for Metasrv {
&self,
request: Request<QueryProcedureRequest>,
) -> GrpcResult<ProcedureStateResponse> {
if !self.is_leader() {
let resp = ProcedureStateResponse {
header: Some(ResponseHeader::failed(0, Error::is_not_leader())),
..Default::default()
};
warn!("The current meta is not leader, but a `query procedure state` request have reached the meta. Detail: {:?}.", request);
return Ok(Response::new(resp));
}
let QueryProcedureRequest { header, pid, .. } = request.into_inner();
let _header = header.context(error::MissingRequestHeaderSnafu)?;
let pid = pid.context(error::MissingRequiredParameterSnafu { param: "pid" })?;
@@ -57,6 +69,16 @@ impl procedure_service_server::ProcedureService for Metasrv {
}
async fn ddl(&self, request: Request<PbDdlTaskRequest>) -> GrpcResult<PbDdlTaskResponse> {
if !self.is_leader() {
let resp = PbDdlTaskResponse {
header: Some(ResponseHeader::failed(0, Error::is_not_leader())),
..Default::default()
};
warn!("The current meta is not leader, but a `ddl` request have reached the meta. Detail: {:?}.", request);
return Ok(Response::new(resp));
}
let PbDdlTaskRequest {
header,
query_context,
@@ -99,12 +121,15 @@ impl procedure_service_server::ProcedureService for Metasrv {
&self,
request: Request<MigrateRegionRequest>,
) -> GrpcResult<MigrateRegionResponse> {
ensure!(
self.meta_peer_client().is_leader(),
error::UnexpectedSnafu {
violated: "Trying to submit a region migration procedure to non-leader meta server"
}
);
if !self.is_leader() {
let resp = MigrateRegionResponse {
header: Some(ResponseHeader::failed(0, Error::is_not_leader())),
..Default::default()
};
warn!("The current meta is not leader, but a `migrate` request have reached the meta. Detail: {:?}.", request);
return Ok(Response::new(resp));
}
let MigrateRegionRequest {
header,
@@ -150,6 +175,16 @@ impl procedure_service_server::ProcedureService for Metasrv {
&self,
request: Request<ProcedureDetailRequest>,
) -> GrpcResult<ProcedureDetailResponse> {
if !self.is_leader() {
let resp = ProcedureDetailResponse {
header: Some(ResponseHeader::failed(0, Error::is_not_leader())),
..Default::default()
};
warn!("The current meta is not leader, but a `procedure details` request have reached the meta. Detail: {:?}.", request);
return Ok(Response::new(resp));
}
let ProcedureDetailRequest { header } = request.into_inner();
let _header = header.context(error::MissingRequestHeaderSnafu)?;
let metas = self

View File

@@ -142,6 +142,7 @@ impl DataRegion {
c.column_id = new_column_id_start + delta as u32;
c.column_schema.set_nullable();
match index_options {
IndexOptions::None => {}
IndexOptions::Inverted => {
c.column_schema.set_inverted_index(true);
}

View File

@@ -21,7 +21,7 @@ use api::v1::SemanticType;
use common_telemetry::info;
use common_time::{Timestamp, FOREVER};
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use datatypes::schema::{ColumnSchema, SkippingIndexOptions};
use datatypes::value::Value;
use mito2::engine::MITO_ENGINE_NAME;
use object_store::util::join_dir;
@@ -55,6 +55,8 @@ use crate::error::{
use crate::metrics::PHYSICAL_REGION_COUNT;
use crate::utils::{self, to_data_region_id, to_metadata_region_id};
const DEFAULT_TABLE_ID_SKIPPING_INDEX_GRANULARITY: u32 = 1024;
impl MetricEngineInner {
pub async fn create_regions(
&self,
@@ -440,6 +442,7 @@ impl MetricEngineInner {
///
/// Return `[table_id_col, tsid_col]`
fn internal_column_metadata() -> [ColumnMetadata; 2] {
// Safety: BloomFilter is a valid skipping index type
let metric_name_col = ColumnMetadata {
column_id: ReservedColumnId::table_id(),
semantic_type: SemanticType::Tag,
@@ -448,7 +451,11 @@ impl MetricEngineInner {
ConcreteDataType::uint32_datatype(),
false,
)
.with_inverted_index(true),
.with_skipping_options(SkippingIndexOptions {
granularity: DEFAULT_TABLE_ID_SKIPPING_INDEX_GRANULARITY,
index_type: datatypes::schema::SkippingIndexType::BloomFilter,
})
.unwrap(),
};
let tsid_col = ColumnMetadata {
column_id: ReservedColumnId::tsid(),

View File

@@ -30,9 +30,10 @@ impl MetricEngineInner {
pub async fn drop_region(
&self,
region_id: RegionId,
_req: RegionDropRequest,
req: RegionDropRequest,
) -> Result<AffectedRows> {
let data_region_id = utils::to_data_region_id(region_id);
let fast_path = req.fast_path;
// enclose the guard in a block to prevent the guard from polluting the async context
let (is_physical_region, is_physical_region_busy) = {
@@ -52,7 +53,7 @@ impl MetricEngineInner {
if is_physical_region {
// check if there is no logical region relates to this physical region
if is_physical_region_busy {
if is_physical_region_busy && !fast_path {
// reject if there is any present logical region
return Err(PhysicalRegionBusySnafu {
region_id: data_region_id,
@@ -60,9 +61,21 @@ impl MetricEngineInner {
.build());
}
self.drop_physical_region(data_region_id).await
return self.drop_physical_region(data_region_id).await;
}
if fast_path {
// for fast path, we don't delete the metadata in the metadata region.
// it only remove the logical region from the engine state.
//
// The drop database procedure will ensure the metadata region and data region are dropped eventually.
self.state
.write()
.unwrap()
.remove_logical_region(region_id)?;
Ok(0)
} else {
// cannot merge these two `if` otherwise the stupid type checker will complain
let metadata_region_id = self
.state
.read()
@@ -87,13 +100,16 @@ impl MetricEngineInner {
// Since the physical regions are going to be dropped, we don't need to
// update the contents in metadata region.
self.mito
.handle_request(data_region_id, RegionRequest::Drop(RegionDropRequest {}))
.handle_request(
data_region_id,
RegionRequest::Drop(RegionDropRequest { fast_path: false }),
)
.await
.with_context(|_| CloseMitoRegionSnafu { region_id })?;
self.mito
.handle_request(
metadata_region_id,
RegionRequest::Drop(RegionDropRequest {}),
RegionRequest::Drop(RegionDropRequest { fast_path: false }),
)
.await
.with_context(|_| CloseMitoRegionSnafu { region_id })?;

View File

@@ -40,6 +40,7 @@ pub struct PhysicalRegionOptions {
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub enum IndexOptions {
#[default]
None,
Inverted,
Skipping {
granularity: u32,

View File

@@ -146,11 +146,14 @@ impl AccessLayer {
} else {
// Write cache is disabled.
let store = self.object_store.clone();
let path_provider = RegionFilePathFactory::new(self.region_dir.clone());
let indexer_builder = IndexerBuilderImpl {
op_type: request.op_type,
metadata: request.metadata.clone(),
row_group_size: write_opts.row_group_size,
puffin_manager: self.puffin_manager_factory.build(store),
puffin_manager: self
.puffin_manager_factory
.build(store, path_provider.clone()),
intermediate_manager: self.intermediate_manager.clone(),
index_options: request.index_options,
inverted_index_config: request.inverted_index_config,
@@ -161,9 +164,7 @@ impl AccessLayer {
self.object_store.clone(),
request.metadata,
indexer_builder,
RegionFilePathFactory {
region_dir: self.region_dir.clone(),
},
path_provider,
)
.await;
writer
@@ -248,8 +249,18 @@ pub trait FilePathProvider: Send + Sync {
/// Path provider that builds paths in local write cache.
#[derive(Clone)]
pub(crate) struct WriteCachePathProvider {
pub(crate) region_id: RegionId,
pub(crate) file_cache: FileCacheRef,
region_id: RegionId,
file_cache: FileCacheRef,
}
impl WriteCachePathProvider {
/// Creates a new `WriteCachePathProvider` instance.
pub fn new(region_id: RegionId, file_cache: FileCacheRef) -> Self {
Self {
region_id,
file_cache,
}
}
}
impl FilePathProvider for WriteCachePathProvider {
@@ -267,7 +278,14 @@ impl FilePathProvider for WriteCachePathProvider {
/// Path provider that builds paths in region storage path.
#[derive(Clone, Debug)]
pub(crate) struct RegionFilePathFactory {
pub(crate) region_dir: String,
region_dir: String,
}
impl RegionFilePathFactory {
/// Creates a new `RegionFilePathFactory` instance.
pub fn new(region_dir: String) -> Self {
Self { region_dir }
}
}
impl FilePathProvider for RegionFilePathFactory {

View File

@@ -187,9 +187,12 @@ impl FileCache {
}
/// Removes a file from the cache explicitly.
/// It always tries to remove the file from the local store because we may not have the file
/// in the memory index if upload is failed.
pub(crate) async fn remove(&self, key: IndexKey) {
let file_path = self.cache_file_path(key);
self.memory_index.remove(&key).await;
// Always delete the file from the local store.
if let Err(e) = self.local_store.delete(&file_path).await {
warn!(e; "Failed to delete a cached file {}", file_path);
}

View File

@@ -22,6 +22,7 @@ use common_telemetry::{debug, info};
use futures::AsyncWriteExt;
use object_store::ObjectStore;
use snafu::ResultExt;
use store_api::storage::RegionId;
use crate::access_layer::{
new_fs_cache_store, FilePathProvider, RegionFilePathFactory, SstInfoArray, SstWriteRequest,
@@ -114,15 +115,14 @@ impl WriteCache {
let region_id = write_request.metadata.region_id;
let store = self.file_cache.local_store();
let path_provider = WriteCachePathProvider {
file_cache: self.file_cache.clone(),
region_id,
};
let path_provider = WriteCachePathProvider::new(region_id, self.file_cache.clone());
let indexer = IndexerBuilderImpl {
op_type: write_request.op_type,
metadata: write_request.metadata.clone(),
row_group_size: write_opts.row_group_size,
puffin_manager: self.puffin_manager_factory.build(store),
puffin_manager: self
.puffin_manager_factory
.build(store, path_provider.clone()),
intermediate_manager: self.intermediate_manager.clone(),
index_options: write_request.index_options,
inverted_index_config: write_request.inverted_index_config,
@@ -150,24 +150,41 @@ impl WriteCache {
return Ok(sst_info);
}
let mut upload_tracker = UploadTracker::new(region_id);
let mut err = None;
let remote_store = &upload_request.remote_store;
for sst in &sst_info {
let parquet_key = IndexKey::new(region_id, sst.file_id, FileType::Parquet);
let parquet_path = upload_request
.dest_path_provider
.build_sst_file_path(sst.file_id);
self.upload(parquet_key, &parquet_path, remote_store)
.await?;
if let Err(e) = self.upload(parquet_key, &parquet_path, remote_store).await {
err = Some(e);
break;
}
upload_tracker.push_uploaded_file(parquet_path);
if sst.index_metadata.file_size > 0 {
let puffin_key = IndexKey::new(region_id, sst.file_id, FileType::Puffin);
let puffin_path = &upload_request
let puffin_path = upload_request
.dest_path_provider
.build_index_file_path(sst.file_id);
self.upload(puffin_key, puffin_path, remote_store).await?;
if let Err(e) = self.upload(puffin_key, &puffin_path, remote_store).await {
err = Some(e);
break;
}
upload_tracker.push_uploaded_file(puffin_path);
}
}
if let Some(err) = err {
// Cleans files on failure.
upload_tracker
.clean(&sst_info, &self.file_cache, remote_store)
.await;
return Err(err);
}
Ok(sst_info)
}
@@ -333,6 +350,61 @@ pub struct SstUploadRequest {
pub remote_store: ObjectStore,
}
/// A structs to track files to upload and clean them if upload failed.
struct UploadTracker {
/// Id of the region to track.
region_id: RegionId,
/// Paths of files uploaded successfully.
files_uploaded: Vec<String>,
}
impl UploadTracker {
/// Creates a new instance of `UploadTracker` for a given region.
fn new(region_id: RegionId) -> Self {
Self {
region_id,
files_uploaded: Vec::new(),
}
}
/// Add a file path to the list of uploaded files.
fn push_uploaded_file(&mut self, path: String) {
self.files_uploaded.push(path);
}
/// Cleans uploaded files and files in the file cache at best effort.
async fn clean(
&self,
sst_info: &SstInfoArray,
file_cache: &FileCacheRef,
remote_store: &ObjectStore,
) {
common_telemetry::info!(
"Start cleaning files on upload failure, region: {}, num_ssts: {}",
self.region_id,
sst_info.len()
);
// Cleans files in the file cache first.
for sst in sst_info {
let parquet_key = IndexKey::new(self.region_id, sst.file_id, FileType::Parquet);
file_cache.remove(parquet_key).await;
if sst.index_metadata.file_size > 0 {
let puffin_key = IndexKey::new(self.region_id, sst.file_id, FileType::Puffin);
file_cache.remove(puffin_key).await;
}
}
// Cleans uploaded files.
for file_path in &self.files_uploaded {
if let Err(e) = remote_store.delete(file_path).await {
common_telemetry::error!(e; "Failed to delete file {}", file_path);
}
}
}
}
#[cfg(test)]
mod tests {
use common_test_util::temp_dir::create_temp_dir;
@@ -355,9 +427,7 @@ mod tests {
// and now just use local file system to mock.
let mut env = TestEnv::new();
let mock_store = env.init_object_store_manager();
let path_provider = RegionFilePathFactory {
region_dir: "test".to_string(),
};
let path_provider = RegionFilePathFactory::new("test".to_string());
let local_dir = create_temp_dir("");
let local_store = new_fs_store(local_dir.path().to_str().unwrap());
@@ -488,9 +558,7 @@ mod tests {
..Default::default()
};
let upload_request = SstUploadRequest {
dest_path_provider: RegionFilePathFactory {
region_dir: data_home.clone(),
},
dest_path_provider: RegionFilePathFactory::new(data_home.clone()),
remote_store: mock_store.clone(),
};

View File

@@ -56,7 +56,10 @@ async fn test_engine_drop_region() {
// It's okay to drop a region doesn't exist.
engine
.handle_request(region_id, RegionRequest::Drop(RegionDropRequest {}))
.handle_request(
region_id,
RegionRequest::Drop(RegionDropRequest { fast_path: false }),
)
.await
.unwrap_err();
@@ -86,7 +89,10 @@ async fn test_engine_drop_region() {
// drop the created region.
engine
.handle_request(region_id, RegionRequest::Drop(RegionDropRequest {}))
.handle_request(
region_id,
RegionRequest::Drop(RegionDropRequest { fast_path: false }),
)
.await
.unwrap();
assert!(!engine.is_region_exists(region_id));
@@ -192,7 +198,10 @@ async fn test_engine_drop_region_for_custom_store() {
// Drop the custom region.
engine
.handle_request(custom_region_id, RegionRequest::Drop(RegionDropRequest {}))
.handle_request(
custom_region_id,
RegionRequest::Drop(RegionDropRequest { fast_path: false }),
)
.await
.unwrap();
assert!(!engine.is_region_exists(custom_region_id));

View File

@@ -301,10 +301,7 @@ impl PartitionTreeMemtable {
fn update_stats(&self, metrics: &WriteMetrics) {
// Only let the tracker tracks value bytes.
self.alloc_tracker.on_allocation(metrics.value_bytes);
self.max_timestamp
.fetch_max(metrics.max_ts, Ordering::SeqCst);
self.min_timestamp
.fetch_min(metrics.min_ts, Ordering::SeqCst);
metrics.update_timestamp_range(&self.max_timestamp, &self.min_timestamp);
}
}

View File

@@ -14,6 +14,8 @@
//! Internal metrics of the memtable.
use std::sync::atomic::{AtomicI64, Ordering};
/// Metrics of writing memtables.
pub(crate) struct WriteMetrics {
/// Size allocated by keys.
@@ -26,6 +28,51 @@ pub(crate) struct WriteMetrics {
pub(crate) max_ts: i64,
}
impl WriteMetrics {
/// Update the min/max timestamp range according to current write metric.
pub(crate) fn update_timestamp_range(&self, prev_max_ts: &AtomicI64, prev_min_ts: &AtomicI64) {
loop {
let current_min = prev_min_ts.load(Ordering::Relaxed);
if self.min_ts >= current_min {
break;
}
let Err(updated) = prev_min_ts.compare_exchange(
current_min,
self.min_ts,
Ordering::Relaxed,
Ordering::Relaxed,
) else {
break;
};
if updated == self.min_ts {
break;
}
}
loop {
let current_max = prev_max_ts.load(Ordering::Relaxed);
if self.max_ts <= current_max {
break;
}
let Err(updated) = prev_max_ts.compare_exchange(
current_max,
self.max_ts,
Ordering::Relaxed,
Ordering::Relaxed,
) else {
break;
};
if updated == self.max_ts {
break;
}
}
}
}
impl Default for WriteMetrics {
fn default() -> Self {
Self {

View File

@@ -146,8 +146,7 @@ impl TimeSeriesMemtable {
fn update_stats(&self, stats: WriteMetrics) {
self.alloc_tracker
.on_allocation(stats.key_bytes + stats.value_bytes);
self.max_timestamp.fetch_max(stats.max_ts, Ordering::SeqCst);
self.min_timestamp.fetch_min(stats.min_ts, Ordering::SeqCst);
stats.update_timestamp_range(&self.max_timestamp, &self.min_timestamp);
}
fn write_key_value(&self, kv: KeyValue, stats: &mut WriteMetrics) -> Result<()> {

View File

@@ -32,7 +32,6 @@ use tokio::sync::{mpsc, Semaphore};
use tokio_stream::wrappers::ReceiverStream;
use crate::access_layer::AccessLayerRef;
use crate::cache::file_cache::FileCacheRef;
use crate::cache::CacheStrategy;
use crate::config::DEFAULT_SCAN_CHANNEL_SIZE;
use crate::error::Result;
@@ -311,10 +310,13 @@ impl ScanRegion {
let memtables: Vec<_> = memtables
.into_iter()
.filter(|mem| {
// check if memtable is empty by reading stats.
let Some((start, end)) = mem.stats().time_range() else {
if mem.is_empty() {
return false;
};
}
let stats = mem.stats();
// Safety: the memtable is not empty.
let (start, end) = stats.time_range().unwrap();
// The time range of the memtable is inclusive.
let memtable_range = TimestampRange::new_inclusive(Some(start), Some(end));
memtable_range.intersects(&time_range)
@@ -424,12 +426,7 @@ impl ScanRegion {
return None;
}
let file_cache = || -> Option<FileCacheRef> {
let write_cache = self.cache_strategy.write_cache()?;
let file_cache = write_cache.file_cache();
Some(file_cache)
}();
let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
let inverted_index_cache = self.cache_strategy.inverted_index_cache().cloned();
let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
@@ -464,14 +461,8 @@ impl ScanRegion {
return None;
}
let file_cache = || -> Option<FileCacheRef> {
let write_cache = self.cache_strategy.write_cache()?;
let file_cache = write_cache.file_cache();
Some(file_cache)
}();
let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
let bloom_filter_index_cache = self.cache_strategy.bloom_filter_index_cache().cloned();
let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
BloomFilterIndexApplierBuilder::new(
@@ -496,12 +487,18 @@ impl ScanRegion {
return None;
}
let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
FulltextIndexApplierBuilder::new(
self.access_layer.region_dir().to_string(),
self.version.metadata.region_id,
self.access_layer.object_store().clone(),
self.access_layer.puffin_manager_factory().clone(),
self.version.metadata.as_ref(),
)
.with_file_cache(file_cache)
.with_puffin_metadata_cache(puffin_metadata_cache)
.build(&self.request.filters)
.inspect_err(|err| warn!(err; "Failed to build fulltext index applier"))
.ok()

View File

@@ -35,8 +35,8 @@ use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataRef};
use store_api::region_engine::{SetRegionRoleStateResponse, SettableRegionRoleState};
use store_api::region_request::{
AffectedRows, RegionAlterRequest, RegionCatchupRequest, RegionCloseRequest,
RegionCompactRequest, RegionCreateRequest, RegionDropRequest, RegionFlushRequest,
RegionOpenRequest, RegionRequest, RegionTruncateRequest,
RegionCompactRequest, RegionCreateRequest, RegionFlushRequest, RegionOpenRequest,
RegionRequest, RegionTruncateRequest,
};
use store_api::storage::{RegionId, SequenceNumber};
use tokio::sync::oneshot::{self, Receiver, Sender};
@@ -624,10 +624,10 @@ impl WorkerRequest {
sender: sender.into(),
request: DdlRequest::Create(v),
}),
RegionRequest::Drop(v) => WorkerRequest::Ddl(SenderDdlRequest {
RegionRequest::Drop(_) => WorkerRequest::Ddl(SenderDdlRequest {
region_id,
sender: sender.into(),
request: DdlRequest::Drop(v),
request: DdlRequest::Drop,
}),
RegionRequest::Open(v) => WorkerRequest::Ddl(SenderDdlRequest {
region_id,
@@ -690,7 +690,7 @@ impl WorkerRequest {
#[derive(Debug)]
pub(crate) enum DdlRequest {
Create(RegionCreateRequest),
Drop(RegionDropRequest),
Drop,
Open((RegionOpenRequest, Option<WalEntryReceiver>)),
Close(RegionCloseRequest),
Alter(RegionAlterRequest),

View File

@@ -174,31 +174,8 @@ impl FileMeta {
.contains(&IndexType::BloomFilterIndex)
}
/// Returns the size of the inverted index file
pub fn inverted_index_size(&self) -> Option<u64> {
if self.available_indexes.len() == 1 && self.inverted_index_available() {
Some(self.index_file_size)
} else {
None
}
}
/// Returns the size of the fulltext index file
pub fn fulltext_index_size(&self) -> Option<u64> {
if self.available_indexes.len() == 1 && self.fulltext_index_available() {
Some(self.index_file_size)
} else {
None
}
}
/// Returns the size of the bloom filter index file
pub fn bloom_filter_index_size(&self) -> Option<u64> {
if self.available_indexes.len() == 1 && self.bloom_filter_index_available() {
Some(self.index_file_size)
} else {
None
}
pub fn index_file_size(&self) -> u64 {
self.index_file_size
}
}

View File

@@ -113,11 +113,9 @@ impl FilePurger for LocalFilePurger {
}
// Purges index content in the stager.
let puffin_file_name =
crate::sst::location::index_file_path(sst_layer.region_dir(), file_meta.file_id);
if let Err(e) = sst_layer
.puffin_manager_factory()
.purge_stager(&puffin_file_name)
.purge_stager(file_meta.file_id)
.await
{
error!(e; "Failed to purge stager with index file, file_id: {}, region: {}",

View File

@@ -103,7 +103,6 @@ pub type BloomFilterOutput = IndexBaseOutput;
#[derive(Default)]
pub struct Indexer {
file_id: FileId,
file_path: String,
region_id: RegionId,
puffin_manager: Option<SstPuffinManager>,
inverted_indexer: Option<InvertedIndexer>,
@@ -170,7 +169,7 @@ impl Indexer {
#[async_trait::async_trait]
pub trait IndexerBuilder {
/// Builds indexer of given file id to [index_file_path].
async fn build(&self, file_id: FileId, index_file_path: String) -> Indexer;
async fn build(&self, file_id: FileId) -> Indexer;
}
pub(crate) struct IndexerBuilderImpl {
@@ -188,10 +187,9 @@ pub(crate) struct IndexerBuilderImpl {
#[async_trait::async_trait]
impl IndexerBuilder for IndexerBuilderImpl {
/// Sanity check for arguments and create a new [Indexer] if arguments are valid.
async fn build(&self, file_id: FileId, index_file_path: String) -> Indexer {
async fn build(&self, file_id: FileId) -> Indexer {
let mut indexer = Indexer {
file_id,
file_path: index_file_path,
region_id: self.metadata.region_id,
..Default::default()
};
@@ -392,30 +390,31 @@ mod tests {
use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
use super::*;
use crate::access_layer::FilePathProvider;
use crate::config::{FulltextIndexConfig, Mode};
struct MetaConfig {
with_tag: bool,
with_inverted: bool,
with_fulltext: bool,
with_skipping_bloom: bool,
}
fn mock_region_metadata(
MetaConfig {
with_tag,
with_inverted,
with_fulltext,
with_skipping_bloom,
}: MetaConfig,
) -> RegionMetadataRef {
let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 2));
let mut column_schema = ColumnSchema::new("a", ConcreteDataType::int64_datatype(), false);
if with_inverted {
column_schema = column_schema.with_inverted_index(true);
}
builder
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new("a", ConcreteDataType::int64_datatype(), false),
semantic_type: if with_tag {
SemanticType::Tag
} else {
SemanticType::Field
},
column_schema,
semantic_type: SemanticType::Field,
column_id: 1,
})
.push_column_metadata(ColumnMetadata {
@@ -433,10 +432,6 @@ mod tests {
column_id: 3,
});
if with_tag {
builder.primary_key(vec![1]);
}
if with_fulltext {
let column_schema =
ColumnSchema::new("text", ConcreteDataType::string_datatype(), true)
@@ -484,6 +479,18 @@ mod tests {
IntermediateManager::init_fs(path).await.unwrap()
}
struct NoopPathProvider;
impl FilePathProvider for NoopPathProvider {
fn build_index_file_path(&self, _file_id: FileId) -> String {
unreachable!()
}
fn build_sst_file_path(&self, _file_id: FileId) -> String {
unreachable!()
}
}
#[tokio::test]
async fn test_build_indexer_basic() {
let (dir, factory) =
@@ -491,7 +498,7 @@ mod tests {
let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
let metadata = mock_region_metadata(MetaConfig {
with_tag: true,
with_inverted: true,
with_fulltext: true,
with_skipping_bloom: true,
});
@@ -499,14 +506,14 @@ mod tests {
op_type: OperationType::Flush,
metadata,
row_group_size: 1024,
puffin_manager: factory.build(mock_object_store()),
puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
intermediate_manager: intm_manager,
index_options: IndexOptions::default(),
inverted_index_config: InvertedIndexConfig::default(),
fulltext_index_config: FulltextIndexConfig::default(),
bloom_filter_index_config: BloomFilterConfig::default(),
}
.build(FileId::random(), "test".to_string())
.build(FileId::random())
.await;
assert!(indexer.inverted_indexer.is_some());
@@ -521,7 +528,7 @@ mod tests {
let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
let metadata = mock_region_metadata(MetaConfig {
with_tag: true,
with_inverted: true,
with_fulltext: true,
with_skipping_bloom: true,
});
@@ -529,7 +536,7 @@ mod tests {
op_type: OperationType::Flush,
metadata: metadata.clone(),
row_group_size: 1024,
puffin_manager: factory.build(mock_object_store()),
puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
intermediate_manager: intm_manager.clone(),
index_options: IndexOptions::default(),
inverted_index_config: InvertedIndexConfig {
@@ -539,7 +546,7 @@ mod tests {
fulltext_index_config: FulltextIndexConfig::default(),
bloom_filter_index_config: BloomFilterConfig::default(),
}
.build(FileId::random(), "test".to_string())
.build(FileId::random())
.await;
assert!(indexer.inverted_indexer.is_none());
@@ -550,7 +557,7 @@ mod tests {
op_type: OperationType::Compact,
metadata: metadata.clone(),
row_group_size: 1024,
puffin_manager: factory.build(mock_object_store()),
puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
intermediate_manager: intm_manager.clone(),
index_options: IndexOptions::default(),
inverted_index_config: InvertedIndexConfig::default(),
@@ -560,7 +567,7 @@ mod tests {
},
bloom_filter_index_config: BloomFilterConfig::default(),
}
.build(FileId::random(), "test".to_string())
.build(FileId::random())
.await;
assert!(indexer.inverted_indexer.is_some());
@@ -571,7 +578,7 @@ mod tests {
op_type: OperationType::Compact,
metadata,
row_group_size: 1024,
puffin_manager: factory.build(mock_object_store()),
puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
intermediate_manager: intm_manager,
index_options: IndexOptions::default(),
inverted_index_config: InvertedIndexConfig::default(),
@@ -581,7 +588,7 @@ mod tests {
..Default::default()
},
}
.build(FileId::random(), "test".to_string())
.build(FileId::random())
.await;
assert!(indexer.inverted_indexer.is_some());
@@ -596,7 +603,7 @@ mod tests {
let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
let metadata = mock_region_metadata(MetaConfig {
with_tag: false,
with_inverted: false,
with_fulltext: true,
with_skipping_bloom: true,
});
@@ -604,14 +611,14 @@ mod tests {
op_type: OperationType::Flush,
metadata: metadata.clone(),
row_group_size: 1024,
puffin_manager: factory.build(mock_object_store()),
puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
intermediate_manager: intm_manager.clone(),
index_options: IndexOptions::default(),
inverted_index_config: InvertedIndexConfig::default(),
fulltext_index_config: FulltextIndexConfig::default(),
bloom_filter_index_config: BloomFilterConfig::default(),
}
.build(FileId::random(), "test".to_string())
.build(FileId::random())
.await;
assert!(indexer.inverted_indexer.is_none());
@@ -619,7 +626,7 @@ mod tests {
assert!(indexer.bloom_filter_indexer.is_some());
let metadata = mock_region_metadata(MetaConfig {
with_tag: true,
with_inverted: true,
with_fulltext: false,
with_skipping_bloom: true,
});
@@ -627,14 +634,14 @@ mod tests {
op_type: OperationType::Flush,
metadata: metadata.clone(),
row_group_size: 1024,
puffin_manager: factory.build(mock_object_store()),
puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
intermediate_manager: intm_manager.clone(),
index_options: IndexOptions::default(),
inverted_index_config: InvertedIndexConfig::default(),
fulltext_index_config: FulltextIndexConfig::default(),
bloom_filter_index_config: BloomFilterConfig::default(),
}
.build(FileId::random(), "test".to_string())
.build(FileId::random())
.await;
assert!(indexer.inverted_indexer.is_some());
@@ -642,7 +649,7 @@ mod tests {
assert!(indexer.bloom_filter_indexer.is_some());
let metadata = mock_region_metadata(MetaConfig {
with_tag: true,
with_inverted: true,
with_fulltext: true,
with_skipping_bloom: false,
});
@@ -650,14 +657,14 @@ mod tests {
op_type: OperationType::Flush,
metadata: metadata.clone(),
row_group_size: 1024,
puffin_manager: factory.build(mock_object_store()),
puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
intermediate_manager: intm_manager,
index_options: IndexOptions::default(),
inverted_index_config: InvertedIndexConfig::default(),
fulltext_index_config: FulltextIndexConfig::default(),
bloom_filter_index_config: BloomFilterConfig::default(),
}
.build(FileId::random(), "test".to_string())
.build(FileId::random())
.await;
assert!(indexer.inverted_indexer.is_some());
@@ -672,7 +679,7 @@ mod tests {
let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
let metadata = mock_region_metadata(MetaConfig {
with_tag: true,
with_inverted: true,
with_fulltext: true,
with_skipping_bloom: true,
});
@@ -680,14 +687,14 @@ mod tests {
op_type: OperationType::Flush,
metadata,
row_group_size: 0,
puffin_manager: factory.build(mock_object_store()),
puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
intermediate_manager: intm_manager,
index_options: IndexOptions::default(),
inverted_index_config: InvertedIndexConfig::default(),
fulltext_index_config: FulltextIndexConfig::default(),
bloom_filter_index_config: BloomFilterConfig::default(),
}
.build(FileId::random(), "test".to_string())
.build(FileId::random())
.await;
assert!(indexer.inverted_indexer.is_none());

View File

@@ -28,6 +28,7 @@ use puffin::puffin_manager::{BlobGuard, PuffinManager, PuffinReader};
use snafu::ResultExt;
use store_api::storage::{ColumnId, RegionId};
use crate::access_layer::{RegionFilePathFactory, WriteCachePathProvider};
use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey};
use crate::cache::index::bloom_filter_index::{
BloomFilterIndexCacheRef, CachedBloomFilterIndexBlobReader,
@@ -43,7 +44,6 @@ use crate::sst::index::bloom_filter::applier::builder::Predicate;
use crate::sst::index::bloom_filter::INDEX_BLOB_TYPE;
use crate::sst::index::puffin_manager::{BlobReader, PuffinManagerFactory};
use crate::sst::index::TYPE_BLOOM_FILTER_INDEX;
use crate::sst::location;
pub(crate) type BloomFilterIndexApplierRef = Arc<BloomFilterIndexApplier>;
@@ -247,11 +247,12 @@ impl BloomFilterIndexApplier {
return Ok(None);
};
let puffin_manager = self.puffin_manager_factory.build(file_cache.local_store());
let puffin_file_name = file_cache.cache_file_path(index_key);
let puffin_manager = self.puffin_manager_factory.build(
file_cache.local_store(),
WriteCachePathProvider::new(self.region_id, file_cache.clone()),
);
let reader = puffin_manager
.reader(&puffin_file_name)
.reader(&file_id)
.await
.context(PuffinBuildReaderSnafu)?
.with_file_size_hint(file_size_hint)
@@ -278,12 +279,14 @@ impl BloomFilterIndexApplier {
) -> Result<BlobReader> {
let puffin_manager = self
.puffin_manager_factory
.build(self.object_store.clone())
.build(
self.object_store.clone(),
RegionFilePathFactory::new(self.region_dir.clone()),
)
.with_puffin_metadata_cache(self.puffin_metadata_cache.clone());
let file_path = location::index_file_path(&self.region_dir, file_id);
puffin_manager
.reader(&file_path)
.reader(&file_id)
.await
.context(PuffinBuildReaderSnafu)?
.with_file_size_hint(file_size_hint)
@@ -447,7 +450,6 @@ mod tests {
let memory_usage_threshold = Some(1024);
let file_id = FileId::random();
let region_dir = "region_dir".to_string();
let path = location::index_file_path(&region_dir, file_id);
let mut indexer =
BloomFilterIndexer::new(file_id, &region_metadata, intm_mgr, memory_usage_threshold)
@@ -460,9 +462,12 @@ mod tests {
let mut batch = new_batch("tag2", 10..20);
indexer.update(&mut batch).await.unwrap();
let puffin_manager = factory.build(object_store.clone());
let puffin_manager = factory.build(
object_store.clone(),
RegionFilePathFactory::new(region_dir.clone()),
);
let mut puffin_writer = puffin_manager.writer(&path).await.unwrap();
let mut puffin_writer = puffin_manager.writer(&file_id).await.unwrap();
indexer.finish(&mut puffin_writer).await.unwrap();
puffin_writer.finish().await.unwrap();

View File

@@ -356,6 +356,7 @@ pub(crate) mod tests {
use store_api::storage::RegionId;
use super::*;
use crate::access_layer::FilePathProvider;
use crate::read::BatchColumn;
use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt};
use crate::sst::index::puffin_manager::PuffinManagerFactory;
@@ -368,6 +369,18 @@ pub(crate) mod tests {
IntermediateManager::init_fs(path).await.unwrap()
}
pub struct TestPathProvider;
impl FilePathProvider for TestPathProvider {
fn build_index_file_path(&self, file_id: FileId) -> String {
file_id.to_string()
}
fn build_sst_file_path(&self, file_id: FileId) -> String {
file_id.to_string()
}
}
/// tag_str:
/// - type: string
/// - index: bloom filter
@@ -483,16 +496,16 @@ pub(crate) mod tests {
indexer.update(&mut batch).await.unwrap();
let (_d, factory) = PuffinManagerFactory::new_for_test_async(prefix).await;
let puffin_manager = factory.build(object_store);
let puffin_manager = factory.build(object_store, TestPathProvider);
let index_file_name = "index_file";
let mut puffin_writer = puffin_manager.writer(index_file_name).await.unwrap();
let file_id = FileId::random();
let mut puffin_writer = puffin_manager.writer(&file_id).await.unwrap();
let (row_count, byte_count) = indexer.finish(&mut puffin_writer).await.unwrap();
assert_eq!(row_count, 20);
assert!(byte_count > 0);
puffin_writer.finish().await.unwrap();
let puffin_reader = puffin_manager.reader(index_file_name).await.unwrap();
let puffin_reader = puffin_manager.reader(&file_id).await.unwrap();
// tag_str
{

View File

@@ -15,19 +15,22 @@
use std::collections::BTreeSet;
use std::sync::Arc;
use common_telemetry::warn;
use index::fulltext_index::search::{FulltextIndexSearcher, RowId, TantivyFulltextIndexSearcher};
use object_store::ObjectStore;
use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
use puffin::puffin_manager::{DirGuard, PuffinManager, PuffinReader};
use snafu::ResultExt;
use store_api::storage::ColumnId;
use store_api::storage::{ColumnId, RegionId};
use crate::access_layer::{RegionFilePathFactory, WriteCachePathProvider};
use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey};
use crate::error::{ApplyFulltextIndexSnafu, PuffinBuildReaderSnafu, PuffinReadBlobSnafu, Result};
use crate::metrics::INDEX_APPLY_ELAPSED;
use crate::sst::file::FileId;
use crate::sst::index::fulltext_index::INDEX_BLOB_TYPE_TANTIVY;
use crate::sst::index::puffin_manager::{PuffinManagerFactory, SstPuffinDir};
use crate::sst::index::TYPE_FULLTEXT_INDEX;
use crate::sst::location;
pub mod builder;
@@ -36,6 +39,9 @@ pub struct FulltextIndexApplier {
/// The root directory of the region.
region_dir: String,
/// The region ID.
region_id: RegionId,
/// Queries to apply to the index.
queries: Vec<(ColumnId, String)>,
@@ -44,6 +50,12 @@ pub struct FulltextIndexApplier {
/// Store responsible for accessing index files.
store: ObjectStore,
/// File cache to be used by the `FulltextIndexApplier`.
file_cache: Option<FileCacheRef>,
/// The puffin metadata cache.
puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
}
pub type FulltextIndexApplierRef = Arc<FulltextIndexApplier>;
@@ -52,20 +64,43 @@ impl FulltextIndexApplier {
/// Creates a new `FulltextIndexApplier`.
pub fn new(
region_dir: String,
region_id: RegionId,
store: ObjectStore,
queries: Vec<(ColumnId, String)>,
puffin_manager_factory: PuffinManagerFactory,
) -> Self {
Self {
region_dir,
region_id,
store,
queries,
puffin_manager_factory,
file_cache: None,
puffin_metadata_cache: None,
}
}
/// Sets the file cache.
pub fn with_file_cache(mut self, file_cache: Option<FileCacheRef>) -> Self {
self.file_cache = file_cache;
self
}
/// Sets the puffin metadata cache.
pub fn with_puffin_metadata_cache(
mut self,
puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
) -> Self {
self.puffin_metadata_cache = puffin_metadata_cache;
self
}
/// Applies the queries to the fulltext index of the specified SST file.
pub async fn apply(&self, file_id: FileId) -> Result<BTreeSet<RowId>> {
pub async fn apply(
&self,
file_id: FileId,
file_size_hint: Option<u64>,
) -> Result<BTreeSet<RowId>> {
let _timer = INDEX_APPLY_ELAPSED
.with_label_values(&[TYPE_FULLTEXT_INDEX])
.start_timer();
@@ -74,7 +109,9 @@ impl FulltextIndexApplier {
let mut row_ids = BTreeSet::new();
for (column_id, query) in &self.queries {
let dir = self.index_dir_path(file_id, *column_id).await?;
let dir = self
.index_dir_path(file_id, *column_id, file_size_hint)
.await?;
let path = match &dir {
Some(dir) => dir.path(),
None => {
@@ -110,15 +147,74 @@ impl FulltextIndexApplier {
&self,
file_id: FileId,
column_id: ColumnId,
file_size_hint: Option<u64>,
) -> Result<Option<SstPuffinDir>> {
let puffin_manager = self.puffin_manager_factory.build(self.store.clone());
let file_path = location::index_file_path(&self.region_dir, file_id);
let blob_key = format!("{INDEX_BLOB_TYPE_TANTIVY}-{column_id}");
match puffin_manager
.reader(&file_path)
// FAST PATH: Try to read the index from the file cache.
if let Some(file_cache) = &self.file_cache {
let index_key = IndexKey::new(self.region_id, file_id, FileType::Puffin);
if file_cache.get(index_key).await.is_some() {
match self
.get_index_from_file_cache(file_cache, file_id, file_size_hint, &blob_key)
.await
{
Ok(dir) => return Ok(dir),
Err(err) => {
warn!(err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file.")
}
}
}
}
// SLOW PATH: Try to read the index from the remote file.
self.get_index_from_remote_file(file_id, file_size_hint, &blob_key)
.await
}
async fn get_index_from_file_cache(
&self,
file_cache: &FileCacheRef,
file_id: FileId,
file_size_hint: Option<u64>,
blob_key: &str,
) -> Result<Option<SstPuffinDir>> {
match self
.puffin_manager_factory
.build(
file_cache.local_store(),
WriteCachePathProvider::new(self.region_id, file_cache.clone()),
)
.reader(&file_id)
.await
.context(PuffinBuildReaderSnafu)?
.dir(&format!("{INDEX_BLOB_TYPE_TANTIVY}-{column_id}"))
.with_file_size_hint(file_size_hint)
.dir(blob_key)
.await
{
Ok(dir) => Ok(Some(dir)),
Err(puffin::error::Error::BlobNotFound { .. }) => Ok(None),
Err(err) => Err(err).context(PuffinReadBlobSnafu),
}
}
async fn get_index_from_remote_file(
&self,
file_id: FileId,
file_size_hint: Option<u64>,
blob_key: &str,
) -> Result<Option<SstPuffinDir>> {
match self
.puffin_manager_factory
.build(
self.store.clone(),
RegionFilePathFactory::new(self.region_dir.clone()),
)
.reader(&file_id)
.await
.context(PuffinBuildReaderSnafu)?
.with_file_size_hint(file_size_hint)
.dir(blob_key)
.await
{
Ok(dir) => Ok(Some(dir)),

View File

@@ -15,9 +15,11 @@
use datafusion_common::ScalarValue;
use datafusion_expr::Expr;
use object_store::ObjectStore;
use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
use store_api::metadata::RegionMetadata;
use store_api::storage::{ColumnId, ConcreteDataType};
use store_api::storage::{ColumnId, ConcreteDataType, RegionId};
use crate::cache::file_cache::FileCacheRef;
use crate::error::Result;
use crate::sst::index::fulltext_index::applier::FulltextIndexApplier;
use crate::sst::index::puffin_manager::PuffinManagerFactory;
@@ -25,27 +27,49 @@ use crate::sst::index::puffin_manager::PuffinManagerFactory;
/// `FulltextIndexApplierBuilder` is a builder for `FulltextIndexApplier`.
pub struct FulltextIndexApplierBuilder<'a> {
region_dir: String,
region_id: RegionId,
store: ObjectStore,
puffin_manager_factory: PuffinManagerFactory,
metadata: &'a RegionMetadata,
file_cache: Option<FileCacheRef>,
puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
}
impl<'a> FulltextIndexApplierBuilder<'a> {
/// Creates a new `FulltextIndexApplierBuilder`.
pub fn new(
region_dir: String,
region_id: RegionId,
store: ObjectStore,
puffin_manager_factory: PuffinManagerFactory,
metadata: &'a RegionMetadata,
) -> Self {
Self {
region_dir,
region_id,
store,
puffin_manager_factory,
metadata,
file_cache: None,
puffin_metadata_cache: None,
}
}
/// Sets the file cache to be used by the `FulltextIndexApplier`.
pub fn with_file_cache(mut self, file_cache: Option<FileCacheRef>) -> Self {
self.file_cache = file_cache;
self
}
/// Sets the puffin metadata cache to be used by the `FulltextIndexApplier`.
pub fn with_puffin_metadata_cache(
mut self,
puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
) -> Self {
self.puffin_metadata_cache = puffin_metadata_cache;
self
}
/// Builds `SstIndexApplier` from the given expressions.
pub fn build(self, exprs: &[Expr]) -> Result<Option<FulltextIndexApplier>> {
let mut queries = Vec::with_capacity(exprs.len());
@@ -58,10 +82,13 @@ impl<'a> FulltextIndexApplierBuilder<'a> {
Ok((!queries.is_empty()).then(|| {
FulltextIndexApplier::new(
self.region_dir,
self.region_id,
self.store,
queries,
self.puffin_manager_factory,
)
.with_file_cache(self.file_cache)
.with_puffin_metadata_cache(self.puffin_metadata_cache)
}))
}

View File

@@ -350,11 +350,11 @@ mod tests {
use store_api::storage::{ConcreteDataType, RegionId};
use super::*;
use crate::access_layer::RegionFilePathFactory;
use crate::read::{Batch, BatchColumn};
use crate::sst::file::FileId;
use crate::sst::index::fulltext_index::applier::FulltextIndexApplier;
use crate::sst::index::puffin_manager::PuffinManagerFactory;
use crate::sst::location;
fn mock_object_store() -> ObjectStore {
ObjectStore::new(Memory::default()).unwrap().finish()
@@ -494,7 +494,6 @@ mod tests {
let (d, factory) = PuffinManagerFactory::new_for_test_async(prefix).await;
let region_dir = "region0".to_string();
let sst_file_id = FileId::random();
let file_path = location::index_file_path(&region_dir, sst_file_id);
let object_store = mock_object_store();
let region_metadata = mock_region_metadata();
let intm_mgr = new_intm_mgr(d.path().to_string_lossy()).await;
@@ -514,8 +513,11 @@ mod tests {
let mut batch = new_batch(rows);
indexer.update(&mut batch).await.unwrap();
let puffin_manager = factory.build(object_store.clone());
let mut writer = puffin_manager.writer(&file_path).await.unwrap();
let puffin_manager = factory.build(
object_store.clone(),
RegionFilePathFactory::new(region_dir.clone()),
);
let mut writer = puffin_manager.writer(&sst_file_id).await.unwrap();
let _ = indexer.finish(&mut writer).await.unwrap();
writer.finish().await.unwrap();
@@ -523,6 +525,7 @@ mod tests {
let _d = &d;
let applier = FulltextIndexApplier::new(
region_dir.clone(),
region_metadata.region_id,
object_store.clone(),
queries
.into_iter()
@@ -531,7 +534,7 @@ mod tests {
factory.clone(),
);
async move { applier.apply(sst_file_id).await.unwrap() }.boxed()
async move { applier.apply(sst_file_id, None).await.unwrap() }.boxed()
}
}

View File

@@ -62,7 +62,7 @@ impl Indexer {
async fn build_puffin_writer(&mut self) -> Option<SstPuffinWriter> {
let puffin_manager = self.puffin_manager.take()?;
let err = match puffin_manager.writer(&self.file_path).await {
let err = match puffin_manager.writer(&self.file_id).await {
Ok(writer) => return Some(writer),
Err(err) => err,
};

View File

@@ -28,6 +28,7 @@ use puffin::puffin_manager::{BlobGuard, PuffinManager, PuffinReader};
use snafu::ResultExt;
use store_api::storage::RegionId;
use crate::access_layer::{RegionFilePathFactory, WriteCachePathProvider};
use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey};
use crate::cache::index::inverted_index::{CachedInvertedIndexBlobReader, InvertedIndexCacheRef};
use crate::error::{
@@ -38,7 +39,6 @@ use crate::sst::file::FileId;
use crate::sst::index::inverted_index::INDEX_BLOB_TYPE;
use crate::sst::index::puffin_manager::{BlobReader, PuffinManagerFactory};
use crate::sst::index::TYPE_INVERTED_INDEX;
use crate::sst::location;
/// `InvertedIndexApplier` is responsible for applying predicates to the provided SST files
/// and returning the relevant row group ids for further scan.
@@ -172,12 +172,14 @@ impl InvertedIndexApplier {
return Ok(None);
};
let puffin_manager = self.puffin_manager_factory.build(file_cache.local_store());
let puffin_file_name = file_cache.cache_file_path(index_key);
let puffin_manager = self.puffin_manager_factory.build(
file_cache.local_store(),
WriteCachePathProvider::new(self.region_id, file_cache.clone()),
);
// Adds file size hint to the puffin reader to avoid extra metadata read.
let reader = puffin_manager
.reader(&puffin_file_name)
.reader(&file_id)
.await
.context(PuffinBuildReaderSnafu)?
.with_file_size_hint(file_size_hint)
@@ -198,12 +200,14 @@ impl InvertedIndexApplier {
) -> Result<BlobReader> {
let puffin_manager = self
.puffin_manager_factory
.build(self.store.clone())
.build(
self.store.clone(),
RegionFilePathFactory::new(self.region_dir.clone()),
)
.with_puffin_metadata_cache(self.puffin_metadata_cache.clone());
let file_path = location::index_file_path(&self.region_dir, file_id);
puffin_manager
.reader(&file_path)
.reader(&file_id)
.await
.context(PuffinBuildReaderSnafu)?
.with_file_size_hint(file_size_hint)
@@ -239,10 +243,12 @@ mod tests {
let object_store = ObjectStore::new(Memory::default()).unwrap().finish();
let file_id = FileId::random();
let region_dir = "region_dir".to_string();
let path = location::index_file_path(&region_dir, file_id);
let puffin_manager = puffin_manager_factory.build(object_store.clone());
let mut writer = puffin_manager.writer(&path).await.unwrap();
let puffin_manager = puffin_manager_factory.build(
object_store.clone(),
RegionFilePathFactory::new(region_dir.clone()),
);
let mut writer = puffin_manager.writer(&file_id).await.unwrap();
writer
.put_blob(INDEX_BLOB_TYPE, Cursor::new(vec![]), Default::default())
.await
@@ -285,10 +291,12 @@ mod tests {
let object_store = ObjectStore::new(Memory::default()).unwrap().finish();
let file_id = FileId::random();
let region_dir = "region_dir".to_string();
let path = location::index_file_path(&region_dir, file_id);
let puffin_manager = puffin_manager_factory.build(object_store.clone());
let mut writer = puffin_manager.writer(&path).await.unwrap();
let puffin_manager = puffin_manager_factory.build(
object_store.clone(),
RegionFilePathFactory::new(region_dir.clone()),
);
let mut writer = puffin_manager.writer(&file_id).await.unwrap();
writer
.put_blob("invalid_blob_type", Cursor::new(vec![]), Default::default())
.await

View File

@@ -336,13 +336,13 @@ mod tests {
use store_api::storage::RegionId;
use super::*;
use crate::access_layer::RegionFilePathFactory;
use crate::cache::index::inverted_index::InvertedIndexCache;
use crate::metrics::CACHE_BYTES;
use crate::read::BatchColumn;
use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt};
use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
use crate::sst::index::puffin_manager::PuffinManagerFactory;
use crate::sst::location;
fn mock_object_store() -> ObjectStore {
ObjectStore::new(Memory::default()).unwrap().finish()
@@ -438,7 +438,6 @@ mod tests {
let (d, factory) = PuffinManagerFactory::new_for_test_async(prefix).await;
let region_dir = "region0".to_string();
let sst_file_id = FileId::random();
let file_path = location::index_file_path(&region_dir, sst_file_id);
let object_store = mock_object_store();
let region_metadata = mock_region_metadata();
let intm_mgr = new_intm_mgr(d.path().to_string_lossy()).await;
@@ -460,8 +459,11 @@ mod tests {
creator.update(&mut batch).await.unwrap();
}
let puffin_manager = factory.build(object_store.clone());
let mut writer = puffin_manager.writer(&file_path).await.unwrap();
let puffin_manager = factory.build(
object_store.clone(),
RegionFilePathFactory::new(region_dir.clone()),
);
let mut writer = puffin_manager.writer(&sst_file_id).await.unwrap();
let (row_count, _) = creator.finish(&mut writer).await.unwrap();
assert_eq!(row_count, rows.len() * segment_row_count);
writer.finish().await.unwrap();

View File

@@ -26,18 +26,20 @@ use puffin::puffin_manager::stager::{BoundedStager, Stager};
use puffin::puffin_manager::{BlobGuard, PuffinManager, PuffinReader};
use snafu::ResultExt;
use crate::access_layer::FilePathProvider;
use crate::error::{PuffinInitStagerSnafu, PuffinPurgeStagerSnafu, Result};
use crate::metrics::{
StagerMetrics, INDEX_PUFFIN_FLUSH_OP_TOTAL, INDEX_PUFFIN_READ_BYTES_TOTAL,
INDEX_PUFFIN_READ_OP_TOTAL, INDEX_PUFFIN_WRITE_BYTES_TOTAL, INDEX_PUFFIN_WRITE_OP_TOTAL,
};
use crate::sst::file::FileId;
use crate::sst::index::store::{self, InstrumentedStore};
type InstrumentedRangeReader = store::InstrumentedRangeReader<'static>;
type InstrumentedAsyncWrite = store::InstrumentedAsyncWrite<'static, FuturesAsyncWriter>;
pub(crate) type SstPuffinManager =
FsPuffinManager<Arc<BoundedStager>, ObjectStorePuffinFileAccessor>;
FsPuffinManager<Arc<BoundedStager<FileId>>, ObjectStorePuffinFileAccessor>;
pub(crate) type SstPuffinReader = <SstPuffinManager as PuffinManager>::Reader;
pub(crate) type SstPuffinWriter = <SstPuffinManager as PuffinManager>::Writer;
pub(crate) type SstPuffinBlob = <SstPuffinReader as PuffinReader>::Blob;
@@ -50,7 +52,7 @@ const STAGING_DIR: &str = "staging";
#[derive(Clone)]
pub struct PuffinManagerFactory {
/// The stager used by the puffin manager.
stager: Arc<BoundedStager>,
stager: Arc<BoundedStager<FileId>>,
/// The size of the write buffer used to create object store.
write_buffer_size: Option<usize>,
@@ -79,15 +81,20 @@ impl PuffinManagerFactory {
})
}
pub(crate) fn build(&self, store: ObjectStore) -> SstPuffinManager {
pub(crate) fn build(
&self,
store: ObjectStore,
path_provider: impl FilePathProvider + 'static,
) -> SstPuffinManager {
let store = InstrumentedStore::new(store).with_write_buffer_size(self.write_buffer_size);
let puffin_file_accessor = ObjectStorePuffinFileAccessor::new(store);
let puffin_file_accessor =
ObjectStorePuffinFileAccessor::new(store, Arc::new(path_provider));
SstPuffinManager::new(self.stager.clone(), puffin_file_accessor)
}
pub(crate) async fn purge_stager(&self, puffin_file_name: &str) -> Result<()> {
pub(crate) async fn purge_stager(&self, file_id: FileId) -> Result<()> {
self.stager
.purge(puffin_file_name)
.purge(&file_id)
.await
.context(PuffinPurgeStagerSnafu)
}
@@ -119,11 +126,15 @@ impl PuffinManagerFactory {
#[derive(Clone)]
pub(crate) struct ObjectStorePuffinFileAccessor {
object_store: InstrumentedStore,
path_provider: Arc<dyn FilePathProvider>,
}
impl ObjectStorePuffinFileAccessor {
pub fn new(object_store: InstrumentedStore) -> Self {
Self { object_store }
pub fn new(object_store: InstrumentedStore, path_provider: Arc<dyn FilePathProvider>) -> Self {
Self {
object_store,
path_provider,
}
}
}
@@ -131,11 +142,13 @@ impl ObjectStorePuffinFileAccessor {
impl PuffinFileAccessor for ObjectStorePuffinFileAccessor {
type Reader = InstrumentedRangeReader;
type Writer = InstrumentedAsyncWrite;
type FileHandle = FileId;
async fn reader(&self, puffin_file_name: &str) -> PuffinResult<Self::Reader> {
async fn reader(&self, handle: &FileId) -> PuffinResult<Self::Reader> {
let file_path = self.path_provider.build_index_file_path(*handle);
self.object_store
.range_reader(
puffin_file_name,
&file_path,
&INDEX_PUFFIN_READ_BYTES_TOTAL,
&INDEX_PUFFIN_READ_OP_TOTAL,
)
@@ -144,10 +157,11 @@ impl PuffinFileAccessor for ObjectStorePuffinFileAccessor {
.context(puffin_error::ExternalSnafu)
}
async fn writer(&self, puffin_file_name: &str) -> PuffinResult<Self::Writer> {
async fn writer(&self, handle: &FileId) -> PuffinResult<Self::Writer> {
let file_path = self.path_provider.build_index_file_path(*handle);
self.object_store
.writer(
puffin_file_name,
&file_path,
&INDEX_PUFFIN_WRITE_BYTES_TOTAL,
&INDEX_PUFFIN_WRITE_OP_TOTAL,
&INDEX_PUFFIN_FLUSH_OP_TOTAL,
@@ -169,20 +183,32 @@ mod tests {
use super::*;
struct TestFilePathProvider;
impl FilePathProvider for TestFilePathProvider {
fn build_index_file_path(&self, file_id: FileId) -> String {
file_id.to_string()
}
fn build_sst_file_path(&self, file_id: FileId) -> String {
file_id.to_string()
}
}
#[tokio::test]
async fn test_puffin_manager_factory() {
let (_dir, factory) =
PuffinManagerFactory::new_for_test_async("test_puffin_manager_factory_").await;
let object_store = ObjectStore::new(Memory::default()).unwrap().finish();
let manager = factory.build(object_store);
let manager = factory.build(object_store, TestFilePathProvider);
let file_name = "my-puffin-file";
let file_id = FileId::random();
let blob_key = "blob-key";
let dir_key = "dir-key";
let raw_data = b"hello world!";
let mut writer = manager.writer(file_name).await.unwrap();
let mut writer = manager.writer(&file_id).await.unwrap();
writer
.put_blob(blob_key, Cursor::new(raw_data), PutOptions::default())
.await
@@ -203,7 +229,7 @@ mod tests {
.unwrap();
writer.finish().await.unwrap();
let reader = manager.reader(file_name).await.unwrap();
let reader = manager.reader(&file_id).await.unwrap();
let blob_guard = reader.blob(blob_key).await.unwrap();
let blob_reader = blob_guard.reader().await.unwrap();
let meta = blob_reader.metadata().await.unwrap();

View File

@@ -131,7 +131,7 @@ mod tests {
#[async_trait::async_trait]
impl IndexerBuilder for NoopIndexBuilder {
async fn build(&self, _file_id: FileId, _path: String) -> Indexer {
async fn build(&self, _file_id: FileId) -> Indexer {
Indexer::default()
}
}

View File

@@ -387,7 +387,11 @@ impl ParquetReaderBuilder {
return false;
}
let apply_res = match index_applier.apply(self.file_handle.file_id()).await {
let file_size_hint = self.file_handle.meta_ref().index_file_size();
let apply_res = match index_applier
.apply(self.file_handle.file_id(), Some(file_size_hint))
.await
{
Ok(res) => res,
Err(err) => {
if cfg!(any(test, feature = "test")) {
@@ -467,9 +471,9 @@ impl ParquetReaderBuilder {
if !self.file_handle.meta_ref().inverted_index_available() {
return false;
}
let file_size_hint = self.file_handle.meta_ref().inverted_index_size();
let file_size_hint = self.file_handle.meta_ref().index_file_size();
let apply_output = match index_applier
.apply(self.file_handle.file_id(), file_size_hint)
.apply(self.file_handle.file_id(), Some(file_size_hint))
.await
{
Ok(output) => output,
@@ -578,11 +582,11 @@ impl ParquetReaderBuilder {
return false;
}
let file_size_hint = self.file_handle.meta_ref().bloom_filter_index_size();
let file_size_hint = self.file_handle.meta_ref().index_file_size();
let apply_output = match index_applier
.apply(
self.file_handle.file_id(),
file_size_hint,
Some(file_size_hint),
parquet_meta
.row_groups()
.iter()

View File

@@ -121,8 +121,7 @@ where
path_provider: P,
) -> ParquetWriter<F, I, P> {
let init_file = FileId::random();
let index_file_path = path_provider.build_index_file_path(init_file);
let indexer = indexer_builder.build(init_file, index_file_path).await;
let indexer = indexer_builder.build(init_file).await;
ParquetWriter {
path_provider,
@@ -140,11 +139,7 @@ where
match self.current_indexer {
None => {
self.current_file = FileId::random();
let index_file_path = self.path_provider.build_index_file_path(self.current_file);
let indexer = self
.indexer_builder
.build(self.current_file, index_file_path)
.await;
let indexer = self.indexer_builder.build(self.current_file).await;
self.current_indexer = Some(indexer);
// safety: self.current_indexer already set above.
self.current_indexer.as_mut().unwrap()

View File

@@ -47,7 +47,8 @@ pub fn sst_region_metadata() -> RegionMetadata {
"tag_0".to_string(),
ConcreteDataType::string_datatype(),
true,
),
)
.with_inverted_index(true),
semantic_type: SemanticType::Tag,
column_id: 0,
})

View File

@@ -836,7 +836,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
for ddl in ddl_requests.drain(..) {
let res = match ddl.request {
DdlRequest::Create(req) => self.handle_create_request(ddl.region_id, req).await,
DdlRequest::Drop(_) => self.handle_drop_request(ddl.region_id).await,
DdlRequest::Drop => self.handle_drop_request(ddl.region_id).await,
DdlRequest::Open((req, wal_entry_receiver)) => {
self.handle_open_request(ddl.region_id, req, wal_entry_receiver, ddl.sender)
.await;

View File

@@ -21,7 +21,7 @@ use common_meta::node_manager::NodeManagerRef;
use common_query::error::Result;
use common_telemetry::tracing_context::TracingContext;
use futures::stream::FuturesUnordered;
use futures::{StreamExt, TryStreamExt};
use futures::StreamExt;
use session::context::QueryContextRef;
use snafu::{OptionExt, ResultExt};
@@ -81,7 +81,6 @@ impl FlowServiceOperator {
.flow_metadata_manager
.flow_route_manager()
.routes(id)
.try_collect::<Vec<_>>()
.await
.map_err(BoxedError::new)
.context(common_query::error::ExecuteSnafu)?;

View File

@@ -583,7 +583,8 @@ impl HistogramFoldStream {
.expect("field column should not be nullable");
counters.push(counter);
}
let result = Self::evaluate_row(self.quantile, &bucket, &counters)?;
// ignore invalid data
let result = Self::evaluate_row(self.quantile, &bucket, &counters).unwrap_or(f64::NAN);
self.output_buffer[self.field_column_index].push_value_ref(ValueRef::from(result));
cursor += bucket_num;
remaining_rows -= bucket_num;
@@ -672,7 +673,7 @@ impl HistogramFoldStream {
if bucket.len() <= 1 {
return Ok(f64::NAN);
}
if *bucket.last().unwrap() != f64::INFINITY {
if bucket.last().unwrap().is_finite() {
return Err(DataFusionError::Execution(
"last bucket should be +Inf".to_string(),
));
@@ -692,8 +693,8 @@ impl HistogramFoldStream {
}
// check input value
debug_assert!(bucket.windows(2).all(|w| w[0] <= w[1]));
debug_assert!(counter.windows(2).all(|w| w[0] <= w[1]));
debug_assert!(bucket.windows(2).all(|w| w[0] <= w[1]), "{bucket:?}");
debug_assert!(counter.windows(2).all(|w| w[0] <= w[1]), "{counter:?}");
let total = *counter.last().unwrap();
let expected_pos = total * quantile;

View File

@@ -21,6 +21,7 @@ mod idelta;
mod predict_linear;
mod quantile;
mod resets;
mod round;
#[cfg(test)]
mod test_util;
@@ -39,6 +40,7 @@ pub use idelta::IDelta;
pub use predict_linear::PredictLinear;
pub use quantile::QuantileOverTime;
pub use resets::Resets;
pub use round::Round;
pub(crate) fn extract_array(columnar_value: &ColumnarValue) -> Result<ArrayRef, DataFusionError> {
if let ColumnarValue::Array(array) = columnar_value {

View File

@@ -0,0 +1,105 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use datafusion::error::DataFusionError;
use datafusion_expr::{create_udf, ColumnarValue, ScalarUDF, Volatility};
use datatypes::arrow::array::AsArray;
use datatypes::arrow::datatypes::{DataType, Float64Type};
use datatypes::compute;
use crate::functions::extract_array;
pub struct Round {
nearest: f64,
}
impl Round {
fn new(nearest: f64) -> Self {
Self { nearest }
}
pub const fn name() -> &'static str {
"prom_round"
}
fn input_type() -> Vec<DataType> {
vec![DataType::Float64]
}
pub fn return_type() -> DataType {
DataType::Float64
}
pub fn scalar_udf(nearest: f64) -> ScalarUDF {
create_udf(
Self::name(),
Self::input_type(),
Self::return_type(),
Volatility::Immutable,
Arc::new(move |input: &_| Self::new(nearest).calc(input)) as _,
)
}
fn calc(&self, input: &[ColumnarValue]) -> Result<ColumnarValue, DataFusionError> {
assert_eq!(input.len(), 1);
let value_array = extract_array(&input[0])?;
if self.nearest == 0.0 {
let values = value_array.as_primitive::<Float64Type>();
let result = compute::unary::<_, _, Float64Type>(values, |a| a.round());
Ok(ColumnarValue::Array(Arc::new(result) as _))
} else {
let values = value_array.as_primitive::<Float64Type>();
let nearest = self.nearest;
let result =
compute::unary::<_, _, Float64Type>(values, |a| ((a / nearest).round() * nearest));
Ok(ColumnarValue::Array(Arc::new(result) as _))
}
}
}
#[cfg(test)]
mod tests {
use datatypes::arrow::array::Float64Array;
use super::*;
fn test_round_f64(value: Vec<f64>, nearest: f64, expected: Vec<f64>) {
let round_udf = Round::scalar_udf(nearest);
let input = vec![ColumnarValue::Array(Arc::new(Float64Array::from(value)))];
let result = round_udf.invoke_batch(&input, 1).unwrap();
let result_array = extract_array(&result).unwrap();
assert_eq!(result_array.len(), 1);
assert_eq!(
result_array.as_primitive::<Float64Type>().values(),
&expected
);
}
#[test]
fn test_round() {
test_round_f64(vec![123.456], 0.001, vec![123.456]);
test_round_f64(vec![123.456], 0.01, vec![123.46000000000001]);
test_round_f64(vec![123.456], 0.1, vec![123.5]);
test_round_f64(vec![123.456], 0.0, vec![123.0]);
test_round_f64(vec![123.456], 1.0, vec![123.0]);
test_round_f64(vec![123.456], 10.0, vec![120.0]);
test_round_f64(vec![123.456], 100.0, vec![100.0]);
test_round_f64(vec![123.456], 105.0, vec![105.0]);
test_round_f64(vec![123.456], 1000.0, vec![0.0]);
}
}

View File

@@ -36,12 +36,13 @@ use crate::file_metadata::FileMetadata;
pub trait PuffinManager {
type Reader: PuffinReader;
type Writer: PuffinWriter;
type FileHandle: ToString + Clone + Send + Sync;
/// Creates a `PuffinReader` for the specified `puffin_file_name`.
async fn reader(&self, puffin_file_name: &str) -> Result<Self::Reader>;
/// Creates a `PuffinReader` for the specified `handle`.
async fn reader(&self, handle: &Self::FileHandle) -> Result<Self::Reader>;
/// Creates a `PuffinWriter` for the specified `puffin_file_name`.
async fn writer(&self, puffin_file_name: &str) -> Result<Self::Writer>;
/// Creates a `PuffinWriter` for the specified `handle`.
async fn writer(&self, handle: &Self::FileHandle) -> Result<Self::Writer>;
}
/// The `PuffinWriter` trait provides methods for writing blobs and directories to a Puffin file.

View File

@@ -27,12 +27,13 @@ use crate::error::Result;
pub trait PuffinFileAccessor: Send + Sync + 'static {
type Reader: SizeAwareRangeReader + Sync;
type Writer: AsyncWrite + Unpin + Send;
type FileHandle: ToString + Clone + Send + Sync;
/// Opens a reader for the given puffin file.
async fn reader(&self, puffin_file_name: &str) -> Result<Self::Reader>;
/// Opens a reader for the given puffin file handle.
async fn reader(&self, handle: &Self::FileHandle) -> Result<Self::Reader>;
/// Creates a writer for the given puffin file.
async fn writer(&self, puffin_file_name: &str) -> Result<Self::Writer>;
/// Creates a writer for the given puffin file handle.
async fn writer(&self, handle: &Self::FileHandle) -> Result<Self::Writer>;
}
pub struct MockFileAccessor {
@@ -50,15 +51,16 @@ impl MockFileAccessor {
impl PuffinFileAccessor for MockFileAccessor {
type Reader = FileReader;
type Writer = Compat<File>;
type FileHandle = String;
async fn reader(&self, puffin_file_name: &str) -> Result<Self::Reader> {
Ok(FileReader::new(self.tempdir.path().join(puffin_file_name))
async fn reader(&self, handle: &String) -> Result<Self::Reader> {
Ok(FileReader::new(self.tempdir.path().join(handle))
.await
.unwrap())
}
async fn writer(&self, puffin_file_name: &str) -> Result<Self::Writer> {
let p = self.tempdir.path().join(puffin_file_name);
async fn writer(&self, handle: &String) -> Result<Self::Writer> {
let p = self.tempdir.path().join(handle);
if let Some(p) = p.parent() {
if !tokio::fs::try_exists(p).await.unwrap() {
tokio::fs::create_dir_all(p).await.unwrap();

View File

@@ -61,25 +61,26 @@ impl<S, F> FsPuffinManager<S, F> {
#[async_trait]
impl<S, F> PuffinManager for FsPuffinManager<S, F>
where
S: Stager + Clone + 'static,
F: PuffinFileAccessor + Clone,
S: Stager<FileHandle = F::FileHandle> + Clone + 'static,
{
type Reader = FsPuffinReader<S, F>;
type Writer = FsPuffinWriter<S, F::Writer>;
type FileHandle = F::FileHandle;
async fn reader(&self, puffin_file_name: &str) -> Result<Self::Reader> {
async fn reader(&self, handle: &Self::FileHandle) -> Result<Self::Reader> {
Ok(FsPuffinReader::new(
puffin_file_name.to_string(),
handle.clone(),
self.stager.clone(),
self.puffin_file_accessor.clone(),
self.puffin_metadata_cache.clone(),
))
}
async fn writer(&self, puffin_file_name: &str) -> Result<Self::Writer> {
let writer = self.puffin_file_accessor.writer(puffin_file_name).await?;
async fn writer(&self, handle: &Self::FileHandle) -> Result<Self::Writer> {
let writer = self.puffin_file_accessor.writer(handle).await?;
Ok(FsPuffinWriter::new(
puffin_file_name.to_string(),
handle.clone(),
self.stager.clone(),
writer,
))

View File

@@ -39,9 +39,13 @@ use crate::puffin_manager::stager::{BoxWriter, DirWriterProviderRef, Stager};
use crate::puffin_manager::{BlobGuard, PuffinReader};
/// `FsPuffinReader` is a `PuffinReader` that provides fs readers for puffin files.
pub struct FsPuffinReader<S, F> {
/// The name of the puffin file.
puffin_file_name: String,
pub struct FsPuffinReader<S, F>
where
S: Stager + 'static,
F: PuffinFileAccessor + Clone,
{
/// The handle of the puffin file.
handle: F::FileHandle,
/// The file size hint.
file_size_hint: Option<u64>,
@@ -56,15 +60,19 @@ pub struct FsPuffinReader<S, F> {
puffin_file_metadata_cache: Option<PuffinMetadataCacheRef>,
}
impl<S, F> FsPuffinReader<S, F> {
impl<S, F> FsPuffinReader<S, F>
where
S: Stager + 'static,
F: PuffinFileAccessor + Clone,
{
pub(crate) fn new(
puffin_file_name: String,
handle: F::FileHandle,
stager: S,
puffin_file_accessor: F,
puffin_file_metadata_cache: Option<PuffinMetadataCacheRef>,
) -> Self {
Self {
puffin_file_name,
handle,
file_size_hint: None,
stager,
puffin_file_accessor,
@@ -76,8 +84,8 @@ impl<S, F> FsPuffinReader<S, F> {
#[async_trait]
impl<S, F> PuffinReader for FsPuffinReader<S, F>
where
S: Stager + 'static,
F: PuffinFileAccessor + Clone,
S: Stager<FileHandle = F::FileHandle> + 'static,
{
type Blob = Either<RandomReadBlob<F>, S::Blob>;
type Dir = S::Dir;
@@ -88,19 +96,13 @@ where
}
async fn metadata(&self) -> Result<Arc<FileMetadata>> {
let reader = self
.puffin_file_accessor
.reader(&self.puffin_file_name)
.await?;
let reader = self.puffin_file_accessor.reader(&self.handle).await?;
let mut file = PuffinFileReader::new(reader);
self.get_puffin_file_metadata(&mut file).await
}
async fn blob(&self, key: &str) -> Result<Self::Blob> {
let mut reader = self
.puffin_file_accessor
.reader(&self.puffin_file_name)
.await?;
let mut reader = self.puffin_file_accessor.reader(&self.handle).await?;
if let Some(file_size_hint) = self.file_size_hint {
reader.with_file_size_hint(file_size_hint);
}
@@ -117,7 +119,7 @@ where
let blob = if blob_metadata.compression_codec.is_none() {
// If the blob is not compressed, we can directly read it from the puffin file.
Either::L(RandomReadBlob {
file_name: self.puffin_file_name.clone(),
handle: self.handle.clone(),
accessor: self.puffin_file_accessor.clone(),
blob_metadata,
})
@@ -126,7 +128,7 @@ where
let staged_blob = self
.stager
.get_blob(
self.puffin_file_name.as_str(),
&self.handle,
key,
Box::new(|writer| {
Box::pin(Self::init_blob_to_stager(file, blob_metadata, writer))
@@ -143,17 +145,18 @@ where
async fn dir(&self, key: &str) -> Result<Self::Dir> {
self.stager
.get_dir(
self.puffin_file_name.as_str(),
&self.handle,
key,
Box::new(|writer_provider| {
let accessor = self.puffin_file_accessor.clone();
let puffin_file_name = self.puffin_file_name.clone();
let handle = self.handle.clone();
let key = key.to_string();
Box::pin(Self::init_dir_to_stager(
puffin_file_name,
handle,
key,
writer_provider,
accessor,
self.file_size_hint,
))
}),
)
@@ -170,15 +173,16 @@ where
&self,
reader: &mut PuffinFileReader<F::Reader>,
) -> Result<Arc<FileMetadata>> {
let id = self.handle.to_string();
if let Some(cache) = self.puffin_file_metadata_cache.as_ref() {
if let Some(metadata) = cache.get_metadata(&self.puffin_file_name) {
if let Some(metadata) = cache.get_metadata(&id) {
return Ok(metadata);
}
}
let metadata = Arc::new(reader.metadata().await?);
if let Some(cache) = self.puffin_file_metadata_cache.as_ref() {
cache.put_metadata(self.puffin_file_name.to_string(), metadata.clone());
cache.put_metadata(id, metadata.clone());
}
Ok(metadata)
}
@@ -196,12 +200,16 @@ where
}
async fn init_dir_to_stager(
puffin_file_name: String,
handle: F::FileHandle,
key: String,
writer_provider: DirWriterProviderRef,
accessor: F,
file_size_hint: Option<u64>,
) -> Result<u64> {
let reader = accessor.reader(&puffin_file_name).await?;
let mut reader = accessor.reader(&handle).await?;
if let Some(file_size_hint) = file_size_hint {
reader.with_file_size_hint(file_size_hint);
}
let mut file = PuffinFileReader::new(reader);
let puffin_metadata = file.metadata().await?;
@@ -237,7 +245,7 @@ where
}
);
let reader = accessor.reader(&puffin_file_name).await?;
let reader = accessor.reader(&handle).await?;
let writer = writer_provider.writer(&file_meta.relative_path).await?;
let task = common_runtime::spawn_global(async move {
let reader = PuffinFileReader::new(reader).into_blob_reader(&blob_meta);
@@ -284,8 +292,8 @@ where
}
/// `RandomReadBlob` is a `BlobGuard` that directly reads the blob from the puffin file.
pub struct RandomReadBlob<F> {
file_name: String,
pub struct RandomReadBlob<F: PuffinFileAccessor> {
handle: F::FileHandle,
accessor: F,
blob_metadata: BlobMetadata,
}
@@ -302,7 +310,7 @@ impl<F: PuffinFileAccessor + Clone> BlobGuard for RandomReadBlob<F> {
}
);
let reader = self.accessor.reader(&self.file_name).await?;
let reader = self.accessor.reader(&self.handle).await?;
let blob_reader = PuffinFileReader::new(reader).into_blob_reader(&self.blob_metadata);
Ok(blob_reader)
}

View File

@@ -34,9 +34,9 @@ use crate::puffin_manager::stager::Stager;
use crate::puffin_manager::{PuffinWriter, PutOptions};
/// `FsPuffinWriter` is a `PuffinWriter` that writes blobs and directories to a puffin file.
pub struct FsPuffinWriter<S, W> {
pub struct FsPuffinWriter<S: Stager, W> {
/// The name of the puffin file.
puffin_file_name: String,
handle: S::FileHandle,
/// The stager.
stager: S,
@@ -48,10 +48,10 @@ pub struct FsPuffinWriter<S, W> {
blob_keys: HashSet<String>,
}
impl<S, W> FsPuffinWriter<S, W> {
pub(crate) fn new(puffin_file_name: String, stager: S, writer: W) -> Self {
impl<S: Stager, W> FsPuffinWriter<S, W> {
pub(crate) fn new(handle: S::FileHandle, stager: S, writer: W) -> Self {
Self {
puffin_file_name,
handle,
stager,
puffin_file_writer: PuffinFileWriter::new(writer),
blob_keys: HashSet::new(),
@@ -147,7 +147,7 @@ where
// Move the directory into the stager.
self.stager
.put_dir(&self.puffin_file_name, key, dir_path, dir_size)
.put_dir(&self.handle, key, dir_path, dir_size)
.await?;
Ok(written_bytes)
}

View File

@@ -57,6 +57,7 @@ pub trait InitDirFn = FnOnce(DirWriterProviderRef) -> WriteResult;
pub trait Stager: Send + Sync {
type Blob: BlobGuard + Sync;
type Dir: DirGuard;
type FileHandle: ToString + Clone + Send + Sync;
/// Retrieves a blob, initializing it if necessary using the provided `init_fn`.
///
@@ -64,7 +65,7 @@ pub trait Stager: Send + Sync {
/// The caller is responsible for holding the `BlobGuard` until they are done with the blob.
async fn get_blob<'a>(
&self,
puffin_file_name: &str,
handle: &Self::FileHandle,
key: &str,
init_factory: Box<dyn InitBlobFn + Send + Sync + 'a>,
) -> Result<Self::Blob>;
@@ -75,7 +76,7 @@ pub trait Stager: Send + Sync {
/// The caller is responsible for holding the `DirGuard` until they are done with the directory.
async fn get_dir<'a>(
&self,
puffin_file_name: &str,
handle: &Self::FileHandle,
key: &str,
init_fn: Box<dyn InitDirFn + Send + Sync + 'a>,
) -> Result<Self::Dir>;
@@ -83,14 +84,14 @@ pub trait Stager: Send + Sync {
/// Stores a directory in the staging area.
async fn put_dir(
&self,
puffin_file_name: &str,
handle: &Self::FileHandle,
key: &str,
dir_path: PathBuf,
dir_size: u64,
) -> Result<()>;
/// Purges all content for the given puffin file from the staging area.
async fn purge(&self, puffin_file_name: &str) -> Result<()>;
async fn purge(&self, handle: &Self::FileHandle) -> Result<()>;
}
/// `StagerNotifier` provides a way to notify the caller of the staging events.

View File

@@ -48,7 +48,7 @@ const DELETED_EXTENSION: &str = "deleted";
const RECYCLE_BIN_TTL: Duration = Duration::from_secs(60);
/// `BoundedStager` is a `Stager` that uses `moka` to manage staging area.
pub struct BoundedStager {
pub struct BoundedStager<H> {
/// The base directory of the staging area.
base_dir: PathBuf,
@@ -71,9 +71,11 @@ pub struct BoundedStager {
/// Notifier for the stager.
notifier: Option<Arc<dyn StagerNotifier>>,
_phantom: std::marker::PhantomData<H>,
}
impl BoundedStager {
impl<H: 'static> BoundedStager<H> {
pub async fn new(
base_dir: PathBuf,
capacity: u64,
@@ -124,6 +126,7 @@ impl BoundedStager {
delete_queue,
recycle_bin,
notifier,
_phantom: std::marker::PhantomData,
};
stager.recover().await?;
@@ -133,17 +136,19 @@ impl BoundedStager {
}
#[async_trait]
impl Stager for BoundedStager {
impl<H: ToString + Clone + Send + Sync> Stager for BoundedStager<H> {
type Blob = Arc<FsBlobGuard>;
type Dir = Arc<FsDirGuard>;
type FileHandle = H;
async fn get_blob<'a>(
&self,
puffin_file_name: &str,
handle: &Self::FileHandle,
key: &str,
init_fn: Box<dyn InitBlobFn + Send + Sync + 'a>,
) -> Result<Self::Blob> {
let cache_key = Self::encode_cache_key(puffin_file_name, key);
let handle_str = handle.to_string();
let cache_key = Self::encode_cache_key(&handle_str, key);
let mut miss = false;
let v = self
@@ -169,7 +174,7 @@ impl Stager for BoundedStager {
notifier.on_load_blob(timer.elapsed());
}
let guard = Arc::new(FsBlobGuard {
puffin_file_name: puffin_file_name.to_string(),
handle: handle_str,
path,
delete_queue: self.delete_queue.clone(),
size,
@@ -194,11 +199,13 @@ impl Stager for BoundedStager {
async fn get_dir<'a>(
&self,
puffin_file_name: &str,
handle: &Self::FileHandle,
key: &str,
init_fn: Box<dyn InitDirFn + Send + Sync + 'a>,
) -> Result<Self::Dir> {
let cache_key = Self::encode_cache_key(puffin_file_name, key);
let handle_str = handle.to_string();
let cache_key = Self::encode_cache_key(&handle_str, key);
let mut miss = false;
let v = self
@@ -224,7 +231,7 @@ impl Stager for BoundedStager {
notifier.on_load_dir(timer.elapsed());
}
let guard = Arc::new(FsDirGuard {
puffin_file_name: puffin_file_name.to_string(),
handle: handle_str,
path,
size,
delete_queue: self.delete_queue.clone(),
@@ -249,12 +256,13 @@ impl Stager for BoundedStager {
async fn put_dir(
&self,
puffin_file_name: &str,
handle: &Self::FileHandle,
key: &str,
dir_path: PathBuf,
size: u64,
) -> Result<()> {
let cache_key = Self::encode_cache_key(puffin_file_name, key);
let handle_str = handle.to_string();
let cache_key = Self::encode_cache_key(&handle_str, key);
self.cache
.try_get_with(cache_key.clone(), async move {
@@ -275,7 +283,7 @@ impl Stager for BoundedStager {
notifier.on_cache_insert(size);
}
let guard = Arc::new(FsDirGuard {
puffin_file_name: puffin_file_name.to_string(),
handle: handle_str,
path,
size,
delete_queue: self.delete_queue.clone(),
@@ -295,17 +303,17 @@ impl Stager for BoundedStager {
Ok(())
}
async fn purge(&self, puffin_file_name: &str) -> Result<()> {
let file_name = puffin_file_name.to_string();
async fn purge(&self, handle: &Self::FileHandle) -> Result<()> {
let handle_str = handle.to_string();
self.cache
.invalidate_entries_if(move |_k, v| v.puffin_file_name() == file_name)
.invalidate_entries_if(move |_k, v| v.handle() == handle_str)
.unwrap(); // SAFETY: `support_invalidation_closures` is enabled
self.cache.run_pending_tasks().await;
Ok(())
}
}
impl BoundedStager {
impl<H> BoundedStager<H> {
fn encode_cache_key(puffin_file_name: &str, key: &str) -> String {
let mut hasher = Sha256::new();
hasher.update(puffin_file_name);
@@ -400,7 +408,7 @@ impl BoundedStager {
delete_queue: self.delete_queue.clone(),
// placeholder
puffin_file_name: String::new(),
handle: String::new(),
}));
// A duplicate dir will be moved to the delete queue.
let _dup_dir = elems.insert(key, v);
@@ -412,7 +420,7 @@ impl BoundedStager {
delete_queue: self.delete_queue.clone(),
// placeholder
puffin_file_name: String::new(),
handle: String::new(),
}));
// A duplicate file will be moved to the delete queue.
let _dup_file = elems.insert(key, v);
@@ -511,7 +519,7 @@ impl BoundedStager {
}
}
impl Drop for BoundedStager {
impl<H> Drop for BoundedStager<H> {
fn drop(&mut self) {
let _ = self.delete_queue.try_send(DeleteTask::Terminate);
}
@@ -535,10 +543,10 @@ impl CacheValue {
self.size().try_into().unwrap_or(u32::MAX)
}
fn puffin_file_name(&self) -> &str {
fn handle(&self) -> &str {
match self {
CacheValue::File(guard) => &guard.puffin_file_name,
CacheValue::Dir(guard) => &guard.puffin_file_name,
CacheValue::File(guard) => &guard.handle,
CacheValue::Dir(guard) => &guard.handle,
}
}
}
@@ -553,7 +561,7 @@ enum DeleteTask {
/// automatically deleting the file on drop.
#[derive(Debug)]
pub struct FsBlobGuard {
puffin_file_name: String,
handle: String,
path: PathBuf,
size: u64,
delete_queue: Sender<DeleteTask>,
@@ -586,7 +594,7 @@ impl Drop for FsBlobGuard {
/// automatically deleting the directory on drop.
#[derive(Debug)]
pub struct FsDirGuard {
puffin_file_name: String,
handle: String,
path: PathBuf,
size: u64,
delete_queue: Sender<DeleteTask>,
@@ -636,7 +644,7 @@ impl DirWriterProvider for MokaDirWriterProvider {
}
#[cfg(test)]
impl BoundedStager {
impl<H> BoundedStager<H> {
pub async fn must_get_file(&self, puffin_file_name: &str, key: &str) -> fs::File {
let cache_key = Self::encode_cache_key(puffin_file_name, key);
let value = self.cache.get(&cache_key).await.unwrap();
@@ -796,11 +804,11 @@ mod tests {
.await
.unwrap();
let puffin_file_name = "test_get_blob";
let puffin_file_name = "test_get_blob".to_string();
let key = "key";
let reader = stager
.get_blob(
puffin_file_name,
&puffin_file_name,
key,
Box::new(|mut writer| {
Box::pin(async move {
@@ -819,7 +827,7 @@ mod tests {
let buf = reader.read(0..m.content_length).await.unwrap();
assert_eq!(&*buf, b"hello world");
let mut file = stager.must_get_file(puffin_file_name, key).await;
let mut file = stager.must_get_file(&puffin_file_name, key).await;
let mut buf = Vec::new();
file.read_to_end(&mut buf).await.unwrap();
assert_eq!(buf, b"hello world");
@@ -861,11 +869,11 @@ mod tests {
("subdir/subsubdir/file_e", "¡Hola mundo!".as_bytes()),
];
let puffin_file_name = "test_get_dir";
let puffin_file_name = "test_get_dir".to_string();
let key = "key";
let dir_path = stager
.get_dir(
puffin_file_name,
&puffin_file_name,
key,
Box::new(|writer_provider| {
Box::pin(async move {
@@ -890,7 +898,7 @@ mod tests {
assert_eq!(buf, *content);
}
let dir_path = stager.must_get_dir(puffin_file_name, key).await;
let dir_path = stager.must_get_dir(&puffin_file_name, key).await;
for (rel_path, content) in &files_in_dir {
let file_path = dir_path.join(rel_path);
let mut file = tokio::fs::File::open(&file_path).await.unwrap();
@@ -929,11 +937,11 @@ mod tests {
.unwrap();
// initialize stager
let puffin_file_name = "test_recover";
let puffin_file_name = "test_recover".to_string();
let blob_key = "blob_key";
let guard = stager
.get_blob(
puffin_file_name,
&puffin_file_name,
blob_key,
Box::new(|mut writer| {
Box::pin(async move {
@@ -957,7 +965,7 @@ mod tests {
let dir_key = "dir_key";
let guard = stager
.get_dir(
puffin_file_name,
&puffin_file_name,
dir_key,
Box::new(|writer_provider| {
Box::pin(async move {
@@ -983,7 +991,7 @@ mod tests {
let reader = stager
.get_blob(
puffin_file_name,
&puffin_file_name,
blob_key,
Box::new(|_| Box::pin(async { Ok(0) })),
)
@@ -999,7 +1007,7 @@ mod tests {
let dir_path = stager
.get_dir(
puffin_file_name,
&puffin_file_name,
dir_key,
Box::new(|_| Box::pin(async { Ok(0) })),
)
@@ -1042,13 +1050,13 @@ mod tests {
.await
.unwrap();
let puffin_file_name = "test_eviction";
let puffin_file_name = "test_eviction".to_string();
let blob_key = "blob_key";
// First time to get the blob
let reader = stager
.get_blob(
puffin_file_name,
&puffin_file_name,
blob_key,
Box::new(|mut writer| {
Box::pin(async move {
@@ -1065,7 +1073,7 @@ mod tests {
// The blob should be evicted
stager.cache.run_pending_tasks().await;
assert!(!stager.in_cache(puffin_file_name, blob_key));
assert!(!stager.in_cache(&puffin_file_name, blob_key));
let stats = notifier.stats();
assert_eq!(
@@ -1089,7 +1097,7 @@ mod tests {
// Second time to get the blob, get from recycle bin
let reader = stager
.get_blob(
puffin_file_name,
&puffin_file_name,
blob_key,
Box::new(|_| async { Ok(0) }.boxed()),
)
@@ -1101,7 +1109,7 @@ mod tests {
// The blob should be evicted
stager.cache.run_pending_tasks().await;
assert!(!stager.in_cache(puffin_file_name, blob_key));
assert!(!stager.in_cache(&puffin_file_name, blob_key));
let stats = notifier.stats();
assert_eq!(
@@ -1134,7 +1142,7 @@ mod tests {
// First time to get the directory
let guard_0 = stager
.get_dir(
puffin_file_name,
&puffin_file_name,
dir_key,
Box::new(|writer_provider| {
Box::pin(async move {
@@ -1161,7 +1169,7 @@ mod tests {
// The directory should be evicted
stager.cache.run_pending_tasks().await;
assert!(!stager.in_cache(puffin_file_name, dir_key));
assert!(!stager.in_cache(&puffin_file_name, dir_key));
let stats = notifier.stats();
assert_eq!(
@@ -1181,7 +1189,7 @@ mod tests {
// Second time to get the directory
let guard_1 = stager
.get_dir(
puffin_file_name,
&puffin_file_name,
dir_key,
Box::new(|_| async { Ok(0) }.boxed()),
)
@@ -1198,7 +1206,7 @@ mod tests {
// Still hold the guard
stager.cache.run_pending_tasks().await;
assert!(!stager.in_cache(puffin_file_name, dir_key));
assert!(!stager.in_cache(&puffin_file_name, dir_key));
let stats = notifier.stats();
assert_eq!(
@@ -1220,7 +1228,7 @@ mod tests {
drop(guard_1);
let guard_2 = stager
.get_dir(
puffin_file_name,
&puffin_file_name,
dir_key,
Box::new(|_| Box::pin(async move { Ok(0) })),
)
@@ -1229,7 +1237,7 @@ mod tests {
// Still hold the guard, so the directory should not be removed even if it's evicted
stager.cache.run_pending_tasks().await;
assert!(!stager.in_cache(puffin_file_name, blob_key));
assert!(!stager.in_cache(&puffin_file_name, blob_key));
for (rel_path, content) in &files_in_dir {
let file_path = guard_2.path().join(rel_path);
@@ -1262,13 +1270,14 @@ mod tests {
.await
.unwrap();
let puffin_file_name = "test_get_blob_concurrency_on_fail";
let puffin_file_name = "test_get_blob_concurrency_on_fail".to_string();
let key = "key";
let stager = Arc::new(stager);
let handles = (0..10)
.map(|_| {
let stager = stager.clone();
let puffin_file_name = puffin_file_name.clone();
let task = async move {
let failed_init = Box::new(|_| {
async {
@@ -1277,7 +1286,7 @@ mod tests {
}
.boxed()
});
stager.get_blob(puffin_file_name, key, failed_init).await
stager.get_blob(&puffin_file_name, key, failed_init).await
};
tokio::spawn(task)
@@ -1289,7 +1298,7 @@ mod tests {
assert!(r.is_err());
}
assert!(!stager.in_cache(puffin_file_name, key));
assert!(!stager.in_cache(&puffin_file_name, key));
}
#[tokio::test]
@@ -1299,13 +1308,14 @@ mod tests {
.await
.unwrap();
let puffin_file_name = "test_get_dir_concurrency_on_fail";
let puffin_file_name = "test_get_dir_concurrency_on_fail".to_string();
let key = "key";
let stager = Arc::new(stager);
let handles = (0..10)
.map(|_| {
let stager = stager.clone();
let puffin_file_name = puffin_file_name.clone();
let task = async move {
let failed_init = Box::new(|_| {
async {
@@ -1314,7 +1324,7 @@ mod tests {
}
.boxed()
});
stager.get_dir(puffin_file_name, key, failed_init).await
stager.get_dir(&puffin_file_name, key, failed_init).await
};
tokio::spawn(task)
@@ -1326,7 +1336,7 @@ mod tests {
assert!(r.is_err());
}
assert!(!stager.in_cache(puffin_file_name, key));
assert!(!stager.in_cache(&puffin_file_name, key));
}
#[tokio::test]
@@ -1343,11 +1353,11 @@ mod tests {
.unwrap();
// initialize stager
let puffin_file_name = "test_purge";
let puffin_file_name = "test_purge".to_string();
let blob_key = "blob_key";
let guard = stager
.get_blob(
puffin_file_name,
&puffin_file_name,
blob_key,
Box::new(|mut writer| {
Box::pin(async move {
@@ -1371,7 +1381,7 @@ mod tests {
let dir_key = "dir_key";
let guard = stager
.get_dir(
puffin_file_name,
&puffin_file_name,
dir_key,
Box::new(|writer_provider| {
Box::pin(async move {
@@ -1390,8 +1400,7 @@ mod tests {
drop(guard);
// purge the stager
stager.purge(puffin_file_name).await.unwrap();
stager.cache.run_pending_tasks().await;
stager.purge(&puffin_file_name).await.unwrap();
let stats = notifier.stats();
assert_eq!(

View File

@@ -27,7 +27,7 @@ use crate::puffin_manager::{
BlobGuard, DirGuard, PuffinManager, PuffinReader, PuffinWriter, PutOptions,
};
async fn new_bounded_stager(prefix: &str, capacity: u64) -> (TempDir, Arc<BoundedStager>) {
async fn new_bounded_stager(prefix: &str, capacity: u64) -> (TempDir, Arc<BoundedStager<String>>) {
let staging_dir = create_temp_dir(prefix);
let path = staging_dir.path().to_path_buf();
(
@@ -52,8 +52,8 @@ async fn test_put_get_file() {
let puffin_manager = FsPuffinManager::new(stager.clone(), file_accessor.clone());
let puffin_file_name = "puffin_file";
let mut writer = puffin_manager.writer(puffin_file_name).await.unwrap();
let puffin_file_name = "puffin_file".to_string();
let mut writer = puffin_manager.writer(&puffin_file_name).await.unwrap();
let key = "blob_a";
let raw_data = "Hello, world!".as_bytes();
@@ -61,9 +61,9 @@ async fn test_put_get_file() {
writer.finish().await.unwrap();
let reader = puffin_manager.reader(puffin_file_name).await.unwrap();
let reader = puffin_manager.reader(&puffin_file_name).await.unwrap();
check_blob(
puffin_file_name,
&puffin_file_name,
key,
raw_data,
&stager,
@@ -76,9 +76,9 @@ async fn test_put_get_file() {
let (_staging_dir, stager) = new_bounded_stager("test_put_get_file_", capacity).await;
let puffin_manager = FsPuffinManager::new(stager.clone(), file_accessor);
let reader = puffin_manager.reader(puffin_file_name).await.unwrap();
let reader = puffin_manager.reader(&puffin_file_name).await.unwrap();
check_blob(
puffin_file_name,
&puffin_file_name,
key,
raw_data,
&stager,
@@ -102,8 +102,8 @@ async fn test_put_get_files() {
let puffin_manager = FsPuffinManager::new(stager.clone(), file_accessor.clone());
let puffin_file_name = "puffin_file";
let mut writer = puffin_manager.writer(puffin_file_name).await.unwrap();
let puffin_file_name = "puffin_file".to_string();
let mut writer = puffin_manager.writer(&puffin_file_name).await.unwrap();
let blobs = [
("blob_a", "Hello, world!".as_bytes()),
@@ -119,10 +119,10 @@ async fn test_put_get_files() {
writer.finish().await.unwrap();
let reader = puffin_manager.reader(puffin_file_name).await.unwrap();
let reader = puffin_manager.reader(&puffin_file_name).await.unwrap();
for (key, raw_data) in &blobs {
check_blob(
puffin_file_name,
&puffin_file_name,
key,
raw_data,
&stager,
@@ -135,10 +135,10 @@ async fn test_put_get_files() {
// renew cache manager
let (_staging_dir, stager) = new_bounded_stager("test_put_get_files_", capacity).await;
let puffin_manager = FsPuffinManager::new(stager.clone(), file_accessor);
let reader = puffin_manager.reader(puffin_file_name).await.unwrap();
let reader = puffin_manager.reader(&puffin_file_name).await.unwrap();
for (key, raw_data) in &blobs {
check_blob(
puffin_file_name,
&puffin_file_name,
key,
raw_data,
&stager,
@@ -164,8 +164,8 @@ async fn test_put_get_dir() {
let puffin_manager = FsPuffinManager::new(stager.clone(), file_accessor.clone());
let puffin_file_name = "puffin_file";
let mut writer = puffin_manager.writer(puffin_file_name).await.unwrap();
let puffin_file_name = "puffin_file".to_string();
let mut writer = puffin_manager.writer(&puffin_file_name).await.unwrap();
let key = "dir_a";
@@ -181,15 +181,15 @@ async fn test_put_get_dir() {
writer.finish().await.unwrap();
let reader = puffin_manager.reader(puffin_file_name).await.unwrap();
check_dir(puffin_file_name, key, &files_in_dir, &stager, &reader).await;
let reader = puffin_manager.reader(&puffin_file_name).await.unwrap();
check_dir(&puffin_file_name, key, &files_in_dir, &stager, &reader).await;
// renew cache manager
let (_staging_dir, stager) = new_bounded_stager("test_put_get_dir_", capacity).await;
let puffin_manager = FsPuffinManager::new(stager.clone(), file_accessor);
let reader = puffin_manager.reader(puffin_file_name).await.unwrap();
check_dir(puffin_file_name, key, &files_in_dir, &stager, &reader).await;
let reader = puffin_manager.reader(&puffin_file_name).await.unwrap();
check_dir(&puffin_file_name, key, &files_in_dir, &stager, &reader).await;
}
}
}
@@ -207,8 +207,8 @@ async fn test_put_get_mix_file_dir() {
let puffin_manager = FsPuffinManager::new(stager.clone(), file_accessor.clone());
let puffin_file_name = "puffin_file";
let mut writer = puffin_manager.writer(puffin_file_name).await.unwrap();
let puffin_file_name = "puffin_file".to_string();
let mut writer = puffin_manager.writer(&puffin_file_name).await.unwrap();
let blobs = [
("blob_a", "Hello, world!".as_bytes()),
@@ -234,10 +234,10 @@ async fn test_put_get_mix_file_dir() {
writer.finish().await.unwrap();
let reader = puffin_manager.reader(puffin_file_name).await.unwrap();
let reader = puffin_manager.reader(&puffin_file_name).await.unwrap();
for (key, raw_data) in &blobs {
check_blob(
puffin_file_name,
&puffin_file_name,
key,
raw_data,
&stager,
@@ -246,17 +246,17 @@ async fn test_put_get_mix_file_dir() {
)
.await;
}
check_dir(puffin_file_name, dir_key, &files_in_dir, &stager, &reader).await;
check_dir(&puffin_file_name, dir_key, &files_in_dir, &stager, &reader).await;
// renew cache manager
let (_staging_dir, stager) =
new_bounded_stager("test_put_get_mix_file_dir_", capacity).await;
let puffin_manager = FsPuffinManager::new(stager.clone(), file_accessor);
let reader = puffin_manager.reader(puffin_file_name).await.unwrap();
let reader = puffin_manager.reader(&puffin_file_name).await.unwrap();
for (key, raw_data) in &blobs {
check_blob(
puffin_file_name,
&puffin_file_name,
key,
raw_data,
&stager,
@@ -265,7 +265,7 @@ async fn test_put_get_mix_file_dir() {
)
.await;
}
check_dir(puffin_file_name, dir_key, &files_in_dir, &stager, &reader).await;
check_dir(&puffin_file_name, dir_key, &files_in_dir, &stager, &reader).await;
}
}
}
@@ -292,7 +292,7 @@ async fn check_blob(
puffin_file_name: &str,
key: &str,
raw_data: &[u8],
stager: &BoundedStager,
stager: &BoundedStager<String>,
puffin_reader: &impl PuffinReader,
compressed: bool,
) {
@@ -346,7 +346,7 @@ async fn check_dir(
puffin_file_name: &str,
key: &str,
files_in_dir: &[(&str, &[u8])],
stager: &BoundedStager,
stager: &BoundedStager<String>,
puffin_reader: &impl PuffinReader,
) {
let res_dir = puffin_reader.dir(key).await.unwrap();

Some files were not shown because too many files have changed in this diff Show More