Files
greptimedb/Cargo.toml
Lei, HUANG 8685ceb232 feat: impl bulk memtable and bridge bulk inserts (#6054)
* feat/bridge-bulk-insert:
 ## Implement Bulk Insert and Update Dependencies

 - **Bulk Insert Implementation**: Added `handle_bulk_inserts` method in `src/operator/src/bulk_insert.rs` to manage bulk insert requests using `FlightDecoder` and `FlightData`.
 - **Dependency Updates**: Updated `Cargo.lock` and `Cargo.toml` to use the latest revision of `greptime-proto` and added new dependencies like `arrow`, `arrow-ipc`, `bytes`, and `prost`.
 - **gRPC Enhancements**: Modified `put_record_batch` method in `src/frontend/src/instance/grpc.rs` and `src/servers/src/grpc/flight.rs` to handle `FlightData` instead of `RawRecordBatch`.
 - **Error Handling**: Added new error types in `src/operator/src/error.rs` for handling Arrow operations and decoding flight data.
 - **Miscellaneous**: Updated `src/operator/src/insert.rs` to expose `partition_manager` and `node_manager` as public fields.

* feat/bridge-bulk-insert:
 - **Update `greptime-proto` Dependency**: Updated the `greptime-proto` dependency to a new revision in `Cargo.lock` and `Cargo.toml`.
 - **Refactor gRPC Query Handling**: Removed `RawRecordBatch` usage from `grpc.rs`, `flight.rs`, `greptime_handler.rs`, and test files, simplifying the gRPC query handling.
 - **Enhance Bulk Insert Logic**: Improved bulk insert logic in `bulk_insert.rs` and `region_request.rs` by using `FlightDecoder` and `BooleanArray` for better performance and clarity.
 - **Add `common-grpc` Dependency**: Added `common-grpc` as a workspace dependency in `store-api/Cargo.toml` to support gRPC functionalities.

* fix: clippy

* fix schema serialization

* feat/bridge-bulk-insert:
 Add error handling for encoding/decoding in `metadata.rs` and `region_request.rs`

 - Introduced new error variants `FlightCodec` and `Prost` in `MetadataError` to handle encoding/decoding failures in `metadata.rs`.
 - Updated `make_region_bulk_inserts` function in `region_request.rs` to use `context` for error handling with `ProstSnafu` and `FlightCodecSnafu`.
 - Enhanced error handling for `FlightData` decoding and `filter_record_batch` operations.

* fix: test

* refactor: rename

* allow empty app_metadata in FlightData

* feat/bridge-bulk-insert:
 - **Remove Logging**: Removed unnecessary logging of affected rows in `region_server.rs`.
 - **Error Handling Enhancement**: Improved error handling in `bulk_insert.rs` by adding context to `split_record_batch` and handling single datanode fast path.
 - **Error Enum Cleanup**: Removed unused `Arrow` error variant from `error.rs`.

* fix: standalone test

* feat/bridge-bulk-insert:
 ### Enhance Bulk Insert Handling and Metadata Management

 - **`lib.rs`**: Enabled the `result_flattening` feature for improved error handling.
 - **`request.rs`**: Made `name_to_index` and `has_null` fields public in `WriteRequest` for better accessibility.
 - **`handle_bulk_insert.rs`**:
   - Added `handle_record_batch` function to streamline processing of bulk insert payloads.
   - Improved error handling and task management for bulk insert operations.
   - Updated `region_metadata_to_column_schema` to return both column schemas and a name-to-index map for efficient data access.

* feat/bridge-bulk-insert:
 - **Refactor `handle_bulk_insert.rs`:**
   - Replaced `handle_record_batch` with `handle_payload` for handling payloads.
   - Modified the fast path to use `common_runtime::spawn_global` for asynchronous task execution.

 - **Optimize `multi_dim.rs`:**
   - Added a fast path for single-region scenarios in `MultiDimPartitionRule::partition_record_batch`.

* feat/bridge-bulk-insert:
 - **Update `greptime-proto` Dependency**: Updated the `greptime-proto` dependency to a new revision in both `Cargo.lock` and `Cargo.toml`.
 - **Optimize Memory Allocation**: Increased initial and builder capacities in `time_series.rs` to improve performance.
 - **Enhance Data Handling**: Modified `bulk_insert.rs` to use `Bytes` for efficient data handling.
 - **Improve Bulk Insert Logic**: Refined the bulk insert logic in `region_request.rs` to handle schema and payload data more effectively and optimize record batch filtering.
 - **String Handling Improvement**: Updated string conversion in `helper.rs` for better performance.

* fix: clippy warnings

* feat/bridge-bulk-insert:
 **Add Metrics and Improve Error Handling**

 - **Metrics Enhancements**: Introduced new metrics for bulk insert operations in `metrics.rs`, `bulk_insert.rs`, `greptime_handler.rs`, and `region_request.rs`. Added `HANDLE_BULK_INSERT_ELAPSED`, `BULK_REQUEST_MESSAGE_SIZE`, and `GRPC_BULK_INSERT_ELAPSED` histograms to
 monitor performance.
 - **Error Handling Improvements**: Removed unnecessary error handling in `handle_bulk_insert.rs` by eliminating redundant `let _ =` patterns.
 - **Dependency Updates**: Added `lazy_static` and `prometheus` to `Cargo.lock` and `Cargo.toml` for metrics support.
 - **Code Refactoring**: Simplified function calls in `region_server.rs` and `handle_bulk_insert.rs` for better readability.

* chore: rebase main

* implement simple bulk memtable

* impl write_bulk

* implement simple bulk memtable

* feat/simple-bulk-memtable:
 ### Enhance Time-Series Memtable and Bulk Insert Handling

 - **Visibility Modifications**: Made `mutable_array` in `PrimitiveVectorBuilder` and `StringVectorBuilder` public in `primitive.rs` and `string.rs`.
 - **New Module**: Added `builder.rs` to `memtable` for time-series builders, including `FieldBuilder` and `StringBuilder` implementations.
 - **Bulk Insert Enhancements**:
   - Added `sequence` field to `BulkPart` in `part.rs` and updated its handling in `simple_bulk_memtable.rs` and `region_write_ctx.rs`.
   - Introduced metrics for bulk insert operations in `metrics.rs` and `bulk_insert.rs`.
 - **Performance Metrics**: Added timing metrics for write operations in `metrics.rs`, `region_write_ctx.rs`, and `handle_write.rs`.
 - **Region Request Handling**: Updated `make_region_bulk_inserts` in `region_request.rs` to include performance metrics.

* feat/simple-bulk-memtable:
 **Improve Memtable Stats Calculation and Add Metrics Timer**

 - **`simple_bulk_memtable.rs`**: Refactored `stats` method to use `num_rows` for checking if rows have been written, improving accuracy in memory table statistics.
 - **`handle_bulk_insert.rs`**: Introduced a metrics timer to measure the elapsed time for processing bulk requests, enhancing performance monitoring.

* feat/simple-bulk-memtable:
 ### Commit Message

 **Enhancements and Bug Fixes**

 - **Dependency Update**: Updated `greptime-proto` dependency to a new revision in `Cargo.lock` and `Cargo.toml`.
 - **Feature Addition**: Implemented `to_mutation` method in `BulkPart` to convert `BulkPart` to `Mutation` for fallback `write_bulk` implementation in `src/mito2/src/memtable/bulk/part.rs`.
 - **Functionality Improvement**: Modified `write_bulk` method in `TimeSeriesMemtable` to support default implementation fallback to row iteration in `src/mito2/src/memtable/time_series.rs`.
 - **Performance Optimization**: Enhanced `bulk_insert` handling by optimizing region request processing and data partitioning in `src/operator/src/bulk_insert.rs`.
 - **Error Handling**: Added `ComputeArrow` error variant for better error management in `src/operator/src/error.rs`.
 - **Code Refactoring**: Simplified region bulk insert request processing in `src/store-api/src/region_request.rs`.

* fix: some clippy warnings

* feat/simple-bulk-memtable:
 ### Commit Summary

 - **Refactor Return Types to `Result`:**
   Updated the return type of the `ranges` method in `memtable.rs`, `bulk.rs`, `partition_tree.rs`, `simple_bulk_memtable.rs`, `time_series.rs`, and `memtable_util.rs` to return `Result<MemtableRanges>` for better error handling.

 - **Enhance Metrics Tracking:**
   Improved metrics tracking by adding `num_rows` and `max_sequence` to `WriteMetrics` in `stats.rs`. Updated related methods in `partition_tree.rs`, `simple_bulk_memtable.rs`, `time_series.rs`, and `scan_region.rs` to utilize these metrics.

 - **Remove Unused Imports:**
   Cleaned up unused imports in `time_series.rs` to streamline the codebase.

* merge main

* remove useless error vairant

* use newer version of proto

* feat/simple-bulk-memtable:
                                                                                                                                 Commit Message

                                                                                                                                     Summary

Enhance FieldBuilder and StringBuilder functionality, add tests, and improve error handling.

                                                                                                                                   Key Changes

 • builder.rs:
    • Added documentation for FieldBuilder methods.
    • Renamed append_string_vector to append_vector in StringBuilder.
 • simple_bulk_memtable.rs:
    • Added new test cases for write_one, write_bulk, is_empty, stats, fork, and sequence_filter.
 • time_series.rs:
    • Improved error handling in ValueBuilder for type mismatches.
 • memtable_util.rs:
    • Removed unused imports and streamlined code.

These changes enhance the robustness and test coverage of the memtable components.

* feat/simple-bulk-memtable:
 Improve Time Partition Matching Logic in `time_partition.rs`

 - Enhanced the `write_bulk` method in `time_partition.rs` to improve the logic for matching partitions based on time ranges.
 - Introduced a new mechanism to filter and select partitions that overlap with the record batch's timestamp range before writing.

* feat/simple-bulk-memtable:
 Improve Metrics Handling in `bulk_insert.rs`

 - Removed the `group_request_timer` and its associated metric observation to streamline the timing logic.
 - Moved the `BULK_REQUEST_ROWS` metric observation to occur after filtering, ensuring accurate row count metrics.

* feat/simple-bulk-memtable:
 **Enhance Stalled Requests Calculation and Update Metrics**

 - **`worker.rs`**: Updated the `stalled_count` method to include both `reqs` and `bulk_reqs` in the calculation of stalled requests.
 - **`bulk_insert.rs`**: Removed duplicate observation of `BULK_REQUEST_MESSAGE_SIZE` metric.
 - **`metrics.rs`**: Changed the bucket strategy for `BULK_REQUEST_ROWS` from linear to exponential, improving the granularity of metrics collection.

* feat/simple-bulk-memtable:
 **Refactor `StringVector` Usage and Update Method Signatures**

 - **`src/datatypes/src/vectors/string.rs`**: Changed `StringVector`'s `array` field from public to private.
 - **`src/mito2/src/memtable/builder.rs`**: Refactored `append_vector` method to `append_array`, updating its usage to work directly with `StringArray` instead of `StringVector`.
 - **`src/mito2/src/memtable/time_series.rs`**: Updated `ValueBuilder` to handle `StringArray` directly, replacing `StringVector` usage with `StringArray` in the `FieldBuilder::String` case.

* feat/simple-bulk-memtable:
 - **Refactor `PrimitiveVectorBuilder`**: Made `mutable_array` private in `src/datatypes/src/vectors/primitive.rs`.
 - **Optimize `ValueBuilder`**: Replaced `UInt64VectorBuilder` and `UInt8VectorBuilder` with `Vec<u64>` and `Vec<u8>` for `sequence` and `op_type` in `src/mito2/src/memtable/time_series.rs`.
 - **Improve Metrics Initialization**: Updated histogram bucket initialization to use `exponential_buckets` in `src/mito2/src/metrics.rs`.

* feat/simple-bulk-memtable:
 Improve error handling in `simple_bulk_memtable.rs` and `time_series.rs`

 - Enhanced error handling by using `OptionExt` for more concise error context management in `simple_bulk_memtable.rs` and `time_series.rs`.
 - Replaced `ok_or` with `with_context` to streamline error context creation in both files.

* feat/simple-bulk-memtable:
 **Enhance Time Partition Handling in `time_partition.rs`**

 - Introduced `create_time_partition` function to streamline the creation of new time partitions, ensuring thread safety by acquiring a lock.
 - Modified logic to handle cases where no matching time partitions exist, creating new partitions as needed.
 - Updated `write_record_batch` and `write_one` methods to utilize the new partition creation logic, improving partition management and data writing efficiency.

* replace proto

* feat/simple-bulk-memtable:
 Update `metrics.rs` to adjust the range of exponential buckets for bulk insert message rows from `10 ~ 1_000_000` to `10 ~ 100_000`.
2025-05-09 02:56:09 +00:00

314 lines
11 KiB
TOML

[workspace]
members = [
"src/api",
"src/auth",
"src/cache",
"src/catalog",
"src/cli",
"src/client",
"src/cmd",
"src/common/base",
"src/common/catalog",
"src/common/config",
"src/common/datasource",
"src/common/decimal",
"src/common/error",
"src/common/frontend",
"src/common/function",
"src/common/greptimedb-telemetry",
"src/common/grpc",
"src/common/grpc-expr",
"src/common/macro",
"src/common/mem-prof",
"src/common/meta",
"src/common/options",
"src/common/plugins",
"src/common/pprof",
"src/common/procedure",
"src/common/procedure-test",
"src/common/query",
"src/common/recordbatch",
"src/common/runtime",
"src/common/session",
"src/common/substrait",
"src/common/telemetry",
"src/common/test-util",
"src/common/time",
"src/common/version",
"src/common/wal",
"src/datanode",
"src/datatypes",
"src/file-engine",
"src/flow",
"src/frontend",
"src/index",
"src/log-query",
"src/log-store",
"src/meta-client",
"src/meta-srv",
"src/metric-engine",
"src/mito2",
"src/object-store",
"src/operator",
"src/partition",
"src/pipeline",
"src/plugins",
"src/promql",
"src/puffin",
"src/query",
"src/servers",
"src/session",
"src/sql",
"src/store-api",
"src/table",
"tests-fuzz",
"tests-integration",
"tests/runner",
]
resolver = "2"
[workspace.package]
version = "0.15.0"
edition = "2021"
license = "Apache-2.0"
[workspace.lints]
clippy.dbg_macro = "warn"
clippy.implicit_clone = "warn"
clippy.result_large_err = "allow"
clippy.large_enum_variant = "allow"
clippy.doc_overindented_list_items = "allow"
rust.unknown_lints = "deny"
rust.unexpected_cfgs = { level = "warn", check-cfg = ['cfg(tokio_unstable)'] }
[workspace.dependencies]
# DO_NOT_REMOVE_THIS: BEGIN_OF_EXTERNAL_DEPENDENCIES
# We turn off default-features for some dependencies here so the workspaces which inherit them can
# selectively turn them on if needed, since we can override default-features = true (from false)
# for the inherited dependency but cannot do the reverse (override from true to false).
#
# See for more detaiils: https://github.com/rust-lang/cargo/issues/11329
ahash = { version = "0.8", features = ["compile-time-rng"] }
aquamarine = "0.6"
arrow = { version = "54.2", features = ["prettyprint"] }
arrow-array = { version = "54.2", default-features = false, features = ["chrono-tz"] }
arrow-flight = "54.2"
arrow-ipc = { version = "54.2", default-features = false, features = ["lz4", "zstd"] }
arrow-schema = { version = "54.2", features = ["serde"] }
async-stream = "0.3"
async-trait = "0.1"
# Remember to update axum-extra, axum-macros when updating axum
axum = "0.8"
axum-extra = "0.10"
axum-macros = "0.5"
backon = "1"
base64 = "0.22"
bigdecimal = "0.4.2"
bitflags = "2.4.1"
bytemuck = "1.12"
bytes = { version = "1.7", features = ["serde"] }
chrono = { version = "0.4", features = ["serde"] }
chrono-tz = "0.10.1"
clap = { version = "4.4", features = ["derive"] }
config = "0.13.0"
crossbeam-utils = "0.8"
dashmap = "6.1"
datafusion = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
datafusion-common = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
datafusion-expr = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
datafusion-functions = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
datafusion-optimizer = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
datafusion-physical-expr = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
datafusion-physical-plan = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
datafusion-sql = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "e104c7cf62b11dd5fe41461b82514978234326b4" }
deadpool = "0.12"
deadpool-postgres = "0.14"
derive_builder = "0.20"
dotenv = "0.15"
etcd-client = "0.14"
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "17a3550751c8b1e02ec16be40101d5f24dc255c3" }
hex = "0.4"
http = "1"
humantime = "2.1"
humantime-serde = "1.1"
hyper = "1.1"
hyper-util = "0.1"
itertools = "0.14"
jsonb = { git = "https://github.com/databendlabs/jsonb.git", rev = "8c8d2fc294a39f3ff08909d60f718639cfba3875", default-features = false }
lazy_static = "1.4"
local-ip-address = "0.6"
loki-proto = { git = "https://github.com/GreptimeTeam/loki-proto.git", rev = "1434ecf23a2654025d86188fb5205e7a74b225d3" }
meter-core = { git = "https://github.com/GreptimeTeam/greptime-meter.git", rev = "5618e779cf2bb4755b499c630fba4c35e91898cb" }
mockall = "0.13"
moka = "0.12"
nalgebra = "0.33"
notify = "8.0"
num_cpus = "1.16"
object_store_opendal = "0.50"
once_cell = "1.18"
opentelemetry-proto = { version = "0.27", features = [
"gen-tonic",
"metrics",
"trace",
"with-serde",
"logs",
] }
parking_lot = "0.12"
parquet = { version = "54.2", default-features = false, features = ["arrow", "async", "object_store"] }
paste = "1.0"
pin-project = "1.0"
prometheus = { version = "0.13.3", features = ["process"] }
promql-parser = { version = "0.5.1", features = ["ser"] }
prost = { version = "0.13", features = ["no-recursion-limit"] }
raft-engine = { version = "0.4.1", default-features = false }
rand = "0.9"
ratelimit = "0.10"
regex = "1.8"
regex-automata = "0.4"
reqwest = { version = "0.12", default-features = false, features = [
"json",
"rustls-tls-native-roots",
"stream",
"multipart",
] }
rskafka = { git = "https://github.com/influxdata/rskafka.git", rev = "75535b5ad9bae4a5dbb582c82e44dfd81ec10105", features = [
"transport-tls",
] }
rstest = "0.25"
rstest_reuse = "0.7"
rust_decimal = "1.33"
rustc-hash = "2.0"
# It is worth noting that we should try to avoid using aws-lc-rs until it can be compiled on various platforms.
rustls = { version = "0.23.25", default-features = false }
serde = { version = "1.0", features = ["derive"] }
serde_json = { version = "1.0", features = ["float_roundtrip"] }
serde_with = "3"
shadow-rs = "1.1"
simd-json = "0.15"
similar-asserts = "1.6.0"
smallvec = { version = "1", features = ["serde"] }
snafu = "0.8"
sqlparser = { git = "https://github.com/GreptimeTeam/sqlparser-rs.git", rev = "0cf6c04490d59435ee965edd2078e8855bd8471e", features = [
"visitor",
"serde",
] } # branch = "v0.54.x"
sqlx = { version = "0.8", features = [
"runtime-tokio-rustls",
"mysql",
"postgres",
"chrono",
] }
strum = { version = "0.27", features = ["derive"] }
sysinfo = "0.33"
tempfile = "3"
tokio = { version = "1.40", features = ["full"] }
tokio-postgres = "0.7"
tokio-rustls = { version = "0.26.2", default-features = false }
tokio-stream = "0.1"
tokio-util = { version = "0.7", features = ["io-util", "compat"] }
toml = "0.8.8"
tonic = { version = "0.12", features = ["tls", "gzip", "zstd"] }
tower = "0.5"
tracing-appender = "0.2"
tracing-subscriber = { version = "0.3", features = ["env-filter", "json", "fmt"] }
typetag = "0.2"
uuid = { version = "1.7", features = ["serde", "v4", "fast-rng"] }
zstd = "0.13"
# DO_NOT_REMOVE_THIS: END_OF_EXTERNAL_DEPENDENCIES
## workspaces members
api = { path = "src/api" }
auth = { path = "src/auth" }
cache = { path = "src/cache" }
catalog = { path = "src/catalog" }
cli = { path = "src/cli" }
client = { path = "src/client" }
cmd = { path = "src/cmd", default-features = false }
common-base = { path = "src/common/base" }
common-catalog = { path = "src/common/catalog" }
common-config = { path = "src/common/config" }
common-datasource = { path = "src/common/datasource" }
common-decimal = { path = "src/common/decimal" }
common-error = { path = "src/common/error" }
common-frontend = { path = "src/common/frontend" }
common-function = { path = "src/common/function" }
common-greptimedb-telemetry = { path = "src/common/greptimedb-telemetry" }
common-grpc = { path = "src/common/grpc" }
common-grpc-expr = { path = "src/common/grpc-expr" }
common-macro = { path = "src/common/macro" }
common-mem-prof = { path = "src/common/mem-prof" }
common-meta = { path = "src/common/meta" }
common-options = { path = "src/common/options" }
common-plugins = { path = "src/common/plugins" }
common-pprof = { path = "src/common/pprof" }
common-procedure = { path = "src/common/procedure" }
common-procedure-test = { path = "src/common/procedure-test" }
common-query = { path = "src/common/query" }
common-recordbatch = { path = "src/common/recordbatch" }
common-runtime = { path = "src/common/runtime" }
common-session = { path = "src/common/session" }
common-telemetry = { path = "src/common/telemetry" }
common-test-util = { path = "src/common/test-util" }
common-time = { path = "src/common/time" }
common-version = { path = "src/common/version" }
common-wal = { path = "src/common/wal" }
datanode = { path = "src/datanode" }
datatypes = { path = "src/datatypes" }
file-engine = { path = "src/file-engine" }
flow = { path = "src/flow" }
frontend = { path = "src/frontend", default-features = false }
index = { path = "src/index" }
log-query = { path = "src/log-query" }
log-store = { path = "src/log-store" }
meta-client = { path = "src/meta-client" }
meta-srv = { path = "src/meta-srv" }
metric-engine = { path = "src/metric-engine" }
mito2 = { path = "src/mito2" }
object-store = { path = "src/object-store" }
operator = { path = "src/operator" }
otel-arrow-rust = { git = "https://github.com/open-telemetry/otel-arrow", rev = "5d551412d2a12e689cde4d84c14ef29e36784e51", features = [
"server",
] }
partition = { path = "src/partition" }
pipeline = { path = "src/pipeline" }
plugins = { path = "src/plugins" }
promql = { path = "src/promql" }
puffin = { path = "src/puffin" }
query = { path = "src/query" }
servers = { path = "src/servers" }
session = { path = "src/session" }
sql = { path = "src/sql" }
store-api = { path = "src/store-api" }
substrait = { path = "src/common/substrait" }
table = { path = "src/table" }
[workspace.dependencies.meter-macros]
git = "https://github.com/GreptimeTeam/greptime-meter.git"
rev = "5618e779cf2bb4755b499c630fba4c35e91898cb"
[profile.release]
debug = 1
[profile.nightly]
inherits = "release"
strip = "debuginfo"
lto = "thin"
debug = false
incremental = false
[profile.ci]
inherits = "dev"
strip = true
[profile.dev.package.sqlness-runner]
debug = false
strip = true
[profile.dev.package.tests-fuzz]
debug = false
strip = true