Compare commits

..

77 Commits

Author SHA1 Message Date
evenyag
ab1928d5fd feat: refetch sst on open 2025-03-17 15:17:14 +08:00
Sicong Hu
09dacc8e9b feat: add vec_subvector function (#5683)
* feat: add vec_subvector function

* change datatype of arg1 and arg2 from u64 to i64

* add sqlness test

* improve description comments
2025-03-16 10:43:53 +00:00
Ruihang Xia
dec439db2b chore: bump version to 0.14.0 (#5711)
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2025-03-16 09:58:19 +00:00
Ning Sun
dc76571166 feat: move default data path from /tmp to current directory (#5719) 2025-03-16 09:57:46 +00:00
shuiyisong
3e17f8c426 chore: use Bytes instead of string in bulk ingestion (#5717)
chore: use bytes instead of string in bulk log ingestion
2025-03-14 09:31:35 +00:00
yihong
a5df3954f3 chore: update flate2 version (#5706)
Signed-off-by: yihong0618 <zouzou0208@gmail.com>
2025-03-14 02:15:27 +00:00
Ruihang Xia
32fd850c20 perf: support in list in simple filter (#5709)
* feat: support in list in simple filter

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix clippy

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2025-03-14 01:08:29 +00:00
shuiyisong
2bfdae4f8f feat: add simple extract processor (#5688)
* feat: add simple extract processor

* chore: add test

* chore: add license header

* chore: minor update
2025-03-13 09:19:58 +00:00
shuiyisong
fcb898e9a4 chore: support inverted index in pipeline (#5700)
chore: rebase main
2025-03-13 08:30:29 +00:00
Ning Sun
8fa2fdfc42 feat: make empty parent_span_id null for v1 (#5690) 2025-03-13 07:48:15 +00:00
shuiyisong
4dc1a1d60f chore: support tag in transform (#5701)
chore: support tag in transform to specify tag
2025-03-13 07:27:12 +00:00
Lei, HUANG
e375a18011 fix: conversion from TableMeta to TableMetaBuilder (#5693)
* refactor: use proc macro to generate conversion between TableMeta and TableMetaBuilder

* chore: format

* fix/partition-key-index:
 ### Update `TableMeta` and Add Partition and Alter Table Tests

 - **`metadata.rs`**: Modified `new_meta_builder` method in `TableMeta` to manually remove `value_indices` by setting it to `None` in the `TableMetaBuilder`.
 - **`partition_and_alter.result` & `partition_and_alter.sql`**: Added new test cases for creating, inserting, selecting, altering, and dropping a partitioned table `molestiAe`. These tests verify partitioning on the `sImiLiQUE` column and altering the table with a TTL
 setting.

fix/partition-key-index:
 ### Remove Obsolete TODO Comment in `metadata.rs`

 - Removed an outdated TODO comment regarding the `new_meta_builder` function in `src/table/src/metadata.rs`.

chore: check struct name in derive_meta_builder

refactor: Simplify TableMeta struct name check in macro

refactor: Improve ToMetaBuilder derive macro validation and error handling

refactor: Enforce ToMetaBuilder macro for table::metadata::TableMeta struct

* fix/partition-key-index:
 Update `partition_and_alter.sql` to modify TTL setting

 - Modified the TTL setting for the `molestiAe` table to '1d' in `partition_and_alter.sql`.

* fix: sqlness

* fix/partition-key-index:
 ### Update `TableMeta` and Test File Structure

 - **Enhancement**: Added a note in `metadata.rs` to always use `new_meta_builder` for creating `TableMetaBuilder`.
 - **Refactor**: Renamed test result and SQL files for better organization:
   - `partition_and_alter.result` to `alter/partition_and_alter.result`
   - `partition_and_alter.sql` to `alter/partition_and_alter.sql`

* refactor: Simplify `derive_meta_builder` by initializing fields with `Default::default()`

* fix/partition-key-index:
 ### Commit Summary

 - **Refactor `TableMetaBuilder` Initialization**:
   - Replaced `TableMetaBuilder::default()` with `TableMetaBuilder::empty()` across multiple files for initializing `TableMetaBuilder` instances.
   - Affected files include:
     - `src/catalog/src/system_schema.rs`
     - `src/common/meta/src/key/test_utils.rs`
     - `src/operator/src/req_convert/insert/fill_impure_default.rs`
     - `src/query/src/log_query/planner.rs`
     - `src/query/src/promql/planner.rs`
     - `src/query/src/range_select/plan_rewrite.rs`
     - `src/query/src/sql/show_create_table.rs`
     - `src/table/src/test_util/memtable.rs`
     - `src/table/src/test_util/table_info.rs`

 - **Enhance `TableMetaBuilder`**:
   - Added `custom_constructor` to `TableMeta` and implemented an `empty` method for `TableMetaBuilder`.
   - Modified `TableMetaBuilder` to include a `new_external_table` method with default values.
   - Updated `src/table/src/metadata.rs` to reflect these changes.

 - **Add Testing Feature**:
   - Introduced a conditional compilation for `test_util` in `src/table/src/lib.rs` to include testing utilities when the `testing` feature is enabled.

 - **Update `Cargo.toml`**:
   - Enabled the `testing` feature for the `table` module in `src/common/meta/Cargo.toml`.

 - **Modify `NumbersTable` Initialization**:
   - Replaced `TableMetaBuilder` with direct `TableMeta` struct initialization in `src/table/src/table/numbers.rs`.

 - **Test Result Update**:
   - Updated test results in `tests/cases/standalone/common/alter/partition_and_alter.result` to reflect changes in table meta handling.

* fix: rename default to empty

* docs: add doc for TableMetaBuilder::empty

* chore: Update src/table/src/metadata.rs

---------

Co-authored-by: Yingwen <realevenyag@gmail.com>
2025-03-13 06:30:16 +00:00
shuiyisong
e0ff701e51 chore: support application/x-ndjson for log ingest (#5697)
chore: support ndjson content type
2025-03-13 04:29:22 +00:00
Yingwen
25645a3303 feat: expose virtual_host_style config for s3 storage (#5696)
* feat: expose enable_virtual_host_style for s3 storage

* docs: update examples

* test: fix config test
2025-03-12 13:46:56 +00:00
Ruihang Xia
b32ea7d84c feat: add Docker image tag information to step summary in dev-build workflow (#5692)
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2025-03-12 13:45:19 +00:00
discord9
f164f6eaf3 fix: FlowInfoValue's compatibility (#5695) 2025-03-12 09:02:48 +00:00
Yohan Wal
af1920defc feat: add mysql kvbackend (#5528)
* feat: add mysql kvbackend txn support

* chore: error handling

* chore: follow review comments

* chore: follow review comments

* chore: follow review comments

* revert: mysql QAQ

* revert: revert changes to sqls

This reverts commit cf98c50dd9.

* chore: add comments
2025-03-12 06:52:56 +00:00
Lei, HUANG
7c97fae522 chore: check region wal provider on startup to avoid inconsistence (#5687) 2025-03-11 17:51:18 +00:00
AntiTopQuark
b8070adc3a feat: enhancement information_schema.flows (#5623)
* feat: enhancement information_schema.flows

* feat: enhancement information_schema.flows

* u

* u

* u

* u

* u

* u

* u

* u

* u

* update

* update

* update

* delete unused code

* u

* u

* Update src/flow/src/adapter/worker.rs

Co-authored-by: dennis zhuang <killme2008@gmail.com>

* Update src/common/meta/src/key/flow/flow_state.rs

Co-authored-by: dennis zhuang <killme2008@gmail.com>

* Update src/common/meta/src/key/flow/flow_info.rs

Co-authored-by: dennis zhuang <killme2008@gmail.com>

* Update src/common/meta/src/key/flow/flow_state.rs

Co-authored-by: dennis zhuang <killme2008@gmail.com>

* Update src/common/meta/src/key/flow/flow_info.rs

Co-authored-by: dennis zhuang <killme2008@gmail.com>

* u

* u

* u

* u

* u

* u

* chore: fix sqlness

* chore: update proto

* fix: remove date time

* fix: update result of information_schema test

---------

Co-authored-by: dennis zhuang <killme2008@gmail.com>
Co-authored-by: discord9 <discord9@163.com>
2025-03-11 15:08:10 +00:00
yihong
11bfb17328 feat: support export command export data to s3 (#5585)
* feat: s3 first step

Signed-off-by: yihong0618 <zouzou0208@gmail.com>

* fix: finish s3 export

Signed-off-by: yihong0618 <zouzou0208@gmail.com>

* fix: drop useless comment

Signed-off-by: yihong0618 <zouzou0208@gmail.com>

* fix: forget to create_database and copy_from

Signed-off-by: yihong0618 <zouzou0208@gmail.com>

* fix: address comment use opendal Fs

Signed-off-by: yihong0618 <zouzou0208@gmail.com>

* refactor: make the export mess code clean

Signed-off-by: yihong0618 <zouzou0208@gmail.com>

---------

Signed-off-by: yihong0618 <zouzou0208@gmail.com>
2025-03-11 08:56:59 +00:00
jeremyhi
1d87bd2d43 feat: alter region follower (#5676)
* feat: add region follower manager

* feat: add region procudure

* refactor: make add, remove follower procedure look  nice

* feat: add region follower procedure

* chore: undo some chane, possibly made by AI

* feat: on prepare cheking

* feat: on update metadata

* feat: on broadcast

* chore: unit test

* feat: add remove follower operation

* feat: add or remove region follower procedure

* chore: ut

* chore: rename

* chore: by comment

* chore: by comment

---------

Co-authored-by: jeremy <jeremy@greptime.local>
2025-03-11 08:44:50 +00:00
jeremyhi
ababeaf538 chore: make memorykv write happily (#5686)
chore: make memorykv write happly
2025-03-11 07:37:14 +00:00
Lin Yihai
2cbf51d0be refactor!: Remove Value::DateTime and ValueRef::DateTime. (#5616)
* refactor: Remove Value::DateTime and ValueRef::DateTime

* fix: don't panic if arrow cast field.

* fix: map `ColumnDataType::Datetime` to `ConcreteDataType::timestamp_microsecond_datatype`

* fix: Map `ValueData::DatetimeValue` correctly.

* refactor: Replace `datetime` with `timestamp_micro_second`
2025-03-11 07:03:27 +00:00
Yingwen
3059b04b19 feat: add a gauge for download tasks (#5681) 2025-03-11 06:55:13 +00:00
Yingwen
352b197be4 feat: add hint for logical region in RegionScanner (#5684)
* feat: add a flag to check logical region

* feat: sets logical region hint in metric engine

* refactor: rename to logical_region
2025-03-11 06:34:39 +00:00
Ning Sun
d0254f9705 feat: update promql-parser to 0.5 for duration literal (#5682) 2025-03-11 06:27:36 +00:00
Ning Sun
8a86903c73 feat: add description for each grafana panel (#5673)
* feat: add description for each grafana panel

* Apply suggestions from code review

Co-authored-by: Yingwen <realevenyag@gmail.com>

* fix: unit of write stall

* feat: add jq script to summary the grafana dashboard

* fix: update description

* ci: add ci step to valid grafana and send summary as comment

* ci: update check

* ci: update ci

---------

Co-authored-by: Yingwen <realevenyag@gmail.com>
2025-03-11 06:16:49 +00:00
Weny Xu
0bd322a078 perf(prom): optimize label values query (#5653)
perf: optimize label values query
2025-03-10 13:20:47 +00:00
discord9
3811e3f632 feat: also get index file&expose mito in metrics (#5680)
* feat: download index file too

* feat: expose mito in metrics

* chore: fmt
2025-03-10 13:07:08 +00:00
localhost
c14aa176b5 chore: impl ref and ref_mut for json like (#5679)
* chore: impl ref and ref_mut for json like

* chore: add code source
2025-03-10 10:43:15 +00:00
Lei, HUANG
a922dcd9df refactor(mito): move wal sync task to background (#5677)
chore/move-wal-sync-to-bg:
 ### Refactor Log Store Task Management

 - **Error Handling Enhancements**: Updated error handling for task management in `error.rs` by renaming `StartGcTask` and `StopGcTask` to `StartWalTask` and `StopWalTask`, respectively, and added a `name` field for more descriptive error messages.
 - **Task Management Improvements**: Introduced `SyncWalTaskFunction` in `log_store.rs` to handle periodic synchronization of WAL tasks, replacing the previous atomic-based sync logic.
 - **Backend Adjustments**: Modified `backend.rs` to use the new `StartWalTaskSnafu` for starting tasks, ensuring consistency with the updated error handling approach.
2025-03-10 08:22:35 +00:00
dennis zhuang
530ff53422 feat(promql): supports quantile and count_values (#5652)
* feat(promql): supports quantile

* fix: merge_batch

* chore: sqlness test

* test: unit tests

* feat: implements count_values

* fix: typo

* refactor: planner

* chore: apply review suggestions

---------

Co-authored-by: Yingwen <realevenyag@gmail.com>
2025-03-10 06:41:40 +00:00
Ruihang Xia
73ca39f37e feat: time series distribution in scanner (#5675)
* define distribution

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* feat: SeqScan support per series distribution

* probe distribution

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* reverse sort order

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* more strict check

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix clippy

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* change null's ordering

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: evenyag <realevenyag@gmail.com>
2025-03-10 05:43:17 +00:00
Yingwen
0acc6b0354 fix: correct stalled count (#5678) 2025-03-10 04:25:38 +00:00
Zhenchi
face361fcb feat: introduce roaring bitmap to optimize sparse value scenarios (#5603)
* feat: introduce roaring bitmap to optimize sparse value scenarios

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix taplo

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* address comments

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* polish

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* address comments

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

---------

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
2025-03-10 04:24:08 +00:00
Yingwen
9860bca986 feat: support exact filter on time index column (#5671)
* feat: add predicate group

* feat: pass predicate group

* feat: memtable prune by time filters

* test: test PruneTimeIterator with time filters

* feat: push down returns exact for timestamp simple filters

---------

Co-authored-by: Ruihang Xia <waynestxia@gmail.com>
2025-03-07 21:55:46 +00:00
ZonaHe
3a83c33a48 feat: update dashboard to v0.8.0 (#5666)
Co-authored-by: sunchanglong <sunchanglong@users.noreply.github.com>
Co-authored-by: Ning Sun <sunng@protonmail.com>
2025-03-07 19:47:02 +00:00
Ruihang Xia
373bd59b07 fix: update column requirements to use Column type instead of String (#5672)
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2025-03-07 18:50:15 +00:00
shuiyisong
c8db4b286d fix: use DateTime instead of NaiveDateTime (#5669)
chore: use datetime instead of naivedatetime
2025-03-07 07:41:59 +00:00
Lei, HUANG
56c8c0651f fix: skip schema check to avoid schema mismatch brought by metadata (#5662)
* fix: skip schema check to avoid schema mismatch brought by metadata

* docs: add some comment to remind me add that check back

* test: add sqlness case

* fix/skip-schema-check:
 ### Update CTE Test Cases

 - **Added GRPC Latencies Test**: Introduced a new test case for GRPC latencies in `cte.result` and `cte.sql` under `standalone/common/cte`.
 - **Removed Redundant Test Files**: Deleted `cte.result` and `cte.sql` under `standalone/common/range` as they were duplicates of the new test case.
2025-03-07 05:47:45 +00:00
shuiyisong
448e588fa7 chore: improve /v1/jaeger/api/trace/{trace_id}'s resp (#5663)
* chore: improve jaeger trace api resp

* chore: fix timestamp type

* chore: fix timestamp type

* chore: complete more fields

* chore: change to microseconds

* chore: add empty check & span status code

* chore: minor update

* chore: update test
2025-03-07 04:31:42 +00:00
Yingwen
f4cbf1d776 docs: update cluster dashboard to make opendal panel works (#5661) 2025-03-07 02:49:15 +00:00
discord9
b35eefcf45 perf: rm coalesce batch when target_batch_size > fetch limit (#5658)
* fix: rm coalesce > limit

* fix: only rm one&test: sqlness
2025-03-07 02:45:07 +00:00
yihong
408dd55a2f fix: flaky test in sqlness by fix random port (#5657)
* fix: flaky test in sqlness by fix random port

Signed-off-by: yihong0618 <zouzou0208@gmail.com>

* fix: typo

Signed-off-by: yihong0618 <zouzou0208@gmail.com>

* fix: panic insead of forever loop

Signed-off-by: yihong0618 <zouzou0208@gmail.com>

---------

Signed-off-by: yihong0618 <zouzou0208@gmail.com>
2025-03-07 00:41:22 +00:00
Ruihang Xia
e463942a5b fix: recover plan schema after dist analyzer (#5665)
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2025-03-07 00:29:55 +00:00
discord9
0124a0d156 fix: window sort not apply when other column alias to time index name (#5634)
* fix: other col alias to time index column handle

* test: update sqlness

* chore: per review

* test: more sqlness

* test: mv some to optimizer folder

* fix: resolve alias properly

* fix: also retain old name

* chore: remove wrong comment

* chore: fix sqlness

* test: standalone/dist more projection diff
2025-03-06 08:05:57 +00:00
liyang
e23628a4e0 ci: bump dev-builder image version to 2024-12-25-a71b93dd-20250305072908 (#5651) 2025-03-06 03:33:17 +00:00
Weny Xu
1d637cad51 fix(metric-engine): group DDL requests (#5628)
* fix(metric-engine): group DDL requests

* test: add sqlness tests

* chore: apply suggestions from CR

* chore: apply suggestions from CR
2025-03-05 09:17:47 +00:00
Lei, HUANG
a56030e6a5 refactor: remove cluster id field (#5610)
* chore: resolve conflicts

* chore: merge main

* test: add compatibility test for DatanodeLeaseKey with missing cluster_id

* test: add compatibility test for DatanodeLeaseKey without cluster_id

* refactor/remove-cluster-id:
 - **Update `greptime-proto` Dependency**: Updated the `greptime-proto` dependency in `Cargo.lock` and `Cargo.toml` to a new revision.
 - **Remove `cluster_id` Usage**: Removed the `cluster_id` field and its related logic from various files, including `cluster.rs`, `datanode.rs`, `rpc.rs`,
 `adapter.rs`, `client.rs`, `ask_leader.rs`, `heartbeat.rs`, `procedure.rs`, `store.rs`, `handler.rs`, `response_header_handler.rs`, `key.rs`, `datanode.rs`,
 `lease.rs`, `metrics.rs`, `cluster.rs`, `heartbeat.rs`, `procedure.rs`, and `store.rs`.
 - **Refactor Tests**: Updated tests in `client.rs`, `response_header_handler.rs`, `store.rs`, and `service` modules to reflect the removal of `cluster_id`.

* fix: clippy

* refactor/remove-cluster-id:
 **Refactor and Cleanup in Meta Server**

 - **`response_header_handler.rs`**: Removed unused import of `HeartbeatResponse` and cleaned up the test function by eliminating the creation of an unused `HeartbeatResponse` object.
 - **`node_lease.rs`**: Simplified parameter handling in `HttpHandler` implementation by using an underscore for unused parameters.

* refactor/remove-cluster-id:
 ### Remove `TableMetadataAllocatorContext` and Refactor Code

 - **Removed `TableMetadataAllocatorContext`**: Eliminated the `TableMetadataAllocatorContext` struct and its usage across multiple files, including `ddl.rs`, `create_table.rs`, `create_view.rs`, `table_meta.rs`, `test_util.rs`, `create_logical_tables.rs`,
 `drop_table.rs`, and `table_meta_alloc.rs`.
 - **Refactored Function Signatures**: Updated function signatures to remove the `TableMetadataAllocatorContext` parameter in methods like `create`, `create_view`, and `alloc` in `table_meta.rs` and `table_meta_alloc.rs`.
 - **Updated Imports**: Adjusted import statements to reflect the removal of `TableMetadataAllocatorContext` in affected files.

 These changes simplify the codebase by removing an unnecessary context struct and updating related function calls.

* refactor/remove-cluster-id:
 ### Update `datanode.rs` to Modify Key Prefix

 - **File Modified**: `src/common/meta/src/datanode.rs`
 - **Key Changes**:
   - Updated `DatanodeStatKey::prefix_key` and `From<DatanodeStatKey>` to remove the cluster ID from the key prefix.
   - Adjusted comments to reflect the changes in key prefix handling.

* reformat code

* refactor/remove-cluster-id:
 ### Commit Summary

 - **Refactor `Pusher` Initialization**: Removed the `RequestHeader` parameter from the `Pusher::new` method across multiple files, including `handler.rs`, `test_util.rs`, and `heartbeat.rs`. This change simplifies the `Pusher` initialization process by eliminating th
 unnecessary parameter.
 - **Update Imports**: Adjusted import statements in `handler.rs` and `test_util.rs` to remove unused `RequestHeader` references, ensuring cleaner and more efficient code.

* chore: update proto
2025-03-05 08:22:18 +00:00
liyang
a71b93dd84 fix: unable to install software-properties-common in dev builder (#5643)
* fix: unable to install software-properties-common in dev builder

* test dev builder

* improve dev-build image

* setup qemu action
2025-03-05 07:07:06 +00:00
Ning Sun
37f8341963 feat: opentelemetry trace new data modeling (#5622)
* feat: include trace v1 encoding

* feat: add trace ingestion in inserter

* feat: add partition rules and index for trace_id

* chore: format

* chore: fmt

* fix: issue introduced with merge

* feat: adjust index and add integration test for v1

* refactor: remove comment key

* fix: update default value of skip index granularity

* fix: update default value of skip index granularity

* refactor: rename some functions

* feat: remove skipping index from span_id

* refactor: made span_id part of primary key for potential dedup purpose

* feat: move the special attribute resource_attribute.service.name to top level

---------

Co-authored-by: shuiyisong <113876041+shuiyisong@users.noreply.github.com>
2025-03-05 04:08:52 +00:00
Ruihang Xia
b90ef10523 refactor: remove or deprecated existing UDAF implementation (#5637)
* expand macro

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* remove argmin/argmax (wrong impl)

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* remove mean (unnecessary)

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* documentations

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* clean up

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* clean up

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* remove scipy_*, diff and polyval

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* remove unused errors

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix clippy again

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2025-03-05 01:40:05 +00:00
jeremyhi
c8ffa70ab8 feat: get tables by ids in catalog manager (#5645)
feat: get tabels by ids in catalog manager

Co-authored-by: jeremy <jeremy@greptime.local>
2025-03-05 00:48:03 +00:00
Ning Sun
e0065a5159 ci: remove ubuntu 20.04 runners (#5545)
* ci: remove ubuntu 20.04 runners

* chore: update ec2-github-runner action as author suggests

* fix: use latest ubuntu image for fuzz test

* Update action.yml

* Update action.yml

---------

Co-authored-by: shuiyisong <113876041+shuiyisong@users.noreply.github.com>
Co-authored-by: liyang <daviderli614@gmail.com>
2025-03-05 00:40:29 +00:00
Lei, HUANG
abf1680d14 fix: interval rewrite rule that messes up show create flow function (#5642)
* fix/interval-cast-rewrite:
 ### Enhance Interval Parsing and Casting

 - **`create_parser.rs`**: Added a test case `test_parse_interval_cast` to verify the parsing of interval casts.
 - **`expand_interval.rs`**: Refactored interval casting logic to handle `CastKind` and `format` attributes. Removed the `create_interval` function and integrated its logic directly into the casting process.
 - **`interval.result`**: Updated test results to reflect changes in interval representation, switching from `IntervalMonthDayNano` to `Utf8` format for interval operations.

* reformat code
2025-03-04 11:55:25 +00:00
Ruihang Xia
0e2fd8e2bd feat: rewrite json_encode_path to geo_path using compound type (#5640)
* function impl

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* tune type

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix clippy and suggestions

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2025-03-04 05:10:12 +00:00
Ruihang Xia
0e097732ca feat: support some IP related functions (#5614)
* feat: support some IP related functions

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* sort sqlness result

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* safer shift left

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* sort result again

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* sort result again

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* update against main

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2025-03-04 05:06:25 +00:00
liyang
bb62dc2491 build: use ubuntu-22.04 base image release dev-build image (#5554)
* build: use ubuntu-22.04 release dev-build image

* ci: use ubuntu-22.04 replace ubuntu-22.04-16-cores
2025-03-04 04:45:55 +00:00
Ruihang Xia
40cf63d3c4 refactor: rename table function to admin function (#5636)
* refactor: rename table function to admin function

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* format

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2025-03-04 03:54:07 +00:00
dennis zhuang
6187fd975f feat: alias for boolean (#5639) 2025-03-04 03:12:10 +00:00
Ruihang Xia
6c90f25299 feat(log-query): implement compound filter and alias expr (#5596)
* refine alias behavior

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* implement compound

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* support gt, lt, and in

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2025-03-03 18:52:13 +00:00
Weny Xu
dc24c462dc fix: prevent failover of regions to the same peer (#5632) 2025-03-03 18:41:27 +00:00
shuiyisong
31f29d8a77 chore: support specifying skipping index in pipeline (#5635)
* chore: support setting skipping index in pipeline

* chore: fix typo key

* chore: add test

* chore: fix typo
2025-03-03 18:37:13 +00:00
Lei, HUANG
4a277c21ef fix: properly display CJK characters in table/column comments (#5633)
fix/comment-in-cjk:
 ### Update `OptionMap` Formatting and Add Tests

 - **Enhancements in `OptionMap`**:
   - Changed formatting from `escape_default` to `escape_debug` for better handling of special characters in `src/sql/src/statements/option_map.rs`.
   - Added unit tests to verify the new formatting behavior.

 - **Test Cases for CJK Comments**:
   - Added test cases for tables with comments in CJK (Chinese, Japanese, Korean) characters in `tests/cases/standalone/common/show/show_create.sql` and `show_create.result`.
2025-03-03 12:32:19 +00:00
Weny Xu
ca81fc6a70 fix: refactor region leader state validation (#5626)
* enhance: refactor region leader state validation

* chore: apply suggestions from CR

* chore: add logs
2025-03-03 10:07:25 +00:00
Zhenchi
e714f7df6c fix: out of bound during bloom search (#5625)
Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
2025-03-03 09:53:14 +00:00
Ruihang Xia
1c04ace4b0 feat: skip printing full config content in sqlness (#5618)
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2025-03-03 09:43:55 +00:00
Weny Xu
95d7ca5382 fix: increase timeout for opening candidate region and log elapsed time (#5627) 2025-03-03 09:16:45 +00:00
yihong
a693583a97 fix: speed up cargo build using sallow clone (#5620)
Signed-off-by: yihong0618 <zouzou0208@gmail.com>
2025-03-03 08:02:12 +00:00
dennis zhuang
87b1408d76 feat: impl topk and bottomk (#5602)
* feat: impl topk and bottomk

* chore: test and project fields

* refactor: prom_topk_bottomk_to_plan

* fix: order

* chore: adds topk plan test

* chore: comment

Co-authored-by: Yingwen <realevenyag@gmail.com>

---------

Co-authored-by: Yingwen <realevenyag@gmail.com>
2025-03-03 07:32:24 +00:00
LFC
dee76f0a73 refactor: simplify udf (#5617)
* refactor: simplify udf

* fix tests
2025-03-03 05:52:44 +00:00
yihong
11a4f54c49 fix: update typos rules to fix ci (#5621)
Signed-off-by: yihong0618 <zouzou0208@gmail.com>
2025-03-01 09:21:36 +00:00
Ruihang Xia
d363c8ee3c fix: check physical region before use (#5612)
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2025-02-28 06:46:48 +00:00
xiaoniaoyouhuajiang
50b521c526 feat: add vec_dim function (#5587)
* feat:add `vec_dim` function

* delete unused imports

* Modified to be implemented correctly

* fix comment

* add order for sqlness test
2025-02-27 15:54:48 +00:00
Ning Sun
c9d70e0e28 refactor: add pipeline concept to OTLP traces and remove OTLP over gRPC (#5605) 2025-02-27 14:01:45 +00:00
Weny Xu
c0c87652c3 chore: bump version to 0.13.0 (#5611)
chore: bump main branch version to 0.13.0
2025-02-27 13:19:59 +00:00
discord9
faaa0affd0 docs: tsbs update (#5608)
chore: tsbs update
2025-02-27 08:14:48 +00:00
531 changed files with 20520 additions and 13867 deletions

View File

@@ -3,3 +3,12 @@ linker = "aarch64-linux-gnu-gcc"
[alias]
sqlness = "run --bin sqlness-runner --"
[unstable.git]
shallow_index = true
shallow_deps = true
[unstable.gitoxide]
fetch = true
checkout = true
list_files = true
internal_use_git2 = false

2
.github/CODEOWNERS vendored
View File

@@ -4,7 +4,7 @@
* @GreptimeTeam/db-approver
## [Module] Database Engine
## [Module] Databse Engine
/src/index @zhongzc
/src/mito2 @evenyag @v0y4g3r @waynexia
/src/query @evenyag

View File

@@ -52,7 +52,7 @@ runs:
uses: ./.github/actions/build-greptime-binary
with:
base-image: ubuntu
features: servers/dashboard
features: servers/dashboard,pg_kvbackend,mysql_kvbackend
cargo-profile: ${{ inputs.cargo-profile }}
artifacts-dir: greptime-linux-${{ inputs.arch }}-${{ inputs.version }}
version: ${{ inputs.version }}
@@ -70,7 +70,7 @@ runs:
if: ${{ inputs.arch == 'amd64' && inputs.dev-mode == 'false' }} # Builds greptime for centos if the host machine is amd64.
with:
base-image: centos
features: servers/dashboard
features: servers/dashboard,pg_kvbackend,mysql_kvbackend
cargo-profile: ${{ inputs.cargo-profile }}
artifacts-dir: greptime-linux-${{ inputs.arch }}-centos-${{ inputs.version }}
version: ${{ inputs.version }}

View File

@@ -47,6 +47,7 @@ runs:
shell: pwsh
run: make test sqlness-test
env:
RUSTUP_WINDOWS_PATH_ADD_BIN: 1 # Workaround for https://github.com/nextest-rs/nextest/issues/1493
RUST_BACKTRACE: 1
SQLNESS_OPTS: "--preserve-state"

View File

@@ -64,11 +64,11 @@ inputs:
upload-max-retry-times:
description: Max retry times for uploading artifacts to S3
required: false
default: "30"
default: "20"
upload-retry-timeout:
description: Timeout for uploading artifacts to S3
required: false
default: "120" # minutes
default: "30" # minutes
runs:
using: composite
steps:

View File

@@ -8,15 +8,15 @@ inputs:
default: 2
description: "Number of Datanode replicas"
meta-replicas:
default: 2
default: 1
description: "Number of Metasrv replicas"
image-registry:
image-registry:
default: "docker.io"
description: "Image registry"
image-repository:
image-repository:
default: "greptime/greptimedb"
description: "Image repository"
image-tag:
image-tag:
default: "latest"
description: 'Image tag'
etcd-endpoints:
@@ -32,12 +32,12 @@ runs:
steps:
- name: Install GreptimeDB operator
uses: nick-fields/retry@v3
with:
with:
timeout_minutes: 3
max_attempts: 3
shell: bash
command: |
helm repo add greptime https://greptimeteam.github.io/helm-charts/
helm repo add greptime https://greptimeteam.github.io/helm-charts/
helm repo update
helm upgrade \
--install \
@@ -48,10 +48,10 @@ runs:
--wait-for-jobs
- name: Install GreptimeDB cluster
shell: bash
run: |
run: |
helm upgrade \
--install my-greptimedb \
--set meta.backendStorage.etcd.endpoints=${{ inputs.etcd-endpoints }} \
--set meta.etcdEndpoints=${{ inputs.etcd-endpoints }} \
--set meta.enableRegionFailover=${{ inputs.enable-region-failover }} \
--set image.registry=${{ inputs.image-registry }} \
--set image.repository=${{ inputs.image-repository }} \
@@ -59,7 +59,7 @@ runs:
--set base.podTemplate.main.resources.requests.cpu=50m \
--set base.podTemplate.main.resources.requests.memory=256Mi \
--set base.podTemplate.main.resources.limits.cpu=2000m \
--set base.podTemplate.main.resources.limits.memory=3Gi \
--set base.podTemplate.main.resources.limits.memory=2Gi \
--set frontend.replicas=${{ inputs.frontend-replicas }} \
--set datanode.replicas=${{ inputs.datanode-replicas }} \
--set meta.replicas=${{ inputs.meta-replicas }} \
@@ -72,7 +72,7 @@ runs:
- name: Wait for GreptimeDB
shell: bash
run: |
while true; do
while true; do
PHASE=$(kubectl -n my-greptimedb get gtc my-greptimedb -o jsonpath='{.status.clusterPhase}')
if [ "$PHASE" == "Running" ]; then
echo "Cluster is ready"
@@ -86,10 +86,10 @@ runs:
- name: Print GreptimeDB info
if: always()
shell: bash
run: |
run: |
kubectl get all --show-labels -n my-greptimedb
- name: Describe Nodes
if: always()
shell: bash
run: |
run: |
kubectl describe nodes

View File

@@ -2,14 +2,13 @@ meta:
configData: |-
[runtime]
global_rt_size = 4
[wal]
provider = "kafka"
broker_endpoints = ["kafka.kafka-cluster.svc.cluster.local:9092"]
num_topics = 3
auto_prune_interval = "30s"
trigger_flush_threshold = 100
[datanode]
[datanode.client]
timeout = "120s"
@@ -22,7 +21,7 @@ datanode:
[wal]
provider = "kafka"
broker_endpoints = ["kafka.kafka-cluster.svc.cluster.local:9092"]
overwrite_entry_start_id = true
linger = "2ms"
frontend:
configData: |-
[runtime]

15
.github/labeler.yaml vendored
View File

@@ -1,15 +0,0 @@
ci:
- changed-files:
- any-glob-to-any-file: .github/**
docker:
- changed-files:
- any-glob-to-any-file: docker/**
documentation:
- changed-files:
- any-glob-to-any-file: docs/**
dashboard:
- changed-files:
- any-glob-to-any-file: grafana/**

View File

@@ -1,42 +0,0 @@
#!/bin/bash
# Get current version
CURRENT_VERSION=$1
if [ -z "$CURRENT_VERSION" ]; then
echo "Error: Failed to get current version"
exit 1
fi
# Get the latest version from GitHub Releases
API_RESPONSE=$(curl -s "https://api.github.com/repos/GreptimeTeam/greptimedb/releases/latest")
if [ -z "$API_RESPONSE" ] || [ "$(echo "$API_RESPONSE" | jq -r '.message')" = "Not Found" ]; then
echo "Error: Failed to fetch latest version from GitHub"
exit 1
fi
# Get the latest version
LATEST_VERSION=$(echo "$API_RESPONSE" | jq -r '.tag_name')
if [ -z "$LATEST_VERSION" ] || [ "$LATEST_VERSION" = "null" ]; then
echo "Error: No valid version found in GitHub releases"
exit 1
fi
# Cleaned up version number format (removed possible 'v' prefix and -nightly suffix)
CLEAN_CURRENT=$(echo "$CURRENT_VERSION" | sed 's/^v//' | sed 's/-nightly-.*//')
CLEAN_LATEST=$(echo "$LATEST_VERSION" | sed 's/^v//' | sed 's/-nightly-.*//')
echo "Current version: $CLEAN_CURRENT"
echo "Latest release version: $CLEAN_LATEST"
# Use sort -V to compare versions
HIGHER_VERSION=$(printf "%s\n%s" "$CLEAN_CURRENT" "$CLEAN_LATEST" | sort -V | tail -n1)
if [ "$HIGHER_VERSION" = "$CLEAN_CURRENT" ]; then
echo "Current version ($CLEAN_CURRENT) is NEWER than or EQUAL to latest ($CLEAN_LATEST)"
echo "should-push-latest-tag=true" >> $GITHUB_OUTPUT
else
echo "Current version ($CLEAN_CURRENT) is OLDER than latest ($CLEAN_LATEST)"
echo "should-push-latest-tag=false" >> $GITHUB_OUTPUT
fi

View File

@@ -8,25 +8,24 @@ set -e
# - If it's a nightly build, the version is 'nightly-YYYYMMDD-$(git rev-parse --short HEAD)', like 'nightly-20230712-e5b243c'.
# create_version ${GIHUB_EVENT_NAME} ${NEXT_RELEASE_VERSION} ${NIGHTLY_RELEASE_PREFIX}
function create_version() {
# Read from environment variables.
# Read from envrionment variables.
if [ -z "$GITHUB_EVENT_NAME" ]; then
echo "GITHUB_EVENT_NAME is empty" >&2
echo "GITHUB_EVENT_NAME is empty"
exit 1
fi
if [ -z "$NEXT_RELEASE_VERSION" ]; then
echo "NEXT_RELEASE_VERSION is empty, use version from Cargo.toml" >&2
# NOTE: Need a `v` prefix for the version string.
export NEXT_RELEASE_VERSION=v$(grep '^version = ' Cargo.toml | cut -d '"' -f 2 | head -n 1)
echo "NEXT_RELEASE_VERSION is empty"
exit 1
fi
if [ -z "$NIGHTLY_RELEASE_PREFIX" ]; then
echo "NIGHTLY_RELEASE_PREFIX is empty" >&2
echo "NIGHTLY_RELEASE_PREFIX is empty"
exit 1
fi
# Reuse $NEXT_RELEASE_VERSION to identify whether it's a nightly build.
# It will be like 'nightly-20230808-7d0d8dc6'.
# It will be like 'nigtly-20230808-7d0d8dc6'.
if [ "$NEXT_RELEASE_VERSION" = nightly ]; then
echo "$NIGHTLY_RELEASE_PREFIX-$(date "+%Y%m%d")-$(git rev-parse --short HEAD)"
exit 0
@@ -36,7 +35,7 @@ function create_version() {
# It will be like 'dev-2023080819-f0e7216c'.
if [ "$NEXT_RELEASE_VERSION" = dev ]; then
if [ -z "$COMMIT_SHA" ]; then
echo "COMMIT_SHA is empty in dev build" >&2
echo "COMMIT_SHA is empty in dev build"
exit 1
fi
echo "dev-$(date "+%Y%m%d-%s")-$(echo "$COMMIT_SHA" | cut -c1-8)"
@@ -46,7 +45,7 @@ function create_version() {
# Note: Only output 'version=xxx' to stdout when everything is ok, so that it can be used in GitHub Actions Outputs.
if [ "$GITHUB_EVENT_NAME" = push ]; then
if [ -z "$GITHUB_REF_NAME" ]; then
echo "GITHUB_REF_NAME is empty in push event" >&2
echo "GITHUB_REF_NAME is empty in push event"
exit 1
fi
echo "$GITHUB_REF_NAME"
@@ -55,15 +54,15 @@ function create_version() {
elif [ "$GITHUB_EVENT_NAME" = schedule ]; then
echo "$NEXT_RELEASE_VERSION-$NIGHTLY_RELEASE_PREFIX-$(date "+%Y%m%d")"
else
echo "Unsupported GITHUB_EVENT_NAME: $GITHUB_EVENT_NAME" >&2
echo "Unsupported GITHUB_EVENT_NAME: $GITHUB_EVENT_NAME"
exit 1
fi
}
# You can run as following examples:
# GITHUB_EVENT_NAME=push NEXT_RELEASE_VERSION=v0.4.0 NIGHTLY_RELEASE_PREFIX=nightly GITHUB_REF_NAME=v0.3.0 ./create-version.sh
# GITHUB_EVENT_NAME=workflow_dispatch NEXT_RELEASE_VERSION=v0.4.0 NIGHTLY_RELEASE_PREFIX=nightly ./create-version.sh
# GITHUB_EVENT_NAME=schedule NEXT_RELEASE_VERSION=v0.4.0 NIGHTLY_RELEASE_PREFIX=nightly ./create-version.sh
# GITHUB_EVENT_NAME=schedule NEXT_RELEASE_VERSION=nightly NIGHTLY_RELEASE_PREFIX=nightly ./create-version.sh
# GITHUB_EVENT_NAME=workflow_dispatch COMMIT_SHA=f0e7216c4bb6acce9b29a21ec2d683be2e3f984a NEXT_RELEASE_VERSION=dev NIGHTLY_RELEASE_PREFIX=nightly ./create-version.sh
# GITHUB_EVENT_NAME=push NEXT_RELEASE_VERSION=v0.4.0 NIGHTLY_RELEASE_PREFIX=nigtly GITHUB_REF_NAME=v0.3.0 ./create-version.sh
# GITHUB_EVENT_NAME=workflow_dispatch NEXT_RELEASE_VERSION=v0.4.0 NIGHTLY_RELEASE_PREFIX=nigtly ./create-version.sh
# GITHUB_EVENT_NAME=schedule NEXT_RELEASE_VERSION=v0.4.0 NIGHTLY_RELEASE_PREFIX=nigtly ./create-version.sh
# GITHUB_EVENT_NAME=schedule NEXT_RELEASE_VERSION=nightly NIGHTLY_RELEASE_PREFIX=nigtly ./create-version.sh
# GITHUB_EVENT_NAME=workflow_dispatch COMMIT_SHA=f0e7216c4bb6acce9b29a21ec2d683be2e3f984a NEXT_RELEASE_VERSION=dev NIGHTLY_RELEASE_PREFIX=nigtly ./create-version.sh
create_version

View File

@@ -10,7 +10,7 @@ GREPTIMEDB_IMAGE_TAG=${GREPTIMEDB_IMAGE_TAG:-latest}
ETCD_CHART="oci://registry-1.docker.io/bitnamicharts/etcd"
GREPTIME_CHART="https://greptimeteam.github.io/helm-charts/"
# Create a cluster with 1 control-plane node and 5 workers.
# Ceate a cluster with 1 control-plane node and 5 workers.
function create_kind_cluster() {
cat <<EOF | kind create cluster --name "${CLUSTER}" --image kindest/node:"$KUBERNETES_VERSION" --config=-
kind: Cluster
@@ -68,7 +68,7 @@ function deploy_greptimedb_cluster() {
helm install "$cluster_name" greptime/greptimedb-cluster \
--set image.tag="$GREPTIMEDB_IMAGE_TAG" \
--set meta.backendStorage.etcd.endpoints="etcd.$install_namespace:2379" \
--set meta.etcdEndpoints="etcd.$install_namespace:2379" \
-n "$install_namespace"
# Wait for greptimedb cluster to be ready.
@@ -103,7 +103,7 @@ function deploy_greptimedb_cluster_with_s3_storage() {
helm install "$cluster_name" greptime/greptimedb-cluster -n "$install_namespace" \
--set image.tag="$GREPTIMEDB_IMAGE_TAG" \
--set meta.backendStorage.etcd.endpoints="etcd.$install_namespace:2379" \
--set meta.etcdEndpoints="etcd.$install_namespace:2379" \
--set storage.s3.bucket="$AWS_CI_TEST_BUCKET" \
--set storage.s3.region="$AWS_REGION" \
--set storage.s3.root="$DATA_ROOT" \

View File

@@ -1,37 +0,0 @@
#!/bin/bash
DEV_BUILDER_IMAGE_TAG=$1
update_dev_builder_version() {
if [ -z "$DEV_BUILDER_IMAGE_TAG" ]; then
echo "Error: Should specify the dev-builder image tag"
exit 1
fi
# Configure Git configs.
git config --global user.email greptimedb-ci@greptime.com
git config --global user.name greptimedb-ci
# Checkout a new branch.
BRANCH_NAME="ci/update-dev-builder-$(date +%Y%m%d%H%M%S)"
git checkout -b $BRANCH_NAME
# Update the dev-builder image tag in the Makefile.
sed -i "s/DEV_BUILDER_IMAGE_TAG ?=.*/DEV_BUILDER_IMAGE_TAG ?= ${DEV_BUILDER_IMAGE_TAG}/g" Makefile
# Commit the changes.
git add Makefile
git commit -m "ci: update dev-builder image tag"
git push origin $BRANCH_NAME
# Create a Pull Request.
gh pr create \
--title "ci: update dev-builder image tag" \
--body "This PR updates the dev-builder image tag" \
--base main \
--head $BRANCH_NAME \
--reviewer zyy17 \
--reviewer daviderli614
}
update_dev_builder_version

View File

@@ -1,46 +0,0 @@
#!/bin/bash
set -e
VERSION=${VERSION}
GITHUB_TOKEN=${GITHUB_TOKEN}
update_helm_charts_version() {
# Configure Git configs.
git config --global user.email update-helm-charts-version@greptime.com
git config --global user.name update-helm-charts-version
# Clone helm-charts repository.
git clone "https://x-access-token:${GITHUB_TOKEN}@github.com/GreptimeTeam/helm-charts.git"
cd helm-charts
# Set default remote for gh CLI
gh repo set-default GreptimeTeam/helm-charts
# Checkout a new branch.
BRANCH_NAME="chore/greptimedb-${VERSION}"
git checkout -b $BRANCH_NAME
# Update version.
make update-version CHART=greptimedb-cluster VERSION=${VERSION}
make update-version CHART=greptimedb-standalone VERSION=${VERSION}
# Update docs.
make docs
# Commit the changes.
git add .
git commit -s -m "chore: Update GreptimeDB version to ${VERSION}"
git push origin $BRANCH_NAME
# Create a Pull Request.
gh pr create \
--title "chore: Update GreptimeDB version to ${VERSION}" \
--body "This PR updates the GreptimeDB version." \
--base main \
--head $BRANCH_NAME \
--reviewer zyy17 \
--reviewer daviderli614
}
update_helm_charts_version

View File

@@ -1,42 +0,0 @@
#!/bin/bash
set -e
VERSION=${VERSION}
GITHUB_TOKEN=${GITHUB_TOKEN}
update_homebrew_greptime_version() {
# Configure Git configs.
git config --global user.email update-greptime-version@greptime.com
git config --global user.name update-greptime-version
# Clone helm-charts repository.
git clone "https://x-access-token:${GITHUB_TOKEN}@github.com/GreptimeTeam/homebrew-greptime.git"
cd homebrew-greptime
# Set default remote for gh CLI
gh repo set-default GreptimeTeam/homebrew-greptime
# Checkout a new branch.
BRANCH_NAME="chore/greptimedb-${VERSION}"
git checkout -b $BRANCH_NAME
# Update version.
make update-greptime-version VERSION=${VERSION}
# Commit the changes.
git add .
git commit -s -m "chore: Update GreptimeDB version to ${VERSION}"
git push origin $BRANCH_NAME
# Create a Pull Request.
gh pr create \
--title "chore: Update GreptimeDB version to ${VERSION}" \
--body "This PR updates the GreptimeDB version." \
--base main \
--head $BRANCH_NAME \
--reviewer zyy17 \
--reviewer daviderli614
}
update_homebrew_greptime_version

View File

@@ -41,7 +41,7 @@ function upload_artifacts() {
# Updates the latest version information in AWS S3 if UPDATE_VERSION_INFO is true.
function update_version_info() {
if [ "$UPDATE_VERSION_INFO" == "true" ]; then
# If it's the official release(like v1.0.0, v1.0.1, v1.0.2, etc.), update latest-version.txt.
# If it's the officail release(like v1.0.0, v1.0.1, v1.0.2, etc.), update latest-version.txt.
if [[ "$VERSION" =~ ^v[0-9]+\.[0-9]+\.[0-9]+$ ]]; then
echo "Updating latest-version.txt"
echo "$VERSION" > latest-version.txt

View File

@@ -55,11 +55,6 @@ on:
description: Build and push images to DockerHub and ACR
required: false
default: true
upload_artifacts_to_s3:
type: boolean
description: Whether upload artifacts to s3
required: false
default: false
cargo_profile:
type: choice
description: The cargo profile to use in building GreptimeDB.
@@ -243,7 +238,7 @@ jobs:
version: ${{ needs.allocate-runners.outputs.version }}
push-latest-tag: false # Don't push the latest tag to registry.
dev-mode: true # Only build the standard images.
- name: Echo Docker image tag to step summary
run: |
echo "## Docker Image Tag" >> $GITHUB_STEP_SUMMARY
@@ -286,7 +281,7 @@ jobs:
aws-cn-access-key-id: ${{ secrets.AWS_CN_ACCESS_KEY_ID }}
aws-cn-secret-access-key: ${{ secrets.AWS_CN_SECRET_ACCESS_KEY }}
aws-cn-region: ${{ vars.AWS_RELEASE_BUCKET_REGION }}
upload-to-s3: ${{ inputs.upload_artifacts_to_s3 }}
upload-to-s3: false
dev-mode: true # Only build the standard images(exclude centos images).
push-latest-tag: false # Don't push the latest tag to registry.
update-version-info: false # Don't update the version info in S3.

View File

@@ -22,7 +22,6 @@ concurrency:
jobs:
check-typos-and-docs:
if: ${{ github.repository == 'GreptimeTeam/greptimedb' }}
name: Check typos and docs
runs-on: ubuntu-latest
steps:
@@ -37,7 +36,6 @@ jobs:
|| (echo "'config/config.md' is not up-to-date, please run 'make config-docs'." && exit 1)
license-header-check:
if: ${{ github.repository == 'GreptimeTeam/greptimedb' }}
runs-on: ubuntu-latest
name: Check License Header
steps:
@@ -47,7 +45,6 @@ jobs:
- uses: korandoru/hawkeye@v5
check:
if: ${{ github.repository == 'GreptimeTeam/greptimedb' }}
name: Check
runs-on: ${{ matrix.os }}
strategy:
@@ -74,7 +71,6 @@ jobs:
run: cargo check --locked --workspace --all-targets
toml:
if: ${{ github.repository == 'GreptimeTeam/greptimedb' }}
name: Toml Check
runs-on: ubuntu-latest
timeout-minutes: 60
@@ -89,7 +85,6 @@ jobs:
run: taplo format --check
build:
if: ${{ github.repository == 'GreptimeTeam/greptimedb' }}
name: Build GreptimeDB binaries
runs-on: ${{ matrix.os }}
strategy:
@@ -116,7 +111,7 @@ jobs:
- name: Build greptime binaries
shell: bash
# `cargo gc` will invoke `cargo build` with specified args
run: cargo gc -- --bin greptime --bin sqlness-runner --features pg_kvbackend
run: cargo gc -- --bin greptime --bin sqlness-runner --features "pg_kvbackend,mysql_kvbackend"
- name: Pack greptime binaries
shell: bash
run: |
@@ -132,7 +127,6 @@ jobs:
version: current
fuzztest:
if: ${{ github.repository == 'GreptimeTeam/greptimedb' }}
name: Fuzz Test
needs: build
runs-on: ubuntu-latest
@@ -189,13 +183,11 @@ jobs:
max-total-time: 120
unstable-fuzztest:
if: ${{ github.repository == 'GreptimeTeam/greptimedb' }}
name: Unstable Fuzz Test
needs: build-greptime-ci
runs-on: ubuntu-latest
timeout-minutes: 60
strategy:
fail-fast: false
matrix:
target: [ "unstable_fuzz_create_table_standalone" ]
steps:
@@ -223,12 +215,12 @@ jobs:
run: |
sudo apt update && sudo apt install -y libfuzzer-14-dev
cargo install cargo-fuzz cargo-gc-bin --force
- name: Download pre-built binary
- name: Download pre-built binariy
uses: actions/download-artifact@v4
with:
name: bin
path: .
- name: Unzip binary
- name: Unzip bianry
run: |
tar -xvf ./bin.tar.gz
rm ./bin.tar.gz
@@ -250,14 +242,8 @@ jobs:
name: unstable-fuzz-logs
path: /tmp/unstable-greptime/
retention-days: 3
- name: Describe pods
if: failure()
shell: bash
run: |
kubectl describe pod -n my-greptimedb
build-greptime-ci:
if: ${{ github.repository == 'GreptimeTeam/greptimedb' }}
name: Build GreptimeDB binary (profile-CI)
runs-on: ${{ matrix.os }}
strategy:
@@ -281,10 +267,10 @@ jobs:
- name: Install cargo-gc-bin
shell: bash
run: cargo install cargo-gc-bin --force
- name: Build greptime binary
- name: Build greptime bianry
shell: bash
# `cargo gc` will invoke `cargo build` with specified args
run: cargo gc --profile ci -- --bin greptime --features pg_kvbackend
run: cargo gc --profile ci -- --bin greptime --features "pg_kvbackend,mysql_kvbackend"
- name: Pack greptime binary
shell: bash
run: |
@@ -299,13 +285,11 @@ jobs:
version: current
distributed-fuzztest:
if: ${{ github.repository == 'GreptimeTeam/greptimedb' }}
name: Fuzz Test (Distributed, ${{ matrix.mode.name }}, ${{ matrix.target }})
runs-on: ubuntu-latest
needs: build-greptime-ci
timeout-minutes: 60
strategy:
fail-fast: false
matrix:
target: [ "fuzz_create_table", "fuzz_alter_table", "fuzz_create_database", "fuzz_create_logical_table", "fuzz_alter_logical_table", "fuzz_insert", "fuzz_insert_logical_table" ]
mode:
@@ -335,9 +319,9 @@ jobs:
name: Setup Minio
uses: ./.github/actions/setup-minio
- if: matrix.mode.kafka
name: Setup Kafka cluster
name: Setup Kafka cluser
uses: ./.github/actions/setup-kafka-cluster
- name: Setup Etcd cluster
- name: Setup Etcd cluser
uses: ./.github/actions/setup-etcd-cluster
# Prepares for fuzz tests
- uses: arduino/setup-protoc@v3
@@ -410,11 +394,6 @@ jobs:
shell: bash
run: |
kubectl describe nodes
- name: Describe pod
if: failure()
shell: bash
run: |
kubectl describe pod -n my-greptimedb
- name: Export kind logs
if: failure()
shell: bash
@@ -437,13 +416,11 @@ jobs:
docker system prune -f
distributed-fuzztest-with-chaos:
if: ${{ github.repository == 'GreptimeTeam/greptimedb' }}
name: Fuzz Test with Chaos (Distributed, ${{ matrix.mode.name }}, ${{ matrix.target }})
runs-on: ubuntu-latest
needs: build-greptime-ci
timeout-minutes: 60
strategy:
fail-fast: false
matrix:
target: ["fuzz_migrate_mito_regions", "fuzz_migrate_metric_regions", "fuzz_failover_mito_regions", "fuzz_failover_metric_regions"]
mode:
@@ -488,9 +465,9 @@ jobs:
name: Setup Minio
uses: ./.github/actions/setup-minio
- if: matrix.mode.kafka
name: Setup Kafka cluster
name: Setup Kafka cluser
uses: ./.github/actions/setup-kafka-cluster
- name: Setup Etcd cluster
- name: Setup Etcd cluser
uses: ./.github/actions/setup-etcd-cluster
# Prepares for fuzz tests
- uses: arduino/setup-protoc@v3
@@ -564,11 +541,6 @@ jobs:
shell: bash
run: |
kubectl describe nodes
- name: Describe pods
if: failure()
shell: bash
run: |
kubectl describe pod -n my-greptimedb
- name: Export kind logs
if: failure()
shell: bash
@@ -591,12 +563,10 @@ jobs:
docker system prune -f
sqlness:
if: ${{ github.repository == 'GreptimeTeam/greptimedb' }}
name: Sqlness Test (${{ matrix.mode.name }})
needs: build
runs-on: ${{ matrix.os }}
strategy:
fail-fast: false
matrix:
os: [ ubuntu-latest ]
mode:
@@ -606,7 +576,7 @@ jobs:
- name: "Remote WAL"
opts: "-w kafka -k 127.0.0.1:9092"
kafka: true
- name: "PostgreSQL KvBackend"
- name: "Pg Kvbackend"
opts: "--setup-pg"
kafka: false
timeout-minutes: 60
@@ -636,7 +606,6 @@ jobs:
retention-days: 3
fmt:
if: ${{ github.repository == 'GreptimeTeam/greptimedb' }}
name: Rustfmt
runs-on: ubuntu-latest
timeout-minutes: 60
@@ -654,7 +623,6 @@ jobs:
run: make fmt-check
clippy:
if: ${{ github.repository == 'GreptimeTeam/greptimedb' }}
name: Clippy
runs-on: ubuntu-latest
timeout-minutes: 60
@@ -680,7 +648,6 @@ jobs:
run: make clippy
conflict-check:
if: ${{ github.repository == 'GreptimeTeam/greptimedb' }}
name: Check for conflict
runs-on: ubuntu-latest
steps:
@@ -691,7 +658,7 @@ jobs:
uses: olivernybroe/action-conflict-finder@v4.0
test:
if: ${{ github.repository == 'GreptimeTeam/greptimedb' && github.event_name != 'merge_group' }}
if: github.event_name != 'merge_group'
runs-on: ubuntu-22.04-arm
timeout-minutes: 60
needs: [conflict-check, clippy, fmt]
@@ -706,7 +673,7 @@ jobs:
- name: Install toolchain
uses: actions-rust-lang/setup-rust-toolchain@v1
with:
cache: false
cache: false
- name: Rust Cache
uses: Swatinem/rust-cache@v2
with:
@@ -720,7 +687,7 @@ jobs:
working-directory: tests-integration/fixtures
run: docker compose up -d --wait
- name: Run nextest cases
run: cargo nextest run --workspace -F dashboard -F pg_kvbackend
run: cargo nextest run --workspace -F dashboard -F pg_kvbackend -F mysql_kvbackend
env:
CARGO_BUILD_RUSTFLAGS: "-C link-arg=-fuse-ld=mold"
RUST_BACKTRACE: 1
@@ -743,7 +710,7 @@ jobs:
UNITTEST_LOG_DIR: "__unittest_logs"
coverage:
if: ${{ github.repository == 'GreptimeTeam/greptimedb' && github.event_name == 'merge_group' }}
if: github.event_name == 'merge_group'
runs-on: ubuntu-22.04-8-cores
timeout-minutes: 60
steps:
@@ -773,7 +740,7 @@ jobs:
working-directory: tests-integration/fixtures
run: docker compose up -d --wait
- name: Run nextest cases
run: cargo llvm-cov nextest --workspace --lcov --output-path lcov.info -F dashboard -F pg_kvbackend
run: cargo llvm-cov nextest --workspace --lcov --output-path lcov.info -F dashboard -F pg_kvbackend -F mysql_kvbackend
env:
CARGO_BUILD_RUSTFLAGS: "-C link-arg=-fuse-ld=mold"
RUST_BACKTRACE: 1
@@ -803,7 +770,6 @@ jobs:
verbose: true
# compat:
# if: ${{ github.repository == 'GreptimeTeam/greptimedb' }}
# name: Compatibility Test
# needs: build
# runs-on: ubuntu-22.04

52
.github/workflows/grafana.yml vendored Normal file
View File

@@ -0,0 +1,52 @@
name: Check Grafana Panels
on:
pull_request:
branches:
- main
paths:
- 'grafana/**' # Trigger only when files under the grafana/ directory change
jobs:
check-panels:
runs-on: ubuntu-latest
steps:
# Check out the repository
- name: Checkout repository
uses: actions/checkout@v4
# Install jq (required for the script)
- name: Install jq
run: sudo apt-get install -y jq
# Make the check.sh script executable
- name: Make check.sh executable
run: chmod +x grafana/check.sh
# Run the check.sh script
- name: Run check.sh
run: ./grafana/check.sh
# Only run summary.sh for pull_request events (not for merge queues or final pushes)
- name: Check if this is a pull request
id: check-pr
run: |
if [[ "${{ github.event_name }}" == "pull_request" ]]; then
echo "is_pull_request=true" >> $GITHUB_OUTPUT
else
echo "is_pull_request=false" >> $GITHUB_OUTPUT
fi
# Make the summary.sh script executable
- name: Make summary.sh executable
if: steps.check-pr.outputs.is_pull_request == 'true'
run: chmod +x grafana/summary.sh
# Run the summary.sh script and add its output to the GitHub Job Summary
- name: Run summary.sh and add to Job Summary
if: steps.check-pr.outputs.is_pull_request == 'true'
run: |
SUMMARY=$(./grafana/summary.sh)
echo "### Summary of Grafana Panels" >> $GITHUB_STEP_SUMMARY
echo "$SUMMARY" >> $GITHUB_STEP_SUMMARY

View File

@@ -107,6 +107,7 @@ jobs:
CARGO_BUILD_RUSTFLAGS: "-C linker=lld-link"
RUST_BACKTRACE: 1
CARGO_INCREMENTAL: 0
RUSTUP_WINDOWS_PATH_ADD_BIN: 1 # Workaround for https://github.com/nextest-rs/nextest/issues/1493
GT_S3_BUCKET: ${{ vars.AWS_CI_TEST_BUCKET }}
GT_S3_ACCESS_KEY_ID: ${{ secrets.AWS_CI_TEST_ACCESS_KEY_ID }}
GT_S3_ACCESS_KEY: ${{ secrets.AWS_CI_TEST_SECRET_ACCESS_KEY }}
@@ -117,16 +118,16 @@ jobs:
name: Run clean build on Linux
runs-on: ubuntu-latest
if: ${{ github.repository == 'GreptimeTeam/greptimedb' }}
timeout-minutes: 45
timeout-minutes: 60
steps:
- uses: actions/checkout@v4
with:
fetch-depth: 0
persist-credentials: false
- uses: cachix/install-nix-action@v31
- run: nix develop --command cargo check --bin greptime
env:
CARGO_BUILD_RUSTFLAGS: "-C link-arg=-fuse-ld=mold"
- uses: cachix/install-nix-action@v27
with:
nix_path: nixpkgs=channel:nixos-24.11
- run: nix develop --command cargo build
check-status:
name: Check status

View File

@@ -1,42 +0,0 @@
name: 'PR Labeling'
on:
pull_request_target:
types:
- opened
- synchronize
- reopened
permissions:
contents: read
pull-requests: write
issues: write
jobs:
labeler:
runs-on: ubuntu-latest
steps:
- name: Checkout sources
uses: actions/checkout@v4
- uses: actions/labeler@v5
with:
configuration-path: ".github/labeler.yaml"
repo-token: "${{ secrets.GITHUB_TOKEN }}"
size-label:
runs-on: ubuntu-latest
steps:
- uses: pascalgn/size-label-action@v0.5.5
env:
GITHUB_TOKEN: "${{ secrets.GITHUB_TOKEN }}"
with:
sizes: >
{
"0": "XS",
"100": "S",
"300": "M",
"1000": "L",
"1500": "XL",
"2000": "XXL"
}

View File

@@ -24,19 +24,11 @@ on:
description: Release dev-builder-android image
required: false
default: false
update_dev_builder_image_tag:
type: boolean
description: Update the DEV_BUILDER_IMAGE_TAG in Makefile and create a PR
required: false
default: false
jobs:
release-dev-builder-images:
name: Release dev builder images
# The jobs are triggered by the following events:
# 1. Manually triggered workflow_dispatch event
# 2. Push event when the PR that modifies the `rust-toolchain.toml` or `docker/dev-builder/**` is merged to main
if: ${{ github.event_name == 'push' || inputs.release_dev_builder_ubuntu_image || inputs.release_dev_builder_centos_image || inputs.release_dev_builder_android_image }}
if: ${{ inputs.release_dev_builder_ubuntu_image || inputs.release_dev_builder_centos_image || inputs.release_dev_builder_android_image }} # Only manually trigger this job.
runs-on: ubuntu-latest
outputs:
version: ${{ steps.set-version.outputs.version }}
@@ -65,9 +57,9 @@ jobs:
version: ${{ env.VERSION }}
dockerhub-image-registry-username: ${{ secrets.DOCKERHUB_USERNAME }}
dockerhub-image-registry-token: ${{ secrets.DOCKERHUB_TOKEN }}
build-dev-builder-ubuntu: ${{ inputs.release_dev_builder_ubuntu_image || github.event_name == 'push' }}
build-dev-builder-centos: ${{ inputs.release_dev_builder_centos_image || github.event_name == 'push' }}
build-dev-builder-android: ${{ inputs.release_dev_builder_android_image || github.event_name == 'push' }}
build-dev-builder-ubuntu: ${{ inputs.release_dev_builder_ubuntu_image }}
build-dev-builder-centos: ${{ inputs.release_dev_builder_centos_image }}
build-dev-builder-android: ${{ inputs.release_dev_builder_android_image }}
release-dev-builder-images-ecr:
name: Release dev builder images to AWS ECR
@@ -93,7 +85,7 @@ jobs:
- name: Push dev-builder-ubuntu image
shell: bash
if: ${{ inputs.release_dev_builder_ubuntu_image || github.event_name == 'push' }}
if: ${{ inputs.release_dev_builder_ubuntu_image }}
env:
IMAGE_VERSION: ${{ needs.release-dev-builder-images.outputs.version }}
IMAGE_NAMESPACE: ${{ vars.IMAGE_NAMESPACE }}
@@ -114,7 +106,7 @@ jobs:
- name: Push dev-builder-centos image
shell: bash
if: ${{ inputs.release_dev_builder_centos_image || github.event_name == 'push' }}
if: ${{ inputs.release_dev_builder_centos_image }}
env:
IMAGE_VERSION: ${{ needs.release-dev-builder-images.outputs.version }}
IMAGE_NAMESPACE: ${{ vars.IMAGE_NAMESPACE }}
@@ -135,7 +127,7 @@ jobs:
- name: Push dev-builder-android image
shell: bash
if: ${{ inputs.release_dev_builder_android_image || github.event_name == 'push' }}
if: ${{ inputs.release_dev_builder_android_image }}
env:
IMAGE_VERSION: ${{ needs.release-dev-builder-images.outputs.version }}
IMAGE_NAMESPACE: ${{ vars.IMAGE_NAMESPACE }}
@@ -170,7 +162,7 @@ jobs:
- name: Push dev-builder-ubuntu image
shell: bash
if: ${{ inputs.release_dev_builder_ubuntu_image || github.event_name == 'push' }}
if: ${{ inputs.release_dev_builder_ubuntu_image }}
env:
IMAGE_VERSION: ${{ needs.release-dev-builder-images.outputs.version }}
IMAGE_NAMESPACE: ${{ vars.IMAGE_NAMESPACE }}
@@ -184,7 +176,7 @@ jobs:
- name: Push dev-builder-centos image
shell: bash
if: ${{ inputs.release_dev_builder_centos_image || github.event_name == 'push' }}
if: ${{ inputs.release_dev_builder_centos_image }}
env:
IMAGE_VERSION: ${{ needs.release-dev-builder-images.outputs.version }}
IMAGE_NAMESPACE: ${{ vars.IMAGE_NAMESPACE }}
@@ -198,7 +190,7 @@ jobs:
- name: Push dev-builder-android image
shell: bash
if: ${{ inputs.release_dev_builder_android_image || github.event_name == 'push' }}
if: ${{ inputs.release_dev_builder_android_image }}
env:
IMAGE_VERSION: ${{ needs.release-dev-builder-images.outputs.version }}
IMAGE_NAMESPACE: ${{ vars.IMAGE_NAMESPACE }}
@@ -209,24 +201,3 @@ jobs:
quay.io/skopeo/stable:latest \
copy -a docker://docker.io/$IMAGE_NAMESPACE/dev-builder-android:$IMAGE_VERSION \
docker://$ACR_IMAGE_REGISTRY/$IMAGE_NAMESPACE/dev-builder-android:$IMAGE_VERSION
update-dev-builder-image-tag:
name: Update dev-builder image tag
runs-on: ubuntu-latest
permissions:
contents: write
pull-requests: write
if: ${{ github.event_name == 'push' || inputs.update_dev_builder_image_tag }}
needs: [
release-dev-builder-images
]
steps:
- name: Checkout repository
uses: actions/checkout@v4
- name: Update dev-builder image tag
shell: bash
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
run: |
./.github/scripts/update-dev-builder-version.sh ${{ needs.release-dev-builder-images.outputs.version }}

View File

@@ -88,8 +88,10 @@ env:
# Controls whether to run tests, include unit-test, integration-test and sqlness.
DISABLE_RUN_TESTS: ${{ inputs.skip_test || vars.DEFAULT_SKIP_TEST }}
# The scheduled version is '${{ env.NEXT_RELEASE_VERSION }}-nightly-YYYYMMDD', like v0.2.0-nightly-20230313;
# The scheduled version is '${{ env.NEXT_RELEASE_VERSION }}-nightly-YYYYMMDD', like v0.2.0-nigthly-20230313;
NIGHTLY_RELEASE_PREFIX: nightly
# Note: The NEXT_RELEASE_VERSION should be modified manually by every formal release.
NEXT_RELEASE_VERSION: v0.14.0
jobs:
allocate-runners:
@@ -110,8 +112,6 @@ jobs:
# The 'version' use as the global tag name of the release workflow.
version: ${{ steps.create-version.outputs.version }}
should-push-latest-tag: ${{ steps.check-version.outputs.should-push-latest-tag }}
steps:
- name: Checkout
uses: actions/checkout@v4
@@ -126,7 +126,7 @@ jobs:
# The create-version will create a global variable named 'version' in the global workflows.
# - If it's a tag push release, the version is the tag name(${{ github.ref_name }});
# - If it's a scheduled release, the version is '${{ env.NEXT_RELEASE_VERSION }}-nightly-$buildTime', like v0.2.0-nightly-20230313;
# - If it's a scheduled release, the version is '${{ env.NEXT_RELEASE_VERSION }}-nightly-$buildTime', like v0.2.0-nigthly-20230313;
# - If it's a manual release, the version is '${{ env.NEXT_RELEASE_VERSION }}-<short-git-sha>-YYYYMMDDSS', like v0.2.0-e5b243c-2023071245;
- name: Create version
id: create-version
@@ -135,13 +135,9 @@ jobs:
env:
GITHUB_EVENT_NAME: ${{ github.event_name }}
GITHUB_REF_NAME: ${{ github.ref_name }}
NEXT_RELEASE_VERSION: ${{ env.NEXT_RELEASE_VERSION }}
NIGHTLY_RELEASE_PREFIX: ${{ env.NIGHTLY_RELEASE_PREFIX }}
- name: Check version
id: check-version
run: |
./.github/scripts/check-version.sh "${{ steps.create-version.outputs.version }}"
- name: Allocate linux-amd64 runner
if: ${{ inputs.build_linux_amd64_artifacts || github.event_name == 'push' || github.event_name == 'schedule' }}
uses: ./.github/actions/start-runner
@@ -321,7 +317,7 @@ jobs:
image-registry-username: ${{ secrets.DOCKERHUB_USERNAME }}
image-registry-password: ${{ secrets.DOCKERHUB_TOKEN }}
version: ${{ needs.allocate-runners.outputs.version }}
push-latest-tag: ${{ needs.allocate-runners.outputs.should-push-latest-tag == 'true' && github.ref_type == 'tag' && !contains(github.ref_name, 'nightly') && github.event_name != 'schedule' }}
push-latest-tag: true
- name: Set build image result
id: set-build-image-result
@@ -339,7 +335,7 @@ jobs:
build-windows-artifacts,
release-images-to-dockerhub,
]
runs-on: ubuntu-latest-16-cores
runs-on: ubuntu-latest
# When we push to ACR, it's easy to fail due to some unknown network issues.
# However, we don't want to fail the whole workflow because of this.
# The ACR have daily sync with DockerHub, so don't worry about the image not being updated.
@@ -368,7 +364,7 @@ jobs:
dev-mode: false
upload-to-s3: true
update-version-info: true
push-latest-tag: ${{ needs.allocate-runners.outputs.should-push-latest-tag == 'true' && github.ref_type == 'tag' && !contains(github.ref_name, 'nightly') && github.event_name != 'schedule' }}
push-latest-tag: true
publish-github-release:
name: Create GitHub release and upload artifacts
@@ -395,7 +391,7 @@ jobs:
### Stop runners ###
# It's very necessary to split the job of releasing runners into 'stop-linux-amd64-runner' and 'stop-linux-arm64-runner'.
# Because we can terminate the specified EC2 instance immediately after the job is finished without unnecessary waiting.
# Because we can terminate the specified EC2 instance immediately after the job is finished without uncessary waiting.
stop-linux-amd64-runner: # It's always run as the last job in the workflow to make sure that the runner is released.
name: Stop linux-amd64 runner
# Only run this job when the runner is allocated.
@@ -448,10 +444,10 @@ jobs:
aws-region: ${{ vars.EC2_RUNNER_REGION }}
github-token: ${{ secrets.GH_PERSONAL_ACCESS_TOKEN }}
bump-downstream-repo-versions:
name: Bump downstream repo versions
bump-doc-version:
name: Bump doc version
if: ${{ github.event_name == 'push' || github.event_name == 'schedule' }}
needs: [allocate-runners, publish-github-release]
needs: [allocate-runners]
runs-on: ubuntu-latest
# Permission reference: https://docs.github.com/en/actions/using-jobs/assigning-permissions-to-jobs
permissions:
@@ -463,58 +459,13 @@ jobs:
fetch-depth: 0
persist-credentials: false
- uses: ./.github/actions/setup-cyborg
- name: Bump downstream repo versions
- name: Bump doc version
working-directory: cyborg
run: pnpm tsx bin/bump-versions.ts
run: pnpm tsx bin/bump-doc-version.ts
env:
TARGET_REPOS: website,docs,demo
VERSION: ${{ needs.allocate-runners.outputs.version }}
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
WEBSITE_REPO_TOKEN: ${{ secrets.WEBSITE_REPO_TOKEN }}
DOCS_REPO_TOKEN: ${{ secrets.DOCS_REPO_TOKEN }}
DEMO_REPO_TOKEN: ${{ secrets.DEMO_REPO_TOKEN }}
bump-helm-charts-version:
name: Bump helm charts version
if: ${{ github.ref_type == 'tag' && !contains(github.ref_name, 'nightly') && github.event_name != 'schedule' }}
needs: [allocate-runners, publish-github-release]
runs-on: ubuntu-latest
permissions:
contents: write
pull-requests: write
steps:
- name: Checkout repository
uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Bump helm charts version
env:
GITHUB_TOKEN: ${{ secrets.HELM_CHARTS_REPO_TOKEN }}
VERSION: ${{ needs.allocate-runners.outputs.version }}
run: |
./.github/scripts/update-helm-charts-version.sh
bump-homebrew-greptime-version:
name: Bump homebrew greptime version
if: ${{ github.ref_type == 'tag' && !contains(github.ref_name, 'nightly') && github.event_name != 'schedule' }}
needs: [allocate-runners, publish-github-release]
runs-on: ubuntu-latest
permissions:
contents: write
pull-requests: write
steps:
- name: Checkout repository
uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Bump homebrew greptime version
env:
GITHUB_TOKEN: ${{ secrets.HOMEBREW_GREPTIME_REPO_TOKEN }}
VERSION: ${{ needs.allocate-runners.outputs.version }}
run: |
./.github/scripts/update-homebrew-greptme-version.sh
notification:
if: ${{ github.repository == 'GreptimeTeam/greptimedb' && (github.event_name == 'push' || github.event_name == 'schedule') && always() }}

View File

@@ -11,17 +11,14 @@ concurrency:
group: ${{ github.workflow }}-${{ github.head_ref || github.run_id }}
cancel-in-progress: true
permissions:
issues: write
contents: write
pull-requests: write
jobs:
check:
runs-on: ubuntu-latest
timeout-minutes: 10
steps:
- uses: actions/checkout@v4
with:
persist-credentials: false
- uses: ./.github/actions/setup-cyborg
- name: Check Pull Request
working-directory: cyborg

4
.gitignore vendored
View File

@@ -57,7 +57,3 @@ tests-fuzz/corpus/
## default data home
greptimedb_data
# github
!/.github

232
Cargo.lock generated
View File

@@ -185,7 +185,7 @@ checksum = "d301b3b94cb4b2f23d7917810addbbaff90738e0ca2be692bd027e70d7e0330c"
[[package]]
name = "api"
version = "0.12.2"
version = "0.14.0"
dependencies = [
"common-base",
"common-decimal",
@@ -710,7 +710,7 @@ dependencies = [
[[package]]
name = "auth"
version = "0.12.2"
version = "0.14.0"
dependencies = [
"api",
"async-trait",
@@ -1324,7 +1324,7 @@ dependencies = [
[[package]]
name = "cache"
version = "0.12.2"
version = "0.14.0"
dependencies = [
"catalog",
"common-error",
@@ -1348,7 +1348,7 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5"
[[package]]
name = "catalog"
version = "0.12.2"
version = "0.14.0"
dependencies = [
"api",
"arrow",
@@ -1661,7 +1661,7 @@ checksum = "1462739cb27611015575c0c11df5df7601141071f07518d56fcc1be504cbec97"
[[package]]
name = "cli"
version = "0.12.2"
version = "0.14.0"
dependencies = [
"async-trait",
"auth",
@@ -1693,6 +1693,7 @@ dependencies = [
"humantime",
"meta-client",
"nu-ansi-term",
"opendal",
"query",
"rand",
"reqwest",
@@ -1703,7 +1704,7 @@ dependencies = [
"session",
"snafu 0.8.5",
"store-api",
"substrait 0.12.2",
"substrait 0.14.0",
"table",
"tempfile",
"tokio",
@@ -1712,7 +1713,7 @@ dependencies = [
[[package]]
name = "client"
version = "0.12.2"
version = "0.14.0"
dependencies = [
"api",
"arc-swap",
@@ -1739,7 +1740,7 @@ dependencies = [
"rand",
"serde_json",
"snafu 0.8.5",
"substrait 0.12.2",
"substrait 0.14.0",
"substrait 0.37.3",
"tokio",
"tokio-stream",
@@ -1780,7 +1781,7 @@ dependencies = [
[[package]]
name = "cmd"
version = "0.12.2"
version = "0.14.0"
dependencies = [
"async-trait",
"auth",
@@ -1791,7 +1792,6 @@ dependencies = [
"clap 4.5.19",
"cli",
"client",
"colored",
"common-base",
"common-catalog",
"common-config",
@@ -1826,10 +1826,7 @@ dependencies = [
"mito2",
"moka",
"nu-ansi-term",
"object-store",
"parquet",
"plugins",
"pprof",
"prometheus",
"prost 0.13.3",
"query",
@@ -1845,7 +1842,7 @@ dependencies = [
"similar-asserts",
"snafu 0.8.5",
"store-api",
"substrait 0.12.2",
"substrait 0.14.0",
"table",
"temp-env",
"tempfile",
@@ -1862,16 +1859,6 @@ version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d3fd119d74b830634cea2a0f58bbd0d54540518a14397557951e79340abc28c0"
[[package]]
name = "colored"
version = "2.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "117725a109d387c937a1533ce01b450cbde6b88abceea8473c4d7a85853cda3c"
dependencies = [
"lazy_static",
"windows-sys 0.59.0",
]
[[package]]
name = "combine"
version = "4.6.7"
@@ -1901,7 +1888,7 @@ checksum = "55b672471b4e9f9e95499ea597ff64941a309b2cdbffcc46f2cc5e2d971fd335"
[[package]]
name = "common-base"
version = "0.12.2"
version = "0.14.0"
dependencies = [
"anymap2",
"async-trait",
@@ -1923,11 +1910,11 @@ dependencies = [
[[package]]
name = "common-catalog"
version = "0.12.2"
version = "0.14.0"
[[package]]
name = "common-config"
version = "0.12.2"
version = "0.14.0"
dependencies = [
"common-base",
"common-error",
@@ -1952,7 +1939,7 @@ dependencies = [
[[package]]
name = "common-datasource"
version = "0.12.2"
version = "0.14.0"
dependencies = [
"arrow",
"arrow-schema",
@@ -1988,7 +1975,7 @@ dependencies = [
[[package]]
name = "common-decimal"
version = "0.12.2"
version = "0.14.0"
dependencies = [
"bigdecimal 0.4.5",
"common-error",
@@ -2001,7 +1988,7 @@ dependencies = [
[[package]]
name = "common-error"
version = "0.12.2"
version = "0.14.0"
dependencies = [
"http 1.1.0",
"snafu 0.8.5",
@@ -2011,7 +1998,7 @@ dependencies = [
[[package]]
name = "common-frontend"
version = "0.12.2"
version = "0.14.0"
dependencies = [
"async-trait",
"common-error",
@@ -2021,7 +2008,7 @@ dependencies = [
[[package]]
name = "common-function"
version = "0.12.2"
version = "0.14.0"
dependencies = [
"ahash 0.8.11",
"api",
@@ -2029,6 +2016,7 @@ dependencies = [
"arc-swap",
"async-trait",
"bincode",
"chrono",
"common-base",
"common-catalog",
"common-error",
@@ -2040,6 +2028,8 @@ dependencies = [
"common-time",
"common-version",
"datafusion",
"datafusion-common",
"datafusion-expr",
"datatypes",
"derive_more",
"geo",
@@ -2069,7 +2059,7 @@ dependencies = [
[[package]]
name = "common-greptimedb-telemetry"
version = "0.12.2"
version = "0.14.0"
dependencies = [
"async-trait",
"common-runtime",
@@ -2086,7 +2076,7 @@ dependencies = [
[[package]]
name = "common-grpc"
version = "0.12.2"
version = "0.14.0"
dependencies = [
"api",
"arrow-flight",
@@ -2114,7 +2104,7 @@ dependencies = [
[[package]]
name = "common-grpc-expr"
version = "0.12.2"
version = "0.14.0"
dependencies = [
"api",
"common-base",
@@ -2133,7 +2123,7 @@ dependencies = [
[[package]]
name = "common-macro"
version = "0.12.2"
version = "0.14.0"
dependencies = [
"arc-swap",
"common-query",
@@ -2147,7 +2137,7 @@ dependencies = [
[[package]]
name = "common-mem-prof"
version = "0.12.2"
version = "0.14.0"
dependencies = [
"common-error",
"common-macro",
@@ -2160,7 +2150,7 @@ dependencies = [
[[package]]
name = "common-meta"
version = "0.12.2"
version = "0.14.0"
dependencies = [
"anymap2",
"api",
@@ -2208,6 +2198,7 @@ dependencies = [
"serde_with",
"session",
"snafu 0.8.5",
"sqlx",
"store-api",
"strum 0.25.0",
"table",
@@ -2220,7 +2211,7 @@ dependencies = [
[[package]]
name = "common-options"
version = "0.12.2"
version = "0.14.0"
dependencies = [
"common-grpc",
"humantime-serde",
@@ -2229,11 +2220,11 @@ dependencies = [
[[package]]
name = "common-plugins"
version = "0.12.2"
version = "0.14.0"
[[package]]
name = "common-pprof"
version = "0.12.2"
version = "0.14.0"
dependencies = [
"common-error",
"common-macro",
@@ -2245,7 +2236,7 @@ dependencies = [
[[package]]
name = "common-procedure"
version = "0.12.2"
version = "0.14.0"
dependencies = [
"async-stream",
"async-trait",
@@ -2272,7 +2263,7 @@ dependencies = [
[[package]]
name = "common-procedure-test"
version = "0.12.2"
version = "0.14.0"
dependencies = [
"async-trait",
"common-procedure",
@@ -2280,7 +2271,7 @@ dependencies = [
[[package]]
name = "common-query"
version = "0.12.2"
version = "0.14.0"
dependencies = [
"api",
"async-trait",
@@ -2306,7 +2297,7 @@ dependencies = [
[[package]]
name = "common-recordbatch"
version = "0.12.2"
version = "0.14.0"
dependencies = [
"arc-swap",
"common-error",
@@ -2325,7 +2316,7 @@ dependencies = [
[[package]]
name = "common-runtime"
version = "0.12.2"
version = "0.14.0"
dependencies = [
"async-trait",
"clap 4.5.19",
@@ -2355,7 +2346,7 @@ dependencies = [
[[package]]
name = "common-telemetry"
version = "0.12.2"
version = "0.14.0"
dependencies = [
"atty",
"backtrace",
@@ -2383,7 +2374,7 @@ dependencies = [
[[package]]
name = "common-test-util"
version = "0.12.2"
version = "0.14.0"
dependencies = [
"client",
"common-query",
@@ -2395,7 +2386,7 @@ dependencies = [
[[package]]
name = "common-time"
version = "0.12.2"
version = "0.14.0"
dependencies = [
"arrow",
"chrono",
@@ -2413,7 +2404,7 @@ dependencies = [
[[package]]
name = "common-version"
version = "0.12.2"
version = "0.14.0"
dependencies = [
"build-data",
"const_format",
@@ -2423,7 +2414,7 @@ dependencies = [
[[package]]
name = "common-wal"
version = "0.12.2"
version = "0.14.0"
dependencies = [
"common-base",
"common-error",
@@ -3354,7 +3345,7 @@ dependencies = [
[[package]]
name = "datanode"
version = "0.12.2"
version = "0.14.0"
dependencies = [
"api",
"arrow-flight",
@@ -3406,7 +3397,7 @@ dependencies = [
"session",
"snafu 0.8.5",
"store-api",
"substrait 0.12.2",
"substrait 0.14.0",
"table",
"tokio",
"toml 0.8.19",
@@ -3415,7 +3406,7 @@ dependencies = [
[[package]]
name = "datatypes"
version = "0.12.2"
version = "0.14.0"
dependencies = [
"arrow",
"arrow-array",
@@ -4059,7 +4050,7 @@ dependencies = [
[[package]]
name = "file-engine"
version = "0.12.2"
version = "0.14.0"
dependencies = [
"api",
"async-trait",
@@ -4128,11 +4119,12 @@ dependencies = [
[[package]]
name = "flate2"
version = "1.0.34"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a1b589b4dc103969ad3cf85c950899926ec64300a1a46d76c03a6072957036f0"
checksum = "11faaf5a5236997af9848be0bef4db95824b1d534ebc64d0f0c6cf3e67bd38dc"
dependencies = [
"crc32fast",
"libz-rs-sys",
"libz-sys",
"miniz_oxide",
]
@@ -4169,7 +4161,7 @@ checksum = "8bf7cc16383c4b8d58b9905a8509f02926ce3058053c056376248d958c9df1e8"
[[package]]
name = "flow"
version = "0.12.2"
version = "0.14.0"
dependencies = [
"api",
"arrow",
@@ -4179,6 +4171,7 @@ dependencies = [
"bytes",
"cache",
"catalog",
"chrono",
"client",
"common-base",
"common-catalog",
@@ -4230,7 +4223,7 @@ dependencies = [
"snafu 0.8.5",
"store-api",
"strum 0.25.0",
"substrait 0.12.2",
"substrait 0.14.0",
"table",
"tokio",
"tonic 0.12.3",
@@ -4285,7 +4278,7 @@ checksum = "6c2141d6d6c8512188a7891b4b01590a45f6dac67afb4f255c4124dbb86d4eaa"
[[package]]
name = "frontend"
version = "0.12.2"
version = "0.14.0"
dependencies = [
"api",
"arc-swap",
@@ -4713,7 +4706,7 @@ dependencies = [
[[package]]
name = "greptime-proto"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=072ce580502e015df1a6b03a185b60309a7c2a7a#072ce580502e015df1a6b03a185b60309a7c2a7a"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=c5419bbd20cb42e568ec325a4d71a3c94cc327e1#c5419bbd20cb42e568ec325a4d71a3c94cc327e1"
dependencies = [
"prost 0.13.3",
"serde",
@@ -5553,7 +5546,7 @@ dependencies = [
[[package]]
name = "index"
version = "0.12.2"
version = "0.14.0"
dependencies = [
"async-trait",
"asynchronous-codec",
@@ -5578,6 +5571,7 @@ dependencies = [
"rand",
"regex",
"regex-automata 0.4.8",
"roaring",
"serde",
"serde_json",
"snafu 0.8.5",
@@ -5909,15 +5903,15 @@ dependencies = [
[[package]]
name = "jsonpath-rust"
version = "0.7.3"
version = "0.7.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "69a61b87f6a55cc6c28fed5739dd36b9642321ce63e4a5e4a4715d69106f4a10"
checksum = "0c00ae348f9f8fd2d09f82a98ca381c60df9e0820d8d79fce43e649b4dc3128b"
dependencies = [
"pest",
"pest_derive",
"regex",
"serde_json",
"thiserror 1.0.64",
"thiserror 2.0.12",
]
[[package]]
@@ -6285,6 +6279,15 @@ dependencies = [
"vcpkg",
]
[[package]]
name = "libz-rs-sys"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "902bc563b5d65ad9bba616b490842ef0651066a1a1dc3ce1087113ffcb873c8d"
dependencies = [
"zlib-rs",
]
[[package]]
name = "libz-sys"
version = "1.1.20"
@@ -6345,7 +6348,7 @@ checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24"
[[package]]
name = "log-query"
version = "0.12.2"
version = "0.14.0"
dependencies = [
"chrono",
"common-error",
@@ -6357,7 +6360,7 @@ dependencies = [
[[package]]
name = "log-store"
version = "0.12.2"
version = "0.14.0"
dependencies = [
"async-stream",
"async-trait",
@@ -6650,7 +6653,7 @@ dependencies = [
[[package]]
name = "meta-client"
version = "0.12.2"
version = "0.14.0"
dependencies = [
"api",
"async-trait",
@@ -6677,7 +6680,7 @@ dependencies = [
[[package]]
name = "meta-srv"
version = "0.12.2"
version = "0.14.0"
dependencies = [
"api",
"async-trait",
@@ -6728,6 +6731,7 @@ dependencies = [
"session",
"snafu 0.8.5",
"store-api",
"strum 0.25.0",
"table",
"tokio",
"tokio-postgres",
@@ -6763,7 +6767,7 @@ dependencies = [
[[package]]
name = "metric-engine"
version = "0.12.2"
version = "0.14.0"
dependencies = [
"api",
"aquamarine",
@@ -6828,9 +6832,9 @@ checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a"
[[package]]
name = "miniz_oxide"
version = "0.8.0"
version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e2d80299ef12ff69b16a84bb182e3b9df68b5a91574d3d4fa6e41b65deec4df1"
checksum = "8e3e04debbb59698c15bacbb6d93584a8c0ca9cc3213cb423d31f760d8843ce5"
dependencies = [
"adler2",
]
@@ -6861,7 +6865,7 @@ dependencies = [
[[package]]
name = "mito2"
version = "0.12.2"
version = "0.14.0"
dependencies = [
"api",
"aquamarine",
@@ -7558,7 +7562,7 @@ dependencies = [
[[package]]
name = "object-store"
version = "0.12.2"
version = "0.14.0"
dependencies = [
"anyhow",
"bytes",
@@ -7807,7 +7811,7 @@ dependencies = [
[[package]]
name = "operator"
version = "0.12.2"
version = "0.14.0"
dependencies = [
"ahash 0.8.11",
"api",
@@ -7855,7 +7859,7 @@ dependencies = [
"sql",
"sqlparser 0.52.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=71dd86058d2af97b9925093d40c4e03360403170)",
"store-api",
"substrait 0.12.2",
"substrait 0.14.0",
"table",
"tokio",
"tokio-util",
@@ -8092,7 +8096,7 @@ dependencies = [
[[package]]
name = "partition"
version = "0.12.2"
version = "0.14.0"
dependencies = [
"api",
"async-trait",
@@ -8282,7 +8286,7 @@ dependencies = [
"rand",
"ring",
"rust_decimal",
"thiserror 2.0.6",
"thiserror 2.0.12",
"tokio",
"tokio-rustls 0.26.0",
"tokio-util",
@@ -8360,7 +8364,7 @@ checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "pipeline"
version = "0.12.2"
version = "0.14.0"
dependencies = [
"ahash 0.8.11",
"api",
@@ -8394,7 +8398,7 @@ dependencies = [
"greptime-proto",
"itertools 0.10.5",
"jsonb",
"jsonpath-rust 0.7.3",
"jsonpath-rust 0.7.5",
"lazy_static",
"moka",
"once_cell",
@@ -8500,7 +8504,7 @@ dependencies = [
[[package]]
name = "plugins"
version = "0.12.2"
version = "0.14.0"
dependencies = [
"auth",
"clap 4.5.19",
@@ -8762,7 +8766,7 @@ dependencies = [
[[package]]
name = "promql"
version = "0.12.2"
version = "0.14.0"
dependencies = [
"ahash 0.8.11",
"async-trait",
@@ -8772,6 +8776,7 @@ dependencies = [
"common-recordbatch",
"common-telemetry",
"datafusion",
"datafusion-common",
"datafusion-expr",
"datatypes",
"futures",
@@ -8785,8 +8790,9 @@ dependencies = [
[[package]]
name = "promql-parser"
version = "0.4.3"
source = "git+https://github.com/GreptimeTeam/promql-parser.git?rev=27abb8e16003a50c720f00d6c85f41f5fa2a2a8e#27abb8e16003a50c720f00d6c85f41f5fa2a2a8e"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7c6b1429bdd199d53bd58b745075c1652efedbe2746e5d4f0d56d3184dda48ec"
dependencies = [
"cfgrammar",
"chrono",
@@ -9007,7 +9013,7 @@ dependencies = [
[[package]]
name = "puffin"
version = "0.12.2"
version = "0.14.0"
dependencies = [
"async-compression 0.4.13",
"async-trait",
@@ -9048,7 +9054,7 @@ dependencies = [
[[package]]
name = "query"
version = "0.12.2"
version = "0.14.0"
dependencies = [
"ahash 0.8.11",
"api",
@@ -9113,7 +9119,7 @@ dependencies = [
"sqlparser 0.52.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=71dd86058d2af97b9925093d40c4e03360403170)",
"statrs",
"store-api",
"substrait 0.12.2",
"substrait 0.14.0",
"table",
"tokio",
"tokio-stream",
@@ -9644,6 +9650,16 @@ dependencies = [
"syn 1.0.109",
]
[[package]]
name = "roaring"
version = "0.10.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "41589aba99537475bf697f2118357cad1c31590c5a1b9f6d9fc4ad6d07503661"
dependencies = [
"bytemuck",
"byteorder",
]
[[package]]
name = "robust"
version = "1.1.0"
@@ -10458,7 +10474,7 @@ dependencies = [
[[package]]
name = "servers"
version = "0.12.2"
version = "0.14.0"
dependencies = [
"ahash 0.8.11",
"api",
@@ -10575,7 +10591,7 @@ dependencies = [
[[package]]
name = "session"
version = "0.12.2"
version = "0.14.0"
dependencies = [
"api",
"arc-swap",
@@ -10884,7 +10900,7 @@ dependencies = [
[[package]]
name = "sql"
version = "0.12.2"
version = "0.14.0"
dependencies = [
"api",
"chrono",
@@ -10938,7 +10954,7 @@ dependencies = [
[[package]]
name = "sqlness-runner"
version = "0.12.2"
version = "0.14.0"
dependencies = [
"async-trait",
"clap 4.5.19",
@@ -11063,7 +11079,7 @@ dependencies = [
"serde_json",
"sha2",
"smallvec",
"thiserror 2.0.6",
"thiserror 2.0.12",
"tokio",
"tokio-stream",
"tracing",
@@ -11148,7 +11164,7 @@ dependencies = [
"smallvec",
"sqlx-core",
"stringprep",
"thiserror 2.0.6",
"thiserror 2.0.12",
"tracing",
"whoami",
]
@@ -11186,7 +11202,7 @@ dependencies = [
"smallvec",
"sqlx-core",
"stringprep",
"thiserror 2.0.6",
"thiserror 2.0.12",
"tracing",
"whoami",
]
@@ -11255,7 +11271,7 @@ dependencies = [
[[package]]
name = "store-api"
version = "0.12.2"
version = "0.14.0"
dependencies = [
"api",
"aquamarine",
@@ -11385,7 +11401,7 @@ dependencies = [
[[package]]
name = "substrait"
version = "0.12.2"
version = "0.14.0"
dependencies = [
"async-trait",
"bytes",
@@ -11566,7 +11582,7 @@ dependencies = [
[[package]]
name = "table"
version = "0.12.2"
version = "0.14.0"
dependencies = [
"api",
"async-trait",
@@ -11817,7 +11833,7 @@ checksum = "3369f5ac52d5eb6ab48c6b4ffdc8efbcad6b89c765749064ba298f2c68a16a76"
[[package]]
name = "tests-fuzz"
version = "0.12.2"
version = "0.14.0"
dependencies = [
"arbitrary",
"async-trait",
@@ -11861,7 +11877,7 @@ dependencies = [
[[package]]
name = "tests-integration"
version = "0.12.2"
version = "0.14.0"
dependencies = [
"api",
"arrow-flight",
@@ -11927,7 +11943,7 @@ dependencies = [
"sql",
"sqlx",
"store-api",
"substrait 0.12.2",
"substrait 0.14.0",
"table",
"tempfile",
"time",
@@ -11967,11 +11983,11 @@ dependencies = [
[[package]]
name = "thiserror"
version = "2.0.6"
version = "2.0.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8fec2a1820ebd077e2b90c4df007bebf344cd394098a13c563957d0afc83ea47"
checksum = "567b8a2dae586314f7be2a752ec7474332959c6460e02bde30d702a66d488708"
dependencies = [
"thiserror-impl 2.0.6",
"thiserror-impl 2.0.12",
]
[[package]]
@@ -11987,9 +12003,9 @@ dependencies = [
[[package]]
name = "thiserror-impl"
version = "2.0.6"
version = "2.0.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d65750cab40f4ff1929fb1ba509e9914eb756131cef4210da8d5d700d26f6312"
checksum = "7f7cf42b4507d8ea322120659672cf1b9dbb93f8f2d4ecfd6e51350ff5b17a1d"
dependencies = [
"proc-macro2",
"quote",
@@ -13948,6 +13964,12 @@ dependencies = [
"syn 2.0.96",
]
[[package]]
name = "zlib-rs"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b20717f0917c908dc63de2e44e97f1e6b126ca58d0e391cee86d504eb8fbd05"
[[package]]
name = "zstd"
version = "0.11.2+zstd.1.5.2"

View File

@@ -67,7 +67,7 @@ members = [
resolver = "2"
[workspace.package]
version = "0.12.2"
version = "0.14.0"
edition = "2021"
license = "Apache-2.0"
@@ -126,10 +126,11 @@ deadpool-postgres = "0.12"
derive_builder = "0.12"
dotenv = "0.15"
etcd-client = "0.14"
flate2 = { version = "1.1.0", default-features = false, features = ["zlib-rs"] }
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "072ce580502e015df1a6b03a185b60309a7c2a7a" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "c5419bbd20cb42e568ec325a4d71a3c94cc327e1" }
hex = "0.4"
http = "1"
humantime = "2.1"
@@ -160,9 +161,7 @@ parquet = { version = "53.0.0", default-features = false, features = ["arrow", "
paste = "1.0"
pin-project = "1.0"
prometheus = { version = "0.13.3", features = ["process"] }
promql-parser = { git = "https://github.com/GreptimeTeam/promql-parser.git", features = [
"ser",
], rev = "27abb8e16003a50c720f00d6c85f41f5fa2a2a8e" }
promql-parser = { version = "0.5", features = ["ser"] }
prost = "0.13"
raft-engine = { version = "0.4.1", default-features = false }
rand = "0.8"
@@ -190,6 +189,10 @@ shadow-rs = "0.38"
similar-asserts = "1.6.0"
smallvec = { version = "1", features = ["serde"] }
snafu = "0.8"
sqlx = { version = "0.8", features = [
"runtime-tokio-rustls",
"mysql",
] }
sysinfo = "0.30"
# on branch v0.52.x
sqlparser = { git = "https://github.com/GreptimeTeam/sqlparser-rs.git", rev = "71dd86058d2af97b9925093d40c4e03360403170", features = [

View File

@@ -8,7 +8,7 @@ CARGO_BUILD_OPTS := --locked
IMAGE_REGISTRY ?= docker.io
IMAGE_NAMESPACE ?= greptime
IMAGE_TAG ?= latest
DEV_BUILDER_IMAGE_TAG ?= 2024-12-25-9d0fa5d5-20250124085746
DEV_BUILDER_IMAGE_TAG ?= 2024-12-25-a71b93dd-20250305072908
BUILDX_MULTI_PLATFORM_BUILD ?= false
BUILDX_BUILDER_NAME ?= gtbuilder
BASE_IMAGE ?= ubuntu
@@ -60,6 +60,8 @@ ifeq ($(BUILDX_MULTI_PLATFORM_BUILD), all)
BUILDX_MULTI_PLATFORM_BUILD_OPTS := --platform linux/amd64,linux/arm64 --push
else ifeq ($(BUILDX_MULTI_PLATFORM_BUILD), amd64)
BUILDX_MULTI_PLATFORM_BUILD_OPTS := --platform linux/amd64 --push
else ifeq ($(BUILDX_MULTI_PLATFORM_BUILD), arm64)
BUILDX_MULTI_PLATFORM_BUILD_OPTS := --platform linux/arm64 --push
else
BUILDX_MULTI_PLATFORM_BUILD_OPTS := -o type=docker
endif

View File

@@ -112,7 +112,7 @@ Start a GreptimeDB container with:
```shell
docker run -p 127.0.0.1:4000-4003:4000-4003 \
-v "$(pwd)/greptimedb:/tmp/greptimedb" \
-v "$(pwd)/greptimedb:./greptimedb_data" \
--name greptime --rm \
greptime/greptimedb:latest standalone start \
--http-addr 0.0.0.0:4000 \

View File

@@ -101,7 +101,7 @@
| `flow` | -- | -- | flow engine options. |
| `flow.num_workers` | Integer | `0` | The number of flow worker in flownode.<br/>Not setting(or set to 0) this value will use the number of CPU cores divided by 2. |
| `storage` | -- | -- | The data storage options. |
| `storage.data_home` | String | `/tmp/greptimedb/` | The working home directory. |
| `storage.data_home` | String | `./greptimedb_data/` | The working home directory. |
| `storage.type` | String | `File` | The storage type used to store the data.<br/>- `File`: the data is stored in the local file system.<br/>- `S3`: the data is stored in the S3 object storage.<br/>- `Gcs`: the data is stored in the Google Cloud Storage.<br/>- `Azblob`: the data is stored in the Azure Blob Storage.<br/>- `Oss`: the data is stored in the Aliyun OSS. |
| `storage.cache_path` | String | Unset | Read cache configuration for object storage such as 'S3' etc, it's configured by default when using object storage. It is recommended to configure it when using object storage for better performance.<br/>A local file directory, defaults to `{data_home}`. An empty string means disabling. |
| `storage.cache_capacity` | String | Unset | The local file cache capacity in bytes. If your disk space is sufficient, it is recommended to set it larger. |
@@ -181,7 +181,7 @@
| `region_engine.metric` | -- | -- | Metric engine options. |
| `region_engine.metric.experimental_sparse_primary_key_encoding` | Bool | `false` | Whether to enable the experimental sparse primary key encoding. |
| `logging` | -- | -- | The logging options. |
| `logging.dir` | String | `/tmp/greptimedb/logs` | The directory to store the log files. If set to empty, logs will not be written to files. |
| `logging.dir` | String | `./greptimedb_data/logs` | The directory to store the log files. If set to empty, logs will not be written to files. |
| `logging.level` | String | Unset | The log level. Can be `info`/`debug`/`warn`/`error`. |
| `logging.enable_otlp_tracing` | Bool | `false` | Enable OTLP tracing. |
| `logging.otlp_endpoint` | String | `http://localhost:4317` | The OTLP tracing endpoint. |
@@ -279,7 +279,7 @@
| `datanode.client.connect_timeout` | String | `10s` | -- |
| `datanode.client.tcp_nodelay` | Bool | `true` | -- |
| `logging` | -- | -- | The logging options. |
| `logging.dir` | String | `/tmp/greptimedb/logs` | The directory to store the log files. If set to empty, logs will not be written to files. |
| `logging.dir` | String | `./greptimedb_data/logs` | The directory to store the log files. If set to empty, logs will not be written to files. |
| `logging.level` | String | Unset | The log level. Can be `info`/`debug`/`warn`/`error`. |
| `logging.enable_otlp_tracing` | Bool | `false` | Enable OTLP tracing. |
| `logging.otlp_endpoint` | String | `http://localhost:4317` | The OTLP tracing endpoint. |
@@ -308,7 +308,7 @@
| Key | Type | Default | Descriptions |
| --- | -----| ------- | ----------- |
| `data_home` | String | `/tmp/metasrv/` | The working home directory. |
| `data_home` | String | `./greptimedb_data/metasrv/` | The working home directory. |
| `bind_addr` | String | `127.0.0.1:3002` | The bind address of metasrv. |
| `server_addr` | String | `127.0.0.1:3002` | The communication server address for the frontend and datanode to connect to metasrv.<br/>If left empty or unset, the server will automatically use the IP address of the first network interface<br/>on the host, with the same port number as the one specified in `bind_addr`. |
| `store_addrs` | Array | -- | Store server address default to etcd store.<br/>For postgres store, the format is:<br/>"password=password dbname=postgres user=postgres host=localhost port=5432"<br/>For etcd store, the format is:<br/>"127.0.0.1:2379" |
@@ -352,7 +352,7 @@
| `wal.backoff_base` | Integer | `2` | Exponential backoff rate, i.e. next backoff = base * current backoff. |
| `wal.backoff_deadline` | String | `5mins` | Stop reconnecting if the total wait time reaches the deadline. If this config is missing, the reconnecting won't terminate. |
| `logging` | -- | -- | The logging options. |
| `logging.dir` | String | `/tmp/greptimedb/logs` | The directory to store the log files. If set to empty, logs will not be written to files. |
| `logging.dir` | String | `./greptimedb_data/logs` | The directory to store the log files. If set to empty, logs will not be written to files. |
| `logging.level` | String | Unset | The log level. Can be `info`/`debug`/`warn`/`error`. |
| `logging.enable_otlp_tracing` | Bool | `false` | Enable OTLP tracing. |
| `logging.otlp_endpoint` | String | `http://localhost:4317` | The OTLP tracing endpoint. |
@@ -442,7 +442,7 @@
| `wal.dump_index_interval` | String | `60s` | The interval for dumping WAL indexes.<br/>**It's only used when the provider is `kafka`**. |
| `wal.overwrite_entry_start_id` | Bool | `false` | Ignore missing entries during read WAL.<br/>**It's only used when the provider is `kafka`**.<br/><br/>This option ensures that when Kafka messages are deleted, the system<br/>can still successfully replay memtable data without throwing an<br/>out-of-range error.<br/>However, enabling this option might lead to unexpected data loss,<br/>as the system will skip over missing entries instead of treating<br/>them as critical errors. |
| `storage` | -- | -- | The data storage options. |
| `storage.data_home` | String | `/tmp/greptimedb/` | The working home directory. |
| `storage.data_home` | String | `./greptimedb_data/` | The working home directory. |
| `storage.type` | String | `File` | The storage type used to store the data.<br/>- `File`: the data is stored in the local file system.<br/>- `S3`: the data is stored in the S3 object storage.<br/>- `Gcs`: the data is stored in the Google Cloud Storage.<br/>- `Azblob`: the data is stored in the Azure Blob Storage.<br/>- `Oss`: the data is stored in the Aliyun OSS. |
| `storage.cache_path` | String | Unset | Read cache configuration for object storage such as 'S3' etc, it's configured by default when using object storage. It is recommended to configure it when using object storage for better performance.<br/>A local file directory, defaults to `{data_home}`. An empty string means disabling. |
| `storage.cache_capacity` | String | Unset | The local file cache capacity in bytes. If your disk space is sufficient, it is recommended to set it larger. |
@@ -522,7 +522,7 @@
| `region_engine.metric` | -- | -- | Metric engine options. |
| `region_engine.metric.experimental_sparse_primary_key_encoding` | Bool | `false` | Whether to enable the experimental sparse primary key encoding. |
| `logging` | -- | -- | The logging options. |
| `logging.dir` | String | `/tmp/greptimedb/logs` | The directory to store the log files. If set to empty, logs will not be written to files. |
| `logging.dir` | String | `./greptimedb_data/logs` | The directory to store the log files. If set to empty, logs will not be written to files. |
| `logging.level` | String | Unset | The log level. Can be `info`/`debug`/`warn`/`error`. |
| `logging.enable_otlp_tracing` | Bool | `false` | Enable OTLP tracing. |
| `logging.otlp_endpoint` | String | `http://localhost:4317` | The OTLP tracing endpoint. |
@@ -579,7 +579,7 @@
| `heartbeat.interval` | String | `3s` | Interval for sending heartbeat messages to the metasrv. |
| `heartbeat.retry_interval` | String | `3s` | Interval for retrying to send heartbeat messages to the metasrv. |
| `logging` | -- | -- | The logging options. |
| `logging.dir` | String | `/tmp/greptimedb/logs` | The directory to store the log files. If set to empty, logs will not be written to files. |
| `logging.dir` | String | `./greptimedb_data/logs` | The directory to store the log files. If set to empty, logs will not be written to files. |
| `logging.level` | String | Unset | The log level. Can be `info`/`debug`/`warn`/`error`. |
| `logging.enable_otlp_tracing` | Bool | `false` | Enable OTLP tracing. |
| `logging.otlp_endpoint` | String | `http://localhost:4317` | The OTLP tracing endpoint. |

View File

@@ -119,7 +119,7 @@ provider = "raft_engine"
## The directory to store the WAL files.
## **It's only used when the provider is `raft_engine`**.
## @toml2docs:none-default
dir = "/tmp/greptimedb/wal"
dir = "./greptimedb_data/wal"
## The size of the WAL segment file.
## **It's only used when the provider is `raft_engine`**.
@@ -265,7 +265,7 @@ overwrite_entry_start_id = false
## The data storage options.
[storage]
## The working home directory.
data_home = "/tmp/greptimedb/"
data_home = "./greptimedb_data/"
## The storage type used to store the data.
## - `File`: the data is stored in the local file system.
@@ -618,7 +618,7 @@ experimental_sparse_primary_key_encoding = false
## The logging options.
[logging]
## The directory to store the log files. If set to empty, logs will not be written to files.
dir = "/tmp/greptimedb/logs"
dir = "./greptimedb_data/logs"
## The log level. Can be `info`/`debug`/`warn`/`error`.
## @toml2docs:none-default

View File

@@ -76,7 +76,7 @@ retry_interval = "3s"
## The logging options.
[logging]
## The directory to store the log files. If set to empty, logs will not be written to files.
dir = "/tmp/greptimedb/logs"
dir = "./greptimedb_data/logs"
## The log level. Can be `info`/`debug`/`warn`/`error`.
## @toml2docs:none-default
@@ -121,4 +121,3 @@ sample_ratio = 1.0
## The tokio console address.
## @toml2docs:none-default
#+ tokio_console_addr = "127.0.0.1"

View File

@@ -189,7 +189,7 @@ tcp_nodelay = true
## The logging options.
[logging]
## The directory to store the log files. If set to empty, logs will not be written to files.
dir = "/tmp/greptimedb/logs"
dir = "./greptimedb_data/logs"
## The log level. Can be `info`/`debug`/`warn`/`error`.
## @toml2docs:none-default

View File

@@ -1,5 +1,5 @@
## The working home directory.
data_home = "/tmp/metasrv/"
data_home = "./greptimedb_data/metasrv/"
## The bind address of metasrv.
bind_addr = "127.0.0.1:3002"
@@ -177,7 +177,7 @@ backoff_deadline = "5mins"
## The logging options.
[logging]
## The directory to store the log files. If set to empty, logs will not be written to files.
dir = "/tmp/greptimedb/logs"
dir = "./greptimedb_data/logs"
## The log level. Can be `info`/`debug`/`warn`/`error`.
## @toml2docs:none-default

View File

@@ -164,7 +164,7 @@ provider = "raft_engine"
## The directory to store the WAL files.
## **It's only used when the provider is `raft_engine`**.
## @toml2docs:none-default
dir = "/tmp/greptimedb/wal"
dir = "./greptimedb_data/wal"
## The size of the WAL segment file.
## **It's only used when the provider is `raft_engine`**.
@@ -352,7 +352,7 @@ retry_delay = "500ms"
## The data storage options.
[storage]
## The working home directory.
data_home = "/tmp/greptimedb/"
data_home = "./greptimedb_data/"
## The storage type used to store the data.
## - `File`: the data is stored in the local file system.
@@ -705,7 +705,7 @@ experimental_sparse_primary_key_encoding = false
## The logging options.
[logging]
## The directory to store the log files. If set to empty, logs will not be written to files.
dir = "/tmp/greptimedb/logs"
dir = "./greptimedb_data/logs"
## The log level. Can be `info`/`debug`/`warn`/`error`.
## @toml2docs:none-default

View File

@@ -1,156 +0,0 @@
/*
* Copyright 2023 Greptime Team
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import * as core from "@actions/core";
import {obtainClient} from "@/common";
interface RepoConfig {
tokenEnv: string;
repo: string;
workflowLogic: (version: string) => [string, string] | null;
}
const REPO_CONFIGS: Record<string, RepoConfig> = {
website: {
tokenEnv: "WEBSITE_REPO_TOKEN",
repo: "website",
workflowLogic: (version: string) => {
// Skip nightly versions for website
if (version.includes('nightly')) {
console.log('Nightly version detected for website, skipping workflow trigger.');
return null;
}
return ['bump-patch-version.yml', version];
}
},
demo: {
tokenEnv: "DEMO_REPO_TOKEN",
repo: "demo-scene",
workflowLogic: (version: string) => {
// Skip nightly versions for demo
if (version.includes('nightly')) {
console.log('Nightly version detected for demo, skipping workflow trigger.');
return null;
}
return ['bump-patch-version.yml', version];
}
},
docs: {
tokenEnv: "DOCS_REPO_TOKEN",
repo: "docs",
workflowLogic: (version: string) => {
// Check if it's a nightly version
if (version.includes('nightly')) {
return ['bump-nightly-version.yml', version];
}
const parts = version.split('.');
if (parts.length !== 3) {
throw new Error('Invalid version format');
}
// If patch version (last number) is 0, it's a major version
// Return only major.minor version
if (parts[2] === '0') {
return ['bump-version.yml', `${parts[0]}.${parts[1]}`];
}
// Otherwise it's a patch version, use full version
return ['bump-patch-version.yml', version];
}
}
};
async function triggerWorkflow(repoConfig: RepoConfig, workflowId: string, version: string) {
const client = obtainClient(repoConfig.tokenEnv);
try {
await client.rest.actions.createWorkflowDispatch({
owner: "GreptimeTeam",
repo: repoConfig.repo,
workflow_id: workflowId,
ref: "main",
inputs: {
version,
},
});
console.log(`Successfully triggered ${workflowId} workflow for ${repoConfig.repo} with version ${version}`);
} catch (error) {
core.setFailed(`Failed to trigger workflow for ${repoConfig.repo}: ${error.message}`);
throw error;
}
}
async function processRepo(repoName: string, version: string) {
const repoConfig = REPO_CONFIGS[repoName];
if (!repoConfig) {
throw new Error(`Unknown repository: ${repoName}`);
}
try {
const workflowResult = repoConfig.workflowLogic(version);
if (workflowResult === null) {
// Skip this repo (e.g., nightly version for website)
return;
}
const [workflowId, apiVersion] = workflowResult;
await triggerWorkflow(repoConfig, workflowId, apiVersion);
} catch (error) {
core.setFailed(`Error processing ${repoName} with version ${version}: ${error.message}`);
throw error;
}
}
async function main() {
const version = process.env.VERSION;
if (!version) {
core.setFailed("VERSION environment variable is required");
process.exit(1);
}
// Remove 'v' prefix if exists
const cleanVersion = version.startsWith('v') ? version.slice(1) : version;
// Get target repositories from environment variable
// Default to both if not specified
const targetRepos = process.env.TARGET_REPOS?.split(',').map(repo => repo.trim()) || ['website', 'docs'];
console.log(`Processing version ${cleanVersion} for repositories: ${targetRepos.join(', ')}`);
const errors: string[] = [];
// Process each repository
for (const repo of targetRepos) {
try {
await processRepo(repo, cleanVersion);
} catch (error) {
errors.push(`${repo}: ${error.message}`);
}
}
if (errors.length > 0) {
core.setFailed(`Failed to process some repositories: ${errors.join('; ')}`);
process.exit(1);
}
console.log('All repositories processed successfully');
}
// Execute main function
main().catch((error) => {
core.setFailed(`Unexpected error: ${error.message}`);
process.exit(1);
});

View File

@@ -55,25 +55,12 @@ async function main() {
await client.rest.issues.addLabels({
owner, repo, issue_number: number, labels: [labelDocsRequired],
})
// Get available assignees for the docs repo
const assigneesResponse = await docsClient.rest.issues.listAssignees({
owner: 'GreptimeTeam',
repo: 'docs',
})
const validAssignees = assigneesResponse.data.map(assignee => assignee.login)
core.info(`Available assignees: ${validAssignees.join(', ')}`)
// Check if the actor is a valid assignee, otherwise fallback to fengjiachun
const assignee = validAssignees.includes(actor) ? actor : 'fengjiachun'
core.info(`Assigning issue to: ${assignee}`)
await docsClient.rest.issues.create({
owner: 'GreptimeTeam',
repo: 'docs',
title: `Update docs for ${title}`,
body: `A document change request is generated from ${html_url}`,
assignee: assignee,
assignee: actor,
}).then((res) => {
core.info(`Created issue ${res.data}`)
})

View File

@@ -1,4 +1,4 @@
FROM ubuntu:20.04 as builder
FROM ubuntu:22.04 as builder
ARG CARGO_PROFILE
ARG FEATURES

View File

@@ -1,4 +1,4 @@
FROM ubuntu:22.04
FROM ubuntu:latest
# The binary name of GreptimeDB executable.
# Defaults to "greptime", but sometimes in other projects it might be different.

View File

@@ -1,4 +1,4 @@
FROM ubuntu:20.04
FROM ubuntu:22.04
# The root path under which contains all the dependencies to build this Dockerfile.
ARG DOCKER_BUILD_ROOT=.
@@ -41,7 +41,7 @@ RUN mv protoc3/include/* /usr/local/include/
# and the repositories are pulled from trusted sources (still us, of course). Doing so does not violate the intention
# of the Git's addition to the "safe.directory" at the first place (see the commit message here:
# https://github.com/git/git/commit/8959555cee7ec045958f9b6dd62e541affb7e7d9).
# There's also another solution to this, that we add the desired submodules to the safe directory, instead of using
# There's also another solution to this, that we add the desired submodules to the safe directory, instead of using
# wildcard here. However, that requires the git's config files and the submodules all owned by the very same user.
# It's troublesome to do this since the dev build runs in Docker, which is under user "root"; while outside the Docker,
# it can be a different user that have prepared the submodules.

View File

@@ -1,51 +0,0 @@
# Use the legacy glibc 2.28.
FROM ubuntu:18.10
ENV LANG en_US.utf8
WORKDIR /greptimedb
# Use old-releases.ubuntu.com to avoid 404s: https://help.ubuntu.com/community/EOLUpgrades.
RUN echo "deb http://old-releases.ubuntu.com/ubuntu/ cosmic main restricted universe multiverse\n\
deb http://old-releases.ubuntu.com/ubuntu/ cosmic-updates main restricted universe multiverse\n\
deb http://old-releases.ubuntu.com/ubuntu/ cosmic-security main restricted universe multiverse" > /etc/apt/sources.list
# Install dependencies.
RUN apt-get update && DEBIAN_FRONTEND=noninteractive apt-get install -y \
libssl-dev \
tzdata \
curl \
ca-certificates \
git \
build-essential \
unzip \
pkg-config
# Install protoc.
ENV PROTOC_VERSION=29.3
RUN if [ "$(uname -m)" = "x86_64" ]; then \
PROTOC_ZIP=protoc-${PROTOC_VERSION}-linux-x86_64.zip; \
elif [ "$(uname -m)" = "aarch64" ]; then \
PROTOC_ZIP=protoc-${PROTOC_VERSION}-linux-aarch_64.zip; \
else \
echo "Unsupported architecture"; exit 1; \
fi && \
curl -OL https://github.com/protocolbuffers/protobuf/releases/download/v${PROTOC_VERSION}/${PROTOC_ZIP} && \
unzip -o ${PROTOC_ZIP} -d /usr/local bin/protoc && \
unzip -o ${PROTOC_ZIP} -d /usr/local 'include/*' && \
rm -f ${PROTOC_ZIP}
# Install Rust.
SHELL ["/bin/bash", "-c"]
RUN curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- --no-modify-path --default-toolchain none -y
ENV PATH /root/.cargo/bin/:$PATH
# Install Rust toolchains.
ARG RUST_TOOLCHAIN
RUN rustup toolchain install ${RUST_TOOLCHAIN}
# Install cargo-binstall with a specific version to adapt the current rust toolchain.
# Note: if we use the latest version, we may encounter the following `use of unstable library feature 'io_error_downcast'` error.
RUN cargo install cargo-binstall --version 1.6.6 --locked
# Install nextest.
RUN cargo binstall cargo-nextest --no-confirm

View File

@@ -0,0 +1,66 @@
FROM ubuntu:20.04
# The root path under which contains all the dependencies to build this Dockerfile.
ARG DOCKER_BUILD_ROOT=.
ENV LANG en_US.utf8
WORKDIR /greptimedb
RUN apt-get update && \
DEBIAN_FRONTEND=noninteractive apt-get install -y software-properties-common
# Install dependencies.
RUN apt-get update && DEBIAN_FRONTEND=noninteractive apt-get install -y \
libssl-dev \
tzdata \
curl \
unzip \
ca-certificates \
git \
build-essential \
pkg-config
ARG TARGETPLATFORM
RUN echo "target platform: $TARGETPLATFORM"
ARG PROTOBUF_VERSION=29.3
# Install protobuf, because the one in the apt is too old (v3.12).
RUN if [ "$TARGETPLATFORM" = "linux/arm64" ]; then \
curl -OL https://github.com/protocolbuffers/protobuf/releases/download/v${PROTOBUF_VERSION}/protoc-${PROTOBUF_VERSION}-linux-aarch_64.zip && \
unzip protoc-${PROTOBUF_VERSION}-linux-aarch_64.zip -d protoc3; \
elif [ "$TARGETPLATFORM" = "linux/amd64" ]; then \
curl -OL https://github.com/protocolbuffers/protobuf/releases/download/v${PROTOBUF_VERSION}/protoc-${PROTOBUF_VERSION}-linux-x86_64.zip && \
unzip protoc-${PROTOBUF_VERSION}-linux-x86_64.zip -d protoc3; \
fi
RUN mv protoc3/bin/* /usr/local/bin/
RUN mv protoc3/include/* /usr/local/include/
# Silence all `safe.directory` warnings, to avoid the "detect dubious repository" error when building with submodules.
# Disabling the safe directory check here won't pose extra security issues, because in our usage for this dev build
# image, we use it solely on our own environment (that github action's VM, or ECS created dynamically by ourselves),
# and the repositories are pulled from trusted sources (still us, of course). Doing so does not violate the intention
# of the Git's addition to the "safe.directory" at the first place (see the commit message here:
# https://github.com/git/git/commit/8959555cee7ec045958f9b6dd62e541affb7e7d9).
# There's also another solution to this, that we add the desired submodules to the safe directory, instead of using
# wildcard here. However, that requires the git's config files and the submodules all owned by the very same user.
# It's troublesome to do this since the dev build runs in Docker, which is under user "root"; while outside the Docker,
# it can be a different user that have prepared the submodules.
RUN git config --global --add safe.directory '*'
# Install Rust.
SHELL ["/bin/bash", "-c"]
RUN curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- --no-modify-path --default-toolchain none -y
ENV PATH /root/.cargo/bin/:$PATH
# Install Rust toolchains.
ARG RUST_TOOLCHAIN
RUN rustup toolchain install ${RUST_TOOLCHAIN}
# Install cargo-binstall with a specific version to adapt the current rust toolchain.
# Note: if we use the latest version, we may encounter the following `use of unstable library feature 'io_error_downcast'` error.
# compile from source take too long, so we use the precompiled binary instead
COPY $DOCKER_BUILD_ROOT/docker/dev-builder/binstall/pull_binstall.sh /usr/local/bin/pull_binstall.sh
RUN chmod +x /usr/local/bin/pull_binstall.sh && /usr/local/bin/pull_binstall.sh
# Install nextest.
RUN cargo binstall cargo-nextest --no-confirm

View File

@@ -25,7 +25,7 @@ services:
- --initial-cluster-state=new
- *etcd_initial_cluster_token
volumes:
- /tmp/greptimedb-cluster-docker-compose/etcd0:/var/lib/etcd
- ./greptimedb-cluster-docker-compose/etcd0:/var/lib/etcd
healthcheck:
test: [ "CMD", "etcdctl", "--endpoints=http://etcd0:2379", "endpoint", "health" ]
interval: 5s
@@ -68,12 +68,13 @@ services:
- datanode
- start
- --node-id=0
- --data-home=/greptimedb_data
- --rpc-bind-addr=0.0.0.0:3001
- --rpc-server-addr=datanode0:3001
- --metasrv-addrs=metasrv:3002
- --http-addr=0.0.0.0:5000
volumes:
- /tmp/greptimedb-cluster-docker-compose/datanode0:/tmp/greptimedb
- ./greptimedb-cluster-docker-compose/datanode0:/greptimedb_data
healthcheck:
test: [ "CMD", "curl", "-fv", "http://datanode0:5000/health" ]
interval: 5s

View File

@@ -0,0 +1,40 @@
# TSBS benchmark - v0.12.0
## Environment
### Amazon EC2
| | |
|---------|-------------------------|
| Machine | c5d.2xlarge |
| CPU | 8 core |
| Memory | 16GB |
| Disk | 100GB (GP3) |
| OS | Ubuntu Server 24.04 LTS |
## Write performance
| Environment | Ingest rate (rows/s) |
|-----------------|----------------------|
| EC2 c5d.2xlarge | 326839.28 |
## Query performance
| Query type | EC2 c5d.2xlarge (ms) |
|-----------------------|----------------------|
| cpu-max-all-1 | 12.46 |
| cpu-max-all-8 | 24.20 |
| double-groupby-1 | 673.08 |
| double-groupby-5 | 963.99 |
| double-groupby-all | 1330.05 |
| groupby-orderby-limit | 952.46 |
| high-cpu-1 | 5.08 |
| high-cpu-all | 4638.57 |
| lastpoint | 591.02 |
| single-groupby-1-1-1 | 4.06 |
| single-groupby-1-1-12 | 4.73 |
| single-groupby-1-8-1 | 8.23 |
| single-groupby-5-1-1 | 4.61 |
| single-groupby-5-1-12 | 5.61 |
| single-groupby-5-8-1 | 9.74 |

19
grafana/check.sh Executable file
View File

@@ -0,0 +1,19 @@
#!/usr/bin/env bash
BASEDIR=$(dirname "$0")
# Use jq to check for panels with empty or missing descriptions
invalid_panels=$(cat $BASEDIR/greptimedb-cluster.json | jq -r '
.panels[]
| select((.type == "stats" or .type == "timeseries") and (.description == "" or .description == null))
')
# Check if any invalid panels were found
if [[ -n "$invalid_panels" ]]; then
echo "Error: The following panels have empty or missing descriptions:"
echo "$invalid_panels"
exit 1
else
echo "All panels with type 'stats' or 'timeseries' have valid descriptions."
exit 0
fi

File diff suppressed because it is too large Load Diff

11
grafana/summary.sh Executable file
View File

@@ -0,0 +1,11 @@
#!/usr/bin/env bash
BASEDIR=$(dirname "$0")
echo '| Title | Description | Expressions |
|---|---|---|'
cat $BASEDIR/greptimedb-cluster.json | jq -r '
.panels |
map(select(.type == "stat" or .type == "timeseries")) |
.[] | "| \(.title) | \(.description | gsub("\n"; "<br>")) | \(.targets | map(.expr // .rawSql | "`\(.|gsub("\n"; "<br>"))`") | join("<br>")) |"
'

View File

@@ -53,54 +53,6 @@ get_arch_type() {
esac
}
# Verify SHA256 checksum
verify_sha256() {
file="$1"
expected_sha256="$2"
if command -v sha256sum >/dev/null 2>&1; then
actual_sha256=$(sha256sum "$file" | cut -d' ' -f1)
elif command -v shasum >/dev/null 2>&1; then
actual_sha256=$(shasum -a 256 "$file" | cut -d' ' -f1)
else
echo "Warning: No SHA256 verification tool found (sha256sum or shasum). Skipping checksum verification."
return 0
fi
if [ "$actual_sha256" = "$expected_sha256" ]; then
echo "SHA256 checksum verified successfully."
return 0
else
echo "Error: SHA256 checksum verification failed!"
echo "Expected: $expected_sha256"
echo "Actual: $actual_sha256"
return 1
fi
}
# Prompt for user confirmation (compatible with different shells)
prompt_confirmation() {
message="$1"
printf "%s (y/N): " "$message"
# Try to read user input, fallback if read fails
answer=""
if read answer </dev/tty 2>/dev/null; then
case "$answer" in
[Yy]|[Yy][Ee][Ss])
return 0
;;
*)
return 1
;;
esac
else
echo ""
echo "Cannot read user input. Defaulting to No."
return 1
fi
}
download_artifact() {
if [ -n "${OS_TYPE}" ] && [ -n "${ARCH_TYPE}" ]; then
# Use the latest stable released version.
@@ -119,104 +71,17 @@ download_artifact() {
fi
echo "Downloading ${BIN}, OS: ${OS_TYPE}, Arch: ${ARCH_TYPE}, Version: ${VERSION}"
PKG_NAME="${BIN}-${OS_TYPE}-${ARCH_TYPE}-${VERSION}"
PACKAGE_NAME="${PKG_NAME}.tar.gz"
SHA256_FILE="${PKG_NAME}.sha256sum"
PACKAGE_NAME="${BIN}-${OS_TYPE}-${ARCH_TYPE}-${VERSION}.tar.gz"
if [ -n "${PACKAGE_NAME}" ]; then
# Check if files already exist and prompt for override
if [ -f "${PACKAGE_NAME}" ]; then
echo "File ${PACKAGE_NAME} already exists."
if prompt_confirmation "Do you want to override it?"; then
echo "Overriding existing file..."
rm -f "${PACKAGE_NAME}"
else
echo "Skipping download. Using existing file."
fi
fi
if [ -f "${BIN}" ]; then
echo "Binary ${BIN} already exists."
if prompt_confirmation "Do you want to override it?"; then
echo "Will override existing binary..."
rm -f "${BIN}"
else
echo "Installation cancelled."
exit 0
fi
fi
# Download package if not exists
if [ ! -f "${PACKAGE_NAME}" ]; then
echo "Downloading ${PACKAGE_NAME}..."
# Use curl instead of wget for better compatibility
if command -v curl >/dev/null 2>&1; then
if ! curl -L -o "${PACKAGE_NAME}" "https://github.com/${GITHUB_ORG}/${GITHUB_REPO}/releases/download/${VERSION}/${PACKAGE_NAME}"; then
echo "Error: Failed to download ${PACKAGE_NAME}"
exit 1
fi
elif command -v wget >/dev/null 2>&1; then
if ! wget -O "${PACKAGE_NAME}" "https://github.com/${GITHUB_ORG}/${GITHUB_REPO}/releases/download/${VERSION}/${PACKAGE_NAME}"; then
echo "Error: Failed to download ${PACKAGE_NAME}"
exit 1
fi
else
echo "Error: Neither curl nor wget is available for downloading."
exit 1
fi
fi
# Download and verify SHA256 checksum
echo "Downloading SHA256 checksum..."
sha256_download_success=0
if command -v curl >/dev/null 2>&1; then
if curl -L -s -o "${SHA256_FILE}" "https://github.com/${GITHUB_ORG}/${GITHUB_REPO}/releases/download/${VERSION}/${SHA256_FILE}" 2>/dev/null; then
sha256_download_success=1
fi
elif command -v wget >/dev/null 2>&1; then
if wget -q -O "${SHA256_FILE}" "https://github.com/${GITHUB_ORG}/${GITHUB_REPO}/releases/download/${VERSION}/${SHA256_FILE}" 2>/dev/null; then
sha256_download_success=1
fi
fi
if [ $sha256_download_success -eq 1 ] && [ -f "${SHA256_FILE}" ]; then
expected_sha256=$(cat "${SHA256_FILE}" | cut -d' ' -f1)
if [ -n "$expected_sha256" ]; then
if ! verify_sha256 "${PACKAGE_NAME}" "${expected_sha256}"; then
echo "SHA256 verification failed. Removing downloaded file."
rm -f "${PACKAGE_NAME}" "${SHA256_FILE}"
exit 1
fi
else
echo "Warning: Could not parse SHA256 checksum from file."
fi
rm -f "${SHA256_FILE}"
else
echo "Warning: Could not download SHA256 checksum file. Skipping verification."
fi
wget "https://github.com/${GITHUB_ORG}/${GITHUB_REPO}/releases/download/${VERSION}/${PACKAGE_NAME}"
# Extract the binary and clean the rest.
echo "Extracting ${PACKAGE_NAME}..."
if ! tar xf "${PACKAGE_NAME}"; then
echo "Error: Failed to extract ${PACKAGE_NAME}"
exit 1
fi
# Find the binary in the extracted directory
extracted_dir="${PACKAGE_NAME%.tar.gz}"
if [ -f "${extracted_dir}/${BIN}" ]; then
mv "${extracted_dir}/${BIN}" "${PWD}/"
rm -f "${PACKAGE_NAME}"
rm -rf "${extracted_dir}"
chmod +x "${BIN}"
echo "Installation completed successfully!"
echo "Run './${BIN} --help' to get started"
else
echo "Error: Binary ${BIN} not found in extracted archive"
rm -f "${PACKAGE_NAME}"
rm -rf "${extracted_dir}"
exit 1
fi
tar xvf "${PACKAGE_NAME}" && \
mv "${PACKAGE_NAME%.tar.gz}/${BIN}" "${PWD}" && \
rm -r "${PACKAGE_NAME}" && \
rm -r "${PACKAGE_NAME%.tar.gz}" && \
echo "Run './${BIN} --help' to get started"
fi
fi
}

View File

@@ -19,9 +19,7 @@ use common_decimal::decimal128::{DECIMAL128_DEFAULT_SCALE, DECIMAL128_MAX_PRECIS
use common_decimal::Decimal128;
use common_time::time::Time;
use common_time::timestamp::TimeUnit;
use common_time::{
Date, DateTime, IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth, Timestamp,
};
use common_time::{Date, IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth, Timestamp};
use datatypes::prelude::{ConcreteDataType, ValueRef};
use datatypes::scalars::ScalarVector;
use datatypes::types::{
@@ -29,8 +27,8 @@ use datatypes::types::{
};
use datatypes::value::{OrderedF32, OrderedF64, Value};
use datatypes::vectors::{
BinaryVector, BooleanVector, DateTimeVector, DateVector, Decimal128Vector, Float32Vector,
Float64Vector, Int32Vector, Int64Vector, IntervalDayTimeVector, IntervalMonthDayNanoVector,
BinaryVector, BooleanVector, DateVector, Decimal128Vector, Float32Vector, Float64Vector,
Int32Vector, Int64Vector, IntervalDayTimeVector, IntervalMonthDayNanoVector,
IntervalYearMonthVector, PrimitiveVector, StringVector, TimeMicrosecondVector,
TimeMillisecondVector, TimeNanosecondVector, TimeSecondVector, TimestampMicrosecondVector,
TimestampMillisecondVector, TimestampNanosecondVector, TimestampSecondVector, UInt32Vector,
@@ -118,7 +116,7 @@ impl From<ColumnDataTypeWrapper> for ConcreteDataType {
ColumnDataType::Json => ConcreteDataType::json_datatype(),
ColumnDataType::String => ConcreteDataType::string_datatype(),
ColumnDataType::Date => ConcreteDataType::date_datatype(),
ColumnDataType::Datetime => ConcreteDataType::datetime_datatype(),
ColumnDataType::Datetime => ConcreteDataType::timestamp_microsecond_datatype(),
ColumnDataType::TimestampSecond => ConcreteDataType::timestamp_second_datatype(),
ColumnDataType::TimestampMillisecond => {
ConcreteDataType::timestamp_millisecond_datatype()
@@ -271,7 +269,6 @@ impl TryFrom<ConcreteDataType> for ColumnDataTypeWrapper {
ConcreteDataType::Binary(_) => ColumnDataType::Binary,
ConcreteDataType::String(_) => ColumnDataType::String,
ConcreteDataType::Date(_) => ColumnDataType::Date,
ConcreteDataType::DateTime(_) => ColumnDataType::Datetime,
ConcreteDataType::Timestamp(t) => match t {
TimestampType::Second(_) => ColumnDataType::TimestampSecond,
TimestampType::Millisecond(_) => ColumnDataType::TimestampMillisecond,
@@ -476,7 +473,6 @@ pub fn push_vals(column: &mut Column, origin_count: usize, vector: VectorRef) {
Value::String(val) => values.string_values.push(val.as_utf8().to_string()),
Value::Binary(val) => values.binary_values.push(val.to_vec()),
Value::Date(val) => values.date_values.push(val.val()),
Value::DateTime(val) => values.datetime_values.push(val.val()),
Value::Timestamp(val) => match val.unit() {
TimeUnit::Second => values.timestamp_second_values.push(val.value()),
TimeUnit::Millisecond => values.timestamp_millisecond_values.push(val.value()),
@@ -577,12 +573,11 @@ pub fn pb_value_to_value_ref<'a>(
ValueData::BinaryValue(bytes) => ValueRef::Binary(bytes.as_slice()),
ValueData::StringValue(string) => ValueRef::String(string.as_str()),
ValueData::DateValue(d) => ValueRef::Date(Date::from(*d)),
ValueData::DatetimeValue(d) => ValueRef::DateTime(DateTime::new(*d)),
ValueData::TimestampSecondValue(t) => ValueRef::Timestamp(Timestamp::new_second(*t)),
ValueData::TimestampMillisecondValue(t) => {
ValueRef::Timestamp(Timestamp::new_millisecond(*t))
}
ValueData::TimestampMicrosecondValue(t) => {
ValueData::DatetimeValue(t) | ValueData::TimestampMicrosecondValue(t) => {
ValueRef::Timestamp(Timestamp::new_microsecond(*t))
}
ValueData::TimestampNanosecondValue(t) => {
@@ -651,7 +646,6 @@ pub fn pb_values_to_vector_ref(data_type: &ConcreteDataType, values: Values) ->
ConcreteDataType::Binary(_) => Arc::new(BinaryVector::from(values.binary_values)),
ConcreteDataType::String(_) => Arc::new(StringVector::from_vec(values.string_values)),
ConcreteDataType::Date(_) => Arc::new(DateVector::from_vec(values.date_values)),
ConcreteDataType::DateTime(_) => Arc::new(DateTimeVector::from_vec(values.datetime_values)),
ConcreteDataType::Timestamp(unit) => match unit {
TimestampType::Second(_) => Arc::new(TimestampSecondVector::from_vec(
values.timestamp_second_values,
@@ -787,11 +781,6 @@ pub fn pb_values_to_values(data_type: &ConcreteDataType, values: Values) -> Vec<
.into_iter()
.map(|val| val.into())
.collect(),
ConcreteDataType::DateTime(_) => values
.datetime_values
.into_iter()
.map(|v| Value::DateTime(v.into()))
.collect(),
ConcreteDataType::Date(_) => values
.date_values
.into_iter()
@@ -947,9 +936,6 @@ pub fn to_proto_value(value: Value) -> Option<v1::Value> {
Value::Date(v) => v1::Value {
value_data: Some(ValueData::DateValue(v.val())),
},
Value::DateTime(v) => v1::Value {
value_data: Some(ValueData::DatetimeValue(v.val())),
},
Value::Timestamp(v) => match v.unit() {
TimeUnit::Second => v1::Value {
value_data: Some(ValueData::TimestampSecondValue(v.value())),
@@ -1066,7 +1052,6 @@ pub fn value_to_grpc_value(value: Value) -> GrpcValue {
Value::String(v) => Some(ValueData::StringValue(v.as_utf8().to_string())),
Value::Binary(v) => Some(ValueData::BinaryValue(v.to_vec())),
Value::Date(v) => Some(ValueData::DateValue(v.val())),
Value::DateTime(v) => Some(ValueData::DatetimeValue(v.val())),
Value::Timestamp(v) => Some(match v.unit() {
TimeUnit::Second => ValueData::TimestampSecondValue(v.value()),
TimeUnit::Millisecond => ValueData::TimestampMillisecondValue(v.value()),
@@ -1248,7 +1233,7 @@ mod tests {
ColumnDataTypeWrapper::date_datatype().into()
);
assert_eq!(
ConcreteDataType::datetime_datatype(),
ConcreteDataType::timestamp_microsecond_datatype(),
ColumnDataTypeWrapper::datetime_datatype().into()
);
assert_eq!(
@@ -1339,10 +1324,6 @@ mod tests {
ColumnDataTypeWrapper::date_datatype(),
ConcreteDataType::date_datatype().try_into().unwrap()
);
assert_eq!(
ColumnDataTypeWrapper::datetime_datatype(),
ConcreteDataType::datetime_datatype().try_into().unwrap()
);
assert_eq!(
ColumnDataTypeWrapper::timestamp_millisecond_datatype(),
ConcreteDataType::timestamp_millisecond_datatype()
@@ -1830,17 +1811,6 @@ mod tests {
]
);
test_convert_values!(
datetime,
vec![1.into(), 2.into(), 3.into()],
datetime,
vec![
Value::DateTime(1.into()),
Value::DateTime(2.into()),
Value::DateTime(3.into())
]
);
#[test]
fn test_vectors_to_rows_for_different_types() {
let boolean_vec = BooleanVector::from_vec(vec![true, false, true]);

View File

@@ -15,8 +15,8 @@
use std::collections::HashMap;
use datatypes::schema::{
ColumnDefaultConstraint, ColumnSchema, FulltextAnalyzer, FulltextOptions, SkippingIndexType,
COMMENT_KEY, FULLTEXT_KEY, INVERTED_INDEX_KEY, SKIPPING_INDEX_KEY,
ColumnDefaultConstraint, ColumnSchema, FulltextAnalyzer, FulltextOptions, SkippingIndexOptions,
SkippingIndexType, COMMENT_KEY, FULLTEXT_KEY, INVERTED_INDEX_KEY, SKIPPING_INDEX_KEY,
};
use greptime_proto::v1::{Analyzer, SkippingIndexType as PbSkippingIndexType};
use snafu::ResultExt;
@@ -103,6 +103,13 @@ pub fn contains_fulltext(options: &Option<ColumnOptions>) -> bool {
.is_some_and(|o| o.options.contains_key(FULLTEXT_GRPC_KEY))
}
/// Checks if the `ColumnOptions` contains skipping index options.
pub fn contains_skipping(options: &Option<ColumnOptions>) -> bool {
options
.as_ref()
.is_some_and(|o| o.options.contains_key(SKIPPING_INDEX_GRPC_KEY))
}
/// Tries to construct a `ColumnOptions` from the given `FulltextOptions`.
pub fn options_from_fulltext(fulltext: &FulltextOptions) -> Result<Option<ColumnOptions>> {
let mut options = ColumnOptions::default();
@@ -113,6 +120,27 @@ pub fn options_from_fulltext(fulltext: &FulltextOptions) -> Result<Option<Column
Ok((!options.options.is_empty()).then_some(options))
}
/// Tries to construct a `ColumnOptions` from the given `SkippingIndexOptions`.
pub fn options_from_skipping(skipping: &SkippingIndexOptions) -> Result<Option<ColumnOptions>> {
let mut options = ColumnOptions::default();
let v = serde_json::to_string(skipping).context(error::SerializeJsonSnafu)?;
options
.options
.insert(SKIPPING_INDEX_GRPC_KEY.to_string(), v);
Ok((!options.options.is_empty()).then_some(options))
}
/// Tries to construct a `ColumnOptions` for inverted index.
pub fn options_from_inverted() -> ColumnOptions {
let mut options = ColumnOptions::default();
options
.options
.insert(INVERTED_INDEX_GRPC_KEY.to_string(), "true".to_string());
options
}
/// Tries to construct a `FulltextAnalyzer` from the given analyzer.
pub fn as_fulltext_option(analyzer: Analyzer) -> FulltextAnalyzer {
match analyzer {

View File

@@ -38,6 +38,7 @@ use partition::manager::{PartitionRuleManager, PartitionRuleManagerRef};
use session::context::{Channel, QueryContext};
use snafu::prelude::*;
use table::dist_table::DistTable;
use table::metadata::TableId;
use table::table::numbers::{NumbersTable, NUMBERS_TABLE_NAME};
use table::table_name::TableName;
use table::TableRef;
@@ -286,6 +287,28 @@ impl CatalogManager for KvBackendCatalogManager {
return Ok(None);
}
async fn tables_by_ids(
&self,
catalog: &str,
schema: &str,
table_ids: &[TableId],
) -> Result<Vec<TableRef>> {
let table_info_values = self
.table_metadata_manager
.table_info_manager()
.batch_get(table_ids)
.await
.context(TableMetadataManagerSnafu)?;
let tables = table_info_values
.into_values()
.filter(|t| t.table_info.catalog_name == catalog && t.table_info.schema_name == schema)
.map(build_table)
.collect::<Result<Vec<_>>>()?;
Ok(tables)
}
fn tables<'a>(
&'a self,
catalog: &'a str,

View File

@@ -87,6 +87,14 @@ pub trait CatalogManager: Send + Sync {
query_ctx: Option<&QueryContext>,
) -> Result<Option<TableRef>>;
/// Returns the tables by table ids.
async fn tables_by_ids(
&self,
catalog: &str,
schema: &str,
table_ids: &[TableId],
) -> Result<Vec<TableRef>>;
/// Returns all tables with a stream by catalog and schema.
fn tables<'a>(
&'a self,

View File

@@ -14,7 +14,7 @@
use std::any::Any;
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::sync::{Arc, RwLock, Weak};
use async_stream::{stream, try_stream};
@@ -28,6 +28,7 @@ use common_meta::kv_backend::memory::MemoryKvBackend;
use futures_util::stream::BoxStream;
use session::context::QueryContext;
use snafu::OptionExt;
use table::metadata::TableId;
use table::TableRef;
use crate::error::{CatalogNotFoundSnafu, Result, SchemaNotFoundSnafu, TableExistsSnafu};
@@ -143,6 +144,33 @@ impl CatalogManager for MemoryCatalogManager {
Ok(result)
}
async fn tables_by_ids(
&self,
catalog: &str,
schema: &str,
table_ids: &[TableId],
) -> Result<Vec<TableRef>> {
let catalogs = self.catalogs.read().unwrap();
let schemas = catalogs.get(catalog).context(CatalogNotFoundSnafu {
catalog_name: catalog,
})?;
let tables = schemas
.get(schema)
.context(SchemaNotFoundSnafu { catalog, schema })?;
let filter_ids: HashSet<_> = table_ids.iter().collect();
// It is very inefficient, but we do not need to optimize it since it will not be called in `MemoryCatalogManager`.
let tables = tables
.values()
.filter(|t| filter_ids.contains(&t.table_info().table_id()))
.cloned()
.collect::<Vec<_>>();
Ok(tables)
}
fn tables<'a>(
&'a self,
catalog: &'a str,

View File

@@ -77,7 +77,7 @@ trait SystemSchemaProviderInner {
fn system_table(&self, name: &str) -> Option<SystemTableRef>;
fn table_info(catalog_name: String, table: &SystemTableRef) -> TableInfoRef {
let table_meta = TableMetaBuilder::default()
let table_meta = TableMetaBuilder::empty()
.schema(table.schema())
.primary_key_indices(vec![])
.next_column_id(0)

View File

@@ -365,10 +365,6 @@ impl InformationSchemaColumnsBuilder {
self.numeric_scales.push(None);
match &column_schema.data_type {
ConcreteDataType::DateTime(datetime_type) => {
self.datetime_precisions
.push(Some(datetime_type.precision() as i64));
}
ConcreteDataType::Timestamp(ts_type) => {
self.datetime_precisions
.push(Some(ts_type.precision() as i64));

View File

@@ -28,16 +28,19 @@ use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
use datatypes::prelude::ConcreteDataType as CDT;
use datatypes::scalars::ScalarVectorBuilder;
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use datatypes::timestamp::TimestampMillisecond;
use datatypes::value::Value;
use datatypes::vectors::{
Int64VectorBuilder, StringVectorBuilder, UInt32VectorBuilder, UInt64VectorBuilder, VectorRef,
Int64VectorBuilder, StringVectorBuilder, TimestampMillisecondVectorBuilder,
UInt32VectorBuilder, UInt64VectorBuilder, VectorRef,
};
use futures::TryStreamExt;
use snafu::{OptionExt, ResultExt};
use store_api::storage::{ScanRequest, TableId};
use crate::error::{
CreateRecordBatchSnafu, FlowInfoNotFoundSnafu, InternalSnafu, JsonSnafu, ListFlowsSnafu, Result,
CreateRecordBatchSnafu, FlowInfoNotFoundSnafu, InternalSnafu, JsonSnafu, ListFlowsSnafu,
Result, UpgradeWeakCatalogManagerRefSnafu,
};
use crate::information_schema::{Predicates, FLOWS};
use crate::system_schema::information_schema::InformationTable;
@@ -59,6 +62,10 @@ pub const SOURCE_TABLE_IDS: &str = "source_table_ids";
pub const SINK_TABLE_NAME: &str = "sink_table_name";
pub const FLOWNODE_IDS: &str = "flownode_ids";
pub const OPTIONS: &str = "options";
pub const CREATED_TIME: &str = "created_time";
pub const UPDATED_TIME: &str = "updated_time";
pub const LAST_EXECUTION_TIME: &str = "last_execution_time";
pub const SOURCE_TABLE_NAMES: &str = "source_table_names";
/// The `information_schema.flows` to provides information about flows in databases.
#[derive(Debug)]
@@ -99,6 +106,14 @@ impl InformationSchemaFlows {
(SINK_TABLE_NAME, CDT::string_datatype(), false),
(FLOWNODE_IDS, CDT::string_datatype(), true),
(OPTIONS, CDT::string_datatype(), true),
(CREATED_TIME, CDT::timestamp_millisecond_datatype(), false),
(UPDATED_TIME, CDT::timestamp_millisecond_datatype(), false),
(
LAST_EXECUTION_TIME,
CDT::timestamp_millisecond_datatype(),
true,
),
(SOURCE_TABLE_NAMES, CDT::string_datatype(), true),
]
.into_iter()
.map(|(name, ty, nullable)| ColumnSchema::new(name, ty, nullable))
@@ -170,6 +185,10 @@ struct InformationSchemaFlowsBuilder {
sink_table_names: StringVectorBuilder,
flownode_id_groups: StringVectorBuilder,
option_groups: StringVectorBuilder,
created_time: TimestampMillisecondVectorBuilder,
updated_time: TimestampMillisecondVectorBuilder,
last_execution_time: TimestampMillisecondVectorBuilder,
source_table_names: StringVectorBuilder,
}
impl InformationSchemaFlowsBuilder {
@@ -196,6 +215,10 @@ impl InformationSchemaFlowsBuilder {
sink_table_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
flownode_id_groups: StringVectorBuilder::with_capacity(INIT_CAPACITY),
option_groups: StringVectorBuilder::with_capacity(INIT_CAPACITY),
created_time: TimestampMillisecondVectorBuilder::with_capacity(INIT_CAPACITY),
updated_time: TimestampMillisecondVectorBuilder::with_capacity(INIT_CAPACITY),
last_execution_time: TimestampMillisecondVectorBuilder::with_capacity(INIT_CAPACITY),
source_table_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
}
}
@@ -235,13 +258,14 @@ impl InformationSchemaFlowsBuilder {
catalog_name: catalog_name.to_string(),
flow_name: flow_name.to_string(),
})?;
self.add_flow(&predicates, flow_id.flow_id(), flow_info, &flow_stat)?;
self.add_flow(&predicates, flow_id.flow_id(), flow_info, &flow_stat)
.await?;
}
self.finish()
}
fn add_flow(
async fn add_flow(
&mut self,
predicates: &Predicates,
flow_id: FlowId,
@@ -290,6 +314,36 @@ impl InformationSchemaFlowsBuilder {
input: format!("{:?}", flow_info.options()),
},
)?));
self.created_time
.push(Some(flow_info.created_time().timestamp_millis().into()));
self.updated_time
.push(Some(flow_info.updated_time().timestamp_millis().into()));
self.last_execution_time
.push(flow_stat.as_ref().and_then(|state| {
state
.last_exec_time_map
.get(&flow_id)
.map(|v| TimestampMillisecond::new(*v))
}));
let mut source_table_names = vec![];
let catalog_name = self.catalog_name.clone();
let catalog_manager = self
.catalog_manager
.upgrade()
.context(UpgradeWeakCatalogManagerRefSnafu)?;
for schema_name in catalog_manager.schema_names(&catalog_name, None).await? {
source_table_names.extend(
catalog_manager
.tables_by_ids(&catalog_name, &schema_name, flow_info.source_table_ids())
.await?
.into_iter()
.map(|table| table.table_info().full_table_name()),
);
}
let source_table_names = source_table_names.join(",");
self.source_table_names.push(Some(&source_table_names));
Ok(())
}
@@ -307,6 +361,10 @@ impl InformationSchemaFlowsBuilder {
Arc::new(self.sink_table_names.finish()),
Arc::new(self.flownode_id_groups.finish()),
Arc::new(self.option_groups.finish()),
Arc::new(self.created_time.finish()),
Arc::new(self.updated_time.finish()),
Arc::new(self.last_execution_time.finish()),
Arc::new(self.source_table_names.finish()),
];
RecordBatch::new(self.schema.clone(), columns).context(CreateRecordBatchSnafu)
}

View File

@@ -20,7 +20,7 @@ use datatypes::vectors::{Int64Vector, StringVector, VectorRef};
use super::table_names::*;
use crate::system_schema::utils::tables::{
bigint_column, datetime_column, string_column, string_columns,
bigint_column, string_column, string_columns, timestamp_micro_column,
};
const NO_VALUE: &str = "NO";
@@ -163,17 +163,17 @@ pub(super) fn get_schema_columns(table_name: &str) -> (SchemaRef, Vec<VectorRef>
string_column("EVENT_BODY"),
string_column("EVENT_DEFINITION"),
string_column("EVENT_TYPE"),
datetime_column("EXECUTE_AT"),
timestamp_micro_column("EXECUTE_AT"),
bigint_column("INTERVAL_VALUE"),
string_column("INTERVAL_FIELD"),
string_column("SQL_MODE"),
datetime_column("STARTS"),
datetime_column("ENDS"),
timestamp_micro_column("STARTS"),
timestamp_micro_column("ENDS"),
string_column("STATUS"),
string_column("ON_COMPLETION"),
datetime_column("CREATED"),
datetime_column("LAST_ALTERED"),
datetime_column("LAST_EXECUTED"),
timestamp_micro_column("CREATED"),
timestamp_micro_column("LAST_ALTERED"),
timestamp_micro_column("LAST_EXECUTED"),
string_column("EVENT_COMMENT"),
bigint_column("ORIGINATOR"),
string_column("CHARACTER_SET_CLIENT"),
@@ -204,10 +204,10 @@ pub(super) fn get_schema_columns(table_name: &str) -> (SchemaRef, Vec<VectorRef>
bigint_column("INITIAL_SIZE"),
bigint_column("MAXIMUM_SIZE"),
bigint_column("AUTOEXTEND_SIZE"),
datetime_column("CREATION_TIME"),
datetime_column("LAST_UPDATE_TIME"),
datetime_column("LAST_ACCESS_TIME"),
datetime_column("RECOVER_TIME"),
timestamp_micro_column("CREATION_TIME"),
timestamp_micro_column("LAST_UPDATE_TIME"),
timestamp_micro_column("LAST_ACCESS_TIME"),
timestamp_micro_column("RECOVER_TIME"),
bigint_column("TRANSACTION_COUNTER"),
string_column("VERSION"),
string_column("ROW_FORMAT"),
@@ -217,9 +217,9 @@ pub(super) fn get_schema_columns(table_name: &str) -> (SchemaRef, Vec<VectorRef>
bigint_column("MAX_DATA_LENGTH"),
bigint_column("INDEX_LENGTH"),
bigint_column("DATA_FREE"),
datetime_column("CREATE_TIME"),
datetime_column("UPDATE_TIME"),
datetime_column("CHECK_TIME"),
timestamp_micro_column("CREATE_TIME"),
timestamp_micro_column("UPDATE_TIME"),
timestamp_micro_column("CHECK_TIME"),
string_column("CHECKSUM"),
string_column("STATUS"),
string_column("EXTRA"),
@@ -330,8 +330,8 @@ pub(super) fn get_schema_columns(table_name: &str) -> (SchemaRef, Vec<VectorRef>
string_column("SQL_DATA_ACCESS"),
string_column("SQL_PATH"),
string_column("SECURITY_TYPE"),
datetime_column("CREATED"),
datetime_column("LAST_ALTERED"),
timestamp_micro_column("CREATED"),
timestamp_micro_column("LAST_ALTERED"),
string_column("SQL_MODE"),
string_column("ROUTINE_COMMENT"),
string_column("DEFINER"),
@@ -383,7 +383,7 @@ pub(super) fn get_schema_columns(table_name: &str) -> (SchemaRef, Vec<VectorRef>
string_column("ACTION_REFERENCE_NEW_TABLE"),
string_column("ACTION_REFERENCE_OLD_ROW"),
string_column("ACTION_REFERENCE_NEW_ROW"),
datetime_column("CREATED"),
timestamp_micro_column("CREATED"),
string_column("SQL_MODE"),
string_column("DEFINER"),
string_column("CHARACTER_SET_CLIENT"),

View File

@@ -20,17 +20,18 @@ use common_catalog::consts::INFORMATION_SCHEMA_PARTITIONS_TABLE_ID;
use common_error::ext::BoxedError;
use common_recordbatch::adapter::RecordBatchStreamAdapter;
use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
use common_time::datetime::DateTime;
use datafusion::execution::TaskContext;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
use datatypes::prelude::{ConcreteDataType, ScalarVectorBuilder, VectorRef};
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use datatypes::timestamp::TimestampMicrosecond;
use datatypes::value::Value;
use datatypes::vectors::{
ConstantVector, DateTimeVector, DateTimeVectorBuilder, Int64Vector, Int64VectorBuilder,
MutableVector, StringVector, StringVectorBuilder, UInt64VectorBuilder,
ConstantVector, Int64Vector, Int64VectorBuilder, MutableVector, StringVector,
StringVectorBuilder, TimestampMicrosecondVector, TimestampMicrosecondVectorBuilder,
UInt64VectorBuilder,
};
use futures::{StreamExt, TryStreamExt};
use partition::manager::PartitionInfo;
@@ -127,9 +128,21 @@ impl InformationSchemaPartitions {
ColumnSchema::new("max_data_length", ConcreteDataType::int64_datatype(), true),
ColumnSchema::new("index_length", ConcreteDataType::int64_datatype(), true),
ColumnSchema::new("data_free", ConcreteDataType::int64_datatype(), true),
ColumnSchema::new("create_time", ConcreteDataType::datetime_datatype(), true),
ColumnSchema::new("update_time", ConcreteDataType::datetime_datatype(), true),
ColumnSchema::new("check_time", ConcreteDataType::datetime_datatype(), true),
ColumnSchema::new(
"create_time",
ConcreteDataType::timestamp_microsecond_datatype(),
true,
),
ColumnSchema::new(
"update_time",
ConcreteDataType::timestamp_microsecond_datatype(),
true,
),
ColumnSchema::new(
"check_time",
ConcreteDataType::timestamp_microsecond_datatype(),
true,
),
ColumnSchema::new("checksum", ConcreteDataType::int64_datatype(), true),
ColumnSchema::new(
"partition_comment",
@@ -200,7 +213,7 @@ struct InformationSchemaPartitionsBuilder {
partition_names: StringVectorBuilder,
partition_ordinal_positions: Int64VectorBuilder,
partition_expressions: StringVectorBuilder,
create_times: DateTimeVectorBuilder,
create_times: TimestampMicrosecondVectorBuilder,
partition_ids: UInt64VectorBuilder,
}
@@ -220,7 +233,7 @@ impl InformationSchemaPartitionsBuilder {
partition_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
partition_ordinal_positions: Int64VectorBuilder::with_capacity(INIT_CAPACITY),
partition_expressions: StringVectorBuilder::with_capacity(INIT_CAPACITY),
create_times: DateTimeVectorBuilder::with_capacity(INIT_CAPACITY),
create_times: TimestampMicrosecondVectorBuilder::with_capacity(INIT_CAPACITY),
partition_ids: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
}
}
@@ -324,7 +337,7 @@ impl InformationSchemaPartitionsBuilder {
};
self.partition_expressions.push(expressions.as_deref());
self.create_times.push(Some(DateTime::from(
self.create_times.push(Some(TimestampMicrosecond::from(
table_info.meta.created_on.timestamp_millis(),
)));
self.partition_ids.push(Some(partition.id.as_u64()));
@@ -342,8 +355,8 @@ impl InformationSchemaPartitionsBuilder {
Arc::new(Int64Vector::from(vec![None])),
rows_num,
));
let null_datetime_vector = Arc::new(ConstantVector::new(
Arc::new(DateTimeVector::from(vec![None])),
let null_timestampmicrosecond_vector = Arc::new(ConstantVector::new(
Arc::new(TimestampMicrosecondVector::from(vec![None])),
rows_num,
));
let partition_methods = Arc::new(ConstantVector::new(
@@ -373,8 +386,8 @@ impl InformationSchemaPartitionsBuilder {
null_i64_vector.clone(),
Arc::new(self.create_times.finish()),
// TODO(dennis): supports update_time
null_datetime_vector.clone(),
null_datetime_vector,
null_timestampmicrosecond_vector.clone(),
null_timestampmicrosecond_vector,
null_i64_vector,
null_string_vector.clone(),
null_string_vector.clone(),

View File

@@ -30,7 +30,8 @@ use datatypes::prelude::{ConcreteDataType, ScalarVectorBuilder, VectorRef};
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use datatypes::value::Value;
use datatypes::vectors::{
DateTimeVectorBuilder, StringVectorBuilder, UInt32VectorBuilder, UInt64VectorBuilder,
StringVectorBuilder, TimestampMicrosecondVectorBuilder, UInt32VectorBuilder,
UInt64VectorBuilder,
};
use futures::TryStreamExt;
use snafu::{OptionExt, ResultExt};
@@ -105,9 +106,21 @@ impl InformationSchemaTables {
ColumnSchema::new(TABLE_ROWS, ConcreteDataType::uint64_datatype(), true),
ColumnSchema::new(DATA_FREE, ConcreteDataType::uint64_datatype(), true),
ColumnSchema::new(AUTO_INCREMENT, ConcreteDataType::uint64_datatype(), true),
ColumnSchema::new(CREATE_TIME, ConcreteDataType::datetime_datatype(), true),
ColumnSchema::new(UPDATE_TIME, ConcreteDataType::datetime_datatype(), true),
ColumnSchema::new(CHECK_TIME, ConcreteDataType::datetime_datatype(), true),
ColumnSchema::new(
CREATE_TIME,
ConcreteDataType::timestamp_microsecond_datatype(),
true,
),
ColumnSchema::new(
UPDATE_TIME,
ConcreteDataType::timestamp_microsecond_datatype(),
true,
),
ColumnSchema::new(
CHECK_TIME,
ConcreteDataType::timestamp_microsecond_datatype(),
true,
),
ColumnSchema::new(TABLE_COLLATION, ConcreteDataType::string_datatype(), true),
ColumnSchema::new(CHECKSUM, ConcreteDataType::uint64_datatype(), true),
ColumnSchema::new(CREATE_OPTIONS, ConcreteDataType::string_datatype(), true),
@@ -182,9 +195,9 @@ struct InformationSchemaTablesBuilder {
max_index_length: UInt64VectorBuilder,
data_free: UInt64VectorBuilder,
auto_increment: UInt64VectorBuilder,
create_time: DateTimeVectorBuilder,
update_time: DateTimeVectorBuilder,
check_time: DateTimeVectorBuilder,
create_time: TimestampMicrosecondVectorBuilder,
update_time: TimestampMicrosecondVectorBuilder,
check_time: TimestampMicrosecondVectorBuilder,
table_collation: StringVectorBuilder,
checksum: UInt64VectorBuilder,
create_options: StringVectorBuilder,
@@ -219,9 +232,9 @@ impl InformationSchemaTablesBuilder {
max_index_length: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
data_free: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
auto_increment: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
create_time: DateTimeVectorBuilder::with_capacity(INIT_CAPACITY),
update_time: DateTimeVectorBuilder::with_capacity(INIT_CAPACITY),
check_time: DateTimeVectorBuilder::with_capacity(INIT_CAPACITY),
create_time: TimestampMicrosecondVectorBuilder::with_capacity(INIT_CAPACITY),
update_time: TimestampMicrosecondVectorBuilder::with_capacity(INIT_CAPACITY),
check_time: TimestampMicrosecondVectorBuilder::with_capacity(INIT_CAPACITY),
table_collation: StringVectorBuilder::with_capacity(INIT_CAPACITY),
checksum: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
create_options: StringVectorBuilder::with_capacity(INIT_CAPACITY),

View File

@@ -51,10 +51,10 @@ pub fn bigint_column(name: &str) -> ColumnSchema {
)
}
pub fn datetime_column(name: &str) -> ColumnSchema {
pub fn timestamp_micro_column(name: &str) -> ColumnSchema {
ColumnSchema::new(
str::to_lowercase(name),
ConcreteDataType::datetime_datatype(),
ConcreteDataType::timestamp_microsecond_datatype(),
false,
)
}

View File

@@ -6,6 +6,7 @@ license.workspace = true
[features]
pg_kvbackend = ["common-meta/pg_kvbackend"]
mysql_kvbackend = ["common-meta/mysql_kvbackend"]
[lints]
workspace = true
@@ -43,6 +44,10 @@ futures.workspace = true
humantime.workspace = true
meta-client.workspace = true
nu-ansi-term = "0.46"
opendal = { version = "0.51.1", features = [
"services-fs",
"services-s3",
] }
query.workspace = true
rand.workspace = true
reqwest.workspace = true

View File

@@ -23,6 +23,8 @@ use common_error::ext::BoxedError;
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::etcd::EtcdStore;
use common_meta::kv_backend::memory::MemoryKvBackend;
#[cfg(feature = "mysql_kvbackend")]
use common_meta::kv_backend::rds::MySqlStore;
#[cfg(feature = "pg_kvbackend")]
use common_meta::kv_backend::rds::PgStore;
use common_meta::peer::Peer;
@@ -63,6 +65,9 @@ pub struct BenchTableMetadataCommand {
#[cfg(feature = "pg_kvbackend")]
#[clap(long)]
postgres_addr: Option<String>,
#[cfg(feature = "mysql_kvbackend")]
#[clap(long)]
mysql_addr: Option<String>,
#[clap(long)]
count: u32,
}
@@ -86,6 +91,16 @@ impl BenchTableMetadataCommand {
kv_backend
};
#[cfg(feature = "mysql_kvbackend")]
let kv_backend = if let Some(mysql_addr) = &self.mysql_addr {
info!("Using mysql as kv backend");
MySqlStore::with_url(mysql_addr, "greptime_metakv", 128)
.await
.unwrap()
} else {
kv_backend
};
let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend));
let tool = BenchTableMetadata {

View File

@@ -276,6 +276,24 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("OpenDAL operator failed"))]
OpenDal {
#[snafu(implicit)]
location: Location,
#[snafu(source)]
error: opendal::Error,
},
#[snafu(display("S3 config need be set"))]
S3ConfigNotSet {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Output directory not set"))]
OutputDirNotSet {
#[snafu(implicit)]
location: Location,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -319,6 +337,9 @@ impl ErrorExt for Error {
| Error::BuildClient { .. } => StatusCode::Unexpected,
Error::Other { source, .. } => source.status_code(),
Error::OpenDal { .. } => StatusCode::Internal,
Error::S3ConfigNotSet { .. } => StatusCode::InvalidArguments,
Error::OutputDirNotSet { .. } => StatusCode::InvalidArguments,
Error::BuildRuntime { source, .. } => source.status_code(),

View File

@@ -21,15 +21,18 @@ use async_trait::async_trait;
use clap::{Parser, ValueEnum};
use common_error::ext::BoxedError;
use common_telemetry::{debug, error, info};
use opendal::layers::LoggingLayer;
use opendal::{services, Operator};
use serde_json::Value;
use snafu::{OptionExt, ResultExt};
use tokio::fs::File;
use tokio::io::{AsyncWriteExt, BufWriter};
use tokio::sync::Semaphore;
use tokio::time::Instant;
use crate::database::{parse_proxy_opts, DatabaseClient};
use crate::error::{EmptyResultSnafu, Error, FileIoSnafu, Result, SchemaNotFoundSnafu};
use crate::error::{
EmptyResultSnafu, Error, OpenDalSnafu, OutputDirNotSetSnafu, Result, S3ConfigNotSetSnafu,
SchemaNotFoundSnafu,
};
use crate::{database, Tool};
type TableReference = (String, String, String);
@@ -52,8 +55,9 @@ pub struct ExportCommand {
addr: String,
/// Directory to put the exported data. E.g.: /tmp/greptimedb-export
/// for local export.
#[clap(long)]
output_dir: String,
output_dir: Option<String>,
/// The name of the catalog to export.
#[clap(long, default_value = "greptime-*")]
@@ -101,10 +105,51 @@ pub struct ExportCommand {
/// Disable proxy server, if set, will not use any proxy.
#[clap(long)]
no_proxy: bool,
/// if export data to s3
#[clap(long)]
s3: bool,
/// The s3 bucket name
/// if s3 is set, this is required
#[clap(long)]
s3_bucket: Option<String>,
/// The s3 endpoint
/// if s3 is set, this is required
#[clap(long)]
s3_endpoint: Option<String>,
/// The s3 access key
/// if s3 is set, this is required
#[clap(long)]
s3_access_key: Option<String>,
/// The s3 secret key
/// if s3 is set, this is required
#[clap(long)]
s3_secret_key: Option<String>,
/// The s3 region
/// if s3 is set, this is required
#[clap(long)]
s3_region: Option<String>,
}
impl ExportCommand {
pub async fn build(&self) -> std::result::Result<Box<dyn Tool>, BoxedError> {
if self.s3
&& (self.s3_bucket.is_none()
|| self.s3_endpoint.is_none()
|| self.s3_access_key.is_none()
|| self.s3_secret_key.is_none()
|| self.s3_region.is_none())
{
return Err(BoxedError::new(S3ConfigNotSetSnafu {}.build()));
}
if !self.s3 && self.output_dir.is_none() {
return Err(BoxedError::new(OutputDirNotSetSnafu {}.build()));
}
let (catalog, schema) =
database::split_database(&self.database).map_err(BoxedError::new)?;
let proxy = parse_proxy_opts(self.proxy.clone(), self.no_proxy)?;
@@ -126,24 +171,43 @@ impl ExportCommand {
target: self.target.clone(),
start_time: self.start_time.clone(),
end_time: self.end_time.clone(),
s3: self.s3,
s3_bucket: self.s3_bucket.clone(),
s3_endpoint: self.s3_endpoint.clone(),
s3_access_key: self.s3_access_key.clone(),
s3_secret_key: self.s3_secret_key.clone(),
s3_region: self.s3_region.clone(),
}))
}
}
#[derive(Clone)]
pub struct Export {
catalog: String,
schema: Option<String>,
database_client: DatabaseClient,
output_dir: String,
output_dir: Option<String>,
parallelism: usize,
target: ExportTarget,
start_time: Option<String>,
end_time: Option<String>,
s3: bool,
s3_bucket: Option<String>,
s3_endpoint: Option<String>,
s3_access_key: Option<String>,
s3_secret_key: Option<String>,
s3_region: Option<String>,
}
impl Export {
fn catalog_path(&self) -> PathBuf {
PathBuf::from(&self.output_dir).join(&self.catalog)
if self.s3 {
PathBuf::from(&self.catalog)
} else if let Some(dir) = &self.output_dir {
PathBuf::from(dir).join(&self.catalog)
} else {
unreachable!("catalog_path: output_dir must be set when not using s3")
}
}
async fn get_db_names(&self) -> Result<Vec<String>> {
@@ -300,19 +364,23 @@ impl Export {
let timer = Instant::now();
let db_names = self.get_db_names().await?;
let db_count = db_names.len();
let operator = self.build_operator().await?;
for schema in db_names {
let db_dir = self.catalog_path().join(format!("{schema}/"));
tokio::fs::create_dir_all(&db_dir)
.await
.context(FileIoSnafu)?;
let file = db_dir.join("create_database.sql");
let mut file = File::create(file).await.context(FileIoSnafu)?;
let create_database = self
.show_create("DATABASE", &self.catalog, &schema, None)
.await?;
file.write_all(create_database.as_bytes())
.await
.context(FileIoSnafu)?;
let file_path = self.get_file_path(&schema, "create_database.sql");
self.write_to_storage(&operator, &file_path, create_database.into_bytes())
.await?;
info!(
"Exported {}.{} database creation SQL to {}",
self.catalog,
schema,
self.format_output_path(&file_path)
);
}
let elapsed = timer.elapsed();
@@ -326,149 +394,267 @@ impl Export {
let semaphore = Arc::new(Semaphore::new(self.parallelism));
let db_names = self.get_db_names().await?;
let db_count = db_names.len();
let operator = Arc::new(self.build_operator().await?);
let mut tasks = Vec::with_capacity(db_names.len());
for schema in db_names {
let semaphore_moved = semaphore.clone();
let export_self = self.clone();
let operator = operator.clone();
tasks.push(async move {
let _permit = semaphore_moved.acquire().await.unwrap();
let (metric_physical_tables, remaining_tables, views) =
self.get_table_list(&self.catalog, &schema).await?;
let table_count =
metric_physical_tables.len() + remaining_tables.len() + views.len();
let db_dir = self.catalog_path().join(format!("{schema}/"));
tokio::fs::create_dir_all(&db_dir)
.await
.context(FileIoSnafu)?;
let file = db_dir.join("create_tables.sql");
let mut file = File::create(file).await.context(FileIoSnafu)?;
for (c, s, t) in metric_physical_tables.into_iter().chain(remaining_tables) {
let create_table = self.show_create("TABLE", &c, &s, Some(&t)).await?;
file.write_all(create_table.as_bytes())
.await
.context(FileIoSnafu)?;
}
for (c, s, v) in views {
let create_view = self.show_create("VIEW", &c, &s, Some(&v)).await?;
file.write_all(create_view.as_bytes())
.await
.context(FileIoSnafu)?;
let (metric_physical_tables, remaining_tables, views) = export_self
.get_table_list(&export_self.catalog, &schema)
.await?;
// Create directory if needed for file system storage
if !export_self.s3 {
let db_dir = format!("{}/{}/", export_self.catalog, schema);
operator.create_dir(&db_dir).await.context(OpenDalSnafu)?;
}
let file_path = export_self.get_file_path(&schema, "create_tables.sql");
let mut content = Vec::new();
// Add table creation SQL
for (c, s, t) in metric_physical_tables.iter().chain(&remaining_tables) {
let create_table = export_self.show_create("TABLE", c, s, Some(t)).await?;
content.extend_from_slice(create_table.as_bytes());
}
// Add view creation SQL
for (c, s, v) in &views {
let create_view = export_self.show_create("VIEW", c, s, Some(v)).await?;
content.extend_from_slice(create_view.as_bytes());
}
// Write to storage
export_self
.write_to_storage(&operator, &file_path, content)
.await?;
info!(
"Finished exporting {}.{schema} with {table_count} table schemas to path: {}",
self.catalog,
db_dir.to_string_lossy()
"Finished exporting {}.{schema} with {} table schemas to path: {}",
export_self.catalog,
metric_physical_tables.len() + remaining_tables.len() + views.len(),
export_self.format_output_path(&file_path)
);
Ok::<(), Error>(())
});
}
let success = futures::future::join_all(tasks)
.await
.into_iter()
.filter(|r| match r {
Ok(_) => true,
Err(e) => {
error!(e; "export schema job failed");
false
}
})
.count();
let success = self.execute_tasks(tasks).await;
let elapsed = timer.elapsed();
info!("Success {success}/{db_count} jobs, cost: {elapsed:?}");
Ok(())
}
async fn build_operator(&self) -> Result<Operator> {
if self.s3 {
self.build_s3_operator().await
} else {
self.build_fs_operator().await
}
}
async fn build_s3_operator(&self) -> Result<Operator> {
let mut builder = services::S3::default().root("").bucket(
self.s3_bucket
.as_ref()
.expect("s3_bucket must be provided when s3 is enabled"),
);
if let Some(endpoint) = self.s3_endpoint.as_ref() {
builder = builder.endpoint(endpoint);
}
if let Some(region) = self.s3_region.as_ref() {
builder = builder.region(region);
}
if let Some(key_id) = self.s3_access_key.as_ref() {
builder = builder.access_key_id(key_id);
}
if let Some(secret_key) = self.s3_secret_key.as_ref() {
builder = builder.secret_access_key(secret_key);
}
let op = Operator::new(builder)
.context(OpenDalSnafu)?
.layer(LoggingLayer::default())
.finish();
Ok(op)
}
async fn build_fs_operator(&self) -> Result<Operator> {
let root = self
.output_dir
.as_ref()
.context(OutputDirNotSetSnafu)?
.clone();
let op = Operator::new(services::Fs::default().root(&root))
.context(OpenDalSnafu)?
.layer(LoggingLayer::default())
.finish();
Ok(op)
}
async fn export_database_data(&self) -> Result<()> {
let timer = Instant::now();
let semaphore = Arc::new(Semaphore::new(self.parallelism));
let db_names = self.get_db_names().await?;
let db_count = db_names.len();
let mut tasks = Vec::with_capacity(db_count);
let operator = Arc::new(self.build_operator().await?);
let with_options = build_with_options(&self.start_time, &self.end_time);
for schema in db_names {
let semaphore_moved = semaphore.clone();
let export_self = self.clone();
let with_options_clone = with_options.clone();
let operator = operator.clone();
tasks.push(async move {
let _permit = semaphore_moved.acquire().await.unwrap();
let db_dir = self.catalog_path().join(format!("{schema}/"));
tokio::fs::create_dir_all(&db_dir)
.await
.context(FileIoSnafu)?;
let with_options = match (&self.start_time, &self.end_time) {
(Some(start_time), Some(end_time)) => {
format!(
"WITH (FORMAT='parquet', start_time='{}', end_time='{}')",
start_time, end_time
)
}
(Some(start_time), None) => {
format!("WITH (FORMAT='parquet', start_time='{}')", start_time)
}
(None, Some(end_time)) => {
format!("WITH (FORMAT='parquet', end_time='{}')", end_time)
}
(None, None) => "WITH (FORMAT='parquet')".to_string(),
};
// Create directory if not using S3
if !export_self.s3 {
let db_dir = format!("{}/{}/", export_self.catalog, schema);
operator.create_dir(&db_dir).await.context(OpenDalSnafu)?;
}
let (path, connection_part) = export_self.get_storage_params(&schema);
// Execute COPY DATABASE TO command
let sql = format!(
r#"COPY DATABASE "{}"."{}" TO '{}' {};"#,
self.catalog,
schema,
db_dir.to_str().unwrap(),
with_options
r#"COPY DATABASE "{}"."{}" TO '{}' WITH ({}){};"#,
export_self.catalog, schema, path, with_options_clone, connection_part
);
info!("Executing sql: {sql}");
export_self.database_client.sql_in_public(&sql).await?;
info!(
"Finished exporting {}.{} data to {}",
export_self.catalog, schema, path
);
info!("Executing sql: {sql}");
// Create copy_from.sql file
let copy_database_from_sql = format!(
r#"COPY DATABASE "{}"."{}" FROM '{}' WITH ({}){};"#,
export_self.catalog, schema, path, with_options_clone, connection_part
);
self.database_client.sql_in_public(&sql).await?;
let copy_from_path = export_self.get_file_path(&schema, "copy_from.sql");
export_self
.write_to_storage(
&operator,
&copy_from_path,
copy_database_from_sql.into_bytes(),
)
.await?;
info!(
"Finished exporting {}.{schema} data into path: {}",
self.catalog,
db_dir.to_string_lossy()
);
// The export copy from sql
let copy_from_file = db_dir.join("copy_from.sql");
let mut writer =
BufWriter::new(File::create(copy_from_file).await.context(FileIoSnafu)?);
let copy_database_from_sql = format!(
r#"COPY DATABASE "{}"."{}" FROM '{}' WITH (FORMAT='parquet');"#,
self.catalog,
"Finished exporting {}.{} copy_from.sql to {}",
export_self.catalog,
schema,
db_dir.to_str().unwrap()
export_self.format_output_path(&copy_from_path)
);
writer
.write(copy_database_from_sql.as_bytes())
.await
.context(FileIoSnafu)?;
writer.flush().await.context(FileIoSnafu)?;
info!("Finished exporting {}.{schema} copy_from.sql", self.catalog);
Ok::<(), Error>(())
})
});
}
let success = futures::future::join_all(tasks)
let success = self.execute_tasks(tasks).await;
let elapsed = timer.elapsed();
info!("Success {success}/{db_count} jobs, costs: {elapsed:?}");
Ok(())
}
fn get_file_path(&self, schema: &str, file_name: &str) -> String {
format!("{}/{}/{}", self.catalog, schema, file_name)
}
fn format_output_path(&self, file_path: &str) -> String {
if self.s3 {
format!(
"s3://{}/{}",
self.s3_bucket.as_ref().unwrap_or(&String::new()),
file_path
)
} else {
format!(
"{}/{}",
self.output_dir.as_ref().unwrap_or(&String::new()),
file_path
)
}
}
async fn write_to_storage(
&self,
op: &Operator,
file_path: &str,
content: Vec<u8>,
) -> Result<()> {
op.write(file_path, content).await.context(OpenDalSnafu)
}
fn get_storage_params(&self, schema: &str) -> (String, String) {
if self.s3 {
let s3_path = format!(
"s3://{}/{}/{}/",
// Safety: s3_bucket is required when s3 is enabled
self.s3_bucket.as_ref().unwrap(),
self.catalog,
schema
);
// endpoint is optional
let endpoint_option = if let Some(endpoint) = self.s3_endpoint.as_ref() {
format!(", ENDPOINT='{}'", endpoint)
} else {
String::new()
};
// Safety: All s3 options are required
let connection_options = format!(
"ACCESS_KEY_ID='{}', SECRET_ACCESS_KEY='{}', REGION='{}'{}",
self.s3_access_key.as_ref().unwrap(),
self.s3_secret_key.as_ref().unwrap(),
self.s3_region.as_ref().unwrap(),
endpoint_option
);
(s3_path, format!(" CONNECTION ({})", connection_options))
} else {
(
self.catalog_path()
.join(format!("{schema}/"))
.to_string_lossy()
.to_string(),
String::new(),
)
}
}
async fn execute_tasks(
&self,
tasks: Vec<impl std::future::Future<Output = Result<()>>>,
) -> usize {
futures::future::join_all(tasks)
.await
.into_iter()
.filter(|r| match r {
Ok(_) => true,
Err(e) => {
error!(e; "export database job failed");
error!(e; "export job failed");
false
}
})
.count();
let elapsed = timer.elapsed();
info!("Success {success}/{db_count} jobs, costs: {elapsed:?}");
Ok(())
.count()
}
}
@@ -493,3 +679,15 @@ impl Tool for Export {
}
}
}
/// Builds the WITH options string for SQL commands, assuming consistent syntax across S3 and local exports.
fn build_with_options(start_time: &Option<String>, end_time: &Option<String>) -> String {
let mut options = vec!["format = 'parquet'".to_string()];
if let Some(start) = start_time {
options.push(format!("start_time = '{}'", start));
}
if let Some(end) = end_time {
options.push(format!("end_time = '{}'", end));
}
options.join(", ")
}

View File

@@ -9,10 +9,6 @@ default-run = "greptime"
name = "greptime"
path = "src/bin/greptime.rs"
[[bin]]
name = "objbench"
path = "src/bin/objbench.rs"
[features]
default = ["servers/pprof", "servers/mem-prof"]
tokio-console = ["common-telemetry/tokio-console"]
@@ -24,7 +20,6 @@ workspace = true
async-trait.workspace = true
auth.workspace = true
base64.workspace = true
colored = "2.0"
cache.workspace = true
catalog.workspace = true
chrono.workspace = true
@@ -60,9 +55,6 @@ futures.workspace = true
human-panic = "2.0"
humantime.workspace = true
lazy_static.workspace = true
object-store.workspace = true
parquet = "53"
pprof = "0.14"
meta-client.workspace = true
meta-srv.workspace = true
metric-engine.workspace = true

View File

@@ -21,8 +21,6 @@ use cmd::{cli, datanode, flownode, frontend, metasrv, standalone, App};
use common_version::version;
use servers::install_ring_crypto_provider;
pub mod objbench;
#[derive(Parser)]
#[command(name = "greptime", author, version, long_version = version(), about)]
#[command(propagate_version = true)]

View File

@@ -1,602 +0,0 @@
// Copyright 2025 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::path::{Path, PathBuf};
use std::time::Instant;
use clap::Parser;
use cmd::error::{self, Result};
use colored::Colorize;
use datanode::config::ObjectStoreConfig;
use mito2::config::{FulltextIndexConfig, MitoConfig, Mode};
use mito2::read::Source;
use mito2::sst::file::{FileHandle, FileId, FileMeta};
use mito2::sst::file_purger::{FilePurger, FilePurgerRef, PurgeRequest};
use mito2::sst::parquet::{WriteOptions, PARQUET_METADATA_KEY};
use mito2::{build_access_layer, Metrics, OperationType, SstWriteRequest};
use object_store::ObjectStore;
use serde::{Deserialize, Serialize};
use store_api::metadata::{RegionMetadata, RegionMetadataRef};
#[tokio::main]
pub async fn main() {
// common_telemetry::init_default_ut_logging();
let cmd = Command::parse();
if let Err(e) = cmd.run().await {
eprintln!("{}: {}", "Error".red().bold(), e);
std::process::exit(1);
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
#[serde(default)]
pub struct StorageConfigWrapper {
storage: StorageConfig,
}
/// Storage engine config
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
#[serde(default)]
pub struct StorageConfig {
/// The working directory of database
pub data_home: String,
#[serde(flatten)]
pub store: ObjectStoreConfig,
}
#[derive(Debug, Parser)]
pub struct Command {
/// Path to the object-store config file (TOML). Must deserialize into datanode::config::ObjectStoreConfig.
#[clap(long, value_name = "FILE")]
pub config: PathBuf,
/// Source SST file path in object-store (e.g. "region_dir/<uuid>.parquet").
#[clap(long, value_name = "PATH")]
pub source: String,
/// Target SST file path in object-store; its parent directory is used as destination region dir.
#[clap(long, value_name = "PATH")]
pub target: String,
/// Verbose output
#[clap(short, long, default_value_t = false)]
pub verbose: bool,
/// Output file path for pprof flamegraph (enables profiling)
#[clap(long, value_name = "FILE")]
pub pprof_file: Option<PathBuf>,
}
impl Command {
pub async fn run(&self) -> Result<()> {
if self.verbose {
common_telemetry::init_default_ut_logging();
}
println!("{}", "Starting objbench...".cyan().bold());
// Build object store from config
let cfg_str = std::fs::read_to_string(&self.config).map_err(|e| {
error::IllegalConfigSnafu {
msg: format!("failed to read config {}: {e}", self.config.display()),
}
.build()
})?;
let store_cfg: StorageConfigWrapper = toml::from_str(&cfg_str).map_err(|e| {
error::IllegalConfigSnafu {
msg: format!("failed to parse config {}: {e}", self.config.display()),
}
.build()
})?;
let object_store = build_object_store(&store_cfg.storage).await?;
println!("{} Object store initialized", "".green());
// Prepare source identifiers
let (src_region_dir, src_file_id) = split_sst_path(&self.source)?;
println!("{} Source path parsed: {}", "".green(), self.source);
// Load parquet metadata to extract RegionMetadata and file stats
println!("{}", "Loading parquet metadata...".yellow());
let file_size = object_store
.stat(&self.source)
.await
.map_err(|e| {
error::IllegalConfigSnafu {
msg: format!("stat failed: {e}"),
}
.build()
})?
.content_length();
let parquet_meta = load_parquet_metadata(object_store.clone(), &self.source, file_size)
.await
.map_err(|e| {
error::IllegalConfigSnafu {
msg: format!("read parquet metadata failed: {e}"),
}
.build()
})?;
let region_meta = extract_region_metadata(&self.source, &parquet_meta)?;
let num_rows = parquet_meta.file_metadata().num_rows() as u64;
let num_row_groups = parquet_meta.num_row_groups() as u64;
println!(
"{} Metadata loaded - rows: {}, size: {} bytes",
"".green(),
num_rows,
file_size
);
// Build a FileHandle for the source file
let file_meta = FileMeta {
region_id: region_meta.region_id,
file_id: src_file_id,
time_range: Default::default(),
level: 0,
file_size,
available_indexes: Default::default(),
index_file_size: 0,
num_rows,
num_row_groups,
sequence: None,
};
let src_handle = FileHandle::new(file_meta, new_noop_file_purger());
// Build the reader for a single file via ParquetReaderBuilder
println!("{}", "Building reader...".yellow());
let (_src_access_layer, _cache_manager) =
build_access_layer_simple(src_region_dir.clone(), object_store.clone()).await?;
let reader_build_start = Instant::now();
let reader = mito2::sst::parquet::reader::ParquetReaderBuilder::new(
src_region_dir.clone(),
src_handle.clone(),
object_store.clone(),
)
.expected_metadata(Some(region_meta.clone()))
.build()
.await
.map_err(|e| {
error::IllegalConfigSnafu {
msg: format!("build reader failed: {e}"),
}
.build()
})?;
let reader_build_elapsed = reader_build_start.elapsed();
let total_rows = reader.parquet_metadata().file_metadata().num_rows();
println!("{} Reader built in {:?}", "".green(), reader_build_elapsed);
// Prepare target access layer for writing
println!("{}", "Preparing target access layer...".yellow());
let (tgt_access_layer, tgt_cache_manager) =
build_access_layer_simple(self.target.clone(), object_store.clone()).await?;
// Build write request
let fulltext_index_config = FulltextIndexConfig {
create_on_compaction: Mode::Disable,
..Default::default()
};
let write_opts = WriteOptions::default();
let write_req = SstWriteRequest {
op_type: OperationType::Compact,
metadata: region_meta,
source: Source::Reader(Box::new(reader)),
cache_manager: tgt_cache_manager,
storage: None,
max_sequence: None,
index_options: Default::default(),
inverted_index_config: MitoConfig::default().inverted_index,
fulltext_index_config,
bloom_filter_index_config: MitoConfig::default().bloom_filter_index,
};
// Write SST
println!("{}", "Writing SST...".yellow());
let mut metrics = Metrics::default();
// Start profiling if pprof_file is specified
#[cfg(unix)]
let profiler_guard = if self.pprof_file.is_some() {
println!("{} Starting profiling...", "".yellow());
Some(
pprof::ProfilerGuardBuilder::default()
.frequency(99)
.blocklist(&["libc", "libgcc", "pthread", "vdso"])
.build()
.map_err(|e| {
error::IllegalConfigSnafu {
msg: format!("Failed to start profiler: {e}"),
}
.build()
})?,
)
} else {
None
};
#[cfg(not(unix))]
if self.pprof_file.is_some() {
eprintln!(
"{}: Profiling is not supported on this platform",
"Warning".yellow()
);
}
let write_start = Instant::now();
let infos = tgt_access_layer
.write_sst(write_req, &write_opts, &mut metrics)
.await
.map_err(|e| {
error::IllegalConfigSnafu {
msg: format!("write_sst failed: {e}"),
}
.build()
})?;
let write_elapsed = write_start.elapsed();
// Stop profiling and generate flamegraph if enabled
#[cfg(unix)]
if let (Some(guard), Some(pprof_file)) = (profiler_guard, &self.pprof_file) {
println!("{} Generating flamegraph...", "🔥".yellow());
match guard.report().build() {
Ok(report) => {
let mut flamegraph_data = Vec::new();
if let Err(e) = report.flamegraph(&mut flamegraph_data) {
eprintln!(
"{}: Failed to generate flamegraph: {}",
"Warning".yellow(),
e
);
} else if let Err(e) = std::fs::write(pprof_file, flamegraph_data) {
eprintln!(
"{}: Failed to write flamegraph to {}: {}",
"Warning".yellow(),
pprof_file.display(),
e
);
} else {
println!(
"{} Flamegraph saved to {}",
"".green(),
pprof_file.display().to_string().cyan()
);
}
}
Err(e) => {
eprintln!(
"{}: Failed to generate pprof report: {}",
"Warning".yellow(),
e
);
}
}
}
assert_eq!(infos.len(), 1);
let dst_file_id = infos[0].file_id;
let dst_file_path = format!("{}{}", self.target, dst_file_id.as_parquet(),);
// Report results with ANSI colors
println!("\n{} {}", "Write complete!".green().bold(), "".green());
println!(" {}: {}", "Destination file".bold(), dst_file_path.cyan());
println!(" {}: {}", "Rows".bold(), total_rows.to_string().cyan());
println!(
" {}: {}",
"File size".bold(),
format!("{} bytes", file_size).cyan()
);
println!(
" {}: {:?}",
"Reader build time".bold(),
reader_build_elapsed
);
println!(" {}: {:?}", "Total time".bold(), write_elapsed);
// Print metrics in a formatted way
println!(
" {}: {:?}, sum: {:?}",
"Metrics".bold(),
metrics,
metrics.sum()
);
// Print infos
println!(" {}: {:?}", "Index".bold(), infos[0].index_metadata);
// Cleanup
println!("\n{}", "Cleaning up...".yellow());
object_store.delete(&dst_file_path).await.map_err(|e| {
error::IllegalConfigSnafu {
msg: format!("Failed to delete dest file {}: {}", dst_file_path, e),
}
.build()
})?;
println!("{} Temporary file deleted", "".green());
println!("\n{}", "Benchmark completed successfully!".green().bold());
Ok(())
}
}
fn split_sst_path(path: &str) -> Result<(String, FileId)> {
let p = Path::new(path);
let file_name = p.file_name().and_then(|s| s.to_str()).ok_or_else(|| {
error::IllegalConfigSnafu {
msg: "invalid source path".to_string(),
}
.build()
})?;
let uuid_str = file_name.strip_suffix(".parquet").ok_or_else(|| {
error::IllegalConfigSnafu {
msg: "expect .parquet file".to_string(),
}
.build()
})?;
let file_id = FileId::parse_str(uuid_str).map_err(|e| {
error::IllegalConfigSnafu {
msg: format!("invalid file id: {e}"),
}
.build()
})?;
let parent = p
.parent()
.and_then(|s| s.to_str())
.unwrap_or("")
.to_string();
Ok((parent, file_id))
}
fn extract_region_metadata(
file_path: &str,
meta: &parquet::file::metadata::ParquetMetaData,
) -> Result<RegionMetadataRef> {
use parquet::format::KeyValue;
let kvs: Option<&Vec<KeyValue>> = meta.file_metadata().key_value_metadata();
let Some(kvs) = kvs else {
return Err(error::IllegalConfigSnafu {
msg: format!("{file_path}: missing parquet key_value metadata"),
}
.build());
};
let json = kvs
.iter()
.find(|kv| kv.key == PARQUET_METADATA_KEY)
.and_then(|kv| kv.value.as_ref())
.ok_or_else(|| {
error::IllegalConfigSnafu {
msg: format!("{file_path}: key {PARQUET_METADATA_KEY} not found or empty"),
}
.build()
})?;
let region: RegionMetadata = RegionMetadata::from_json(json).map_err(|e| {
error::IllegalConfigSnafu {
msg: format!("invalid region metadata json: {e}"),
}
.build()
})?;
Ok(std::sync::Arc::new(region))
}
async fn build_object_store(sc: &StorageConfig) -> Result<ObjectStore> {
use datanode::config::ObjectStoreConfig::*;
let oss = &sc.store;
match oss {
File(_) => {
use object_store::services::Fs;
let builder = Fs::default().root(&sc.data_home);
Ok(ObjectStore::new(builder)
.map_err(|e| {
error::IllegalConfigSnafu {
msg: format!("init fs backend failed: {e}"),
}
.build()
})?
.finish())
}
S3(s3) => {
use common_base::secrets::ExposeSecret;
use object_store::services::S3;
use object_store::util;
let root = util::normalize_dir(&s3.root);
let mut builder = S3::default()
.root(&root)
.bucket(&s3.bucket)
.access_key_id(s3.access_key_id.expose_secret())
.secret_access_key(s3.secret_access_key.expose_secret());
if let Some(ep) = &s3.endpoint {
builder = builder.endpoint(ep);
}
if let Some(region) = &s3.region {
builder = builder.region(region);
}
if s3.enable_virtual_host_style {
builder = builder.enable_virtual_host_style();
}
Ok(ObjectStore::new(builder)
.map_err(|e| {
error::IllegalConfigSnafu {
msg: format!("init s3 backend failed: {e}"),
}
.build()
})?
.finish())
}
Oss(oss) => {
use common_base::secrets::ExposeSecret;
use object_store::services::Oss;
use object_store::util;
let root = util::normalize_dir(&oss.root);
let builder = Oss::default()
.root(&root)
.bucket(&oss.bucket)
.endpoint(&oss.endpoint)
.access_key_id(oss.access_key_id.expose_secret())
.access_key_secret(oss.access_key_secret.expose_secret());
Ok(ObjectStore::new(builder)
.map_err(|e| {
error::IllegalConfigSnafu {
msg: format!("init oss backend failed: {e}"),
}
.build()
})?
.finish())
}
Azblob(az) => {
use common_base::secrets::ExposeSecret;
use object_store::services::Azblob;
use object_store::util;
let root = util::normalize_dir(&az.root);
let mut builder = Azblob::default()
.root(&root)
.container(&az.container)
.endpoint(&az.endpoint)
.account_name(az.account_name.expose_secret())
.account_key(az.account_key.expose_secret());
if let Some(token) = &az.sas_token {
builder = builder.sas_token(token);
}
Ok(ObjectStore::new(builder)
.map_err(|e| {
error::IllegalConfigSnafu {
msg: format!("init azblob backend failed: {e}"),
}
.build()
})?
.finish())
}
Gcs(gcs) => {
use common_base::secrets::ExposeSecret;
use object_store::services::Gcs;
use object_store::util;
let root = util::normalize_dir(&gcs.root);
let builder = Gcs::default()
.root(&root)
.bucket(&gcs.bucket)
.scope(&gcs.scope)
.credential_path(gcs.credential_path.expose_secret())
.credential(gcs.credential.expose_secret())
.endpoint(&gcs.endpoint);
Ok(ObjectStore::new(builder)
.map_err(|e| {
error::IllegalConfigSnafu {
msg: format!("init gcs backend failed: {e}"),
}
.build()
})?
.finish())
}
}
}
async fn build_access_layer_simple(
region_dir: String,
object_store: ObjectStore,
) -> Result<(
std::sync::Arc<mito2::AccessLayer>,
std::sync::Arc<mito2::CacheManager>,
)> {
// Minimal index aux path setup
let mut mito_cfg = MitoConfig::default();
// Use a temporary directory as aux path
let data_home = std::env::temp_dir().join("greptime_objbench");
let _ = std::fs::create_dir_all(&data_home);
let _ = mito_cfg.index.sanitize(
data_home.to_str().unwrap_or("/tmp"),
&mito_cfg.inverted_index,
);
let access_layer = build_access_layer(&region_dir, object_store, &mito_cfg)
.await
.map_err(|e| {
error::IllegalConfigSnafu {
msg: format!("build_access_layer failed: {e}"),
}
.build()
})?;
Ok((
access_layer,
std::sync::Arc::new(mito2::CacheManager::default()),
))
}
fn new_noop_file_purger() -> FilePurgerRef {
#[derive(Debug)]
struct Noop;
impl FilePurger for Noop {
fn send_request(&self, _request: PurgeRequest) {}
}
std::sync::Arc::new(Noop)
}
async fn load_parquet_metadata(
object_store: ObjectStore,
path: &str,
file_size: u64,
) -> std::result::Result<
parquet::file::metadata::ParquetMetaData,
Box<dyn std::error::Error + Send + Sync>,
> {
use parquet::file::metadata::ParquetMetaDataReader;
use parquet::file::FOOTER_SIZE;
let actual_size = if file_size == 0 {
object_store.stat(path).await?.content_length()
} else {
file_size
};
if actual_size < FOOTER_SIZE as u64 {
return Err("file too small".into());
}
let prefetch: u64 = 64 * 1024;
let start = actual_size.saturating_sub(prefetch);
let buffer = object_store
.read_with(path)
.range(start..actual_size)
.await?
.to_vec();
let buffer_len = buffer.len();
let mut footer = [0; 8];
footer.copy_from_slice(&buffer[buffer_len - FOOTER_SIZE..]);
let metadata_len = ParquetMetaDataReader::decode_footer(&footer)? as u64;
if actual_size - (FOOTER_SIZE as u64) < metadata_len {
return Err("invalid footer/metadata length".into());
}
if (metadata_len as usize) <= buffer_len - FOOTER_SIZE {
let metadata_start = buffer_len - metadata_len as usize - FOOTER_SIZE;
let meta = ParquetMetaDataReader::decode_metadata(
&buffer[metadata_start..buffer_len - FOOTER_SIZE],
)?;
Ok(meta)
} else {
let metadata_start = actual_size - metadata_len - FOOTER_SIZE as u64;
let data = object_store
.read_with(path)
.range(metadata_start..(actual_size - FOOTER_SIZE as u64))
.await?
.to_vec();
let meta = ParquetMetaDataReader::decode_metadata(&data)?;
Ok(meta)
}
}
#[cfg(test)]
mod tests {
use super::StorageConfigWrapper;
#[test]
fn test_decode() {
let cfg = std::fs::read_to_string("/home/lei/datanode-bulk.toml").unwrap();
let storage: StorageConfigWrapper = toml::from_str(&cfg).unwrap();
println!("{:?}", storage);
}
}

View File

@@ -287,7 +287,6 @@ impl StartCommand {
.await
.context(StartDatanodeSnafu)?;
let cluster_id = 0; // TODO(hl): read from config
let member_id = opts
.node_id
.context(MissingConfigSnafu { msg: "'node_id'" })?;
@@ -296,13 +295,10 @@ impl StartCommand {
msg: "'meta_client_options'",
})?;
let meta_client = meta_client::create_meta_client(
cluster_id,
MetaClientType::Datanode { member_id },
meta_config,
)
.await
.context(MetaClientInitSnafu)?;
let meta_client =
meta_client::create_meta_client(MetaClientType::Datanode { member_id }, meta_config)
.await
.context(MetaClientInitSnafu)?;
let meta_backend = Arc::new(MetaKvBackend {
client: meta_client.clone(),
@@ -410,7 +406,7 @@ mod tests {
sync_write = false
[storage]
data_home = "/tmp/greptimedb/"
data_home = "./greptimedb_data/"
type = "File"
[[storage.providers]]
@@ -424,7 +420,7 @@ mod tests {
[logging]
level = "debug"
dir = "/tmp/greptimedb/test/logs"
dir = "./greptimedb_data/test/logs"
"#;
write!(file, "{}", toml_str).unwrap();
@@ -471,7 +467,7 @@ mod tests {
assert_eq!(10000, ddl_timeout.as_millis());
assert_eq!(3000, timeout.as_millis());
assert!(tcp_nodelay);
assert_eq!("/tmp/greptimedb/", options.storage.data_home);
assert_eq!("./greptimedb_data/", options.storage.data_home);
assert!(matches!(
&options.storage.store,
ObjectStoreConfig::File(FileConfig { .. })
@@ -487,7 +483,10 @@ mod tests {
));
assert_eq!("debug", options.logging.level.unwrap());
assert_eq!("/tmp/greptimedb/test/logs".to_string(), options.logging.dir);
assert_eq!(
"./greptimedb_data/test/logs".to_string(),
options.logging.dir
);
}
#[test]
@@ -530,7 +529,7 @@ mod tests {
let options = cmd
.load_options(&GlobalOptions {
log_dir: Some("/tmp/greptimedb/test/logs".to_string()),
log_dir: Some("./greptimedb_data/test/logs".to_string()),
log_level: Some("debug".to_string()),
#[cfg(feature = "tokio-console")]
@@ -540,7 +539,7 @@ mod tests {
.component;
let logging_opt = options.logging;
assert_eq!("/tmp/greptimedb/test/logs", logging_opt.dir);
assert_eq!("./greptimedb_data/test/logs", logging_opt.dir);
assert_eq!("debug", logging_opt.level.as_ref().unwrap());
}
@@ -569,11 +568,11 @@ mod tests {
[storage]
type = "File"
data_home = "/tmp/greptimedb/"
data_home = "./greptimedb_data/"
[logging]
level = "debug"
dir = "/tmp/greptimedb/test/logs"
dir = "./greptimedb_data/test/logs"
"#;
write!(file, "{}", toml_str).unwrap();

View File

@@ -241,9 +241,6 @@ impl StartCommand {
let mut opts = opts.component;
opts.grpc.detect_server_addr();
// TODO(discord9): make it not optionale after cluster id is required
let cluster_id = opts.cluster_id.unwrap_or(0);
let member_id = opts
.node_id
.context(MissingConfigSnafu { msg: "'node_id'" })?;
@@ -252,13 +249,10 @@ impl StartCommand {
msg: "'meta_client_options'",
})?;
let meta_client = meta_client::create_meta_client(
cluster_id,
MetaClientType::Flownode { member_id },
meta_config,
)
.await
.context(MetaClientInitSnafu)?;
let meta_client =
meta_client::create_meta_client(MetaClientType::Flownode { member_id }, meta_config)
.await
.context(MetaClientInitSnafu)?;
let cache_max_capacity = meta_config.metadata_cache_max_capacity;
let cache_ttl = meta_config.metadata_cache_ttl;

View File

@@ -295,14 +295,10 @@ impl StartCommand {
let cache_ttl = meta_client_options.metadata_cache_ttl;
let cache_tti = meta_client_options.metadata_cache_tti;
let cluster_id = 0; // (TODO: jeremy): It is currently a reserved field and has not been enabled.
let meta_client = meta_client::create_meta_client(
cluster_id,
MetaClientType::Frontend,
meta_client_options,
)
.await
.context(MetaClientInitSnafu)?;
let meta_client =
meta_client::create_meta_client(MetaClientType::Frontend, meta_client_options)
.await
.context(MetaClientInitSnafu)?;
// TODO(discord9): add helper function to ease the creation of cache registry&such
let cached_meta_backend =
@@ -452,7 +448,7 @@ mod tests {
[logging]
level = "debug"
dir = "/tmp/greptimedb/test/logs"
dir = "./greptimedb_data/test/logs"
"#;
write!(file, "{}", toml_str).unwrap();
@@ -470,7 +466,10 @@ mod tests {
assert_eq!(ReadableSize::gb(2), fe_opts.http.body_limit);
assert_eq!("debug", fe_opts.logging.level.as_ref().unwrap());
assert_eq!("/tmp/greptimedb/test/logs".to_string(), fe_opts.logging.dir);
assert_eq!(
"./greptimedb_data/test/logs".to_string(),
fe_opts.logging.dir
);
assert!(!fe_opts.opentsdb.enable);
}
@@ -509,7 +508,7 @@ mod tests {
let options = cmd
.load_options(&GlobalOptions {
log_dir: Some("/tmp/greptimedb/test/logs".to_string()),
log_dir: Some("./greptimedb_data/test/logs".to_string()),
log_level: Some("debug".to_string()),
#[cfg(feature = "tokio-console")]
@@ -519,7 +518,7 @@ mod tests {
.component;
let logging_opt = options.logging;
assert_eq!("/tmp/greptimedb/test/logs", logging_opt.dir);
assert_eq!("./greptimedb_data/test/logs", logging_opt.dir);
assert_eq!("debug", logging_opt.level.as_ref().unwrap());
}

View File

@@ -337,7 +337,7 @@ mod tests {
[logging]
level = "debug"
dir = "/tmp/greptimedb/test/logs"
dir = "./greptimedb_data/test/logs"
[failure_detector]
threshold = 8.0
@@ -358,7 +358,10 @@ mod tests {
assert_eq!(vec!["127.0.0.1:2379".to_string()], options.store_addrs);
assert_eq!(SelectorType::LeaseBased, options.selector);
assert_eq!("debug", options.logging.level.as_ref().unwrap());
assert_eq!("/tmp/greptimedb/test/logs".to_string(), options.logging.dir);
assert_eq!(
"./greptimedb_data/test/logs".to_string(),
options.logging.dir
);
assert_eq!(8.0, options.failure_detector.threshold);
assert_eq!(
100.0,
@@ -396,7 +399,7 @@ mod tests {
let options = cmd
.load_options(&GlobalOptions {
log_dir: Some("/tmp/greptimedb/test/logs".to_string()),
log_dir: Some("./greptimedb_data/test/logs".to_string()),
log_level: Some("debug".to_string()),
#[cfg(feature = "tokio-console")]
@@ -406,7 +409,7 @@ mod tests {
.component;
let logging_opt = options.logging;
assert_eq!("/tmp/greptimedb/test/logs", logging_opt.dir);
assert_eq!("./greptimedb_data/test/logs", logging_opt.dir);
assert_eq!("debug", logging_opt.level.as_ref().unwrap());
}
@@ -424,7 +427,7 @@ mod tests {
[logging]
level = "debug"
dir = "/tmp/greptimedb/test/logs"
dir = "./greptimedb_data/test/logs"
"#;
write!(file, "{}", toml_str).unwrap();

View File

@@ -852,7 +852,7 @@ mod tests {
[wal]
provider = "raft_engine"
dir = "/tmp/greptimedb/test/wal"
dir = "./greptimedb_data/test/wal"
file_size = "1GB"
purge_threshold = "50GB"
purge_interval = "10m"
@@ -860,7 +860,7 @@ mod tests {
sync_write = false
[storage]
data_home = "/tmp/greptimedb/"
data_home = "./greptimedb_data/"
type = "File"
[[storage.providers]]
@@ -892,7 +892,7 @@ mod tests {
[logging]
level = "debug"
dir = "/tmp/greptimedb/test/logs"
dir = "./greptimedb_data/test/logs"
"#;
write!(file, "{}", toml_str).unwrap();
let cmd = StartCommand {
@@ -922,7 +922,10 @@ mod tests {
let DatanodeWalConfig::RaftEngine(raft_engine_config) = dn_opts.wal else {
unreachable!()
};
assert_eq!("/tmp/greptimedb/test/wal", raft_engine_config.dir.unwrap());
assert_eq!(
"./greptimedb_data/test/wal",
raft_engine_config.dir.unwrap()
);
assert!(matches!(
&dn_opts.storage.store,
@@ -946,7 +949,7 @@ mod tests {
}
assert_eq!("debug", logging_opts.level.as_ref().unwrap());
assert_eq!("/tmp/greptimedb/test/logs".to_string(), logging_opts.dir);
assert_eq!("./greptimedb_data/test/logs".to_string(), logging_opts.dir);
}
#[test]
@@ -958,7 +961,7 @@ mod tests {
let opts = cmd
.load_options(&GlobalOptions {
log_dir: Some("/tmp/greptimedb/test/logs".to_string()),
log_dir: Some("./greptimedb_data/test/logs".to_string()),
log_level: Some("debug".to_string()),
#[cfg(feature = "tokio-console")]
@@ -967,7 +970,7 @@ mod tests {
.unwrap()
.component;
assert_eq!("/tmp/greptimedb/test/logs", opts.logging.dir);
assert_eq!("./greptimedb_data/test/logs", opts.logging.dir);
assert_eq!("debug", opts.logging.level.unwrap());
}

View File

@@ -56,13 +56,13 @@ fn test_load_datanode_example_config() {
metadata_cache_tti: Duration::from_secs(300),
}),
wal: DatanodeWalConfig::RaftEngine(RaftEngineConfig {
dir: Some("/tmp/greptimedb/wal".to_string()),
dir: Some("./greptimedb_data/wal".to_string()),
sync_period: Some(Duration::from_secs(10)),
recovery_parallelism: 2,
..Default::default()
}),
storage: StorageConfig {
data_home: "/tmp/greptimedb/".to_string(),
data_home: "./greptimedb_data/".to_string(),
..Default::default()
},
region_engine: vec![
@@ -159,10 +159,10 @@ fn test_load_metasrv_example_config() {
let expected = GreptimeOptions::<MetasrvOptions> {
component: MetasrvOptions {
selector: SelectorType::default(),
data_home: "/tmp/metasrv/".to_string(),
data_home: "./greptimedb_data/metasrv/".to_string(),
server_addr: "127.0.0.1:3002".to_string(),
logging: LoggingOptions {
dir: "/tmp/greptimedb/logs".to_string(),
dir: "./greptimedb_data/logs".to_string(),
level: Some("info".to_string()),
otlp_endpoint: Some(DEFAULT_OTLP_ENDPOINT.to_string()),
tracing_sample_ratio: Some(Default::default()),
@@ -202,7 +202,7 @@ fn test_load_standalone_example_config() {
component: StandaloneOptions {
default_timezone: Some("UTC".to_string()),
wal: DatanodeWalConfig::RaftEngine(RaftEngineConfig {
dir: Some("/tmp/greptimedb/wal".to_string()),
dir: Some("./greptimedb_data/wal".to_string()),
sync_period: Some(Duration::from_secs(10)),
recovery_parallelism: 2,
..Default::default()
@@ -219,7 +219,7 @@ fn test_load_standalone_example_config() {
}),
],
storage: StorageConfig {
data_home: "/tmp/greptimedb/".to_string(),
data_home: "./greptimedb_data/".to_string(),
..Default::default()
},
logging: LoggingOptions {

View File

@@ -130,3 +130,10 @@ pub const SEMANTIC_TYPE_TIME_INDEX: &str = "TIMESTAMP";
pub fn is_readonly_schema(schema: &str) -> bool {
matches!(schema, INFORMATION_SCHEMA_NAME)
}
// ---- special table and fields ----
pub const TRACE_ID_COLUMN: &str = "trace_id";
pub const SPAN_ID_COLUMN: &str = "span_id";
pub const SPAN_NAME_COLUMN: &str = "span_name";
pub const PARENT_SPAN_ID_COLUMN: &str = "parent_span_id";
// ---- End of special table and fields ----

View File

@@ -161,7 +161,7 @@ mod tests {
[wal]
provider = "raft_engine"
dir = "/tmp/greptimedb/wal"
dir = "./greptimedb_data/wal"
file_size = "1GB"
purge_threshold = "50GB"
purge_interval = "10m"
@@ -170,7 +170,7 @@ mod tests {
[logging]
level = "debug"
dir = "/tmp/greptimedb/test/logs"
dir = "./greptimedb_data/test/logs"
"#;
write!(file, "{}", toml_str).unwrap();
@@ -246,7 +246,7 @@ mod tests {
let DatanodeWalConfig::RaftEngine(raft_engine_config) = opts.wal else {
unreachable!()
};
assert_eq!(raft_engine_config.dir.unwrap(), "/tmp/greptimedb/wal");
assert_eq!(raft_engine_config.dir.unwrap(), "./greptimedb_data/wal");
// Should be default values.
assert_eq!(opts.node_id, None);

View File

@@ -17,6 +17,7 @@ api.workspace = true
arc-swap = "1.0"
async-trait.workspace = true
bincode = "1.3"
chrono.workspace = true
common-base.workspace = true
common-catalog.workspace = true
common-error.workspace = true
@@ -28,6 +29,8 @@ common-telemetry.workspace = true
common-time.workspace = true
common-version.workspace = true
datafusion.workspace = true
datafusion-common.workspace = true
datafusion-expr.workspace = true
datatypes.workspace = true
derive_more = { version = "1", default-features = false, features = ["display"] }
geo = { version = "0.29", optional = true }

View File

@@ -26,9 +26,9 @@ use crate::flush_flow::FlushFlowFunction;
use crate::function_registry::FunctionRegistry;
/// Table functions
pub(crate) struct TableFunction;
pub(crate) struct AdminFunction;
impl TableFunction {
impl AdminFunction {
/// Register all table functions to [`FunctionRegistry`].
pub fn register(registry: &FunctionRegistry) {
registry.register_async(Arc::new(MigrateRegionFunction));

View File

@@ -12,9 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.
mod geo_path;
mod hll;
mod uddsketch_state;
pub use geo_path::{GeoPathAccumulator, GEO_PATH_NAME};
pub(crate) use hll::HllStateType;
pub use hll::{HllState, HLL_MERGE_NAME, HLL_NAME};
pub use uddsketch_state::{UddSketchState, UDDSKETCH_STATE_NAME};

View File

@@ -0,0 +1,433 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use datafusion::arrow::array::{Array, ArrayRef};
use datafusion::common::cast::as_primitive_array;
use datafusion::error::{DataFusionError, Result as DfResult};
use datafusion::logical_expr::{Accumulator as DfAccumulator, AggregateUDF, Volatility};
use datafusion::prelude::create_udaf;
use datafusion_common::cast::{as_list_array, as_struct_array};
use datafusion_common::utils::SingleRowListArrayBuilder;
use datafusion_common::ScalarValue;
use datatypes::arrow::array::{Float64Array, Int64Array, ListArray, StructArray};
use datatypes::arrow::datatypes::{
DataType, Field, Float64Type, Int64Type, TimeUnit, TimestampNanosecondType,
};
use datatypes::compute::{self, sort_to_indices};
pub const GEO_PATH_NAME: &str = "geo_path";
const LATITUDE_FIELD: &str = "lat";
const LONGITUDE_FIELD: &str = "lng";
const TIMESTAMP_FIELD: &str = "timestamp";
const DEFAULT_LIST_FIELD_NAME: &str = "item";
#[derive(Debug, Default)]
pub struct GeoPathAccumulator {
lat: Vec<Option<f64>>,
lng: Vec<Option<f64>>,
timestamp: Vec<Option<i64>>,
}
impl GeoPathAccumulator {
pub fn new() -> Self {
Self::default()
}
pub fn udf_impl() -> AggregateUDF {
create_udaf(
GEO_PATH_NAME,
// Input types: lat, lng, timestamp
vec![
DataType::Float64,
DataType::Float64,
DataType::Timestamp(TimeUnit::Nanosecond, None),
],
// Output type: list of points {[lat], [lng]}
Arc::new(DataType::Struct(
vec![
Field::new(
LATITUDE_FIELD,
DataType::List(Arc::new(Field::new(
DEFAULT_LIST_FIELD_NAME,
DataType::Float64,
true,
))),
false,
),
Field::new(
LONGITUDE_FIELD,
DataType::List(Arc::new(Field::new(
DEFAULT_LIST_FIELD_NAME,
DataType::Float64,
true,
))),
false,
),
]
.into(),
)),
Volatility::Immutable,
// Create the accumulator
Arc::new(|_| Ok(Box::new(GeoPathAccumulator::new()))),
// Intermediate state types
Arc::new(vec![DataType::Struct(
vec![
Field::new(
LATITUDE_FIELD,
DataType::List(Arc::new(Field::new(
DEFAULT_LIST_FIELD_NAME,
DataType::Float64,
true,
))),
false,
),
Field::new(
LONGITUDE_FIELD,
DataType::List(Arc::new(Field::new(
DEFAULT_LIST_FIELD_NAME,
DataType::Float64,
true,
))),
false,
),
Field::new(
TIMESTAMP_FIELD,
DataType::List(Arc::new(Field::new(
DEFAULT_LIST_FIELD_NAME,
DataType::Int64,
true,
))),
false,
),
]
.into(),
)]),
)
}
}
impl DfAccumulator for GeoPathAccumulator {
fn update_batch(&mut self, values: &[ArrayRef]) -> datafusion::error::Result<()> {
if values.len() != 3 {
return Err(DataFusionError::Internal(format!(
"Expected 3 columns for geo_path, got {}",
values.len()
)));
}
let lat_array = as_primitive_array::<Float64Type>(&values[0])?;
let lng_array = as_primitive_array::<Float64Type>(&values[1])?;
let ts_array = as_primitive_array::<TimestampNanosecondType>(&values[2])?;
let size = lat_array.len();
self.lat.reserve(size);
self.lng.reserve(size);
for idx in 0..size {
self.lat.push(if lat_array.is_null(idx) {
None
} else {
Some(lat_array.value(idx))
});
self.lng.push(if lng_array.is_null(idx) {
None
} else {
Some(lng_array.value(idx))
});
self.timestamp.push(if ts_array.is_null(idx) {
None
} else {
Some(ts_array.value(idx))
});
}
Ok(())
}
fn evaluate(&mut self) -> DfResult<ScalarValue> {
let unordered_lng_array = Float64Array::from(self.lng.clone());
let unordered_lat_array = Float64Array::from(self.lat.clone());
let ts_array = Int64Array::from(self.timestamp.clone());
let ordered_indices = sort_to_indices(&ts_array, None, None)?;
let lat_array = compute::take(&unordered_lat_array, &ordered_indices, None)?;
let lng_array = compute::take(&unordered_lng_array, &ordered_indices, None)?;
let lat_list = Arc::new(SingleRowListArrayBuilder::new(lat_array).build_list_array());
let lng_list = Arc::new(SingleRowListArrayBuilder::new(lng_array).build_list_array());
let result = ScalarValue::Struct(Arc::new(StructArray::new(
vec![
Field::new(
LATITUDE_FIELD,
DataType::List(Arc::new(Field::new("item", DataType::Float64, true))),
false,
),
Field::new(
LONGITUDE_FIELD,
DataType::List(Arc::new(Field::new("item", DataType::Float64, true))),
false,
),
]
.into(),
vec![lat_list, lng_list],
None,
)));
Ok(result)
}
fn size(&self) -> usize {
// Base size of GeoPathAccumulator struct fields
let mut total_size = std::mem::size_of::<Self>();
// Size of vectors (approximation)
total_size += self.lat.capacity() * std::mem::size_of::<Option<f64>>();
total_size += self.lng.capacity() * std::mem::size_of::<Option<f64>>();
total_size += self.timestamp.capacity() * std::mem::size_of::<Option<i64>>();
total_size
}
fn state(&mut self) -> datafusion::error::Result<Vec<ScalarValue>> {
let lat_array = Arc::new(ListArray::from_iter_primitive::<Float64Type, _, _>(vec![
Some(self.lat.clone()),
]));
let lng_array = Arc::new(ListArray::from_iter_primitive::<Float64Type, _, _>(vec![
Some(self.lng.clone()),
]));
let ts_array = Arc::new(ListArray::from_iter_primitive::<Int64Type, _, _>(vec![
Some(self.timestamp.clone()),
]));
let state_struct = StructArray::new(
vec![
Field::new(
LATITUDE_FIELD,
DataType::List(Arc::new(Field::new("item", DataType::Float64, true))),
false,
),
Field::new(
LONGITUDE_FIELD,
DataType::List(Arc::new(Field::new("item", DataType::Float64, true))),
false,
),
Field::new(
TIMESTAMP_FIELD,
DataType::List(Arc::new(Field::new("item", DataType::Int64, true))),
false,
),
]
.into(),
vec![lat_array, lng_array, ts_array],
None,
);
Ok(vec![ScalarValue::Struct(Arc::new(state_struct))])
}
fn merge_batch(&mut self, states: &[ArrayRef]) -> datafusion::error::Result<()> {
if states.len() != 1 {
return Err(DataFusionError::Internal(format!(
"Expected 1 states for geo_path, got {}",
states.len()
)));
}
for state in states {
let state = as_struct_array(state)?;
let lat_list = as_list_array(state.column(0))?.value(0);
let lat_array = as_primitive_array::<Float64Type>(&lat_list)?;
let lng_list = as_list_array(state.column(1))?.value(0);
let lng_array = as_primitive_array::<Float64Type>(&lng_list)?;
let ts_list = as_list_array(state.column(2))?.value(0);
let ts_array = as_primitive_array::<Int64Type>(&ts_list)?;
self.lat.extend(lat_array);
self.lng.extend(lng_array);
self.timestamp.extend(ts_array);
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use datafusion::arrow::array::{Float64Array, TimestampNanosecondArray};
use datafusion::scalar::ScalarValue;
use super::*;
#[test]
fn test_geo_path_basic() {
let mut accumulator = GeoPathAccumulator::new();
// Create test data
let lat_array = Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0]));
let lng_array = Arc::new(Float64Array::from(vec![4.0, 5.0, 6.0]));
let ts_array = Arc::new(TimestampNanosecondArray::from(vec![100, 200, 300]));
// Update batch
accumulator
.update_batch(&[lat_array, lng_array, ts_array])
.unwrap();
// Evaluate
let result = accumulator.evaluate().unwrap();
if let ScalarValue::Struct(struct_array) = result {
// Verify structure
let fields = struct_array.fields().clone();
assert_eq!(fields.len(), 2);
assert_eq!(fields[0].name(), LATITUDE_FIELD);
assert_eq!(fields[1].name(), LONGITUDE_FIELD);
// Verify data
let columns = struct_array.columns();
assert_eq!(columns.len(), 2);
// Check latitude values
let lat_list = as_list_array(&columns[0]).unwrap().value(0);
let lat_array = as_primitive_array::<Float64Type>(&lat_list).unwrap();
assert_eq!(lat_array.len(), 3);
assert_eq!(lat_array.value(0), 1.0);
assert_eq!(lat_array.value(1), 2.0);
assert_eq!(lat_array.value(2), 3.0);
// Check longitude values
let lng_list = as_list_array(&columns[1]).unwrap().value(0);
let lng_array = as_primitive_array::<Float64Type>(&lng_list).unwrap();
assert_eq!(lng_array.len(), 3);
assert_eq!(lng_array.value(0), 4.0);
assert_eq!(lng_array.value(1), 5.0);
assert_eq!(lng_array.value(2), 6.0);
} else {
panic!("Expected Struct scalar value");
}
}
#[test]
fn test_geo_path_sort_by_timestamp() {
let mut accumulator = GeoPathAccumulator::new();
// Create test data with unordered timestamps
let lat_array = Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0]));
let lng_array = Arc::new(Float64Array::from(vec![4.0, 5.0, 6.0]));
let ts_array = Arc::new(TimestampNanosecondArray::from(vec![300, 100, 200]));
// Update batch
accumulator
.update_batch(&[lat_array, lng_array, ts_array])
.unwrap();
// Evaluate
let result = accumulator.evaluate().unwrap();
if let ScalarValue::Struct(struct_array) = result {
// Extract arrays
let columns = struct_array.columns();
// Check latitude values
let lat_list = as_list_array(&columns[0]).unwrap().value(0);
let lat_array = as_primitive_array::<Float64Type>(&lat_list).unwrap();
assert_eq!(lat_array.len(), 3);
assert_eq!(lat_array.value(0), 2.0); // timestamp 100
assert_eq!(lat_array.value(1), 3.0); // timestamp 200
assert_eq!(lat_array.value(2), 1.0); // timestamp 300
// Check longitude values (should be sorted by timestamp)
let lng_list = as_list_array(&columns[1]).unwrap().value(0);
let lng_array = as_primitive_array::<Float64Type>(&lng_list).unwrap();
assert_eq!(lng_array.len(), 3);
assert_eq!(lng_array.value(0), 5.0); // timestamp 100
assert_eq!(lng_array.value(1), 6.0); // timestamp 200
assert_eq!(lng_array.value(2), 4.0); // timestamp 300
} else {
panic!("Expected Struct scalar value");
}
}
#[test]
fn test_geo_path_merge() {
let mut accumulator1 = GeoPathAccumulator::new();
let mut accumulator2 = GeoPathAccumulator::new();
// Create test data for first accumulator
let lat_array1 = Arc::new(Float64Array::from(vec![1.0]));
let lng_array1 = Arc::new(Float64Array::from(vec![4.0]));
let ts_array1 = Arc::new(TimestampNanosecondArray::from(vec![100]));
// Create test data for second accumulator
let lat_array2 = Arc::new(Float64Array::from(vec![2.0]));
let lng_array2 = Arc::new(Float64Array::from(vec![5.0]));
let ts_array2 = Arc::new(TimestampNanosecondArray::from(vec![200]));
// Update batches
accumulator1
.update_batch(&[lat_array1, lng_array1, ts_array1])
.unwrap();
accumulator2
.update_batch(&[lat_array2, lng_array2, ts_array2])
.unwrap();
// Get states
let state1 = accumulator1.state().unwrap();
let state2 = accumulator2.state().unwrap();
// Create a merged accumulator
let mut merged = GeoPathAccumulator::new();
// Extract the struct arrays from the states
let state_array1 = match &state1[0] {
ScalarValue::Struct(array) => array.clone(),
_ => panic!("Expected Struct scalar value"),
};
let state_array2 = match &state2[0] {
ScalarValue::Struct(array) => array.clone(),
_ => panic!("Expected Struct scalar value"),
};
// Merge state arrays
merged.merge_batch(&[state_array1]).unwrap();
merged.merge_batch(&[state_array2]).unwrap();
// Evaluate merged result
let result = merged.evaluate().unwrap();
if let ScalarValue::Struct(struct_array) = result {
// Extract arrays
let columns = struct_array.columns();
// Check latitude values
let lat_list = as_list_array(&columns[0]).unwrap().value(0);
let lat_array = as_primitive_array::<Float64Type>(&lat_list).unwrap();
assert_eq!(lat_array.len(), 2);
assert_eq!(lat_array.value(0), 1.0); // timestamp 100
assert_eq!(lat_array.value(1), 2.0); // timestamp 200
// Check longitude values (should be sorted by timestamp)
let lng_list = as_list_array(&columns[1]).unwrap().value(0);
let lng_array = as_primitive_array::<Float64Type>(&lng_list).unwrap();
assert_eq!(lng_array.len(), 2);
assert_eq!(lng_array.value(0), 4.0); // timestamp 100
assert_eq!(lng_array.value(1), 5.0); // timestamp 200
} else {
panic!("Expected Struct scalar value");
}
}
}

View File

@@ -12,6 +12,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.
//! Two UDAFs are implemented for HyperLogLog:
//!
//! - `hll`: Accepts a string column and aggregates the values into a
//! HyperLogLog state.
//! - `hll_merge`: Accepts a binary column of states generated by `hll`
//! and merges them into a single state.
//!
//! The states can be then used to estimate the cardinality of the
//! values in the column by `hll_count` UDF.
use std::sync::Arc;
use common_query::prelude::*;

View File

@@ -12,6 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.
//! Implementation of the `uddsketch_state` UDAF that generate the state of
//! UDDSketch for a given set of values.
//!
//! The generated state can be used to compute approximate quantiles using
//! `uddsketch_calc` UDF.
use std::sync::Arc;
use common_query::prelude::*;

View File

@@ -63,7 +63,7 @@ pub trait Function: fmt::Display + Sync + Send {
fn signature(&self) -> Signature;
/// Evaluate the function, e.g. run/execute the function.
fn eval(&self, _func_ctx: FunctionContext, _columns: &[VectorRef]) -> Result<VectorRef>;
fn eval(&self, ctx: &FunctionContext, columns: &[VectorRef]) -> Result<VectorRef>;
}
pub type FunctionRef = Arc<dyn Function>;

View File

@@ -18,11 +18,13 @@ use std::sync::{Arc, RwLock};
use once_cell::sync::Lazy;
use crate::admin::AdminFunction;
use crate::function::{AsyncFunctionRef, FunctionRef};
use crate::scalars::aggregate::{AggregateFunctionMetaRef, AggregateFunctions};
use crate::scalars::date::DateFunction;
use crate::scalars::expression::ExpressionFunction;
use crate::scalars::hll_count::HllCalcFunction;
use crate::scalars::ip::IpFunctions;
use crate::scalars::json::JsonFunction;
use crate::scalars::matches::MatchesFunction;
use crate::scalars::math::MathFunction;
@@ -30,7 +32,6 @@ use crate::scalars::timestamp::TimestampFunction;
use crate::scalars::uddsketch_calc::UddSketchCalcFunction;
use crate::scalars::vector::VectorFunction;
use crate::system::SystemFunction;
use crate::table::TableFunction;
#[derive(Default)]
pub struct FunctionRegistry {
@@ -118,7 +119,7 @@ pub static FUNCTION_REGISTRY: Lazy<Arc<FunctionRegistry>> = Lazy::new(|| {
// System and administration functions
SystemFunction::register(&function_registry);
TableFunction::register(&function_registry);
AdminFunction::register(&function_registry);
// Json related functions
JsonFunction::register(&function_registry);
@@ -130,6 +131,9 @@ pub static FUNCTION_REGISTRY: Lazy<Arc<FunctionRegistry>> = Lazy::new(|| {
#[cfg(feature = "geo")]
crate::scalars::geo::GeoFunctions::register(&function_registry);
// Ip functions
IpFunctions::register(&function_registry);
Arc::new(function_registry)
});

View File

@@ -15,11 +15,11 @@
#![feature(let_chains)]
#![feature(try_blocks)]
mod admin;
mod flush_flow;
mod macros;
pub mod scalars;
mod system;
mod table;
pub mod aggr;
pub mod function;

View File

@@ -23,6 +23,7 @@ pub mod math;
pub mod vector;
pub(crate) mod hll_count;
pub mod ip;
#[cfg(test)]
pub(crate) mod test;
pub(crate) mod timestamp;

View File

@@ -12,24 +12,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.
mod argmax;
mod argmin;
mod diff;
mod mean;
mod polyval;
mod scipy_stats_norm_cdf;
mod scipy_stats_norm_pdf;
//! # Deprecate Warning:
//!
//! This module is deprecated and will be removed in the future.
//! All UDAF implementation here are not maintained and should
//! not be used before they are refactored into the `src/aggr`
//! version.
use std::sync::Arc;
pub use argmax::ArgmaxAccumulatorCreator;
pub use argmin::ArgminAccumulatorCreator;
use common_query::logical_plan::AggregateFunctionCreatorRef;
pub use diff::DiffAccumulatorCreator;
pub use mean::MeanAccumulatorCreator;
pub use polyval::PolyvalAccumulatorCreator;
pub use scipy_stats_norm_cdf::ScipyStatsNormCdfAccumulatorCreator;
pub use scipy_stats_norm_pdf::ScipyStatsNormPdfAccumulatorCreator;
use crate::function_registry::FunctionRegistry;
use crate::scalars::vector::product::VectorProductCreator;
@@ -76,31 +68,22 @@ pub(crate) struct AggregateFunctions;
impl AggregateFunctions {
pub fn register(registry: &FunctionRegistry) {
macro_rules! register_aggr_func {
($name :expr, $arg_count :expr, $creator :ty) => {
registry.register_aggregate_function(Arc::new(AggregateFunctionMeta::new(
$name,
$arg_count,
Arc::new(|| Arc::new(<$creator>::default())),
)));
};
}
register_aggr_func!("diff", 1, DiffAccumulatorCreator);
register_aggr_func!("mean", 1, MeanAccumulatorCreator);
register_aggr_func!("polyval", 2, PolyvalAccumulatorCreator);
register_aggr_func!("argmax", 1, ArgmaxAccumulatorCreator);
register_aggr_func!("argmin", 1, ArgminAccumulatorCreator);
register_aggr_func!("scipystatsnormcdf", 2, ScipyStatsNormCdfAccumulatorCreator);
register_aggr_func!("scipystatsnormpdf", 2, ScipyStatsNormPdfAccumulatorCreator);
register_aggr_func!("vec_sum", 1, VectorSumCreator);
register_aggr_func!("vec_product", 1, VectorProductCreator);
registry.register_aggregate_function(Arc::new(AggregateFunctionMeta::new(
"vec_sum",
1,
Arc::new(|| Arc::new(VectorSumCreator::default())),
)));
registry.register_aggregate_function(Arc::new(AggregateFunctionMeta::new(
"vec_product",
1,
Arc::new(|| Arc::new(VectorProductCreator::default())),
)));
#[cfg(feature = "geo")]
register_aggr_func!(
registry.register_aggregate_function(Arc::new(AggregateFunctionMeta::new(
"json_encode_path",
3,
super::geo::encoding::JsonPathEncodeFunctionCreator
);
Arc::new(|| Arc::new(super::geo::encoding::JsonPathEncodeFunctionCreator::default())),
)));
}
}

View File

@@ -1,208 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::cmp::Ordering;
use std::sync::Arc;
use common_macro::{as_aggr_func_creator, AggrFuncTypeStore};
use common_query::error::{
BadAccumulatorImplSnafu, CreateAccumulatorSnafu, InvalidInputStateSnafu, Result,
};
use common_query::logical_plan::accumulator::AggrFuncTypeStore;
use common_query::logical_plan::{Accumulator, AggregateFunctionCreator};
use common_query::prelude::*;
use datatypes::prelude::*;
use datatypes::types::{LogicalPrimitiveType, WrapperType};
use datatypes::vectors::{ConstantVector, Helper};
use datatypes::with_match_primitive_type_id;
use snafu::ensure;
// https://numpy.org/doc/stable/reference/generated/numpy.argmax.html
// return the index of the max value
#[derive(Debug, Default)]
pub struct Argmax<T> {
max: Option<T>,
n: u64,
}
impl<T> Argmax<T>
where
T: PartialOrd + Copy,
{
fn update(&mut self, value: T, index: u64) {
if let Some(Ordering::Less) = self.max.partial_cmp(&Some(value)) {
self.max = Some(value);
self.n = index;
}
}
}
impl<T> Accumulator for Argmax<T>
where
T: WrapperType + PartialOrd,
{
fn state(&self) -> Result<Vec<Value>> {
match self.max {
Some(max) => Ok(vec![max.into(), self.n.into()]),
_ => Ok(vec![Value::Null, self.n.into()]),
}
}
fn update_batch(&mut self, values: &[VectorRef]) -> Result<()> {
if values.is_empty() {
return Ok(());
}
let column = &values[0];
let column: &<T as Scalar>::VectorType = if column.is_const() {
let column: &ConstantVector = unsafe { Helper::static_cast(column) };
unsafe { Helper::static_cast(column.inner()) }
} else {
unsafe { Helper::static_cast(column) }
};
for (i, v) in column.iter_data().enumerate() {
if let Some(value) = v {
self.update(value, i as u64);
}
}
Ok(())
}
fn merge_batch(&mut self, states: &[VectorRef]) -> Result<()> {
if states.is_empty() {
return Ok(());
}
ensure!(
states.len() == 2,
BadAccumulatorImplSnafu {
err_msg: "expect 2 states in `merge_batch`",
}
);
let max = &states[0];
let index = &states[1];
let max: &<T as Scalar>::VectorType = unsafe { Helper::static_cast(max) };
let index: &<u64 as Scalar>::VectorType = unsafe { Helper::static_cast(index) };
index
.iter_data()
.flatten()
.zip(max.iter_data().flatten())
.for_each(|(i, max)| self.update(max, i));
Ok(())
}
fn evaluate(&self) -> Result<Value> {
match self.max {
Some(_) => Ok(self.n.into()),
_ => Ok(Value::Null),
}
}
}
#[as_aggr_func_creator]
#[derive(Debug, Default, AggrFuncTypeStore)]
pub struct ArgmaxAccumulatorCreator {}
impl AggregateFunctionCreator for ArgmaxAccumulatorCreator {
fn creator(&self) -> AccumulatorCreatorFunction {
let creator: AccumulatorCreatorFunction = Arc::new(move |types: &[ConcreteDataType]| {
let input_type = &types[0];
with_match_primitive_type_id!(
input_type.logical_type_id(),
|$S| {
Ok(Box::new(Argmax::<<$S as LogicalPrimitiveType>::Wrapper>::default()))
},
{
let err_msg = format!(
"\"ARGMAX\" aggregate function not support data type {:?}",
input_type.logical_type_id(),
);
CreateAccumulatorSnafu { err_msg }.fail()?
}
)
});
creator
}
fn output_type(&self) -> Result<ConcreteDataType> {
Ok(ConcreteDataType::uint64_datatype())
}
fn state_types(&self) -> Result<Vec<ConcreteDataType>> {
let input_types = self.input_types()?;
ensure!(input_types.len() == 1, InvalidInputStateSnafu);
Ok(vec![
input_types.into_iter().next().unwrap(),
ConcreteDataType::uint64_datatype(),
])
}
}
#[cfg(test)]
mod test {
use datatypes::vectors::Int32Vector;
use super::*;
#[test]
fn test_update_batch() {
// test update empty batch, expect not updating anything
let mut argmax = Argmax::<i32>::default();
argmax.update_batch(&[]).unwrap();
assert_eq!(Value::Null, argmax.evaluate().unwrap());
// test update one not-null value
let mut argmax = Argmax::<i32>::default();
let v: Vec<VectorRef> = vec![Arc::new(Int32Vector::from(vec![Some(42)]))];
argmax.update_batch(&v).unwrap();
assert_eq!(Value::from(0_u64), argmax.evaluate().unwrap());
// test update one null value
let mut argmax = Argmax::<i32>::default();
let v: Vec<VectorRef> = vec![Arc::new(Int32Vector::from(vec![Option::<i32>::None]))];
argmax.update_batch(&v).unwrap();
assert_eq!(Value::Null, argmax.evaluate().unwrap());
// test update no null-value batch
let mut argmax = Argmax::<i32>::default();
let v: Vec<VectorRef> = vec![Arc::new(Int32Vector::from(vec![
Some(-1i32),
Some(1),
Some(3),
]))];
argmax.update_batch(&v).unwrap();
assert_eq!(Value::from(2_u64), argmax.evaluate().unwrap());
// test update null-value batch
let mut argmax = Argmax::<i32>::default();
let v: Vec<VectorRef> = vec![Arc::new(Int32Vector::from(vec![
Some(-2i32),
None,
Some(4),
]))];
argmax.update_batch(&v).unwrap();
assert_eq!(Value::from(2_u64), argmax.evaluate().unwrap());
// test update with constant vector
let mut argmax = Argmax::<i32>::default();
let v: Vec<VectorRef> = vec![Arc::new(ConstantVector::new(
Arc::new(Int32Vector::from_vec(vec![4])),
10,
))];
argmax.update_batch(&v).unwrap();
assert_eq!(Value::from(0_u64), argmax.evaluate().unwrap());
}
}

View File

@@ -1,216 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::cmp::Ordering;
use std::sync::Arc;
use common_macro::{as_aggr_func_creator, AggrFuncTypeStore};
use common_query::error::{
BadAccumulatorImplSnafu, CreateAccumulatorSnafu, InvalidInputStateSnafu, Result,
};
use common_query::logical_plan::accumulator::AggrFuncTypeStore;
use common_query::logical_plan::{Accumulator, AggregateFunctionCreator};
use common_query::prelude::*;
use datatypes::prelude::*;
use datatypes::vectors::{ConstantVector, Helper};
use datatypes::with_match_primitive_type_id;
use snafu::ensure;
// // https://numpy.org/doc/stable/reference/generated/numpy.argmin.html
#[derive(Debug, Default)]
pub struct Argmin<T> {
min: Option<T>,
n: u32,
}
impl<T> Argmin<T>
where
T: Copy + PartialOrd,
{
fn update(&mut self, value: T, index: u32) {
match self.min {
Some(min) => {
if let Some(Ordering::Greater) = min.partial_cmp(&value) {
self.min = Some(value);
self.n = index;
}
}
None => {
self.min = Some(value);
self.n = index;
}
}
}
}
impl<T> Accumulator for Argmin<T>
where
T: WrapperType + PartialOrd,
{
fn state(&self) -> Result<Vec<Value>> {
match self.min {
Some(min) => Ok(vec![min.into(), self.n.into()]),
_ => Ok(vec![Value::Null, self.n.into()]),
}
}
fn update_batch(&mut self, values: &[VectorRef]) -> Result<()> {
if values.is_empty() {
return Ok(());
}
ensure!(values.len() == 1, InvalidInputStateSnafu);
let column = &values[0];
let column: &<T as Scalar>::VectorType = if column.is_const() {
let column: &ConstantVector = unsafe { Helper::static_cast(column) };
unsafe { Helper::static_cast(column.inner()) }
} else {
unsafe { Helper::static_cast(column) }
};
for (i, v) in column.iter_data().enumerate() {
if let Some(value) = v {
self.update(value, i as u32);
}
}
Ok(())
}
fn merge_batch(&mut self, states: &[VectorRef]) -> Result<()> {
if states.is_empty() {
return Ok(());
}
ensure!(
states.len() == 2,
BadAccumulatorImplSnafu {
err_msg: "expect 2 states in `merge_batch`",
}
);
let min = &states[0];
let index = &states[1];
let min: &<T as Scalar>::VectorType = unsafe { Helper::static_cast(min) };
let index: &<u32 as Scalar>::VectorType = unsafe { Helper::static_cast(index) };
index
.iter_data()
.flatten()
.zip(min.iter_data().flatten())
.for_each(|(i, min)| self.update(min, i));
Ok(())
}
fn evaluate(&self) -> Result<Value> {
match self.min {
Some(_) => Ok(self.n.into()),
_ => Ok(Value::Null),
}
}
}
#[as_aggr_func_creator]
#[derive(Debug, Default, AggrFuncTypeStore)]
pub struct ArgminAccumulatorCreator {}
impl AggregateFunctionCreator for ArgminAccumulatorCreator {
fn creator(&self) -> AccumulatorCreatorFunction {
let creator: AccumulatorCreatorFunction = Arc::new(move |types: &[ConcreteDataType]| {
let input_type = &types[0];
with_match_primitive_type_id!(
input_type.logical_type_id(),
|$S| {
Ok(Box::new(Argmin::<<$S as LogicalPrimitiveType>::Wrapper>::default()))
},
{
let err_msg = format!(
"\"ARGMIN\" aggregate function not support data type {:?}",
input_type.logical_type_id(),
);
CreateAccumulatorSnafu { err_msg }.fail()?
}
)
});
creator
}
fn output_type(&self) -> Result<ConcreteDataType> {
Ok(ConcreteDataType::uint32_datatype())
}
fn state_types(&self) -> Result<Vec<ConcreteDataType>> {
let input_types = self.input_types()?;
ensure!(input_types.len() == 1, InvalidInputStateSnafu);
Ok(vec![
input_types.into_iter().next().unwrap(),
ConcreteDataType::uint32_datatype(),
])
}
}
#[cfg(test)]
mod test {
use datatypes::vectors::Int32Vector;
use super::*;
#[test]
fn test_update_batch() {
// test update empty batch, expect not updating anything
let mut argmin = Argmin::<i32>::default();
argmin.update_batch(&[]).unwrap();
assert_eq!(Value::Null, argmin.evaluate().unwrap());
// test update one not-null value
let mut argmin = Argmin::<i32>::default();
let v: Vec<VectorRef> = vec![Arc::new(Int32Vector::from(vec![Some(42)]))];
argmin.update_batch(&v).unwrap();
assert_eq!(Value::from(0_u32), argmin.evaluate().unwrap());
// test update one null value
let mut argmin = Argmin::<i32>::default();
let v: Vec<VectorRef> = vec![Arc::new(Int32Vector::from(vec![Option::<i32>::None]))];
argmin.update_batch(&v).unwrap();
assert_eq!(Value::Null, argmin.evaluate().unwrap());
// test update no null-value batch
let mut argmin = Argmin::<i32>::default();
let v: Vec<VectorRef> = vec![Arc::new(Int32Vector::from(vec![
Some(-1i32),
Some(1),
Some(3),
]))];
argmin.update_batch(&v).unwrap();
assert_eq!(Value::from(0_u32), argmin.evaluate().unwrap());
// test update null-value batch
let mut argmin = Argmin::<i32>::default();
let v: Vec<VectorRef> = vec![Arc::new(Int32Vector::from(vec![
Some(-2i32),
None,
Some(4),
]))];
argmin.update_batch(&v).unwrap();
assert_eq!(Value::from(0_u32), argmin.evaluate().unwrap());
// test update with constant vector
let mut argmin = Argmin::<i32>::default();
let v: Vec<VectorRef> = vec![Arc::new(ConstantVector::new(
Arc::new(Int32Vector::from_vec(vec![4])),
10,
))];
argmin.update_batch(&v).unwrap();
assert_eq!(Value::from(0_u32), argmin.evaluate().unwrap());
}
}

View File

@@ -1,252 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::marker::PhantomData;
use std::sync::Arc;
use common_macro::{as_aggr_func_creator, AggrFuncTypeStore};
use common_query::error::{
CreateAccumulatorSnafu, DowncastVectorSnafu, FromScalarValueSnafu, InvalidInputStateSnafu,
Result,
};
use common_query::logical_plan::accumulator::AggrFuncTypeStore;
use common_query::logical_plan::{Accumulator, AggregateFunctionCreator};
use common_query::prelude::*;
use datatypes::prelude::*;
use datatypes::value::ListValue;
use datatypes::vectors::{ConstantVector, Helper, ListVector};
use datatypes::with_match_primitive_type_id;
use num_traits::AsPrimitive;
use snafu::{ensure, OptionExt, ResultExt};
// https://numpy.org/doc/stable/reference/generated/numpy.diff.html
// I is the input type, O is the output type.
#[derive(Debug, Default)]
pub struct Diff<I, O> {
values: Vec<I>,
_phantom: PhantomData<O>,
}
impl<I, O> Diff<I, O> {
fn push(&mut self, value: I) {
self.values.push(value);
}
}
impl<I, O> Accumulator for Diff<I, O>
where
I: WrapperType,
O: WrapperType,
I::Native: AsPrimitive<O::Native>,
O::Native: std::ops::Sub<Output = O::Native>,
{
fn state(&self) -> Result<Vec<Value>> {
let nums = self
.values
.iter()
.map(|&n| n.into())
.collect::<Vec<Value>>();
Ok(vec![Value::List(ListValue::new(
nums,
I::LogicalType::build_data_type(),
))])
}
fn update_batch(&mut self, values: &[VectorRef]) -> Result<()> {
if values.is_empty() {
return Ok(());
}
ensure!(values.len() == 1, InvalidInputStateSnafu);
let column = &values[0];
let mut len = 1;
let column: &<I as Scalar>::VectorType = if column.is_const() {
len = column.len();
let column: &ConstantVector = unsafe { Helper::static_cast(column) };
unsafe { Helper::static_cast(column.inner()) }
} else {
unsafe { Helper::static_cast(column) }
};
(0..len).for_each(|_| {
for v in column.iter_data().flatten() {
self.push(v);
}
});
Ok(())
}
fn merge_batch(&mut self, states: &[VectorRef]) -> Result<()> {
if states.is_empty() {
return Ok(());
}
let states = &states[0];
let states = states
.as_any()
.downcast_ref::<ListVector>()
.with_context(|| DowncastVectorSnafu {
err_msg: format!(
"expect ListVector, got vector type {}",
states.vector_type_name()
),
})?;
for state in states.values_iter() {
if let Some(state) = state.context(FromScalarValueSnafu)? {
self.update_batch(&[state])?;
}
}
Ok(())
}
fn evaluate(&self) -> Result<Value> {
if self.values.is_empty() || self.values.len() == 1 {
return Ok(Value::Null);
}
let diff = self
.values
.windows(2)
.map(|x| {
let native = x[1].into_native().as_() - x[0].into_native().as_();
O::from_native(native).into()
})
.collect::<Vec<Value>>();
let diff = Value::List(ListValue::new(diff, O::LogicalType::build_data_type()));
Ok(diff)
}
}
#[as_aggr_func_creator]
#[derive(Debug, Default, AggrFuncTypeStore)]
pub struct DiffAccumulatorCreator {}
impl AggregateFunctionCreator for DiffAccumulatorCreator {
fn creator(&self) -> AccumulatorCreatorFunction {
let creator: AccumulatorCreatorFunction = Arc::new(move |types: &[ConcreteDataType]| {
let input_type = &types[0];
with_match_primitive_type_id!(
input_type.logical_type_id(),
|$S| {
Ok(Box::new(Diff::<<$S as LogicalPrimitiveType>::Wrapper, <<$S as LogicalPrimitiveType>::LargestType as LogicalPrimitiveType>::Wrapper>::default()))
},
{
let err_msg = format!(
"\"DIFF\" aggregate function not support data type {:?}",
input_type.logical_type_id(),
);
CreateAccumulatorSnafu { err_msg }.fail()?
}
)
});
creator
}
fn output_type(&self) -> Result<ConcreteDataType> {
let input_types = self.input_types()?;
ensure!(input_types.len() == 1, InvalidInputStateSnafu);
with_match_primitive_type_id!(
input_types[0].logical_type_id(),
|$S| {
Ok(ConcreteDataType::list_datatype($S::default().into()))
},
{
unreachable!()
}
)
}
fn state_types(&self) -> Result<Vec<ConcreteDataType>> {
let input_types = self.input_types()?;
ensure!(input_types.len() == 1, InvalidInputStateSnafu);
with_match_primitive_type_id!(
input_types[0].logical_type_id(),
|$S| {
Ok(vec![ConcreteDataType::list_datatype($S::default().into())])
},
{
unreachable!()
}
)
}
}
#[cfg(test)]
mod test {
use datatypes::vectors::Int32Vector;
use super::*;
#[test]
fn test_update_batch() {
// test update empty batch, expect not updating anything
let mut diff = Diff::<i32, i64>::default();
diff.update_batch(&[]).unwrap();
assert!(diff.values.is_empty());
assert_eq!(Value::Null, diff.evaluate().unwrap());
// test update one not-null value
let mut diff = Diff::<i32, i64>::default();
let v: Vec<VectorRef> = vec![Arc::new(Int32Vector::from(vec![Some(42)]))];
diff.update_batch(&v).unwrap();
assert_eq!(Value::Null, diff.evaluate().unwrap());
// test update one null value
let mut diff = Diff::<i32, i64>::default();
let v: Vec<VectorRef> = vec![Arc::new(Int32Vector::from(vec![Option::<i32>::None]))];
diff.update_batch(&v).unwrap();
assert_eq!(Value::Null, diff.evaluate().unwrap());
// test update no null-value batch
let mut diff = Diff::<i32, i64>::default();
let v: Vec<VectorRef> = vec![Arc::new(Int32Vector::from(vec![
Some(-1i32),
Some(1),
Some(2),
]))];
let values = vec![Value::from(2_i64), Value::from(1_i64)];
diff.update_batch(&v).unwrap();
assert_eq!(
Value::List(ListValue::new(values, ConcreteDataType::int64_datatype())),
diff.evaluate().unwrap()
);
// test update null-value batch
let mut diff = Diff::<i32, i64>::default();
let v: Vec<VectorRef> = vec![Arc::new(Int32Vector::from(vec![
Some(-2i32),
None,
Some(3),
Some(4),
]))];
let values = vec![Value::from(5_i64), Value::from(1_i64)];
diff.update_batch(&v).unwrap();
assert_eq!(
Value::List(ListValue::new(values, ConcreteDataType::int64_datatype())),
diff.evaluate().unwrap()
);
// test update with constant vector
let mut diff = Diff::<i32, i64>::default();
let v: Vec<VectorRef> = vec![Arc::new(ConstantVector::new(
Arc::new(Int32Vector::from_vec(vec![4])),
4,
))];
let values = vec![Value::from(0_i64), Value::from(0_i64), Value::from(0_i64)];
diff.update_batch(&v).unwrap();
assert_eq!(
Value::List(ListValue::new(values, ConcreteDataType::int64_datatype())),
diff.evaluate().unwrap()
);
}
}

View File

@@ -1,238 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::marker::PhantomData;
use std::sync::Arc;
use common_macro::{as_aggr_func_creator, AggrFuncTypeStore};
use common_query::error::{
BadAccumulatorImplSnafu, CreateAccumulatorSnafu, DowncastVectorSnafu, InvalidInputStateSnafu,
Result,
};
use common_query::logical_plan::accumulator::AggrFuncTypeStore;
use common_query::logical_plan::{Accumulator, AggregateFunctionCreator};
use common_query::prelude::*;
use datatypes::prelude::*;
use datatypes::types::WrapperType;
use datatypes::vectors::{ConstantVector, Float64Vector, Helper, UInt64Vector};
use datatypes::with_match_primitive_type_id;
use num_traits::AsPrimitive;
use snafu::{ensure, OptionExt};
#[derive(Debug, Default)]
pub struct Mean<T> {
sum: f64,
n: u64,
_phantom: PhantomData<T>,
}
impl<T> Mean<T>
where
T: WrapperType,
T::Native: AsPrimitive<f64>,
{
#[inline(always)]
fn push(&mut self, value: T) {
self.sum += value.into_native().as_();
self.n += 1;
}
#[inline(always)]
fn update(&mut self, sum: f64, n: u64) {
self.sum += sum;
self.n += n;
}
}
impl<T> Accumulator for Mean<T>
where
T: WrapperType,
T::Native: AsPrimitive<f64>,
{
fn state(&self) -> Result<Vec<Value>> {
Ok(vec![self.sum.into(), self.n.into()])
}
fn update_batch(&mut self, values: &[VectorRef]) -> Result<()> {
if values.is_empty() {
return Ok(());
}
ensure!(values.len() == 1, InvalidInputStateSnafu);
let column = &values[0];
let mut len = 1;
let column: &<T as Scalar>::VectorType = if column.is_const() {
len = column.len();
let column: &ConstantVector = unsafe { Helper::static_cast(column) };
unsafe { Helper::static_cast(column.inner()) }
} else {
unsafe { Helper::static_cast(column) }
};
(0..len).for_each(|_| {
for v in column.iter_data().flatten() {
self.push(v);
}
});
Ok(())
}
fn merge_batch(&mut self, states: &[VectorRef]) -> Result<()> {
if states.is_empty() {
return Ok(());
}
ensure!(
states.len() == 2,
BadAccumulatorImplSnafu {
err_msg: "expect 2 states in `merge_batch`",
}
);
let sum = &states[0];
let n = &states[1];
let sum = sum
.as_any()
.downcast_ref::<Float64Vector>()
.with_context(|| DowncastVectorSnafu {
err_msg: format!(
"expect Float64Vector, got vector type {}",
sum.vector_type_name()
),
})?;
let n = n
.as_any()
.downcast_ref::<UInt64Vector>()
.with_context(|| DowncastVectorSnafu {
err_msg: format!(
"expect UInt64Vector, got vector type {}",
sum.vector_type_name()
),
})?;
sum.iter_data().zip(n.iter_data()).for_each(|(sum, n)| {
if let (Some(sum), Some(n)) = (sum, n) {
self.update(sum, n);
}
});
Ok(())
}
fn evaluate(&self) -> Result<Value> {
if self.n == 0 {
return Ok(Value::Null);
}
let values = self.sum / self.n as f64;
Ok(values.into())
}
}
#[as_aggr_func_creator]
#[derive(Debug, Default, AggrFuncTypeStore)]
pub struct MeanAccumulatorCreator {}
impl AggregateFunctionCreator for MeanAccumulatorCreator {
fn creator(&self) -> AccumulatorCreatorFunction {
let creator: AccumulatorCreatorFunction = Arc::new(move |types: &[ConcreteDataType]| {
let input_type = &types[0];
with_match_primitive_type_id!(
input_type.logical_type_id(),
|$S| {
Ok(Box::new(Mean::<<$S as LogicalPrimitiveType>::Native>::default()))
},
{
let err_msg = format!(
"\"MEAN\" aggregate function not support data type {:?}",
input_type.logical_type_id(),
);
CreateAccumulatorSnafu { err_msg }.fail()?
}
)
});
creator
}
fn output_type(&self) -> Result<ConcreteDataType> {
let input_types = self.input_types()?;
ensure!(input_types.len() == 1, InvalidInputStateSnafu);
Ok(ConcreteDataType::float64_datatype())
}
fn state_types(&self) -> Result<Vec<ConcreteDataType>> {
let input_types = self.input_types()?;
ensure!(input_types.len() == 1, InvalidInputStateSnafu);
Ok(vec![
ConcreteDataType::float64_datatype(),
ConcreteDataType::uint64_datatype(),
])
}
}
#[cfg(test)]
mod test {
use datatypes::vectors::Int32Vector;
use super::*;
#[test]
fn test_update_batch() {
// test update empty batch, expect not updating anything
let mut mean = Mean::<i32>::default();
mean.update_batch(&[]).unwrap();
assert_eq!(Value::Null, mean.evaluate().unwrap());
// test update one not-null value
let mut mean = Mean::<i32>::default();
let v: Vec<VectorRef> = vec![Arc::new(Int32Vector::from(vec![Some(42)]))];
mean.update_batch(&v).unwrap();
assert_eq!(Value::from(42.0_f64), mean.evaluate().unwrap());
// test update one null value
let mut mean = Mean::<i32>::default();
let v: Vec<VectorRef> = vec![Arc::new(Int32Vector::from(vec![Option::<i32>::None]))];
mean.update_batch(&v).unwrap();
assert_eq!(Value::Null, mean.evaluate().unwrap());
// test update no null-value batch
let mut mean = Mean::<i32>::default();
let v: Vec<VectorRef> = vec![Arc::new(Int32Vector::from(vec![
Some(-1i32),
Some(1),
Some(2),
]))];
mean.update_batch(&v).unwrap();
assert_eq!(Value::from(0.6666666666666666), mean.evaluate().unwrap());
// test update null-value batch
let mut mean = Mean::<i32>::default();
let v: Vec<VectorRef> = vec![Arc::new(Int32Vector::from(vec![
Some(-2i32),
None,
Some(3),
Some(4),
]))];
mean.update_batch(&v).unwrap();
assert_eq!(Value::from(1.6666666666666667), mean.evaluate().unwrap());
// test update with constant vector
let mut mean = Mean::<i32>::default();
let v: Vec<VectorRef> = vec![Arc::new(ConstantVector::new(
Arc::new(Int32Vector::from_vec(vec![4])),
10,
))];
mean.update_batch(&v).unwrap();
assert_eq!(Value::from(4.0), mean.evaluate().unwrap());
}
}

View File

@@ -1,329 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::marker::PhantomData;
use std::sync::Arc;
use common_macro::{as_aggr_func_creator, AggrFuncTypeStore};
use common_query::error::{
self, BadAccumulatorImplSnafu, CreateAccumulatorSnafu, DowncastVectorSnafu,
FromScalarValueSnafu, InvalidInputColSnafu, InvalidInputStateSnafu, Result,
};
use common_query::logical_plan::accumulator::AggrFuncTypeStore;
use common_query::logical_plan::{Accumulator, AggregateFunctionCreator};
use common_query::prelude::*;
use datatypes::prelude::*;
use datatypes::types::{LogicalPrimitiveType, WrapperType};
use datatypes::value::ListValue;
use datatypes::vectors::{ConstantVector, Helper, Int64Vector, ListVector};
use datatypes::with_match_primitive_type_id;
use num_traits::AsPrimitive;
use snafu::{ensure, OptionExt, ResultExt};
// https://numpy.org/doc/stable/reference/generated/numpy.polyval.html
#[derive(Debug, Default)]
pub struct Polyval<T, PolyT>
where
T: WrapperType,
T::Native: AsPrimitive<PolyT::Native>,
PolyT: WrapperType,
PolyT::Native: std::ops::Mul<Output = PolyT::Native>,
{
values: Vec<T>,
// DataFusion casts constant in into i64 type.
x: Option<i64>,
_phantom: PhantomData<PolyT>,
}
impl<T, PolyT> Polyval<T, PolyT>
where
T: WrapperType,
T::Native: AsPrimitive<PolyT::Native>,
PolyT: WrapperType,
PolyT::Native: std::ops::Mul<Output = PolyT::Native>,
{
fn push(&mut self, value: T) {
self.values.push(value);
}
}
impl<T, PolyT> Accumulator for Polyval<T, PolyT>
where
T: WrapperType,
T::Native: AsPrimitive<PolyT::Native>,
PolyT: WrapperType + std::iter::Sum<<PolyT as WrapperType>::Native>,
PolyT::Native: std::ops::Mul<Output = PolyT::Native> + std::iter::Sum<PolyT::Native>,
i64: AsPrimitive<<PolyT as WrapperType>::Native>,
{
fn state(&self) -> Result<Vec<Value>> {
let nums = self
.values
.iter()
.map(|&n| n.into())
.collect::<Vec<Value>>();
Ok(vec![
Value::List(ListValue::new(nums, T::LogicalType::build_data_type())),
self.x.into(),
])
}
fn update_batch(&mut self, values: &[VectorRef]) -> Result<()> {
if values.is_empty() {
return Ok(());
}
ensure!(values.len() == 2, InvalidInputStateSnafu);
ensure!(values[0].len() == values[1].len(), InvalidInputStateSnafu);
if values[0].len() == 0 {
return Ok(());
}
// This is a unary accumulator, so only one column is provided.
let column = &values[0];
let mut len = 1;
let column: &<T as Scalar>::VectorType = if column.is_const() {
len = column.len();
let column: &ConstantVector = unsafe { Helper::static_cast(column) };
unsafe { Helper::static_cast(column.inner()) }
} else {
unsafe { Helper::static_cast(column) }
};
(0..len).for_each(|_| {
for v in column.iter_data().flatten() {
self.push(v);
}
});
let x = &values[1];
let x = Helper::check_get_scalar::<i64>(x).context(error::InvalidInputTypeSnafu {
err_msg: "expecting \"POLYVAL\" function's second argument to be a positive integer",
})?;
// `get(0)` is safe because we have checked `values[1].len() == values[0].len() != 0`
let first = x.get(0);
ensure!(!first.is_null(), InvalidInputColSnafu);
for i in 1..x.len() {
ensure!(first == x.get(i), InvalidInputColSnafu);
}
let first = match first {
Value::Int64(v) => v,
// unreachable because we have checked `first` is not null and is i64 above
_ => unreachable!(),
};
if let Some(x) = self.x {
ensure!(x == first, InvalidInputColSnafu);
} else {
self.x = Some(first);
};
Ok(())
}
// DataFusion executes accumulators in partitions. In some execution stage, DataFusion will
// merge states from other accumulators (returned by `state()` method).
fn merge_batch(&mut self, states: &[VectorRef]) -> Result<()> {
if states.is_empty() {
return Ok(());
}
ensure!(
states.len() == 2,
BadAccumulatorImplSnafu {
err_msg: "expect 2 states in `merge_batch`",
}
);
let x = &states[1];
let x = x
.as_any()
.downcast_ref::<Int64Vector>()
.with_context(|| DowncastVectorSnafu {
err_msg: format!(
"expect Int64Vector, got vector type {}",
x.vector_type_name()
),
})?;
let x = x.get(0);
if x.is_null() {
return Ok(());
}
let x = match x {
Value::Int64(x) => x,
_ => unreachable!(),
};
self.x = Some(x);
let values = &states[0];
let values = values
.as_any()
.downcast_ref::<ListVector>()
.with_context(|| DowncastVectorSnafu {
err_msg: format!(
"expect ListVector, got vector type {}",
values.vector_type_name()
),
})?;
for value in values.values_iter() {
if let Some(value) = value.context(FromScalarValueSnafu)? {
let column: &<T as Scalar>::VectorType = unsafe { Helper::static_cast(&value) };
for v in column.iter_data().flatten() {
self.push(v);
}
}
}
Ok(())
}
// DataFusion expects this function to return the final value of this aggregator.
fn evaluate(&self) -> Result<Value> {
if self.values.is_empty() {
return Ok(Value::Null);
}
let x = if let Some(x) = self.x {
x
} else {
return Ok(Value::Null);
};
let len = self.values.len();
let polyval: PolyT = self
.values
.iter()
.enumerate()
.map(|(i, &value)| value.into_native().as_() * x.pow((len - 1 - i) as u32).as_())
.sum();
Ok(polyval.into())
}
}
#[as_aggr_func_creator]
#[derive(Debug, Default, AggrFuncTypeStore)]
pub struct PolyvalAccumulatorCreator {}
impl AggregateFunctionCreator for PolyvalAccumulatorCreator {
fn creator(&self) -> AccumulatorCreatorFunction {
let creator: AccumulatorCreatorFunction = Arc::new(move |types: &[ConcreteDataType]| {
let input_type = &types[0];
with_match_primitive_type_id!(
input_type.logical_type_id(),
|$S| {
Ok(Box::new(Polyval::<<$S as LogicalPrimitiveType>::Wrapper, <<$S as LogicalPrimitiveType>::LargestType as LogicalPrimitiveType>::Wrapper>::default()))
},
{
let err_msg = format!(
"\"POLYVAL\" aggregate function not support data type {:?}",
input_type.logical_type_id(),
);
CreateAccumulatorSnafu { err_msg }.fail()?
}
)
});
creator
}
fn output_type(&self) -> Result<ConcreteDataType> {
let input_types = self.input_types()?;
ensure!(input_types.len() == 2, InvalidInputStateSnafu);
let input_type = self.input_types()?[0].logical_type_id();
with_match_primitive_type_id!(
input_type,
|$S| {
Ok(<<$S as LogicalPrimitiveType>::LargestType as LogicalPrimitiveType>::build_data_type())
},
{
unreachable!()
}
)
}
fn state_types(&self) -> Result<Vec<ConcreteDataType>> {
let input_types = self.input_types()?;
ensure!(input_types.len() == 2, InvalidInputStateSnafu);
Ok(vec![
ConcreteDataType::list_datatype(input_types.into_iter().next().unwrap()),
ConcreteDataType::int64_datatype(),
])
}
}
#[cfg(test)]
mod test {
use datatypes::vectors::Int32Vector;
use super::*;
#[test]
fn test_update_batch() {
// test update empty batch, expect not updating anything
let mut polyval = Polyval::<i32, i64>::default();
polyval.update_batch(&[]).unwrap();
assert!(polyval.values.is_empty());
assert_eq!(Value::Null, polyval.evaluate().unwrap());
// test update one not-null value
let mut polyval = Polyval::<i32, i64>::default();
let v: Vec<VectorRef> = vec![
Arc::new(Int32Vector::from(vec![Some(3)])),
Arc::new(Int64Vector::from(vec![Some(2_i64)])),
];
polyval.update_batch(&v).unwrap();
assert_eq!(Value::Int64(3), polyval.evaluate().unwrap());
// test update one null value
let mut polyval = Polyval::<i32, i64>::default();
let v: Vec<VectorRef> = vec![
Arc::new(Int32Vector::from(vec![Option::<i32>::None])),
Arc::new(Int64Vector::from(vec![Some(2_i64)])),
];
polyval.update_batch(&v).unwrap();
assert_eq!(Value::Null, polyval.evaluate().unwrap());
// test update no null-value batch
let mut polyval = Polyval::<i32, i64>::default();
let v: Vec<VectorRef> = vec![
Arc::new(Int32Vector::from(vec![Some(3), Some(0), Some(1)])),
Arc::new(Int64Vector::from(vec![
Some(2_i64),
Some(2_i64),
Some(2_i64),
])),
];
polyval.update_batch(&v).unwrap();
assert_eq!(Value::Int64(13), polyval.evaluate().unwrap());
// test update null-value batch
let mut polyval = Polyval::<i32, i64>::default();
let v: Vec<VectorRef> = vec![
Arc::new(Int32Vector::from(vec![Some(3), Some(0), None, Some(1)])),
Arc::new(Int64Vector::from(vec![
Some(2_i64),
Some(2_i64),
Some(2_i64),
Some(2_i64),
])),
];
polyval.update_batch(&v).unwrap();
assert_eq!(Value::Int64(13), polyval.evaluate().unwrap());
// test update with constant vector
let mut polyval = Polyval::<i32, i64>::default();
let v: Vec<VectorRef> = vec![
Arc::new(ConstantVector::new(
Arc::new(Int32Vector::from_vec(vec![4])),
2,
)),
Arc::new(Int64Vector::from(vec![Some(5_i64), Some(5_i64)])),
];
polyval.update_batch(&v).unwrap();
assert_eq!(Value::Int64(24), polyval.evaluate().unwrap());
}
}

View File

@@ -1,270 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use common_macro::{as_aggr_func_creator, AggrFuncTypeStore};
use common_query::error::{
self, BadAccumulatorImplSnafu, CreateAccumulatorSnafu, DowncastVectorSnafu,
FromScalarValueSnafu, GenerateFunctionSnafu, InvalidInputColSnafu, InvalidInputStateSnafu,
Result,
};
use common_query::logical_plan::accumulator::AggrFuncTypeStore;
use common_query::logical_plan::{Accumulator, AggregateFunctionCreator};
use common_query::prelude::*;
use datatypes::prelude::*;
use datatypes::value::{ListValue, OrderedFloat};
use datatypes::vectors::{ConstantVector, Float64Vector, Helper, ListVector};
use datatypes::with_match_primitive_type_id;
use num_traits::AsPrimitive;
use snafu::{ensure, OptionExt, ResultExt};
use statrs::distribution::{ContinuousCDF, Normal};
use statrs::statistics::Statistics;
// https://docs.scipy.org/doc/scipy/reference/generated/scipy.stats.norm.html
#[derive(Debug, Default)]
pub struct ScipyStatsNormCdf<T> {
values: Vec<T>,
x: Option<f64>,
}
impl<T> ScipyStatsNormCdf<T> {
fn push(&mut self, value: T) {
self.values.push(value);
}
}
impl<T> Accumulator for ScipyStatsNormCdf<T>
where
T: WrapperType + std::iter::Sum<T>,
T::Native: AsPrimitive<f64>,
{
fn state(&self) -> Result<Vec<Value>> {
let nums = self
.values
.iter()
.map(|&x| x.into())
.collect::<Vec<Value>>();
Ok(vec![
Value::List(ListValue::new(nums, T::LogicalType::build_data_type())),
self.x.into(),
])
}
fn update_batch(&mut self, values: &[VectorRef]) -> Result<()> {
if values.is_empty() {
return Ok(());
}
ensure!(values.len() == 2, InvalidInputStateSnafu);
ensure!(values[1].len() == values[0].len(), InvalidInputStateSnafu);
if values[0].len() == 0 {
return Ok(());
}
let column = &values[0];
let mut len = 1;
let column: &<T as Scalar>::VectorType = if column.is_const() {
len = column.len();
let column: &ConstantVector = unsafe { Helper::static_cast(column) };
unsafe { Helper::static_cast(column.inner()) }
} else {
unsafe { Helper::static_cast(column) }
};
let x = &values[1];
let x = Helper::check_get_scalar::<f64>(x).context(error::InvalidInputTypeSnafu {
err_msg: "expecting \"SCIPYSTATSNORMCDF\" function's second argument to be a positive integer",
})?;
let first = x.get(0);
ensure!(!first.is_null(), InvalidInputColSnafu);
let first = match first {
Value::Float64(OrderedFloat(v)) => v,
// unreachable because we have checked `first` is not null and is i64 above
_ => unreachable!(),
};
if let Some(x) = self.x {
ensure!(x == first, InvalidInputColSnafu);
} else {
self.x = Some(first);
};
(0..len).for_each(|_| {
for v in column.iter_data().flatten() {
self.push(v);
}
});
Ok(())
}
fn merge_batch(&mut self, states: &[VectorRef]) -> Result<()> {
if states.is_empty() {
return Ok(());
}
ensure!(
states.len() == 2,
BadAccumulatorImplSnafu {
err_msg: "expect 2 states in `merge_batch`",
}
);
let x = &states[1];
let x = x
.as_any()
.downcast_ref::<Float64Vector>()
.with_context(|| DowncastVectorSnafu {
err_msg: format!(
"expect Float64Vector, got vector type {}",
x.vector_type_name()
),
})?;
let x = x.get(0);
if x.is_null() {
return Ok(());
}
let x = match x {
Value::Float64(OrderedFloat(x)) => x,
_ => unreachable!(),
};
self.x = Some(x);
let values = &states[0];
let values = values
.as_any()
.downcast_ref::<ListVector>()
.with_context(|| DowncastVectorSnafu {
err_msg: format!(
"expect ListVector, got vector type {}",
values.vector_type_name()
),
})?;
for value in values.values_iter() {
if let Some(value) = value.context(FromScalarValueSnafu)? {
let column: &<T as Scalar>::VectorType = unsafe { Helper::static_cast(&value) };
for v in column.iter_data().flatten() {
self.push(v);
}
}
}
Ok(())
}
fn evaluate(&self) -> Result<Value> {
let mean = self.values.iter().map(|v| v.into_native().as_()).mean();
let std_dev = self.values.iter().map(|v| v.into_native().as_()).std_dev();
if mean.is_nan() || std_dev.is_nan() {
Ok(Value::Null)
} else {
let x = if let Some(x) = self.x {
x
} else {
return Ok(Value::Null);
};
let n = Normal::new(mean, std_dev).context(GenerateFunctionSnafu)?;
Ok(n.cdf(x).into())
}
}
}
#[as_aggr_func_creator]
#[derive(Debug, Default, AggrFuncTypeStore)]
pub struct ScipyStatsNormCdfAccumulatorCreator {}
impl AggregateFunctionCreator for ScipyStatsNormCdfAccumulatorCreator {
fn creator(&self) -> AccumulatorCreatorFunction {
let creator: AccumulatorCreatorFunction = Arc::new(move |types: &[ConcreteDataType]| {
let input_type = &types[0];
with_match_primitive_type_id!(
input_type.logical_type_id(),
|$S| {
Ok(Box::new(ScipyStatsNormCdf::<<$S as LogicalPrimitiveType>::Wrapper>::default()))
},
{
let err_msg = format!(
"\"SCIPYSTATSNORMCDF\" aggregate function not support data type {:?}",
input_type.logical_type_id(),
);
CreateAccumulatorSnafu { err_msg }.fail()?
}
)
});
creator
}
fn output_type(&self) -> Result<ConcreteDataType> {
let input_types = self.input_types()?;
ensure!(input_types.len() == 2, InvalidInputStateSnafu);
Ok(ConcreteDataType::float64_datatype())
}
fn state_types(&self) -> Result<Vec<ConcreteDataType>> {
let input_types = self.input_types()?;
ensure!(input_types.len() == 2, InvalidInputStateSnafu);
Ok(vec![
ConcreteDataType::list_datatype(input_types[0].clone()),
ConcreteDataType::float64_datatype(),
])
}
}
#[cfg(test)]
mod test {
use datatypes::vectors::{Float64Vector, Int32Vector};
use super::*;
#[test]
fn test_update_batch() {
// test update empty batch, expect not updating anything
let mut scipy_stats_norm_cdf = ScipyStatsNormCdf::<i32>::default();
scipy_stats_norm_cdf.update_batch(&[]).unwrap();
assert!(scipy_stats_norm_cdf.values.is_empty());
assert_eq!(Value::Null, scipy_stats_norm_cdf.evaluate().unwrap());
// test update no null-value batch
let mut scipy_stats_norm_cdf = ScipyStatsNormCdf::<i32>::default();
let v: Vec<VectorRef> = vec![
Arc::new(Int32Vector::from(vec![Some(-1i32), Some(1), Some(2)])),
Arc::new(Float64Vector::from(vec![
Some(2.0_f64),
Some(2.0_f64),
Some(2.0_f64),
])),
];
scipy_stats_norm_cdf.update_batch(&v).unwrap();
assert_eq!(
Value::from(0.8086334555398362),
scipy_stats_norm_cdf.evaluate().unwrap()
);
// test update null-value batch
let mut scipy_stats_norm_cdf = ScipyStatsNormCdf::<i32>::default();
let v: Vec<VectorRef> = vec![
Arc::new(Int32Vector::from(vec![Some(-2i32), None, Some(3), Some(4)])),
Arc::new(Float64Vector::from(vec![
Some(2.0_f64),
None,
Some(2.0_f64),
Some(2.0_f64),
])),
];
scipy_stats_norm_cdf.update_batch(&v).unwrap();
assert_eq!(
Value::from(0.5412943699039795),
scipy_stats_norm_cdf.evaluate().unwrap()
);
}
}

View File

@@ -1,271 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use common_macro::{as_aggr_func_creator, AggrFuncTypeStore};
use common_query::error::{
self, BadAccumulatorImplSnafu, CreateAccumulatorSnafu, DowncastVectorSnafu,
FromScalarValueSnafu, GenerateFunctionSnafu, InvalidInputColSnafu, InvalidInputStateSnafu,
Result,
};
use common_query::logical_plan::accumulator::AggrFuncTypeStore;
use common_query::logical_plan::{Accumulator, AggregateFunctionCreator};
use common_query::prelude::*;
use datatypes::prelude::*;
use datatypes::value::{ListValue, OrderedFloat};
use datatypes::vectors::{ConstantVector, Float64Vector, Helper, ListVector};
use datatypes::with_match_primitive_type_id;
use num_traits::AsPrimitive;
use snafu::{ensure, OptionExt, ResultExt};
use statrs::distribution::{Continuous, Normal};
use statrs::statistics::Statistics;
// https://docs.scipy.org/doc/scipy/reference/generated/scipy.stats.norm.html
#[derive(Debug, Default)]
pub struct ScipyStatsNormPdf<T> {
values: Vec<T>,
x: Option<f64>,
}
impl<T> ScipyStatsNormPdf<T> {
fn push(&mut self, value: T) {
self.values.push(value);
}
}
impl<T> Accumulator for ScipyStatsNormPdf<T>
where
T: WrapperType,
T::Native: AsPrimitive<f64> + std::iter::Sum<T>,
{
fn state(&self) -> Result<Vec<Value>> {
let nums = self
.values
.iter()
.map(|&x| x.into())
.collect::<Vec<Value>>();
Ok(vec![
Value::List(ListValue::new(nums, T::LogicalType::build_data_type())),
self.x.into(),
])
}
fn update_batch(&mut self, values: &[VectorRef]) -> Result<()> {
if values.is_empty() {
return Ok(());
}
ensure!(values.len() == 2, InvalidInputStateSnafu);
ensure!(values[1].len() == values[0].len(), InvalidInputStateSnafu);
if values[0].len() == 0 {
return Ok(());
}
let column = &values[0];
let mut len = 1;
let column: &<T as Scalar>::VectorType = if column.is_const() {
len = column.len();
let column: &ConstantVector = unsafe { Helper::static_cast(column) };
unsafe { Helper::static_cast(column.inner()) }
} else {
unsafe { Helper::static_cast(column) }
};
let x = &values[1];
let x = Helper::check_get_scalar::<f64>(x).context(error::InvalidInputTypeSnafu {
err_msg: "expecting \"SCIPYSTATSNORMPDF\" function's second argument to be a positive integer",
})?;
let first = x.get(0);
ensure!(!first.is_null(), InvalidInputColSnafu);
let first = match first {
Value::Float64(OrderedFloat(v)) => v,
// unreachable because we have checked `first` is not null and is i64 above
_ => unreachable!(),
};
if let Some(x) = self.x {
ensure!(x == first, InvalidInputColSnafu);
} else {
self.x = Some(first);
};
(0..len).for_each(|_| {
for v in column.iter_data().flatten() {
self.push(v);
}
});
Ok(())
}
fn merge_batch(&mut self, states: &[VectorRef]) -> Result<()> {
if states.is_empty() {
return Ok(());
}
ensure!(
states.len() == 2,
BadAccumulatorImplSnafu {
err_msg: "expect 2 states in `merge_batch`",
}
);
let x = &states[1];
let x = x
.as_any()
.downcast_ref::<Float64Vector>()
.with_context(|| DowncastVectorSnafu {
err_msg: format!(
"expect Float64Vector, got vector type {}",
x.vector_type_name()
),
})?;
let x = x.get(0);
if x.is_null() {
return Ok(());
}
let x = match x {
Value::Float64(OrderedFloat(x)) => x,
_ => unreachable!(),
};
self.x = Some(x);
let values = &states[0];
let values = values
.as_any()
.downcast_ref::<ListVector>()
.with_context(|| DowncastVectorSnafu {
err_msg: format!(
"expect ListVector, got vector type {}",
values.vector_type_name()
),
})?;
for value in values.values_iter() {
if let Some(value) = value.context(FromScalarValueSnafu)? {
let column: &<T as Scalar>::VectorType = unsafe { Helper::static_cast(&value) };
for v in column.iter_data().flatten() {
self.push(v);
}
}
}
Ok(())
}
fn evaluate(&self) -> Result<Value> {
let mean = self.values.iter().map(|v| v.into_native().as_()).mean();
let std_dev = self.values.iter().map(|v| v.into_native().as_()).std_dev();
if mean.is_nan() || std_dev.is_nan() {
Ok(Value::Null)
} else {
let x = if let Some(x) = self.x {
x
} else {
return Ok(Value::Null);
};
let n = Normal::new(mean, std_dev).context(GenerateFunctionSnafu)?;
Ok(n.pdf(x).into())
}
}
}
#[as_aggr_func_creator]
#[derive(Debug, Default, AggrFuncTypeStore)]
pub struct ScipyStatsNormPdfAccumulatorCreator {}
impl AggregateFunctionCreator for ScipyStatsNormPdfAccumulatorCreator {
fn creator(&self) -> AccumulatorCreatorFunction {
let creator: AccumulatorCreatorFunction = Arc::new(move |types: &[ConcreteDataType]| {
let input_type = &types[0];
with_match_primitive_type_id!(
input_type.logical_type_id(),
|$S| {
Ok(Box::new(ScipyStatsNormPdf::<<$S as LogicalPrimitiveType>::Wrapper>::default()))
},
{
let err_msg = format!(
"\"SCIPYSTATSNORMpdf\" aggregate function not support data type {:?}",
input_type.logical_type_id(),
);
CreateAccumulatorSnafu { err_msg }.fail()?
}
)
});
creator
}
fn output_type(&self) -> Result<ConcreteDataType> {
let input_types = self.input_types()?;
ensure!(input_types.len() == 2, InvalidInputStateSnafu);
Ok(ConcreteDataType::float64_datatype())
}
fn state_types(&self) -> Result<Vec<ConcreteDataType>> {
let input_types = self.input_types()?;
ensure!(input_types.len() == 2, InvalidInputStateSnafu);
Ok(vec![
ConcreteDataType::list_datatype(input_types[0].clone()),
ConcreteDataType::float64_datatype(),
])
}
}
#[cfg(test)]
mod test {
use datatypes::vectors::{Float64Vector, Int32Vector};
use super::*;
#[test]
fn test_update_batch() {
// test update empty batch, expect not updating anything
let mut scipy_stats_norm_pdf = ScipyStatsNormPdf::<i32>::default();
scipy_stats_norm_pdf.update_batch(&[]).unwrap();
assert!(scipy_stats_norm_pdf.values.is_empty());
assert_eq!(Value::Null, scipy_stats_norm_pdf.evaluate().unwrap());
// test update no null-value batch
let mut scipy_stats_norm_pdf = ScipyStatsNormPdf::<i32>::default();
let v: Vec<VectorRef> = vec![
Arc::new(Int32Vector::from(vec![Some(-1i32), Some(1), Some(2)])),
Arc::new(Float64Vector::from(vec![
Some(2.0_f64),
Some(2.0_f64),
Some(2.0_f64),
])),
];
scipy_stats_norm_pdf.update_batch(&v).unwrap();
assert_eq!(
Value::from(0.17843340219081558),
scipy_stats_norm_pdf.evaluate().unwrap()
);
// test update null-value batch
let mut scipy_stats_norm_pdf = ScipyStatsNormPdf::<i32>::default();
let v: Vec<VectorRef> = vec![
Arc::new(Int32Vector::from(vec![Some(-2i32), None, Some(3), Some(4)])),
Arc::new(Float64Vector::from(vec![
Some(2.0_f64),
None,
Some(2.0_f64),
Some(2.0_f64),
])),
];
scipy_stats_norm_pdf.update_batch(&v).unwrap();
assert_eq!(
Value::from(0.12343972049858312),
scipy_stats_norm_pdf.evaluate().unwrap()
);
}
}

View File

@@ -58,7 +58,7 @@ impl Function for DateAddFunction {
)
}
fn eval(&self, _func_ctx: FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
ensure!(
columns.len() == 2,
InvalidFuncArgsSnafu {
@@ -146,7 +146,7 @@ mod tests {
let time_vector = TimestampSecondVector::from(times.clone());
let interval_vector = IntervalDayTimeVector::from_vec(intervals);
let args: Vec<VectorRef> = vec![Arc::new(time_vector), Arc::new(interval_vector)];
let vector = f.eval(FunctionContext::default(), &args).unwrap();
let vector = f.eval(&FunctionContext::default(), &args).unwrap();
assert_eq!(4, vector.len());
for (i, _t) in times.iter().enumerate() {
@@ -178,7 +178,7 @@ mod tests {
let date_vector = DateVector::from(dates.clone());
let interval_vector = IntervalYearMonthVector::from_vec(intervals);
let args: Vec<VectorRef> = vec![Arc::new(date_vector), Arc::new(interval_vector)];
let vector = f.eval(FunctionContext::default(), &args).unwrap();
let vector = f.eval(&FunctionContext::default(), &args).unwrap();
assert_eq!(4, vector.len());
for (i, _t) in dates.iter().enumerate() {

View File

@@ -43,7 +43,6 @@ impl Function for DateFormatFunction {
helper::one_of_sigs2(
vec![
ConcreteDataType::date_datatype(),
ConcreteDataType::datetime_datatype(),
ConcreteDataType::timestamp_second_datatype(),
ConcreteDataType::timestamp_millisecond_datatype(),
ConcreteDataType::timestamp_microsecond_datatype(),
@@ -53,7 +52,7 @@ impl Function for DateFormatFunction {
)
}
fn eval(&self, func_ctx: FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
fn eval(&self, func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
ensure!(
columns.len() == 2,
InvalidFuncArgsSnafu {
@@ -105,22 +104,6 @@ impl Function for DateFormatFunction {
results.push(result.as_deref());
}
}
ConcreteDataType::DateTime(_) => {
for i in 0..size {
let datetime = left.get(i).as_datetime();
let format = formats.get(i).as_string();
let result = match (datetime, format) {
(Some(datetime), Some(fmt)) => datetime
.as_formatted_string(&fmt, Some(&func_ctx.query_ctx.timezone()))
.map_err(BoxedError::new)
.context(error::ExecuteSnafu)?,
_ => None,
};
results.push(result.as_deref());
}
}
_ => {
return UnsupportedInputDataTypeSnafu {
function: NAME,
@@ -147,7 +130,7 @@ mod tests {
use common_query::prelude::{TypeSignature, Volatility};
use datatypes::prelude::{ConcreteDataType, ScalarVector};
use datatypes::value::Value;
use datatypes::vectors::{DateTimeVector, DateVector, StringVector, TimestampSecondVector};
use datatypes::vectors::{DateVector, StringVector, TimestampSecondVector};
use super::{DateFormatFunction, *};
@@ -169,16 +152,11 @@ mod tests {
ConcreteDataType::string_datatype(),
f.return_type(&[ConcreteDataType::date_datatype()]).unwrap()
);
assert_eq!(
ConcreteDataType::string_datatype(),
f.return_type(&[ConcreteDataType::datetime_datatype()])
.unwrap()
);
assert!(matches!(f.signature(),
Signature {
type_signature: TypeSignature::OneOf(sigs),
volatility: Volatility::Immutable
} if sigs.len() == 6));
} if sigs.len() == 5));
}
#[test]
@@ -202,7 +180,7 @@ mod tests {
let time_vector = TimestampSecondVector::from(times.clone());
let interval_vector = StringVector::from_vec(formats);
let args: Vec<VectorRef> = vec![Arc::new(time_vector), Arc::new(interval_vector)];
let vector = f.eval(FunctionContext::default(), &args).unwrap();
let vector = f.eval(&FunctionContext::default(), &args).unwrap();
assert_eq!(4, vector.len());
for (i, _t) in times.iter().enumerate() {
@@ -243,48 +221,7 @@ mod tests {
let date_vector = DateVector::from(dates.clone());
let interval_vector = StringVector::from_vec(formats);
let args: Vec<VectorRef> = vec![Arc::new(date_vector), Arc::new(interval_vector)];
let vector = f.eval(FunctionContext::default(), &args).unwrap();
assert_eq!(4, vector.len());
for (i, _t) in dates.iter().enumerate() {
let v = vector.get(i);
let result = results.get(i).unwrap();
if result.is_none() {
assert_eq!(Value::Null, v);
continue;
}
match v {
Value::String(s) => {
assert_eq!(s.as_utf8(), result.unwrap());
}
_ => unreachable!(),
}
}
}
#[test]
fn test_datetime_date_format() {
let f = DateFormatFunction;
let dates = vec![Some(123), None, Some(42), None];
let formats = vec![
"%Y-%m-%d %T.%3f",
"%Y-%m-%d %T.%3f",
"%Y-%m-%d %T.%3f",
"%Y-%m-%d %T.%3f",
];
let results = [
Some("1970-01-01 00:00:00.123"),
None,
Some("1970-01-01 00:00:00.042"),
None,
];
let date_vector = DateTimeVector::from(dates.clone());
let interval_vector = StringVector::from_vec(formats);
let args: Vec<VectorRef> = vec![Arc::new(date_vector), Arc::new(interval_vector)];
let vector = f.eval(FunctionContext::default(), &args).unwrap();
let vector = f.eval(&FunctionContext::default(), &args).unwrap();
assert_eq!(4, vector.len());
for (i, _t) in dates.iter().enumerate() {

View File

@@ -58,7 +58,7 @@ impl Function for DateSubFunction {
)
}
fn eval(&self, _func_ctx: FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
ensure!(
columns.len() == 2,
InvalidFuncArgsSnafu {
@@ -118,11 +118,6 @@ mod tests {
ConcreteDataType::date_datatype(),
f.return_type(&[ConcreteDataType::date_datatype()]).unwrap()
);
assert_eq!(
ConcreteDataType::datetime_datatype(),
f.return_type(&[ConcreteDataType::datetime_datatype()])
.unwrap()
);
assert!(
matches!(f.signature(),
Signature {
@@ -151,7 +146,7 @@ mod tests {
let time_vector = TimestampSecondVector::from(times.clone());
let interval_vector = IntervalDayTimeVector::from_vec(intervals);
let args: Vec<VectorRef> = vec![Arc::new(time_vector), Arc::new(interval_vector)];
let vector = f.eval(FunctionContext::default(), &args).unwrap();
let vector = f.eval(&FunctionContext::default(), &args).unwrap();
assert_eq!(4, vector.len());
for (i, _t) in times.iter().enumerate() {
@@ -189,7 +184,7 @@ mod tests {
let date_vector = DateVector::from(dates.clone());
let interval_vector = IntervalYearMonthVector::from_vec(intervals);
let args: Vec<VectorRef> = vec![Arc::new(date_vector), Arc::new(interval_vector)];
let vector = f.eval(FunctionContext::default(), &args).unwrap();
let vector = f.eval(&FunctionContext::default(), &args).unwrap();
assert_eq!(4, vector.len());
for (i, _t) in dates.iter().enumerate() {

View File

@@ -55,7 +55,7 @@ impl Function for IsNullFunction {
fn eval(
&self,
_func_ctx: FunctionContext,
_func_ctx: &FunctionContext,
columns: &[VectorRef],
) -> common_query::error::Result<VectorRef> {
ensure!(
@@ -102,7 +102,7 @@ mod tests {
let values = vec![None, Some(3.0), None];
let args: Vec<VectorRef> = vec![Arc::new(Float32Vector::from(values))];
let vector = is_null.eval(FunctionContext::default(), &args).unwrap();
let vector = is_null.eval(&FunctionContext::default(), &args).unwrap();
let expect: VectorRef = Arc::new(BooleanVector::from_vec(vec![true, false, true]));
assert_eq!(expect, vector);
}

View File

@@ -118,7 +118,7 @@ impl Function for GeohashFunction {
Signature::one_of(signatures, Volatility::Stable)
}
fn eval(&self, _func_ctx: FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
ensure!(
columns.len() == 3,
InvalidFuncArgsSnafu {
@@ -218,7 +218,7 @@ impl Function for GeohashNeighboursFunction {
Signature::one_of(signatures, Volatility::Stable)
}
fn eval(&self, _func_ctx: FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
ensure!(
columns.len() == 3,
InvalidFuncArgsSnafu {

Some files were not shown because too many files have changed in this diff Show More