Compare commits

...

22 Commits

Author SHA1 Message Date
Lei, HUANG
97e9b97a57 feat/bulk-support-flow-batch:
Update `greptime-proto` Dependency

 - Updated the `greptime-proto` dependency to a new revision in both `Cargo.lock` and `Cargo.toml`.
   - `Cargo.lock`: Changed the source revision from `f0913f179ee1d2ce428f8b85a9ea12b5f69ad636` to `17971523673f4fbc982510d3c9d6647ff642e16f`.
   - `Cargo.toml`: Updated the `greptime-proto` git revision to `17971523673f4fbc982510d3c9d6647ff642e16f`.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
2025-06-16 06:28:12 +00:00
Lei, HUANG
7fc74e2928 feat/bulk-support-flow-batch:
## Update `handle_mark_window_dirty` Method in `flownode_impl.rs`

 - Replaced `unimplemented!()` with `unreachable!()` in the `handle_mark_window_dirty` method for both `FlowDualEngine` and `StreamingEngine` implementations in `flownode_impl.rs`.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
2025-06-16 06:22:25 +00:00
Lei, HUANG
77f20ede7a feat/bulk-support-flow-batch:
**Refactor and Enhance Timestamp Handling in `bulk_insert.rs`**

 - **Refactored Timestamp Extraction**: Moved timestamp extraction logic to a new method `maybe_update_flow_dirty_window` to improve code readability and maintainability.
 - **Enhanced Flow Update Logic**: Updated the flow dirty window update mechanism to conditionally notify flownodes only if they are configured, using `table_info` and `record_batch`.
 - **Imports Adjusted**: Updated imports to reflect changes in table metadata handling, replacing `TableId` with `TableInfoRef`.

 Files affected:
 - `src/operator/src/bulk_insert.rs`

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
2025-06-16 06:22:24 +00:00
Lei, HUANG
ced018fce0 feat/bulk-support-flow-batch:
### Update Metrics in Batching Mode Engine

 - **Modified Metrics**: Replaced `METRIC_FLOW_BATCHING_ENGINE_BULK_MARK_TIME_WINDOW_RANGE` with `METRIC_FLOW_BATCHING_ENGINE_BULK_MARK_TIME_WINDOW` to track the count of time windows instead of their range.
   - Files affected: `engine.rs`, `metrics.rs`
 - **New Method**: Added `len()` method to `DirtyTimeWindows` to return the number of dirty windows.
   - File affected: `state.rs`

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
2025-06-16 06:22:24 +00:00
Lei, HUANG
41dacff283 feat/bulk-support-flow-batch:
**Refactor Timestamp Handling and Update Dependencies**

 - **Dependency Update**: Updated `greptime-proto` dependency in `Cargo.lock` and `Cargo.toml` to a new revision.
 - **Batching Engine Refactor**: Modified `src/flow/src/batching_mode/engine.rs` to replace `dirty_time_ranges` with `timestamps` for improved timestamp handling.
 - **Bulk Insert Refactor**: Updated `src/operator/src/bulk_insert.rs` to refactor timestamp extraction and handling. Replaced `compute_timestamp_range` with `extract_timestamps` and adjusted related logic to handle timestamps directly.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
2025-06-16 06:22:22 +00:00
discord9
94a14b6da7 metrics: more useful metrics batching mode 2025-06-16 06:22:06 +00:00
discord9
6ad3a32cb2 feat: metrics 2025-06-16 06:22:06 +00:00
discord9
ac00314578 feat: mark dirty time window 2025-06-16 06:22:05 +00:00
Lei, HUANG
2f08bee08f feat/bulk-support-flow-batch:
### Add Dirty Window Handling in Flow Module

 - **Updated `greptime-proto` Dependency**: Updated the `greptime-proto` dependency to a new revision in `Cargo.lock` and `Cargo.toml`.
 - **Flow Module Enhancements**:
   - Added `DirtyWindowRequest` handling in `flow.rs`, `node_manager.rs`, `test_util.rs`, `flownode_impl.rs`, and `server.rs`.
   - Implemented `handle_mark_window_dirty` function to manage dirty time windows.
 - **Bulk Insert Enhancements**:
   - Modified `bulk_insert.rs` to notify flownodes about dirty time windows using `update_flow_dirty_window`.
 - **Removed Unused Imports**: Cleaned up unused imports in `greptime_handler.rs`, `grpc.rs`, and `mod.rs`.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
2025-06-16 06:22:03 +00:00
Lei, HUANG
8ebb31cdcd feat/bulk-support-flow-batch:
### Refactor and Enhance Timestamp Handling in gRPC and Bulk Insert

 - **Refactor Table Handling**:
   - Updated `put_record_batch` method to use `TableRef` instead of `TableId` in `grpc.rs`, `greptime_handler.rs`, and `grpc.rs`.
   - Modified `handle_bulk_insert` to accept `TableRef` and extract `TableId` internally in `bulk_insert.rs`.

 - **Enhance Timestamp Processing**:
   - Added `compute_timestamp_range` function to calculate timestamp range in `bulk_insert.rs`.
   - Introduced error handling for invalid time index types in `error.rs`.

 - **Test Adjustments**:
   - Updated `DummyInstance` implementation in `tests/mod.rs` to align with new method signatures.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
2025-06-16 06:21:32 +00:00
localhost
f4f8d65a39 fix: event api content type only check type and subtype (#6317)
* fix: event api content type only check type and subtype

Signed-off-by: paomian <xpaomian@gmail.com>

* chore: make clippy happy

Signed-off-by: paomian <xpaomian@gmail.com>

---------

Signed-off-by: paomian <xpaomian@gmail.com>
2025-06-13 18:50:05 +00:00
Lei, HUANG
b31990e881 chore: add connection info to QueryContext (#6319)
chore/add-conn-info-to-query-ctx:
 ### Add Connection Information to Query Context

 - **`src/frontend/src/instance.rs`**: Updated to use `query_ctx.conn_info().to_string()` for connection information instead of a placeholder string.
 - **`src/session/src/context.rs`**: Introduced `conn_info` field in `QueryContext` and added a method `conn_info()` to retrieve it. Updated `QueryContextBuilder` to handle `conn_info`.
 - **`src/session/src/lib.rs`**: Modified `Session` to include `conn_info` in the query context building process.

 These changes enhance the query context by incorporating connection information, allowing for more detailed session management.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
2025-06-13 18:42:13 +00:00
Lei, HUANG
6da633e70d feat: support killing process (#6309)
* feat/kill-process:
 ### Add Cancellation Support and Enhance Process Management

 - **Cancellation Handle Implementation**: Introduced `CancellationHandle` in `cancellation_handle.rs` to facilitate cancellation of futures and streams.
 - **Process Management Enhancements**:
   - Updated `ProcessManager` in `process_manager.rs` to support cancellable processes using `CancellableProcess`.
   - Added `kill_process` method for terminating processes.
 - **Stream Wrapper Update**:
   - Replaced `StreamWrapper` with `CancellableStreamWrapper` in `stream_wrapper.rs` and `instance.rs` to handle stream cancellation.
 - **Error Handling**:
   - Added `StreamCancelled` error variant in `error.rs` to handle stream cancellation scenarios.
 - **gRPC Handler Update**:
   - Added `kill_process` gRPC method in `frontend_grpc_handler.rs` to allow external process termination.
 - **Dependency Updates**:
   - Updated `Cargo.lock` and `Cargo.toml` to include `common-base` and `tokio-util`.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* feat/kill-process:
 **Enhancements and Bug Fixes**

 - **Dependency Update**: Updated `greptime-proto` dependency in `Cargo.lock` and `Cargo.toml` to a new revision.
 - **Error Handling Improvements**:
   - Modified error variants in `src/catalog/src/error.rs` and `src/common/frontend/src/error.rs` to improve error messages and handling.
   - Added `FrontendNotFound` error variant for better error specificity.
 - **Process Management Enhancements**:
   - Updated `ProcessManager` in `src/catalog/src/process_manager.rs` to include `kill_process` functionality with server address validation.
   - Enhanced `FrontendClient` trait in `src/common/frontend/src/selector.rs` to support `kill_process` requests.
 - **gRPC Handler Update**:
   - Refactored `FrontendGrpcHandler` in `src/servers/src/grpc/frontend_grpc_handler.rs` to handle `kill_process` requests asynchronously and return process status.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* feat/kill-process:
 ### Add Kill Process Functionality

 - **`Cargo.lock`, `Cargo.toml`**: Added `common-frontend` as a dependency.
 - **`server.rs`, `builder.rs`, `instance.rs`**: Updated `FrontendInvoker` and `FrontendBuilder` to support process management.
 - **`error.rs`**: Introduced `InvalidProcessId` error for handling invalid process IDs.
 - **`statement.rs`, `kill.rs`**: Implemented `execute_kill` method in `StatementExecutor` to handle the `KILL` statement.
 - **`parser.rs`, `statement.rs`**: Updated SQL parser to recognize and parse the `KILL` statement.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* feat/kill-process:
 ## Add Cancellation Support to Query Execution

 - **`process_manager.rs`**: Updated `CancellationHandle` initialization to use `default()` method.
 - **`cancellation_handle.rs`**: Implemented `Debug` trait for `CancellationHandle` and added `Cancellation` and `CancellableFuture` structs to support cancellable futures.
 - **`error.rs`**: Introduced `Cancelled` error variant to handle query cancellations.
 - **`instance.rs`**: Integrated `CancellableFuture` to manage query execution with cancellation support.
 - **`stream_wrapper.rs`**: Modified `CancellableStreamWrapper` to use the new `waker()` method for cancellation handling.
 - **`statement.rs`**: Added `#[allow(clippy::too_many_arguments)]` to `StatementExecutor::new` to suppress clippy warnings.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* feat/kill-process:
 - **Add `MetaClientMissing` Error**: Introduced a new error variant `MetaClientMissing` in `error.rs` to handle missing meta client scenarios.
 - **Refactor Cancellation Handling**: Merged `cancellation_handle.rs` into `cancellation.rs` and updated related logic in `process_manager.rs`, `instance.rs`, and `stream_wrapper.rs`.
 - **Enhance Process Management**: Improved process management logic in `process_manager.rs` to handle process cancellation more effectively.
 - **Update Tests**: Added and updated tests in `cancellation.rs` and `stream_wrapper.rs` to cover new cancellation logic and error handling.
 - **Cargo.toml Update**: Adjusted workspace settings in `Cargo.toml` for `common-frontend`.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* feat/kill-process:
 - **Add Tests for Process Management**: Introduced multiple async tests in `process_manager.rs` to verify query registration, deregistration, cancellation, and process killing functionalities.
 - **Update Error Message in SQL Parser**: Modified the expected error message in `parser.rs` to clarify the expected token as a "process id string literal".

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* feat/kill-process:
 ### Add Process Count Metrics to Catalog

 - **`metrics.rs`**: Introduced a new metric `PROCESS_LIST_COUNT` to track the count of running processes per catalog using `IntGaugeVec`.
 - **`process_manager.rs`**: Updated `CancellableProcess` to increment and decrement `PROCESS_LIST_COUNT` upon creation and destruction, respectively. Added a `Drop` implementation for `CancellableProcess` to handle metric updates.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* feat/kill-process:
 ### Fix process removal logic in `process_manager.rs`

 - Corrected the condition for removing an entry from the catalog in `ProcessManager` by using `o.get()` instead of `o.get_mut()`.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* feat/kill-process:
 - **Error Handling Improvements**:
   - Updated status codes for `Error::FrontendNotFound` and `Error::MetaClientMissing` to `StatusCode::Unexpected` in `src/catalog/src/error.rs`.
   - Changed `InvokeFrontend` error display message and status code in `src/common/frontend/src/error.rs`.
   - Added `ProcessManagerMissing` error in `src/operator/src/error.rs` and updated its handling in `src/operator/src/statement/kill.rs`.

 - **Process Management Enhancements**:
   - Added documentation for `ProcessManager` and `register_query` in `src/catalog/src/process_manager.rs`.
   - Modified `kill_process` response handling in `src/servers/src/grpc/frontend_grpc_handler.rs`.

 - **Cancellation Logic Update**:
   - Improved cancellation logic in `src/common/base/src/cancellation.rs` to use `compare_exchange` for atomic operations.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* feat/kill-process:
 ### Add Process Kill Count Metric and Refactor Cancellation Handle

 - **Metrics Update**: Added a new metric `PROCESS_KILL_COUNT` in `metrics.rs` to track the count of completed kill process requests per catalog.
 - **Refactor Cancellation Handle**: Renamed `cancellation_handler` to `cancellation_handle` across multiple files for consistency:
   - `process_manager.rs`
   - `instance.rs`
   - `stream_wrapper.rs`
 - **Process Management**: Updated process management logic in `process_manager.rs` to increment the `PROCESS_KILL_COUNT` metric upon successful process termination.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* feat/kill-process:
 Update metric description in `metrics.rs`

 - Changed the description of `PROCESS_KILL_COUNT` to reflect the count of killed processes instead of running processes in `metrics.rs`.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* feat/kill-process:
 Update `greptime-proto` Dependency and Fix Response Field

 - **Updated Dependency**: Changed the `greptime-proto` Git revision in `Cargo.lock` and `Cargo.toml` to `f0913f1`.
 - **Code Fix**: Modified `frontend_grpc_handler.rs` to correct the response field from `found` to `success` in `KillProcessResponse`.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

---------

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
2025-06-13 13:30:25 +00:00
zyy17
9633e794c7 fix: always use linux path style in windows platform unit tests (#6314)
Signed-off-by: zyy17 <zyylsxm@gmail.com>
2025-06-13 07:15:53 +00:00
Yingwen
eaf1e1198f refactor: Extract mito codec part into a new crate (#6307)
* chore: add a new crate mito-codec

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: port necessary mods for primary key codec

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor: use codec utils in mito-codec

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor: remove unused mods

Signed-off-by: evenyag <realevenyag@gmail.com>

* style: fix clippy

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor: remove Partition::is_partition_column()

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor: remove duplicated test utils

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: remove unused comment

Signed-off-by: evenyag <realevenyag@gmail.com>

* fix: fix is_partition_column check

Signed-off-by: evenyag <realevenyag@gmail.com>

---------

Signed-off-by: evenyag <realevenyag@gmail.com>
2025-06-13 07:14:29 +00:00
ZonaHe
505bf25505 feat: update dashboard to v0.9.3 (#6311)
Co-authored-by: ZonaHex <ZonaHex@users.noreply.github.com>
2025-06-13 07:13:12 +00:00
Ning Sun
f1b29ece3c feat: process id for session, query context and postgres (#6301)
* feat: process id for session, query context and postgres

Signed-off-by: Ning Sun <sunning@greptime.com>

* feat: add sql functions to retrieve connection/process id

Signed-off-by: Ning Sun <sunning@greptime.com>

---------

Signed-off-by: Ning Sun <sunning@greptime.com>
2025-06-12 16:53:57 +00:00
discord9
74df12e8c0 fix: check for zero parallelism (#6310)
* fix: check for zero parallelism

Signed-off-by: discord9 <discord9@163.com>

* chore: silently use default value

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: discord9 <discord9@163.com>
2025-06-12 15:58:59 +00:00
discord9
be6a5d2da8 feat: parallelism hint in grpc (#6306)
* feat: parallelism hint in grpc

Signed-off-by: discord9 <discord9@163.com>

* chore: per review

Signed-off-by: discord9 <discord9@163.com>

* chore: comment

Signed-off-by: discord9 <discord9@163.com>

* chore:docs

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: discord9 <discord9@163.com>
2025-06-12 10:12:45 +00:00
Ruihang Xia
7468a8ab2a feat: organize EXPLAIN ANALYZE VERBOSE's output in JSON format (#6308)
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2025-06-12 09:55:53 +00:00
Lei, HUANG
5bb0466ff2 feat: introduce file group in compaction (#6261)
* fix/file-group-in-compaction:
 ### Enhance Compaction Logic with File Grouping

 - **`run.rs`**: Introduced `FileGroup` struct to manage groups of `FileHandle` objects, allowing for more efficient compaction operations. Updated `Ranged` and `Item` trait implementations to work with `FileGroup`.
 - **`test_util.rs`**: Added `new_file_handle_with_sequence` function to support file handles with sequence numbers, enhancing test utilities.
 - **`twcs.rs`**: Modified `TwcsPicker` to utilize `FileGroup` for managing files within windows, improving compaction logic. Updated `Window` struct to use `HashMap` for storing `FileGroup` objects.
 - **`version_util.rs`**: Updated version control utilities to handle sequence numbers in file metadata, aligning with new compaction logic.

Signed-off-by: Lei, HUANG <lhuang@greptime.com>

* fix/file-group-in-compaction:
 ### Add Test for File Group Assignment in TWCS

 - **Enhancements in `twcs.rs`:**
   - Added a new test `test_assign_file_groups_to_windows` to verify the correct assignment of file groups to windows.
   - Enhanced `test_assign_compacting_to_windows` with a new case to ensure files with overlapping time ranges and the same sequence are treated as one `FileGroup`.

Signed-off-by: Lei, HUANG <lhuang@greptime.com>

* fix/file-group-in-compaction:
 **Enhance Compaction Task Documentation and Initialization**

 - **`run.rs`**: Added documentation for `FileGroup` to clarify its role in representing a group of files created by the same compaction task.
 - **`twcs.rs`**: Introduced comments in the `Window` struct to explain the mapping of file sequences to file groups, indicating files created from the same compaction task. Simplified the initialization of the `files` hashmap using `HashMap::from`.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

---------

Signed-off-by: Lei, HUANG <lhuang@greptime.com>
Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
2025-06-12 09:33:40 +00:00
Ruihang Xia
f6db419afd feat: support using expressions as literal in PromQL (#6297)
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2025-06-12 08:18:10 +00:00
134 changed files with 2725 additions and 644 deletions

33
Cargo.lock generated
View File

@@ -1621,6 +1621,7 @@ dependencies = [
"cache",
"catalog",
"chrono",
"common-base",
"common-catalog",
"common-error",
"common-frontend",
@@ -4755,6 +4756,7 @@ dependencies = [
"substrait 0.15.0",
"table",
"tokio",
"tokio-util",
"toml 0.8.19",
"tonic 0.12.3",
"tower 0.5.2",
@@ -5141,7 +5143,7 @@ dependencies = [
[[package]]
name = "greptime-proto"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=5f6119ac7952878d39dcde0343c4bf828d18ffc8#5f6119ac7952878d39dcde0343c4bf828d18ffc8"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=17971523673f4fbc982510d3c9d6647ff642e16f#17971523673f4fbc982510d3c9d6647ff642e16f"
dependencies = [
"prost 0.13.5",
"serde",
@@ -7248,6 +7250,7 @@ dependencies = [
"humantime-serde",
"itertools 0.14.0",
"lazy_static",
"mito-codec",
"mito2",
"mur3",
"object-store",
@@ -7313,6 +7316,29 @@ dependencies = [
"windows-sys 0.52.0",
]
[[package]]
name = "mito-codec"
version = "0.15.0"
dependencies = [
"api",
"bytes",
"common-base",
"common-decimal",
"common-error",
"common-macro",
"common-recordbatch",
"common-telemetry",
"common-time",
"datafusion-common",
"datafusion-expr",
"datatypes",
"memcomparable",
"paste",
"serde",
"snafu 0.8.5",
"store-api",
]
[[package]]
name = "mito2"
version = "0.15.0"
@@ -7355,6 +7381,7 @@ dependencies = [
"lazy_static",
"log-store",
"memcomparable",
"mito-codec",
"moka",
"object-store",
"parquet",
@@ -8394,6 +8421,7 @@ dependencies = [
"common-catalog",
"common-datasource",
"common-error",
"common-frontend",
"common-function",
"common-grpc",
"common-grpc-expr",
@@ -8894,8 +8922,7 @@ dependencies = [
[[package]]
name = "pgwire"
version = "0.30.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4ca6c26b25be998208a13ff2f0c55b567363f34675410e6d6f1c513a150583fd"
source = "git+https://github.com/sunng87/pgwire?rev=127573d997228cfb70c7699881c568eae8131270#127573d997228cfb70c7699881c568eae8131270"
dependencies = [
"async-trait",
"bytes",

View File

@@ -49,6 +49,7 @@ members = [
"src/meta-client",
"src/meta-srv",
"src/metric-engine",
"src/mito-codec",
"src/mito2",
"src/object-store",
"src/operator",
@@ -133,7 +134,7 @@ etcd-client = "0.14"
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "5f6119ac7952878d39dcde0343c4bf828d18ffc8" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "17971523673f4fbc982510d3c9d6647ff642e16f" }
hex = "0.4"
http = "1"
humantime = "2.1"
@@ -274,6 +275,7 @@ log-store = { path = "src/log-store" }
meta-client = { path = "src/meta-client" }
meta-srv = { path = "src/meta-srv" }
metric-engine = { path = "src/metric-engine" }
mito-codec = { path = "src/mito-codec" }
mito2 = { path = "src/mito2" }
object-store = { path = "src/object-store" }
operator = { path = "src/operator" }

View File

@@ -17,6 +17,7 @@ arrow-schema.workspace = true
async-stream.workspace = true
async-trait.workspace = true
bytes.workspace = true
common-base.workspace = true
common-catalog.workspace = true
common-error.workspace = true
common-frontend.workspace = true

View File

@@ -278,12 +278,25 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to list frontend nodes"))]
ListProcess {
#[snafu(display("Failed to invoke frontend services"))]
InvokeFrontend {
source: common_frontend::error::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Meta client is not provided"))]
MetaClientMissing {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to find frontend node: {}", addr))]
FrontendNotFound {
addr: String,
#[snafu(implicit)]
location: Location,
},
}
impl Error {
@@ -352,7 +365,10 @@ impl ErrorExt for Error {
Error::GetViewCache { source, .. } | Error::GetTableCache { source, .. } => {
source.status_code()
}
Error::ListProcess { source, .. } => source.status_code(),
Error::InvokeFrontend { source, .. } => source.status_code(),
Error::FrontendNotFound { .. } | Error::MetaClientMissing { .. } => {
StatusCode::Unexpected
}
}
}

View File

@@ -34,4 +34,20 @@ lazy_static! {
register_histogram!("greptime_catalog_kv_get", "catalog kv get").unwrap();
pub static ref METRIC_CATALOG_KV_BATCH_GET: Histogram =
register_histogram!("greptime_catalog_kv_batch_get", "catalog kv batch get").unwrap();
/// Count of running process in each catalog.
pub static ref PROCESS_LIST_COUNT: IntGaugeVec = register_int_gauge_vec!(
"greptime_process_list_count",
"Running process count per catalog",
&["catalog"]
)
.unwrap();
/// Count of killed process in each catalog.
pub static ref PROCESS_KILL_COUNT: IntCounterVec = register_int_counter_vec!(
"greptime_process_kill_count",
"Completed kill process requests count",
&["catalog"]
)
.unwrap();
}

View File

@@ -14,24 +14,32 @@
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::fmt::{Debug, Formatter};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, RwLock};
use api::v1::frontend::{ListProcessRequest, ProcessInfo};
use api::v1::frontend::{KillProcessRequest, ListProcessRequest, ProcessInfo};
use common_base::cancellation::CancellationHandle;
use common_frontend::selector::{FrontendSelector, MetaClientSelector};
use common_telemetry::{debug, info};
use common_time::util::current_time_millis;
use meta_client::MetaClientRef;
use snafu::ResultExt;
use snafu::{ensure, OptionExt, ResultExt};
use crate::error;
use crate::metrics::{PROCESS_KILL_COUNT, PROCESS_LIST_COUNT};
pub type ProcessManagerRef = Arc<ProcessManager>;
/// Query process manager.
pub struct ProcessManager {
/// Local frontend server address,
server_addr: String,
/// Next process id for local queries.
next_id: AtomicU64,
catalogs: RwLock<HashMap<String, HashMap<u64, ProcessInfo>>>,
/// Running process per catalog.
catalogs: RwLock<HashMap<String, HashMap<u64, CancellableProcess>>>,
/// Frontend selector to locate frontend nodes.
frontend_selector: Option<MetaClientSelector>,
}
@@ -50,6 +58,7 @@ impl ProcessManager {
impl ProcessManager {
/// Registers a submitted query. Use the provided id if present.
#[must_use]
pub fn register_query(
self: &Arc<Self>,
catalog: String,
@@ -68,16 +77,21 @@ impl ProcessManager {
client,
frontend: self.server_addr.clone(),
};
let cancellation_handle = Arc::new(CancellationHandle::default());
let cancellable_process = CancellableProcess::new(cancellation_handle.clone(), process);
self.catalogs
.write()
.unwrap()
.entry(catalog.clone())
.or_default()
.insert(id, process);
.insert(id, cancellable_process);
Ticket {
catalog,
manager: self.clone(),
id,
cancellation_handle,
}
}
@@ -91,30 +105,25 @@ impl ProcessManager {
if let Entry::Occupied(mut o) = self.catalogs.write().unwrap().entry(catalog) {
let process = o.get_mut().remove(&id);
debug!("Deregister process: {:?}", process);
if o.get_mut().is_empty() {
if o.get().is_empty() {
o.remove();
}
}
}
pub fn deregister_all_queries(&self) {
self.catalogs.write().unwrap().clear();
info!("All queries on {} has been deregistered", self.server_addr);
}
/// List local running processes in given catalog.
pub fn local_processes(&self, catalog: Option<&str>) -> error::Result<Vec<ProcessInfo>> {
let catalogs = self.catalogs.read().unwrap();
let result = if let Some(catalog) = catalog {
if let Some(catalogs) = catalogs.get(catalog) {
catalogs.values().cloned().collect()
catalogs.values().map(|p| p.process.clone()).collect()
} else {
vec![]
}
} else {
catalogs
.values()
.flat_map(|v| v.values().cloned())
.flat_map(|v| v.values().map(|p| p.process.clone()))
.collect()
};
Ok(result)
@@ -129,14 +138,14 @@ impl ProcessManager {
let frontends = remote_frontend_selector
.select(|node| node.peer.addr != self.server_addr)
.await
.context(error::ListProcessSnafu)?;
.context(error::InvokeFrontendSnafu)?;
for mut f in frontends {
processes.extend(
f.list_process(ListProcessRequest {
catalog: catalog.unwrap_or_default().to_string(),
})
.await
.context(error::ListProcessSnafu)?
.context(error::InvokeFrontendSnafu)?
.processes,
);
}
@@ -144,12 +153,64 @@ impl ProcessManager {
processes.extend(self.local_processes(catalog)?);
Ok(processes)
}
/// Kills query with provided catalog and id.
pub async fn kill_process(
&self,
server_addr: String,
catalog: String,
id: u64,
) -> error::Result<bool> {
if server_addr == self.server_addr {
if let Some(catalogs) = self.catalogs.write().unwrap().get_mut(&catalog) {
if let Some(process) = catalogs.remove(&id) {
process.handle.cancel();
info!(
"Killed process, catalog: {}, id: {:?}",
process.process.catalog, process.process.id
);
PROCESS_KILL_COUNT.with_label_values(&[&catalog]).inc();
Ok(true)
} else {
debug!("Failed to kill process, id not found: {}", id);
Ok(false)
}
} else {
debug!("Failed to kill process, catalog not found: {}", catalog);
Ok(false)
}
} else {
let mut nodes = self
.frontend_selector
.as_ref()
.context(error::MetaClientMissingSnafu)?
.select(|node| node.peer.addr == server_addr)
.await
.context(error::InvokeFrontendSnafu)?;
ensure!(
!nodes.is_empty(),
error::FrontendNotFoundSnafu { addr: server_addr }
);
let request = KillProcessRequest {
server_addr,
catalog,
process_id: id,
};
nodes[0]
.kill_process(request)
.await
.context(error::InvokeFrontendSnafu)?;
Ok(true)
}
}
}
pub struct Ticket {
pub(crate) catalog: String,
pub(crate) manager: ProcessManagerRef,
pub(crate) id: u64,
pub cancellation_handle: Arc<CancellationHandle>,
}
impl Drop for Ticket {
@@ -159,6 +220,37 @@ impl Drop for Ticket {
}
}
struct CancellableProcess {
handle: Arc<CancellationHandle>,
process: ProcessInfo,
}
impl Drop for CancellableProcess {
fn drop(&mut self) {
PROCESS_LIST_COUNT
.with_label_values(&[&self.process.catalog])
.dec();
}
}
impl CancellableProcess {
fn new(handle: Arc<CancellationHandle>, process: ProcessInfo) -> Self {
PROCESS_LIST_COUNT
.with_label_values(&[&process.catalog])
.inc();
Self { handle, process }
}
}
impl Debug for CancellableProcess {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("CancellableProcess")
.field("cancelled", &self.handle.is_cancelled())
.field("process", &self.process)
.finish()
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
@@ -185,4 +277,212 @@ mod tests {
drop(ticket);
assert_eq!(process_manager.local_processes(None).unwrap().len(), 0);
}
#[tokio::test]
async fn test_register_query_with_custom_id() {
let process_manager = Arc::new(ProcessManager::new("127.0.0.1:8000".to_string(), None));
let custom_id = 12345;
let ticket = process_manager.clone().register_query(
"public".to_string(),
vec!["test".to_string()],
"SELECT * FROM table".to_string(),
"client1".to_string(),
Some(custom_id),
);
assert_eq!(ticket.id, custom_id);
let running_processes = process_manager.local_processes(None).unwrap();
assert_eq!(running_processes.len(), 1);
assert_eq!(running_processes[0].id, custom_id);
assert_eq!(&running_processes[0].client, "client1");
}
#[tokio::test]
async fn test_multiple_queries_same_catalog() {
let process_manager = Arc::new(ProcessManager::new("127.0.0.1:8000".to_string(), None));
let ticket1 = process_manager.clone().register_query(
"public".to_string(),
vec!["schema1".to_string()],
"SELECT * FROM table1".to_string(),
"client1".to_string(),
None,
);
let ticket2 = process_manager.clone().register_query(
"public".to_string(),
vec!["schema2".to_string()],
"SELECT * FROM table2".to_string(),
"client2".to_string(),
None,
);
let running_processes = process_manager.local_processes(Some("public")).unwrap();
assert_eq!(running_processes.len(), 2);
// Verify both processes are present
let ids: Vec<u64> = running_processes.iter().map(|p| p.id).collect();
assert!(ids.contains(&ticket1.id));
assert!(ids.contains(&ticket2.id));
}
#[tokio::test]
async fn test_multiple_catalogs() {
let process_manager = Arc::new(ProcessManager::new("127.0.0.1:8000".to_string(), None));
let _ticket1 = process_manager.clone().register_query(
"catalog1".to_string(),
vec!["schema1".to_string()],
"SELECT * FROM table1".to_string(),
"client1".to_string(),
None,
);
let _ticket2 = process_manager.clone().register_query(
"catalog2".to_string(),
vec!["schema2".to_string()],
"SELECT * FROM table2".to_string(),
"client2".to_string(),
None,
);
// Test listing processes for specific catalog
let catalog1_processes = process_manager.local_processes(Some("catalog1")).unwrap();
assert_eq!(catalog1_processes.len(), 1);
assert_eq!(&catalog1_processes[0].catalog, "catalog1");
let catalog2_processes = process_manager.local_processes(Some("catalog2")).unwrap();
assert_eq!(catalog2_processes.len(), 1);
assert_eq!(&catalog2_processes[0].catalog, "catalog2");
// Test listing all processes
let all_processes = process_manager.local_processes(None).unwrap();
assert_eq!(all_processes.len(), 2);
}
#[tokio::test]
async fn test_deregister_query() {
let process_manager = Arc::new(ProcessManager::new("127.0.0.1:8000".to_string(), None));
let ticket = process_manager.clone().register_query(
"public".to_string(),
vec!["test".to_string()],
"SELECT * FROM table".to_string(),
"client1".to_string(),
None,
);
assert_eq!(process_manager.local_processes(None).unwrap().len(), 1);
process_manager.deregister_query("public".to_string(), ticket.id);
assert_eq!(process_manager.local_processes(None).unwrap().len(), 0);
}
#[tokio::test]
async fn test_cancellation_handle() {
let process_manager = Arc::new(ProcessManager::new("127.0.0.1:8000".to_string(), None));
let ticket = process_manager.clone().register_query(
"public".to_string(),
vec!["test".to_string()],
"SELECT * FROM table".to_string(),
"client1".to_string(),
None,
);
assert!(!ticket.cancellation_handle.is_cancelled());
ticket.cancellation_handle.cancel();
assert!(ticket.cancellation_handle.is_cancelled());
}
#[tokio::test]
async fn test_kill_local_process() {
let process_manager = Arc::new(ProcessManager::new("127.0.0.1:8000".to_string(), None));
let ticket = process_manager.clone().register_query(
"public".to_string(),
vec!["test".to_string()],
"SELECT * FROM table".to_string(),
"client1".to_string(),
None,
);
assert!(!ticket.cancellation_handle.is_cancelled());
let killed = process_manager
.kill_process(
"127.0.0.1:8000".to_string(),
"public".to_string(),
ticket.id,
)
.await
.unwrap();
assert!(killed);
assert_eq!(process_manager.local_processes(None).unwrap().len(), 0);
}
#[tokio::test]
async fn test_kill_nonexistent_process() {
let process_manager = Arc::new(ProcessManager::new("127.0.0.1:8000".to_string(), None));
let killed = process_manager
.kill_process("127.0.0.1:8000".to_string(), "public".to_string(), 999)
.await
.unwrap();
assert!(!killed);
}
#[tokio::test]
async fn test_kill_process_nonexistent_catalog() {
let process_manager = Arc::new(ProcessManager::new("127.0.0.1:8000".to_string(), None));
let killed = process_manager
.kill_process("127.0.0.1:8000".to_string(), "nonexistent".to_string(), 1)
.await
.unwrap();
assert!(!killed);
}
#[tokio::test]
async fn test_process_info_fields() {
let process_manager = Arc::new(ProcessManager::new("127.0.0.1:8000".to_string(), None));
let _ticket = process_manager.clone().register_query(
"test_catalog".to_string(),
vec!["schema1".to_string(), "schema2".to_string()],
"SELECT COUNT(*) FROM users WHERE age > 18".to_string(),
"test_client".to_string(),
Some(42),
);
let processes = process_manager.local_processes(None).unwrap();
assert_eq!(processes.len(), 1);
let process = &processes[0];
assert_eq!(process.id, 42);
assert_eq!(&process.catalog, "test_catalog");
assert_eq!(process.schemas, vec!["schema1", "schema2"]);
assert_eq!(&process.query, "SELECT COUNT(*) FROM users WHERE age > 18");
assert_eq!(&process.client, "test_client");
assert_eq!(&process.frontend, "127.0.0.1:8000");
assert!(process.start_timestamp > 0);
}
#[tokio::test]
async fn test_ticket_drop_deregisters_process() {
let process_manager = Arc::new(ProcessManager::new("127.0.0.1:8000".to_string(), None));
{
let _ticket = process_manager.clone().register_query(
"public".to_string(),
vec!["test".to_string()],
"SELECT * FROM table".to_string(),
"client1".to_string(),
None,
);
// Process should be registered
assert_eq!(process_manager.local_processes(None).unwrap().len(), 1);
} // ticket goes out of scope here
// Process should be automatically deregistered
assert_eq!(process_manager.local_processes(None).unwrap().len(), 0);
}
}

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::flow::{FlowRequest, FlowResponse};
use api::v1::flow::{DirtyWindowRequest, DirtyWindowRequests, FlowRequest, FlowResponse};
use api::v1::region::InsertRequests;
use common_error::ext::BoxedError;
use common_meta::node_manager::Flownode;
@@ -44,6 +44,16 @@ impl Flownode for FlowRequester {
.map_err(BoxedError::new)
.context(common_meta::error::ExternalSnafu)
}
async fn handle_mark_window_dirty(
&self,
req: DirtyWindowRequest,
) -> common_meta::error::Result<FlowResponse> {
self.handle_mark_window_dirty(req)
.await
.map_err(BoxedError::new)
.context(common_meta::error::ExternalSnafu)
}
}
impl FlowRequester {
@@ -91,4 +101,20 @@ impl FlowRequester {
.into_inner();
Ok(response)
}
async fn handle_mark_window_dirty(&self, req: DirtyWindowRequest) -> Result<FlowResponse> {
let (addr, mut client) = self.client.raw_flow_client()?;
let response = client
.handle_mark_dirty_time_window(DirtyWindowRequests {
requests: vec![req],
})
.await
.or_else(|e| {
let code = e.code();
let err: crate::error::Error = e.into();
Err(BoxedError::new(err)).context(FlowServerSnafu { addr, code })
})?
.into_inner();
Ok(response)
}
}

View File

@@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::path::Path;
use std::time::Duration;
use cmd::options::GreptimeOptions;
@@ -58,12 +57,7 @@ fn test_load_datanode_example_config() {
metadata_cache_tti: Duration::from_secs(300),
}),
wal: DatanodeWalConfig::RaftEngine(RaftEngineConfig {
dir: Some(
Path::new(DEFAULT_DATA_HOME)
.join(WAL_DIR)
.to_string_lossy()
.to_string(),
),
dir: Some(format!("{}/{}", DEFAULT_DATA_HOME, WAL_DIR)),
sync_period: Some(Duration::from_secs(10)),
recovery_parallelism: 2,
..Default::default()
@@ -86,10 +80,7 @@ fn test_load_datanode_example_config() {
],
logging: LoggingOptions {
level: Some("info".to_string()),
dir: Path::new(DEFAULT_DATA_HOME)
.join(DEFAULT_LOGGING_DIR)
.to_string_lossy()
.to_string(),
dir: format!("{}/{}", DEFAULT_DATA_HOME, DEFAULT_LOGGING_DIR),
otlp_endpoint: Some(DEFAULT_OTLP_ENDPOINT.to_string()),
tracing_sample_ratio: Some(Default::default()),
..Default::default()
@@ -132,10 +123,7 @@ fn test_load_frontend_example_config() {
}),
logging: LoggingOptions {
level: Some("info".to_string()),
dir: Path::new(DEFAULT_DATA_HOME)
.join(DEFAULT_LOGGING_DIR)
.to_string_lossy()
.to_string(),
dir: format!("{}/{}", DEFAULT_DATA_HOME, DEFAULT_LOGGING_DIR),
otlp_endpoint: Some(DEFAULT_OTLP_ENDPOINT.to_string()),
tracing_sample_ratio: Some(Default::default()),
..Default::default()
@@ -182,10 +170,7 @@ fn test_load_metasrv_example_config() {
..Default::default()
},
logging: LoggingOptions {
dir: Path::new(DEFAULT_DATA_HOME)
.join(DEFAULT_LOGGING_DIR)
.to_string_lossy()
.to_string(),
dir: format!("{}/{}", DEFAULT_DATA_HOME, DEFAULT_LOGGING_DIR),
level: Some("info".to_string()),
otlp_endpoint: Some(DEFAULT_OTLP_ENDPOINT.to_string()),
tracing_sample_ratio: Some(Default::default()),
@@ -220,12 +205,7 @@ fn test_load_standalone_example_config() {
component: StandaloneOptions {
default_timezone: Some("UTC".to_string()),
wal: DatanodeWalConfig::RaftEngine(RaftEngineConfig {
dir: Some(
Path::new(DEFAULT_DATA_HOME)
.join(WAL_DIR)
.to_string_lossy()
.to_string(),
),
dir: Some(format!("{}/{}", DEFAULT_DATA_HOME, WAL_DIR)),
sync_period: Some(Duration::from_secs(10)),
recovery_parallelism: 2,
..Default::default()
@@ -248,10 +228,7 @@ fn test_load_standalone_example_config() {
},
logging: LoggingOptions {
level: Some("info".to_string()),
dir: Path::new(DEFAULT_DATA_HOME)
.join(DEFAULT_LOGGING_DIR)
.to_string_lossy()
.to_string(),
dir: format!("{}/{}", DEFAULT_DATA_HOME, DEFAULT_LOGGING_DIR),
otlp_endpoint: Some(DEFAULT_OTLP_ENDPOINT.to_string()),
tracing_sample_ratio: Some(Default::default()),
..Default::default()

View File

@@ -0,0 +1,240 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! [CancellationHandle] is used to compose with manual implementation of [futures::future::Future]
//! or [futures::stream::Stream] to facilitate cancellation.
//! See example in [frontend::stream_wrapper::CancellableStreamWrapper] and [CancellableFuture].
use std::fmt::{Debug, Display, Formatter};
use std::future::Future;
use std::pin::Pin;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::task::{Context, Poll};
use futures::task::AtomicWaker;
use pin_project::pin_project;
#[derive(Default)]
pub struct CancellationHandle {
waker: AtomicWaker,
cancelled: AtomicBool,
}
impl Debug for CancellationHandle {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("CancellationHandle")
.field("cancelled", &self.is_cancelled())
.finish()
}
}
impl CancellationHandle {
pub fn waker(&self) -> &AtomicWaker {
&self.waker
}
/// Cancels a future or stream.
pub fn cancel(&self) {
if self
.cancelled
.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
.is_ok()
{
self.waker.wake();
}
}
/// Is this handle cancelled.
pub fn is_cancelled(&self) -> bool {
self.cancelled.load(Ordering::Relaxed)
}
}
#[pin_project]
#[derive(Debug, Clone)]
pub struct CancellableFuture<T> {
#[pin]
fut: T,
handle: Arc<CancellationHandle>,
}
impl<T> CancellableFuture<T> {
pub fn new(fut: T, handle: Arc<CancellationHandle>) -> Self {
Self { fut, handle }
}
}
impl<T> Future for CancellableFuture<T>
where
T: Future,
{
type Output = Result<T::Output, Cancelled>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.as_mut().project();
// Check if the task has been aborted
if this.handle.is_cancelled() {
return Poll::Ready(Err(Cancelled));
}
if let Poll::Ready(x) = this.fut.poll(cx) {
return Poll::Ready(Ok(x));
}
this.handle.waker().register(cx.waker());
if this.handle.is_cancelled() {
return Poll::Ready(Err(Cancelled));
}
Poll::Pending
}
}
#[derive(Copy, Clone, Debug)]
pub struct Cancelled;
impl Display for Cancelled {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "Future has been cancelled")
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::time::Duration;
use tokio::time::{sleep, timeout};
use crate::cancellation::{CancellableFuture, CancellationHandle, Cancelled};
#[tokio::test]
async fn test_cancellable_future_completes_normally() {
let handle = Arc::new(CancellationHandle::default());
let future = async { 42 };
let cancellable = CancellableFuture::new(future, handle);
let result = cancellable.await;
assert!(result.is_ok());
assert_eq!(result.unwrap(), 42);
}
#[tokio::test]
async fn test_cancellable_future_cancelled_before_start() {
let handle = Arc::new(CancellationHandle::default());
handle.cancel();
let future = async { 42 };
let cancellable = CancellableFuture::new(future, handle);
let result = cancellable.await;
assert!(result.is_err());
assert!(matches!(result.unwrap_err(), Cancelled));
}
#[tokio::test]
async fn test_cancellable_future_cancelled_during_execution() {
let handle = Arc::new(CancellationHandle::default());
let handle_clone = handle.clone();
// Create a future that sleeps for a long time
let future = async {
sleep(Duration::from_secs(10)).await;
42
};
let cancellable = CancellableFuture::new(future, handle);
// Cancel the future after a short delay
tokio::spawn(async move {
sleep(Duration::from_millis(50)).await;
handle_clone.cancel();
});
let result = cancellable.await;
assert!(result.is_err());
assert!(matches!(result.unwrap_err(), Cancelled));
}
#[tokio::test]
async fn test_cancellable_future_completes_before_cancellation() {
let handle = Arc::new(CancellationHandle::default());
let handle_clone = handle.clone();
// Create a future that completes quickly
let future = async {
sleep(Duration::from_millis(10)).await;
42
};
let cancellable = CancellableFuture::new(future, handle);
// Try to cancel after the future should have completed
tokio::spawn(async move {
sleep(Duration::from_millis(100)).await;
handle_clone.cancel();
});
let result = cancellable.await;
assert!(result.is_ok());
assert_eq!(result.unwrap(), 42);
}
#[tokio::test]
async fn test_cancellation_handle_is_cancelled() {
let handle = CancellationHandle::default();
assert!(!handle.is_cancelled());
handle.cancel();
assert!(handle.is_cancelled());
}
#[tokio::test]
async fn test_multiple_cancellable_futures_with_same_handle() {
let handle = Arc::new(CancellationHandle::default());
let future1 = CancellableFuture::new(async { 1 }, handle.clone());
let future2 = CancellableFuture::new(async { 2 }, handle.clone());
// Cancel before starting
handle.cancel();
let (result1, result2) = tokio::join!(future1, future2);
assert!(result1.is_err());
assert!(result2.is_err());
assert!(matches!(result1.unwrap_err(), Cancelled));
assert!(matches!(result2.unwrap_err(), Cancelled));
}
#[tokio::test]
async fn test_cancellable_future_with_timeout() {
let handle = Arc::new(CancellationHandle::default());
let future = async {
sleep(Duration::from_secs(1)).await;
42
};
let cancellable = CancellableFuture::new(future, handle.clone());
// Use timeout to ensure the test doesn't hang
let result = timeout(Duration::from_millis(100), cancellable).await;
// Should timeout because the future takes 1 second but we timeout after 100ms
assert!(result.is_err());
}
#[tokio::test]
async fn test_cancelled_display() {
let cancelled = Cancelled;
assert_eq!(format!("{}", cancelled), "Future has been cancelled");
}
}

View File

@@ -14,6 +14,7 @@
pub mod bit_vec;
pub mod bytes;
pub mod cancellation;
pub mod plugins;
pub mod range_read;
#[allow(clippy::all)]

View File

@@ -42,8 +42,8 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to invoke list process service"))]
ListProcess {
#[snafu(display("Failed to invoke frontend service"))]
InvokeFrontend {
#[snafu(source)]
error: tonic::Status,
#[snafu(implicit)]
@@ -67,7 +67,7 @@ impl ErrorExt for Error {
External { source, .. } => source.status_code(),
Meta { source, .. } => source.status_code(),
ParseProcessId { .. } => StatusCode::InvalidArguments,
ListProcess { .. } => StatusCode::External,
InvokeFrontend { .. } => StatusCode::Unexpected,
CreateChannel { source, .. } => source.status_code(),
}
}

View File

@@ -16,9 +16,13 @@ use std::time::Duration;
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
use common_meta::cluster::{ClusterInfo, NodeInfo, Role};
use greptime_proto::v1::frontend::{frontend_client, ListProcessRequest, ListProcessResponse};
use greptime_proto::v1::frontend::{
frontend_client, KillProcessRequest, KillProcessResponse, ListProcessRequest,
ListProcessResponse,
};
use meta_client::MetaClientRef;
use snafu::ResultExt;
use tonic::Response;
use crate::error;
use crate::error::{MetaSnafu, Result};
@@ -28,18 +32,28 @@ pub type FrontendClientPtr = Box<dyn FrontendClient>;
#[async_trait::async_trait]
pub trait FrontendClient: Send {
async fn list_process(&mut self, req: ListProcessRequest) -> Result<ListProcessResponse>;
async fn kill_process(&mut self, req: KillProcessRequest) -> Result<KillProcessResponse>;
}
#[async_trait::async_trait]
impl FrontendClient for frontend_client::FrontendClient<tonic::transport::channel::Channel> {
async fn list_process(&mut self, req: ListProcessRequest) -> Result<ListProcessResponse> {
let response: ListProcessResponse = frontend_client::FrontendClient::<
tonic::transport::channel::Channel,
>::list_process(self, req)
frontend_client::FrontendClient::<tonic::transport::channel::Channel>::list_process(
self, req,
)
.await
.context(error::ListProcessSnafu)?
.into_inner();
Ok(response)
.context(error::InvokeFrontendSnafu)
.map(Response::into_inner)
}
async fn kill_process(&mut self, req: KillProcessRequest) -> Result<KillProcessResponse> {
frontend_client::FrontendClient::<tonic::transport::channel::Channel>::kill_process(
self, req,
)
.await
.context(error::InvokeFrontendSnafu)
.map(Response::into_inner)
}
}

View File

@@ -23,7 +23,8 @@ use std::sync::Arc;
use build::BuildFunction;
use database::{
CurrentSchemaFunction, DatabaseFunction, ReadPreferenceFunction, SessionUserFunction,
ConnectionIdFunction, CurrentSchemaFunction, DatabaseFunction, PgBackendPidFunction,
ReadPreferenceFunction, SessionUserFunction,
};
use pg_catalog::PGCatalogFunction;
use procedure_state::ProcedureStateFunction;
@@ -42,6 +43,8 @@ impl SystemFunction {
registry.register_scalar(DatabaseFunction);
registry.register_scalar(SessionUserFunction);
registry.register_scalar(ReadPreferenceFunction);
registry.register_scalar(PgBackendPidFunction);
registry.register_scalar(ConnectionIdFunction);
registry.register_scalar(TimezoneFunction);
registry.register_async(Arc::new(ProcedureStateFunction));
PGCatalogFunction::register(registry);

View File

@@ -18,7 +18,8 @@ use std::sync::Arc;
use common_query::error::Result;
use common_query::prelude::{Signature, Volatility};
use datatypes::prelude::{ConcreteDataType, ScalarVector};
use datatypes::vectors::{StringVector, VectorRef};
use datatypes::vectors::{StringVector, UInt64Vector, VectorRef};
use derive_more::Display;
use crate::function::{Function, FunctionContext};
@@ -32,10 +33,20 @@ pub struct SessionUserFunction;
pub struct ReadPreferenceFunction;
#[derive(Display)]
#[display("{}", self.name())]
pub struct PgBackendPidFunction;
#[derive(Display)]
#[display("{}", self.name())]
pub struct ConnectionIdFunction;
const DATABASE_FUNCTION_NAME: &str = "database";
const CURRENT_SCHEMA_FUNCTION_NAME: &str = "current_schema";
const SESSION_USER_FUNCTION_NAME: &str = "session_user";
const READ_PREFERENCE_FUNCTION_NAME: &str = "read_preference";
const PG_BACKEND_PID: &str = "pg_backend_pid";
const CONNECTION_ID: &str = "connection_id";
impl Function for DatabaseFunction {
fn name(&self) -> &str {
@@ -117,6 +128,46 @@ impl Function for ReadPreferenceFunction {
}
}
impl Function for PgBackendPidFunction {
fn name(&self) -> &str {
PG_BACKEND_PID
}
fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
Ok(ConcreteDataType::uint64_datatype())
}
fn signature(&self) -> Signature {
Signature::nullary(Volatility::Immutable)
}
fn eval(&self, func_ctx: &FunctionContext, _columns: &[VectorRef]) -> Result<VectorRef> {
let pid = func_ctx.query_ctx.process_id();
Ok(Arc::new(UInt64Vector::from_slice([pid])) as _)
}
}
impl Function for ConnectionIdFunction {
fn name(&self) -> &str {
CONNECTION_ID
}
fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
Ok(ConcreteDataType::uint64_datatype())
}
fn signature(&self) -> Signature {
Signature::nullary(Volatility::Immutable)
}
fn eval(&self, func_ctx: &FunctionContext, _columns: &[VectorRef]) -> Result<VectorRef> {
let pid = func_ctx.query_ctx.process_id();
Ok(Arc::new(UInt64Vector::from_slice([pid])) as _)
}
}
impl fmt::Display for DatabaseFunction {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "DATABASE")

View File

@@ -15,7 +15,7 @@
use std::sync::Arc;
use api::region::RegionResponse;
use api::v1::flow::{FlowRequest, FlowResponse};
use api::v1::flow::{DirtyWindowRequest, FlowRequest, FlowResponse};
use api::v1::region::{InsertRequests, RegionRequest};
pub use common_base::AffectedRows;
use common_query::request::QueryRequest;
@@ -42,6 +42,9 @@ pub trait Flownode: Send + Sync {
async fn handle(&self, request: FlowRequest) -> Result<FlowResponse>;
async fn handle_inserts(&self, request: InsertRequests) -> Result<FlowResponse>;
/// Handles requests to mark time window as dirty.
async fn handle_mark_window_dirty(&self, req: DirtyWindowRequest) -> Result<FlowResponse>;
}
pub type FlownodeRef = Arc<dyn Flownode>;

View File

@@ -15,7 +15,7 @@
use std::sync::Arc;
use api::region::RegionResponse;
use api::v1::flow::{FlowRequest, FlowResponse};
use api::v1::flow::{DirtyWindowRequest, FlowRequest, FlowResponse};
use api::v1::region::{InsertRequests, RegionRequest};
pub use common_base::AffectedRows;
use common_query::request::QueryRequest;
@@ -67,6 +67,14 @@ pub trait MockFlownodeHandler: Sync + Send + Clone {
) -> Result<FlowResponse> {
unimplemented!()
}
async fn handle_mark_window_dirty(
&self,
_peer: &Peer,
_req: DirtyWindowRequest,
) -> Result<FlowResponse> {
unimplemented!()
}
}
/// A mock struct implements [NodeManager] only implement the `datanode` method.
@@ -134,6 +142,10 @@ impl<T: MockFlownodeHandler> Flownode for MockNode<T> {
async fn handle_inserts(&self, requests: InsertRequests) -> Result<FlowResponse> {
self.handler.handle_inserts(&self.peer, requests).await
}
async fn handle_mark_window_dirty(&self, req: DirtyWindowRequest) -> Result<FlowResponse> {
self.handler.handle_mark_window_dirty(&self.peer, req).await
}
}
#[async_trait::async_trait]

View File

@@ -173,6 +173,7 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Stream timeout"))]
StreamTimeout {
#[snafu(implicit)]
@@ -180,6 +181,7 @@ pub enum Error {
#[snafu(source)]
error: tokio::time::error::Elapsed,
},
#[snafu(display("RecordBatch slice index overflow: {visit_index} > {size}"))]
RecordBatchSliceIndexOverflow {
#[snafu(implicit)]
@@ -187,6 +189,12 @@ pub enum Error {
size: usize,
visit_index: usize,
},
#[snafu(display("Stream has been cancelled"))]
StreamCancelled {
#[snafu(implicit)]
location: Location,
},
}
impl ErrorExt for Error {
@@ -221,6 +229,8 @@ impl ErrorExt for Error {
}
Error::StreamTimeout { .. } => StatusCode::Cancelled,
Error::StreamCancelled { .. } => StatusCode::Cancelled,
}
}

View File

@@ -316,7 +316,7 @@ impl StreamingEngine {
);
METRIC_FLOW_ROWS
.with_label_values(&["out"])
.with_label_values(&["out-streaming"])
.inc_by(total_rows as u64);
let now = self.tick_manager.tick();

View File

@@ -31,6 +31,7 @@ use common_runtime::JoinHandle;
use common_telemetry::{error, info, trace, warn};
use datatypes::value::Value;
use futures::TryStreamExt;
use greptime_proto::v1::flow::DirtyWindowRequest;
use itertools::Itertools;
use session::context::QueryContextBuilder;
use snafu::{ensure, IntoError, OptionExt, ResultExt};
@@ -46,7 +47,7 @@ use crate::error::{
IllegalCheckTaskStateSnafu, InsertIntoFlowSnafu, InternalSnafu, JoinTaskSnafu, ListFlowsSnafu,
NoAvailableFrontendSnafu, SyncCheckTaskSnafu, UnexpectedSnafu,
};
use crate::metrics::METRIC_FLOW_TASK_COUNT;
use crate::metrics::{METRIC_FLOW_ROWS, METRIC_FLOW_TASK_COUNT};
use crate::repr::{self, DiffRow};
use crate::{Error, FlowId};
@@ -689,6 +690,9 @@ impl FlowEngine for FlowDualEngine {
let mut to_stream_engine = Vec::with_capacity(request.requests.len());
let mut to_batch_engine = request.requests;
let mut batching_row_cnt = 0;
let mut streaming_row_cnt = 0;
{
// not locking this, or recover flows will be starved when also handling flow inserts
let src_table2flow = self.src_table2flow.read().await;
@@ -698,9 +702,11 @@ impl FlowEngine for FlowDualEngine {
let is_in_stream = src_table2flow.in_stream(table_id);
let is_in_batch = src_table2flow.in_batch(table_id);
if is_in_stream {
streaming_row_cnt += req.rows.as_ref().map(|rs| rs.rows.len()).unwrap_or(0);
to_stream_engine.push(req.clone());
}
if is_in_batch {
batching_row_cnt += req.rows.as_ref().map(|rs| rs.rows.len()).unwrap_or(0);
return true;
}
if !is_in_batch && !is_in_stream {
@@ -713,6 +719,14 @@ impl FlowEngine for FlowDualEngine {
// can't use drop due to https://github.com/rust-lang/rust/pull/128846
}
METRIC_FLOW_ROWS
.with_label_values(&["in-streaming"])
.inc_by(streaming_row_cnt as u64);
METRIC_FLOW_ROWS
.with_label_values(&["in-batching"])
.inc_by(batching_row_cnt as u64);
let streaming_engine = self.streaming_engine.clone();
let stream_handler: JoinHandle<Result<(), Error>> =
common_runtime::spawn_global(async move {
@@ -819,6 +833,10 @@ impl common_meta::node_manager::Flownode for FlowDualEngine {
.map(|_| Default::default())
.map_err(to_meta_err(snafu::location!()))
}
async fn handle_mark_window_dirty(&self, _req: DirtyWindowRequest) -> MetaResult<FlowResponse> {
unreachable!()
}
}
/// return a function to convert `crate::error::Error` to `common_meta::error::Error`
@@ -926,6 +944,10 @@ impl common_meta::node_manager::Flownode for StreamingEngine {
.map(|_| Default::default())
.map_err(to_meta_err(snafu::location!()))
}
async fn handle_mark_window_dirty(&self, _req: DirtyWindowRequest) -> MetaResult<FlowResponse> {
unreachable!()
}
}
impl FlowEngine for StreamingEngine {

View File

@@ -17,6 +17,7 @@
use std::collections::{BTreeMap, HashMap};
use std::sync::Arc;
use api::v1::flow::{DirtyWindowRequests, FlowResponse};
use catalog::CatalogManagerRef;
use common_error::ext::BoxedError;
use common_meta::ddl::create_flow::FlowType;
@@ -29,8 +30,7 @@ use common_telemetry::{debug, info};
use common_time::TimeToLive;
use query::QueryEngineRef;
use snafu::{ensure, OptionExt, ResultExt};
use store_api::storage::RegionId;
use table::metadata::TableId;
use store_api::storage::{RegionId, TableId};
use tokio::sync::{oneshot, RwLock};
use crate::batching_mode::frontend_client::FrontendClient;
@@ -42,6 +42,7 @@ use crate::error::{
ExternalSnafu, FlowAlreadyExistSnafu, FlowNotFoundSnafu, TableNotFoundMetaSnafu,
UnexpectedSnafu, UnsupportedSnafu,
};
use crate::metrics::METRIC_FLOW_BATCHING_ENGINE_BULK_MARK_TIME_WINDOW;
use crate::{CreateFlowArgs, Error, FlowId, TableName};
/// Batching mode Engine, responsible for driving all the batching mode tasks
@@ -77,6 +78,116 @@ impl BatchingEngine {
}
}
pub async fn handle_mark_dirty_time_window(
&self,
reqs: DirtyWindowRequests,
) -> Result<FlowResponse, Error> {
let table_info_mgr = self.table_meta.table_info_manager();
let mut group_by_table_id: HashMap<u32, Vec<_>> = HashMap::new();
for r in reqs.requests {
let tid = TableId::from(r.table_id);
let entry = group_by_table_id.entry(tid).or_default();
entry.extend(r.timestamps);
}
let tids = group_by_table_id.keys().cloned().collect::<Vec<TableId>>();
let table_infos =
table_info_mgr
.batch_get(&tids)
.await
.with_context(|_| TableNotFoundMetaSnafu {
msg: format!("Failed to get table info for table ids: {:?}", tids),
})?;
let group_by_table_name = group_by_table_id
.into_iter()
.filter_map(|(id, timestamps)| {
let table_name = table_infos.get(&id).map(|info| info.table_name());
let Some(table_name) = table_name else {
warn!("Failed to get table infos for table id: {:?}", id);
return None;
};
let table_name = [
table_name.catalog_name,
table_name.schema_name,
table_name.table_name,
];
let schema = &table_infos.get(&id).unwrap().table_info.meta.schema;
let time_index_unit = schema.column_schemas[schema.timestamp_index.unwrap()]
.data_type
.as_timestamp()
.unwrap()
.unit();
Some((table_name, (timestamps, time_index_unit)))
})
.collect::<HashMap<_, _>>();
let group_by_table_name = Arc::new(group_by_table_name);
let mut handles = Vec::new();
let tasks = self.tasks.read().await;
for (_flow_id, task) in tasks.iter() {
let src_table_names = &task.config.source_table_names;
if src_table_names
.iter()
.all(|name| !group_by_table_name.contains_key(name))
{
continue;
}
let group_by_table_name = group_by_table_name.clone();
let task = task.clone();
let handle: JoinHandle<Result<(), Error>> = tokio::spawn(async move {
let src_table_names = &task.config.source_table_names;
let mut all_dirty_windows = vec![];
for src_table_name in src_table_names {
if let Some((timestamps, unit)) = group_by_table_name.get(src_table_name) {
let Some(expr) = &task.config.time_window_expr else {
continue;
};
for timestamp in timestamps {
let align_start = expr
.eval(common_time::Timestamp::new(*timestamp, *unit))?
.0
.context(UnexpectedSnafu {
reason: "Failed to eval start value",
})?;
all_dirty_windows.push(align_start);
}
}
}
let mut state = task.state.write().unwrap();
let flow_id_label = task.config.flow_id.to_string();
for timestamp in all_dirty_windows {
state.dirty_time_windows.add_window(timestamp, None);
}
METRIC_FLOW_BATCHING_ENGINE_BULK_MARK_TIME_WINDOW
.with_label_values(&[&flow_id_label])
.set(state.dirty_time_windows.len() as f64);
Ok(())
});
handles.push(handle);
}
drop(tasks);
for handle in handles {
match handle.await {
Err(e) => {
warn!("Failed to handle inserts: {e}");
}
Ok(Ok(())) => (),
Ok(Err(e)) => {
warn!("Failed to handle inserts: {e}");
}
}
}
Ok(Default::default())
}
pub async fn handle_inserts_inner(
&self,
request: api::v1::region::InsertRequests,

View File

@@ -156,6 +156,11 @@ impl DirtyTimeWindows {
self.windows.clear();
}
/// Number of dirty windows.
pub fn len(&self) -> usize {
self.windows.len()
}
/// Generate all filter expressions consuming all time windows
///
/// there is two limits:

View File

@@ -61,7 +61,9 @@ use crate::error::{
SubstraitEncodeLogicalPlanSnafu, UnexpectedSnafu,
};
use crate::metrics::{
METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME, METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY,
METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT, METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME,
METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY, METRIC_FLOW_BATCHING_ENGINE_START_QUERY_CNT,
METRIC_FLOW_ROWS,
};
use crate::{Error, FlowId};
@@ -371,6 +373,9 @@ impl BatchingTask {
"Flow {flow_id} executed, affected_rows: {affected_rows:?}, elapsed: {:?}",
elapsed
);
METRIC_FLOW_ROWS
.with_label_values(&[format!("{}-out-batching", flow_id).as_str()])
.inc_by(*affected_rows as _);
} else if let Err(err) = &res {
warn!(
"Failed to execute Flow {flow_id} on frontend {:?}, result: {err:?}, elapsed: {:?} with query: {}",
@@ -410,6 +415,7 @@ impl BatchingTask {
engine: QueryEngineRef,
frontend_client: Arc<FrontendClient>,
) {
let flow_id_str = self.config.flow_id.to_string();
loop {
// first check if shutdown signal is received
// if so, break the loop
@@ -427,6 +433,9 @@ impl BatchingTask {
Err(TryRecvError::Empty) => (),
}
}
METRIC_FLOW_BATCHING_ENGINE_START_QUERY_CNT
.with_label_values(&[&flow_id_str])
.inc();
let new_query = match self.gen_insert_plan(&engine).await {
Ok(new_query) => new_query,
@@ -473,6 +482,9 @@ impl BatchingTask {
}
// TODO(discord9): this error should have better place to go, but for now just print error, also more context is needed
Err(err) => {
METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT
.with_label_values(&[&flow_id_str])
.inc();
match new_query {
Some(query) => {
common_telemetry::error!(err; "Failed to execute query for flow={} with query: {query}", self.config.flow_id)

View File

@@ -58,11 +58,32 @@ lazy_static! {
vec![60., 4. * 60., 16. * 60., 64. * 60., 256. * 60.]
)
.unwrap();
pub static ref METRIC_FLOW_BATCHING_ENGINE_BULK_MARK_TIME_WINDOW: GaugeVec =
register_gauge_vec!(
"greptime_flow_batching_engine_bulk_mark_time_window",
"flow batching engine query time window count marked by bulk inserts",
&["flow_id"],
)
.unwrap();
pub static ref METRIC_FLOW_BATCHING_ENGINE_START_QUERY_CNT: IntCounterVec =
register_int_counter_vec!(
"greptime_flow_batching_start_query_count",
"flow batching engine started query count",
&["flow_id"],
)
.unwrap();
pub static ref METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT: IntCounterVec =
register_int_counter_vec!(
"greptime_flow_batching_error_count",
"flow batching engine error count per flow id",
&["flow_id"],
)
.unwrap();
pub static ref METRIC_FLOW_RUN_INTERVAL_MS: IntGauge =
register_int_gauge!("greptime_flow_run_interval_ms", "flow run interval in ms").unwrap();
pub static ref METRIC_FLOW_ROWS: IntCounterVec = register_int_counter_vec!(
"greptime_flow_processed_rows",
"Count of rows flowing through the system",
"Count of rows flowing through the system.",
&["direction"]
)
.unwrap();

View File

@@ -17,6 +17,7 @@
use std::net::SocketAddr;
use std::sync::Arc;
use api::v1::flow::DirtyWindowRequests;
use api::v1::{RowDeleteRequests, RowInsertRequests};
use cache::{TABLE_FLOWNODE_SET_CACHE_NAME, TABLE_ROUTE_CACHE_NAME};
use catalog::CatalogManagerRef;
@@ -136,6 +137,18 @@ impl flow_server::Flow for FlowService {
.map(Response::new)
.map_err(to_status_with_last_err)
}
async fn handle_mark_dirty_time_window(
&self,
reqs: Request<DirtyWindowRequests>,
) -> Result<Response<FlowResponse>, Status> {
self.dual_engine
.batching_engine()
.handle_mark_dirty_time_window(reqs.into_inner())
.await
.map(Response::new)
.map_err(to_status_with_last_err)
}
}
#[derive(Clone)]
@@ -578,6 +591,7 @@ impl FrontendInvoker {
layered_cache_registry.clone(),
inserter.clone(),
table_route_cache,
None,
));
let invoker = FrontendInvoker::new(inserter, deleter, statement_executor);

View File

@@ -70,6 +70,7 @@ store-api.workspace = true
substrait.workspace = true
table.workspace = true
tokio.workspace = true
tokio-util.workspace = true
toml.workspace = true
tonic.workspace = true

View File

@@ -357,6 +357,12 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Query has been cancelled"))]
Cancelled {
#[snafu(implicit)]
location: Location,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -435,6 +441,8 @@ impl ErrorExt for Error {
Error::InFlightWriteBytesExceeded { .. } => StatusCode::RateLimited,
Error::DataFusion { error, .. } => datafusion_status_code::<Self>(error, None),
Error::Cancelled { .. } => StatusCode::Cancelled,
}
}

View File

@@ -33,6 +33,7 @@ use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
use catalog::process_manager::ProcessManagerRef;
use catalog::CatalogManagerRef;
use client::OutputData;
use common_base::cancellation::CancellableFuture;
use common_base::Plugins;
use common_config::KvBackendConfig;
use common_error::ext::{BoxedError, ErrorExt};
@@ -81,7 +82,7 @@ use crate::error::{
};
use crate::limiter::LimiterRef;
use crate::slow_query_recorder::SlowQueryRecorder;
use crate::stream_wrapper::StreamWrapper;
use crate::stream_wrapper::CancellableStreamWrapper;
/// The frontend instance contains necessary components, and implements many
/// traits, like [`servers::query_handler::grpc::GrpcQueryHandler`],
@@ -183,67 +184,75 @@ impl Instance {
query_ctx.current_catalog().to_string(),
vec![query_ctx.current_schema()],
stmt.to_string(),
"unknown".to_string(),
query_ctx.conn_info().to_string(),
None,
);
let output = match stmt {
Statement::Query(_) | Statement::Explain(_) | Statement::Delete(_) => {
// TODO: remove this when format is supported in datafusion
if let Statement::Explain(explain) = &stmt {
if let Some(format) = explain.format() {
query_ctx.set_explain_format(format.to_string());
let query_fut = async {
match stmt {
Statement::Query(_) | Statement::Explain(_) | Statement::Delete(_) => {
// TODO: remove this when format is supported in datafusion
if let Statement::Explain(explain) = &stmt {
if let Some(format) = explain.format() {
query_ctx.set_explain_format(format.to_string());
}
}
let stmt = QueryStatement::Sql(stmt);
let plan = self
.statement_executor
.plan(&stmt, query_ctx.clone())
.await?;
let QueryStatement::Sql(stmt) = stmt else {
unreachable!()
};
query_interceptor.pre_execute(&stmt, Some(&plan), query_ctx.clone())?;
self.statement_executor
.exec_plan(plan, query_ctx)
.await
.context(TableOperationSnafu)
}
Statement::Tql(tql) => {
let plan = self
.statement_executor
.plan_tql(tql.clone(), &query_ctx)
.await?;
let stmt = QueryStatement::Sql(stmt);
let plan = self
.statement_executor
.plan(&stmt, query_ctx.clone())
.await?;
let QueryStatement::Sql(stmt) = stmt else {
unreachable!()
};
query_interceptor.pre_execute(&stmt, Some(&plan), query_ctx.clone())?;
self.statement_executor.exec_plan(plan, query_ctx).await
}
Statement::Tql(tql) => {
let plan = self
.statement_executor
.plan_tql(tql.clone(), &query_ctx)
.await?;
query_interceptor.pre_execute(
&Statement::Tql(tql),
Some(&plan),
query_ctx.clone(),
)?;
self.statement_executor.exec_plan(plan, query_ctx).await
}
_ => {
query_interceptor.pre_execute(&stmt, None, query_ctx.clone())?;
self.statement_executor.execute_sql(stmt, query_ctx).await
query_interceptor.pre_execute(
&Statement::Tql(tql),
Some(&plan),
query_ctx.clone(),
)?;
self.statement_executor
.exec_plan(plan, query_ctx)
.await
.context(TableOperationSnafu)
}
_ => {
query_interceptor.pre_execute(&stmt, None, query_ctx.clone())?;
self.statement_executor
.execute_sql(stmt, query_ctx)
.await
.context(TableOperationSnafu)
}
}
};
match output {
Ok(output) => {
CancellableFuture::new(query_fut, ticket.cancellation_handle.clone())
.await
.map_err(|_| error::CancelledSnafu.build())?
.map(|output| {
let Output { meta, data } = output;
let data = match data {
OutputData::Stream(stream) => {
OutputData::Stream(Box::pin(StreamWrapper::new(stream, ticket)))
OutputData::Stream(Box::pin(CancellableStreamWrapper::new(stream, ticket)))
}
other => other,
};
Ok(Output { data, meta })
}
Err(e) => Err(e).context(TableOperationSnafu),
}
Output { data, meta }
})
}
}
@@ -605,6 +614,8 @@ pub fn check_permission(
}
// cursor operations are always allowed once it's created
Statement::FetchCursor(_) | Statement::CloseCursor(_) => {}
// User can only kill process in their own catalog.
Statement::Kill(_) => {}
}
Ok(())
}

View File

@@ -180,6 +180,7 @@ impl FrontendBuilder {
local_cache_invalidator,
inserter.clone(),
table_route_cache,
Some(process_manager.clone()),
));
let pipeline_operator = Arc::new(PipelineOperator::new(

View File

@@ -35,8 +35,8 @@ use servers::query_handler::grpc::GrpcQueryHandler;
use servers::query_handler::sql::SqlQueryHandler;
use session::context::QueryContextRef;
use snafu::{ensure, OptionExt, ResultExt};
use table::metadata::TableId;
use table::table_name::TableName;
use table::TableRef;
use crate::error::{
CatalogSnafu, DataFusionSnafu, Error, InFlightWriteBytesExceededSnafu,
@@ -235,34 +235,33 @@ impl GrpcQueryHandler for Instance {
async fn put_record_batch(
&self,
table: &TableName,
table_id: &mut Option<TableId>,
table_name: &TableName,
table_ref: &mut Option<TableRef>,
decoder: &mut FlightDecoder,
data: FlightData,
) -> Result<AffectedRows> {
let table_id = if let Some(table_id) = table_id {
*table_id
let table = if let Some(table) = table_ref {
table.clone()
} else {
let table = self
.catalog_manager()
.table(
&table.catalog_name,
&table.schema_name,
&table.table_name,
&table_name.catalog_name,
&table_name.schema_name,
&table_name.table_name,
None,
)
.await
.context(CatalogSnafu)?
.with_context(|| TableNotFoundSnafu {
table_name: table.to_string(),
table_name: table_name.to_string(),
})?;
let id = table.table_info().table_id();
*table_id = Some(id);
id
*table_ref = Some(table.clone());
table
};
self.inserter
.handle_bulk_insert(table_id, decoder, data)
.handle_bulk_insert(table, decoder, data)
.await
.context(TableOperationSnafu)
}

View File

@@ -15,37 +15,52 @@
use std::pin::Pin;
use std::task::{Context, Poll};
use catalog::process_manager::Ticket;
use common_recordbatch::adapter::RecordBatchMetrics;
use common_recordbatch::{OrderOption, RecordBatch, RecordBatchStream, SendableRecordBatchStream};
use datatypes::schema::SchemaRef;
use futures::Stream;
pub struct StreamWrapper<T> {
pub struct CancellableStreamWrapper {
inner: SendableRecordBatchStream,
_attachment: T,
ticket: Ticket,
}
impl<T> Unpin for StreamWrapper<T> {}
impl Unpin for CancellableStreamWrapper {}
impl<T> StreamWrapper<T> {
pub fn new(stream: SendableRecordBatchStream, attachment: T) -> Self {
impl CancellableStreamWrapper {
pub fn new(stream: SendableRecordBatchStream, ticket: Ticket) -> Self {
Self {
inner: stream,
_attachment: attachment,
ticket,
}
}
}
impl<T> Stream for StreamWrapper<T> {
impl Stream for CancellableStreamWrapper {
type Item = common_recordbatch::error::Result<RecordBatch>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = &mut *self;
Pin::new(&mut this.inner).poll_next(cx)
if this.ticket.cancellation_handle.is_cancelled() {
return Poll::Ready(Some(common_recordbatch::error::StreamCancelledSnafu.fail()));
}
if let Poll::Ready(res) = Pin::new(&mut this.inner).poll_next(cx) {
return Poll::Ready(res);
}
// on pending, register cancellation waker.
this.ticket.cancellation_handle.waker().register(cx.waker());
// check if canceled again.
if this.ticket.cancellation_handle.is_cancelled() {
return Poll::Ready(Some(common_recordbatch::error::StreamCancelledSnafu.fail()));
}
Poll::Pending
}
}
impl<T> RecordBatchStream for StreamWrapper<T> {
impl RecordBatchStream for CancellableStreamWrapper {
fn schema(&self) -> SchemaRef {
self.inner.schema()
}
@@ -58,3 +73,295 @@ impl<T> RecordBatchStream for StreamWrapper<T> {
self.inner.metrics()
}
}
#[cfg(test)]
mod tests {
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;
use catalog::process_manager::ProcessManager;
use common_recordbatch::adapter::RecordBatchMetrics;
use common_recordbatch::{OrderOption, RecordBatch, RecordBatchStream};
use datatypes::data_type::ConcreteDataType;
use datatypes::prelude::VectorRef;
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use datatypes::vectors::Int32Vector;
use futures::{Stream, StreamExt};
use tokio::time::{sleep, timeout};
use super::CancellableStreamWrapper;
// Mock stream for testing
struct MockRecordBatchStream {
schema: SchemaRef,
batches: Vec<common_recordbatch::error::Result<RecordBatch>>,
current: usize,
delay: Option<Duration>,
}
impl MockRecordBatchStream {
fn new(batches: Vec<common_recordbatch::error::Result<RecordBatch>>) -> Self {
let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
"test_col",
ConcreteDataType::int32_datatype(),
false,
)]));
Self {
schema,
batches,
current: 0,
delay: None,
}
}
fn with_delay(mut self, delay: Duration) -> Self {
self.delay = Some(delay);
self
}
}
impl Stream for MockRecordBatchStream {
type Item = common_recordbatch::error::Result<RecordBatch>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if let Some(delay) = self.delay {
// Simulate async delay
let waker = cx.waker().clone();
let delay_clone = delay;
tokio::spawn(async move {
sleep(delay_clone).await;
waker.wake();
});
self.delay = None; // Only delay once
return Poll::Pending;
}
if self.current >= self.batches.len() {
return Poll::Ready(None);
}
let batch = self.batches[self.current].as_ref().unwrap().clone();
self.current += 1;
Poll::Ready(Some(Ok(batch)))
}
}
impl RecordBatchStream for MockRecordBatchStream {
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
fn output_ordering(&self) -> Option<&[OrderOption]> {
None
}
fn metrics(&self) -> Option<RecordBatchMetrics> {
None
}
}
fn create_test_batch() -> RecordBatch {
let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
"test_col",
ConcreteDataType::int32_datatype(),
false,
)]));
RecordBatch::new(
schema,
vec![Arc::new(Int32Vector::from_values(0..3)) as VectorRef],
)
.unwrap()
}
#[tokio::test]
async fn test_stream_completes_normally() {
let batch = create_test_batch();
let mock_stream = MockRecordBatchStream::new(vec![Ok(batch.clone())]);
let process_manager = Arc::new(ProcessManager::new("".to_string(), None));
let ticket = process_manager.register_query(
"catalog".to_string(),
vec![],
"query".to_string(),
"client".to_string(),
None,
);
let mut cancellable_stream = CancellableStreamWrapper::new(Box::pin(mock_stream), ticket);
let result = cancellable_stream.next().await;
assert!(result.is_some());
assert!(result.unwrap().is_ok());
let end_result = cancellable_stream.next().await;
assert!(end_result.is_none());
}
#[tokio::test]
async fn test_stream_cancelled_before_start() {
let batch = create_test_batch();
let mock_stream = MockRecordBatchStream::new(vec![Ok(batch)]);
let process_manager = Arc::new(ProcessManager::new("".to_string(), None));
let ticket = process_manager.register_query(
"catalog".to_string(),
vec![],
"query".to_string(),
"client".to_string(),
None,
);
// Cancel before creating the wrapper
ticket.cancellation_handle.cancel();
let mut cancellable_stream = CancellableStreamWrapper::new(Box::pin(mock_stream), ticket);
let result = cancellable_stream.next().await;
assert!(result.is_some());
assert!(result.unwrap().is_err());
}
#[tokio::test]
async fn test_stream_cancelled_during_execution() {
let batch = create_test_batch();
let mock_stream =
MockRecordBatchStream::new(vec![Ok(batch)]).with_delay(Duration::from_millis(100));
let process_manager = Arc::new(ProcessManager::new("".to_string(), None));
let ticket = process_manager.register_query(
"catalog".to_string(),
vec![],
"query".to_string(),
"client".to_string(),
None,
);
let cancellation_handle = ticket.cancellation_handle.clone();
let mut cancellable_stream = CancellableStreamWrapper::new(Box::pin(mock_stream), ticket);
// Cancel after a short delay
tokio::spawn(async move {
sleep(Duration::from_millis(50)).await;
cancellation_handle.cancel();
});
let result = cancellable_stream.next().await;
assert!(result.is_some());
assert!(result.unwrap().is_err());
}
#[tokio::test]
async fn test_stream_completes_before_cancellation() {
let batch = create_test_batch();
let mock_stream = MockRecordBatchStream::new(vec![Ok(batch.clone())]);
let process_manager = Arc::new(ProcessManager::new("".to_string(), None));
let ticket = process_manager.register_query(
"catalog".to_string(),
vec![],
"query".to_string(),
"client".to_string(),
None,
);
let cancellation_handle = ticket.cancellation_handle.clone();
let mut cancellable_stream = CancellableStreamWrapper::new(Box::pin(mock_stream), ticket);
// Try to cancel after the stream should have completed
tokio::spawn(async move {
sleep(Duration::from_millis(100)).await;
cancellation_handle.cancel();
});
let result = cancellable_stream.next().await;
assert!(result.is_some());
assert!(result.unwrap().is_ok());
}
#[tokio::test]
async fn test_multiple_batches() {
let batch1 = create_test_batch();
let batch2 = create_test_batch();
let mock_stream = MockRecordBatchStream::new(vec![Ok(batch1), Ok(batch2)]);
let process_manager = Arc::new(ProcessManager::new("".to_string(), None));
let ticket = process_manager.register_query(
"catalog".to_string(),
vec![],
"query".to_string(),
"client".to_string(),
None,
);
let mut cancellable_stream = CancellableStreamWrapper::new(Box::pin(mock_stream), ticket);
// First batch
let result1 = cancellable_stream.next().await;
assert!(result1.is_some());
assert!(result1.unwrap().is_ok());
// Second batch
let result2 = cancellable_stream.next().await;
assert!(result2.is_some());
assert!(result2.unwrap().is_ok());
// End of stream
let end_result = cancellable_stream.next().await;
assert!(end_result.is_none());
}
#[tokio::test]
async fn test_record_batch_stream_methods() {
let batch = create_test_batch();
let mock_stream = MockRecordBatchStream::new(vec![Ok(batch)]);
let process_manager = Arc::new(ProcessManager::new("".to_string(), None));
let ticket = process_manager.register_query(
"catalog".to_string(),
vec![],
"query".to_string(),
"client".to_string(),
None,
);
let cancellable_stream = CancellableStreamWrapper::new(Box::pin(mock_stream), ticket);
// Test schema method
let schema = cancellable_stream.schema();
assert_eq!(schema.column_schemas().len(), 1);
assert_eq!(schema.column_schemas()[0].name, "test_col");
// Test output_ordering method
assert!(cancellable_stream.output_ordering().is_none());
// Test metrics method
assert!(cancellable_stream.metrics().is_none());
}
#[tokio::test]
async fn test_cancellation_during_pending_poll() {
let batch = create_test_batch();
let mock_stream =
MockRecordBatchStream::new(vec![Ok(batch)]).with_delay(Duration::from_millis(200));
let process_manager = Arc::new(ProcessManager::new("".to_string(), None));
let ticket = process_manager.register_query(
"catalog".to_string(),
vec![],
"query".to_string(),
"client".to_string(),
None,
);
let cancellation_handle = ticket.cancellation_handle.clone();
let mut cancellable_stream = CancellableStreamWrapper::new(Box::pin(mock_stream), ticket);
// Cancel while the stream is pending
tokio::spawn(async move {
sleep(Duration::from_millis(50)).await;
cancellation_handle.cancel();
});
let result = timeout(Duration::from_millis(300), cancellable_stream.next()).await;
assert!(result.is_ok());
let stream_result = result.unwrap();
assert!(stream_result.is_some());
assert!(stream_result.unwrap().is_err());
}
}

View File

@@ -27,6 +27,7 @@ futures-util.workspace = true
humantime-serde.workspace = true
itertools.workspace = true
lazy_static = "1.4"
mito-codec.workspace = true
mito2.workspace = true
mur3 = "0.1"
object-store.workspace = true

View File

@@ -115,7 +115,7 @@ pub enum Error {
#[snafu(display("Failed to encode primary key"))]
EncodePrimaryKey {
source: mito2::error::Error,
source: mito_codec::error::Error,
#[snafu(implicit)]
location: Location,
},

View File

@@ -18,7 +18,7 @@ use std::hash::Hash;
use api::v1::value::ValueData;
use api::v1::{ColumnDataType, ColumnSchema, Row, Rows, SemanticType, Value};
use datatypes::value::ValueRef;
use mito2::row_converter::SparsePrimaryKeyCodec;
use mito_codec::row_converter::SparsePrimaryKeyCodec;
use smallvec::SmallVec;
use snafu::ResultExt;
use store_api::codec::PrimaryKeyEncoding;

30
src/mito-codec/Cargo.toml Normal file
View File

@@ -0,0 +1,30 @@
[package]
name = "mito-codec"
version.workspace = true
edition.workspace = true
license.workspace = true
[features]
default = []
testing = []
[dependencies]
api.workspace = true
bytes.workspace = true
common-base.workspace = true
common-decimal.workspace = true
common-error.workspace = true
common-macro.workspace = true
common-recordbatch.workspace = true
common-telemetry.workspace = true
common-time.workspace = true
datatypes.workspace = true
memcomparable = "0.2"
paste.workspace = true
serde.workspace = true
snafu.workspace = true
store-api.workspace = true
[dev-dependencies]
datafusion-common.workspace = true
datafusion-expr.workspace = true

View File

@@ -0,0 +1,95 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use datatypes::prelude::ConcreteDataType;
use snafu::{Location, Snafu};
/// Error definitions for mito encoding.
#[derive(Snafu)]
#[snafu(visibility(pub))]
#[stack_trace_debug]
pub enum Error {
#[snafu(display("Row value mismatches field data type"))]
FieldTypeMismatch {
// Box the source to reduce the size of the error.
#[snafu(source(from(datatypes::error::Error, Box::new)))]
source: Box<datatypes::error::Error>,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to serialize field"))]
SerializeField {
#[snafu(source)]
error: memcomparable::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display(
"Data type: {} does not support serialization/deserialization",
data_type,
))]
NotSupportedField {
data_type: ConcreteDataType,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to deserialize field"))]
DeserializeField {
#[snafu(source)]
error: memcomparable::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Operation not supported: {}", err_msg))]
UnsupportedOperation {
err_msg: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Encode null value"))]
IndexEncodeNull {
#[snafu(implicit)]
location: Location,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
use Error::*;
match self {
FieldTypeMismatch { source, .. } => source.status_code(),
SerializeField { .. } | DeserializeField { .. } | IndexEncodeNull { .. } => {
StatusCode::InvalidArguments
}
NotSupportedField { .. } | UnsupportedOperation { .. } => StatusCode::Unsupported,
}
}
fn as_any(&self) -> &dyn Any {
self
}
}

View File

@@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
//! Index codec utilities.
use std::collections::HashMap;
use std::sync::Arc;
@@ -47,7 +49,7 @@ impl IndexValueCodec {
) -> Result<()> {
ensure!(!value.is_null(), IndexEncodeNullSnafu);
if matches!(field.data_type, ConcreteDataType::String(_)) {
if matches!(field.data_type(), ConcreteDataType::String(_)) {
let value = value
.as_string()
.context(FieldTypeMismatchSnafu)?

View File

@@ -31,7 +31,7 @@ pub struct KeyValues {
///
/// This mutation must be a valid mutation and rows in the mutation
/// must not be `None`.
pub(crate) mutation: Mutation,
pub mutation: Mutation,
/// Key value read helper.
helper: SparseReadRowHelper,
/// Primary key encoding hint.
@@ -333,8 +333,7 @@ mod tests {
use api::v1::{self, ColumnDataType, SemanticType};
use super::*;
use crate::test_util::i64_value;
use crate::test_util::meta_util::TestRegionMetadataBuilder;
use crate::test_util::{i64_value, TestRegionMetadataBuilder};
const TS_NAME: &str = "ts";
const START_SEQ: SequenceNumber = 100;

24
src/mito-codec/src/lib.rs Normal file
View File

@@ -0,0 +1,24 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Codec utilities for the Mito protocol.
pub mod error;
pub mod index;
pub mod key_values;
pub mod primary_key_filter;
pub mod row_converter;
#[cfg(any(test, feature = "testing"))]
pub mod test_util;

View File

@@ -19,12 +19,17 @@ use api::v1::SemanticType;
use common_recordbatch::filter::SimpleFilterEvaluator;
use datatypes::value::Value;
use store_api::metadata::RegionMetadataRef;
use store_api::metric_engine_consts::DATA_SCHEMA_TABLE_ID_COLUMN_NAME;
use store_api::storage::ColumnId;
use crate::error::Result;
use crate::memtable::partition_tree::partition::Partition;
use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyFilter, SparsePrimaryKeyCodec};
/// Returns true if this is a partition column for metrics in the memtable.
pub fn is_partition_column(name: &str) -> bool {
name == DATA_SCHEMA_TABLE_ID_COLUMN_NAME
}
#[derive(Clone)]
struct PrimaryKeyFilterInner {
metadata: RegionMetadataRef,
@@ -42,7 +47,7 @@ impl PrimaryKeyFilterInner {
let mut result = true;
for filter in self.filters.iter() {
if Partition::is_partition_column(filter.column_name()) {
if is_partition_column(filter.column_name()) {
continue;
}
let Some(column) = self.metadata.column_by_name(filter.column_name()) else {
@@ -149,9 +154,8 @@ mod tests {
use std::sync::Arc;
use api::v1::SemanticType;
use datafusion::logical_expr::BinaryExpr;
use datafusion_common::{Column, ScalarValue};
use datafusion_expr::{Expr, Operator};
use datafusion_expr::{BinaryExpr, Expr, Operator};
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use datatypes::value::ValueRef;

View File

@@ -0,0 +1,164 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod dense;
pub mod sparse;
use std::fmt::Debug;
use std::sync::Arc;
use common_recordbatch::filter::SimpleFilterEvaluator;
use datatypes::value::{Value, ValueRef};
pub use dense::{DensePrimaryKeyCodec, SortField};
pub use sparse::{SparsePrimaryKeyCodec, SparseValues, COLUMN_ID_ENCODE_SIZE};
use store_api::codec::PrimaryKeyEncoding;
use store_api::metadata::{RegionMetadata, RegionMetadataRef};
use store_api::storage::ColumnId;
use crate::error::Result;
use crate::key_values::KeyValue;
/// Row value encoder/decoder.
pub trait PrimaryKeyCodecExt {
/// Encodes rows to bytes.
/// # Note
/// Ensure the length of row iterator matches the length of fields.
fn encode<'a, I>(&self, row: I) -> Result<Vec<u8>>
where
I: Iterator<Item = ValueRef<'a>>,
{
let mut buffer = Vec::new();
self.encode_to_vec(row, &mut buffer)?;
Ok(buffer)
}
/// Encodes rows to specific vec.
/// # Note
/// Ensure the length of row iterator matches the length of fields.
fn encode_to_vec<'a, I>(&self, row: I, buffer: &mut Vec<u8>) -> Result<()>
where
I: Iterator<Item = ValueRef<'a>>;
}
pub trait PrimaryKeyFilter: Send + Sync {
/// Returns true if the primary key matches the filter.
fn matches(&mut self, pk: &[u8]) -> bool;
}
/// Composite values decoded from primary key bytes.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum CompositeValues {
Dense(Vec<(ColumnId, Value)>),
Sparse(SparseValues),
}
impl CompositeValues {
/// Extends the composite values with the given values.
pub fn extend(&mut self, values: &[(ColumnId, Value)]) {
match self {
CompositeValues::Dense(dense_values) => {
for (column_id, value) in values {
dense_values.push((*column_id, value.clone()));
}
}
CompositeValues::Sparse(sprase_value) => {
for (column_id, value) in values {
sprase_value.insert(*column_id, value.clone());
}
}
}
}
}
#[cfg(any(test, feature = "testing"))]
impl CompositeValues {
pub fn into_sparse(self) -> SparseValues {
match self {
CompositeValues::Sparse(v) => v,
_ => panic!("CompositeValues is not sparse"),
}
}
pub fn into_dense(self) -> Vec<Value> {
match self {
CompositeValues::Dense(v) => v.into_iter().map(|(_, v)| v).collect(),
_ => panic!("CompositeValues is not dense"),
}
}
}
pub trait PrimaryKeyCodec: Send + Sync + Debug {
/// Encodes a key value to bytes.
fn encode_key_value(&self, key_value: &KeyValue, buffer: &mut Vec<u8>) -> Result<()>;
/// Encodes values to bytes.
fn encode_values(&self, values: &[(ColumnId, Value)], buffer: &mut Vec<u8>) -> Result<()>;
/// Encodes values to bytes.
fn encode_value_refs(
&self,
values: &[(ColumnId, ValueRef)],
buffer: &mut Vec<u8>,
) -> Result<()>;
/// Returns the number of fields in the primary key.
fn num_fields(&self) -> Option<usize>;
/// Returns a primary key filter factory.
fn primary_key_filter(
&self,
metadata: &RegionMetadataRef,
filters: Arc<Vec<SimpleFilterEvaluator>>,
) -> Box<dyn PrimaryKeyFilter>;
/// Returns the estimated size of the primary key.
fn estimated_size(&self) -> Option<usize> {
None
}
/// Returns the encoding type of the primary key.
fn encoding(&self) -> PrimaryKeyEncoding;
/// Decodes the primary key from the given bytes.
///
/// Returns a [`CompositeValues`] that follows the primary key ordering.
fn decode(&self, bytes: &[u8]) -> Result<CompositeValues>;
/// Decode the leftmost value from bytes.
fn decode_leftmost(&self, bytes: &[u8]) -> Result<Option<Value>>;
}
/// Builds a primary key codec from region metadata.
pub fn build_primary_key_codec(region_metadata: &RegionMetadata) -> Arc<dyn PrimaryKeyCodec> {
let fields = region_metadata.primary_key_columns().map(|col| {
(
col.column_id,
SortField::new(col.column_schema.data_type.clone()),
)
});
build_primary_key_codec_with_fields(region_metadata.primary_key_encoding, fields)
}
/// Builds a primary key codec from region metadata.
pub fn build_primary_key_codec_with_fields(
encoding: PrimaryKeyEncoding,
fields: impl Iterator<Item = (ColumnId, SortField)>,
) -> Arc<dyn PrimaryKeyCodec> {
match encoding {
PrimaryKeyEncoding::Dense => Arc::new(DensePrimaryKeyCodec::with_fields(fields.collect())),
PrimaryKeyEncoding::Sparse => {
Arc::new(SparsePrimaryKeyCodec::with_fields(fields.collect()))
}
}
}

View File

@@ -35,15 +35,16 @@ use store_api::storage::ColumnId;
use crate::error::{
self, FieldTypeMismatchSnafu, NotSupportedFieldSnafu, Result, SerializeFieldSnafu,
};
use crate::memtable::key_values::KeyValue;
use crate::memtable::partition_tree::DensePrimaryKeyFilter;
use crate::key_values::KeyValue;
use crate::primary_key_filter::DensePrimaryKeyFilter;
use crate::row_converter::{
CompositeValues, PrimaryKeyCodec, PrimaryKeyCodecExt, PrimaryKeyFilter,
};
/// Field to serialize and deserialize value in memcomparable format.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SortField {
pub(crate) data_type: ConcreteDataType,
data_type: ConcreteDataType,
}
impl SortField {
@@ -51,6 +52,11 @@ impl SortField {
Self { data_type }
}
/// Returns the data type of the field.
pub fn data_type(&self) -> &ConcreteDataType {
&self.data_type
}
pub fn estimated_size(&self) -> usize {
match &self.data_type {
ConcreteDataType::Boolean(_) => 2,
@@ -75,10 +81,9 @@ impl SortField {
| ConcreteDataType::Dictionary(_) => 0,
}
}
}
impl SortField {
pub(crate) fn serialize(
/// Serialize a value to the serializer.
pub fn serialize(
&self,
serializer: &mut Serializer<&mut Vec<u8>>,
value: &ValueRef,
@@ -163,7 +168,8 @@ impl SortField {
Ok(())
}
pub(crate) fn deserialize<B: Buf>(&self, deserializer: &mut Deserializer<B>) -> Result<Value> {
/// Deserialize a value from the deserializer.
pub fn deserialize<B: Buf>(&self, deserializer: &mut Deserializer<B>) -> Result<Value> {
macro_rules! deserialize_and_build_value {
(
$self: ident;
@@ -525,7 +531,7 @@ mod tests {
let value = encoder.decode_value_at(&result, i, &mut offsets).unwrap();
decoded.push(value);
}
assert_eq!(data_types.len(), offsets.len(), "offsets: {:?}", offsets);
assert_eq!(data_types.len(), offsets.len(), "offsets: {offsets:?}");
assert_eq!(decoded, row);
}
}

View File

@@ -27,8 +27,8 @@ use store_api::storage::consts::ReservedColumnId;
use store_api::storage::ColumnId;
use crate::error::{DeserializeFieldSnafu, Result, SerializeFieldSnafu, UnsupportedOperationSnafu};
use crate::memtable::key_values::KeyValue;
use crate::memtable::partition_tree::SparsePrimaryKeyFilter;
use crate::key_values::KeyValue;
use crate::primary_key_filter::SparsePrimaryKeyFilter;
use crate::row_converter::dense::SortField;
use crate::row_converter::{CompositeValues, PrimaryKeyCodec, PrimaryKeyFilter};
@@ -205,7 +205,7 @@ impl SparsePrimaryKeyCodec {
}
/// Returns the offset of the given column id in the given primary key.
pub(crate) fn has_column(
pub fn has_column(
&self,
pk: &[u8],
offsets_map: &mut HashMap<u32, usize>,
@@ -233,12 +233,7 @@ impl SparsePrimaryKeyCodec {
}
/// Decode value at `offset` in `pk`.
pub(crate) fn decode_value_at(
&self,
pk: &[u8],
offset: usize,
column_id: ColumnId,
) -> Result<Value> {
pub fn decode_value_at(&self, pk: &[u8], offset: usize, column_id: ColumnId) -> Result<Value> {
let mut deserializer = Deserializer::new(pk);
deserializer.advance(offset);
// Safety: checked by `has_column`
@@ -300,6 +295,40 @@ impl PrimaryKeyCodec for SparsePrimaryKeyCodec {
}
}
/// Field with column id.
pub struct FieldWithId {
pub field: SortField,
pub column_id: ColumnId,
}
/// A special encoder for memtable.
pub struct SparseEncoder {
fields: Vec<FieldWithId>,
}
impl SparseEncoder {
pub fn new(fields: Vec<FieldWithId>) -> Self {
Self { fields }
}
pub fn encode_to_vec<'a, I>(&self, row: I, buffer: &mut Vec<u8>) -> Result<()>
where
I: Iterator<Item = ValueRef<'a>>,
{
let mut serializer = Serializer::new(buffer);
for (value, field) in row.zip(self.fields.iter()) {
if !value.is_null() {
field
.column_id
.serialize(&mut serializer)
.context(SerializeFieldSnafu)?;
field.field.serialize(&mut serializer, &value)?;
}
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;

View File

@@ -12,8 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.
//! Utilities to create a [RegionMetadata](store_api::metadata::RegionMetadata).
//! Test utilities for mito codec.
use api::greptime_proto::v1;
use api::v1::value::ValueData;
use api::v1::SemanticType;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnSchema;
@@ -105,3 +107,10 @@ impl TestRegionMetadataBuilder {
builder.build().unwrap()
}
}
/// Creates value for i64.
pub fn i64_value(data: i64) -> v1::Value {
v1::Value {
value_data: Some(ValueData::I64Value(data)),
}
}

View File

@@ -48,6 +48,7 @@ itertools.workspace = true
lazy_static = "1.4"
log-store = { workspace = true }
memcomparable = "0.2"
mito-codec.workspace = true
moka = { workspace = true, features = ["sync", "future"] }
object-store.workspace = true
parquet = { workspace = true, features = ["async"] }
@@ -82,6 +83,7 @@ common-test-util.workspace = true
criterion = "0.4"
dotenv.workspace = true
log-store.workspace = true
mito-codec = { workspace = true, features = ["testing"] }
object-store = { workspace = true, features = ["services-memory"] }
rskafka.workspace = true
rstest.workspace = true

View File

@@ -25,8 +25,8 @@ use mito2::memtable::partition_tree::{PartitionTreeConfig, PartitionTreeMemtable
use mito2::memtable::time_series::TimeSeriesMemtable;
use mito2::memtable::{KeyValues, Memtable};
use mito2::region::options::MergeMode;
use mito2::row_converter::DensePrimaryKeyCodec;
use mito2::test_util::memtable_util::{self, region_metadata_to_row_schema};
use mito_codec::row_converter::DensePrimaryKeyCodec;
use rand::rngs::ThreadRng;
use rand::seq::IndexedRandom;
use rand::Rng;

View File

@@ -18,8 +18,9 @@
use common_base::readable_size::ReadableSize;
use common_base::BitVec;
use common_time::Timestamp;
use smallvec::{smallvec, SmallVec};
use crate::sst::file::FileHandle;
use crate::sst::file::{FileHandle, FileId};
/// Default max compaction output file size when not specified.
const DEFAULT_MAX_OUTPUT_SIZE: u64 = ReadableSize::gb(2).as_bytes();
@@ -125,17 +126,68 @@ pub trait Item: Ranged + Clone {
fn size(&self) -> usize;
}
impl Ranged for FileHandle {
type BoundType = Timestamp;
/// A group of files that are created by the same compaction task.
#[derive(Debug, Clone)]
pub struct FileGroup {
files: SmallVec<[FileHandle; 2]>,
size: usize,
num_rows: usize,
min_timestamp: Timestamp,
max_timestamp: Timestamp,
}
fn range(&self) -> (Self::BoundType, Self::BoundType) {
self.time_range()
impl FileGroup {
pub(crate) fn new_with_file(file: FileHandle) -> Self {
let size = file.size() as usize;
let (min_timestamp, max_timestamp) = file.time_range();
let num_rows = file.num_rows();
Self {
files: smallvec![file],
size,
num_rows,
min_timestamp,
max_timestamp,
}
}
pub(crate) fn num_rows(&self) -> usize {
self.num_rows
}
pub(crate) fn add_file(&mut self, file: FileHandle) {
self.size += file.size() as usize;
self.num_rows += file.num_rows();
let (min_timestamp, max_timestamp) = file.time_range();
self.min_timestamp = self.min_timestamp.min(min_timestamp);
self.max_timestamp = self.max_timestamp.max(max_timestamp);
self.files.push(file);
}
#[cfg(test)]
pub(crate) fn files(&self) -> &[FileHandle] {
&self.files[..]
}
pub(crate) fn file_ids(&self) -> SmallVec<[FileId; 2]> {
SmallVec::from_iter(self.files.iter().map(|f| f.file_id()))
}
pub(crate) fn into_files(self) -> impl Iterator<Item = FileHandle> {
self.files.into_iter()
}
}
impl Item for FileHandle {
impl Ranged for FileGroup {
type BoundType = Timestamp;
fn range(&self) -> (Self::BoundType, Self::BoundType) {
(self.min_timestamp, self.max_timestamp)
}
}
impl Item for FileGroup {
fn size(&self) -> usize {
self.size() as usize
self.size
}
}

View File

@@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::num::NonZeroU64;
use common_time::Timestamp;
use crate::sst::file::{FileHandle, FileId, FileMeta, Level};
@@ -23,6 +25,23 @@ pub fn new_file_handle(
start_ts_millis: i64,
end_ts_millis: i64,
level: Level,
) -> FileHandle {
new_file_handle_with_sequence(
file_id,
start_ts_millis,
end_ts_millis,
level,
start_ts_millis as u64,
)
}
/// Test util to create file handles.
pub fn new_file_handle_with_sequence(
file_id: FileId,
start_ts_millis: i64,
end_ts_millis: i64,
level: Level,
sequence: u64,
) -> FileHandle {
let file_purger = new_noop_file_purger();
FileHandle::new(
@@ -39,7 +58,7 @@ pub fn new_file_handle(
index_file_size: 0,
num_rows: 0,
num_row_groups: 0,
sequence: None,
sequence: NonZeroU64::new(sequence),
},
file_purger,
)

View File

@@ -15,6 +15,7 @@
use std::collections::hash_map::Entry;
use std::collections::{BTreeMap, HashMap};
use std::fmt::Debug;
use std::num::NonZeroU64;
use common_base::readable_size::ReadableSize;
use common_telemetry::info;
@@ -26,7 +27,9 @@ use store_api::storage::RegionId;
use crate::compaction::buckets::infer_time_bucket;
use crate::compaction::compactor::CompactionRegion;
use crate::compaction::picker::{Picker, PickerOutput};
use crate::compaction::run::{find_sorted_runs, merge_seq_files, reduce_runs};
use crate::compaction::run::{
find_sorted_runs, merge_seq_files, reduce_runs, FileGroup, Item, Ranged,
};
use crate::compaction::{get_expired_ssts, CompactionOutput};
use crate::sst::file::{overlaps, FileHandle, Level};
use crate::sst::version::LevelMeta;
@@ -60,7 +63,8 @@ impl TwcsPicker {
if files.files.is_empty() {
continue;
}
let sorted_runs = find_sorted_runs(&mut files.files);
let mut files_to_merge: Vec<_> = files.files().cloned().collect();
let sorted_runs = find_sorted_runs(&mut files_to_merge);
let found_runs = sorted_runs.len();
// We only remove deletion markers if we found less than 2 runs and not in append mode.
// because after compaction there will be no overlapping files.
@@ -90,7 +94,7 @@ impl TwcsPicker {
);
output.push(CompactionOutput {
output_level: LEVEL_COMPACTED, // always compact to l1
inputs,
inputs: inputs.into_iter().flat_map(|fg| fg.into_files()).collect(),
filter_deleted,
output_time_range: None, // we do not enforce output time range in twcs compactions.
});
@@ -109,21 +113,21 @@ fn log_pick_result(
file_num: usize,
max_output_file_size: Option<u64>,
filter_deleted: bool,
inputs: &[FileHandle],
inputs: &[FileGroup],
) {
let input_file_str: Vec<String> = inputs
.iter()
.map(|f| {
let range = f.time_range();
let range = f.range();
let start = range.0.to_iso8601_string();
let end = range.1.to_iso8601_string();
let num_rows = f.num_rows();
format!(
"SST{{id: {}, range: ({}, {}), size: {}, num rows: {} }}",
f.file_id(),
"FileGroup{{id: {:?}, range: ({}, {}), size: {}, num rows: {} }}",
f.file_ids(),
start,
end,
ReadableSize(f.size()),
ReadableSize(f.size() as u64),
num_rows
)
})
@@ -198,7 +202,9 @@ impl Picker for TwcsPicker {
struct Window {
start: Timestamp,
end: Timestamp,
files: Vec<FileHandle>,
// Mapping from file sequence to file groups. Files with the same sequence is considered
// created from the same compaction task.
files: HashMap<Option<NonZeroU64>, FileGroup>,
time_window: i64,
overlapping: bool,
}
@@ -207,10 +213,11 @@ impl Window {
/// Creates a new [Window] with given file.
fn new_with_file(file: FileHandle) -> Self {
let (start, end) = file.time_range();
let files = HashMap::from([(file.meta_ref().sequence, FileGroup::new_with_file(file))]);
Self {
start,
end,
files: vec![file],
files,
time_window: 0,
overlapping: false,
}
@@ -226,7 +233,19 @@ impl Window {
let (start, end) = file.time_range();
self.start = self.start.min(start);
self.end = self.end.max(end);
self.files.push(file);
match self.files.entry(file.meta_ref().sequence) {
Entry::Occupied(mut o) => {
o.get_mut().add_file(file);
}
Entry::Vacant(v) => {
v.insert(FileGroup::new_with_file(file));
}
}
}
fn files(&self) -> impl Iterator<Item = &FileGroup> {
self.files.values()
}
}
@@ -311,7 +330,7 @@ mod tests {
use std::collections::HashSet;
use super::*;
use crate::compaction::test_util::new_file_handle;
use crate::compaction::test_util::{new_file_handle, new_file_handle_with_sequence};
use crate::sst::file::{FileId, Level};
#[test]
@@ -371,7 +390,9 @@ mod tests {
.iter(),
3,
);
assert_eq!(5, windows.get(&0).unwrap().files.len());
let fgs = &windows.get(&0).unwrap().files;
assert_eq!(1, fgs.len());
assert_eq!(fgs.values().map(|f| f.files().len()).sum::<usize>(), 5);
let files = [FileId::random(); 3];
let windows = assign_to_windows(
@@ -385,15 +406,56 @@ mod tests {
);
assert_eq!(
files[0],
windows.get(&0).unwrap().files.first().unwrap().file_id()
windows.get(&0).unwrap().files().next().unwrap().files()[0].file_id()
);
assert_eq!(
files[1],
windows.get(&3).unwrap().files.first().unwrap().file_id()
windows.get(&3).unwrap().files().next().unwrap().files()[0].file_id()
);
assert_eq!(
files[2],
windows.get(&12).unwrap().files.first().unwrap().file_id()
windows.get(&12).unwrap().files().next().unwrap().files()[0].file_id()
);
}
#[test]
fn test_assign_file_groups_to_windows() {
let files = [
FileId::random(),
FileId::random(),
FileId::random(),
FileId::random(),
];
let windows = assign_to_windows(
[
new_file_handle_with_sequence(files[0], 0, 999, 0, 1),
new_file_handle_with_sequence(files[1], 0, 999, 0, 1),
new_file_handle_with_sequence(files[2], 0, 999, 0, 2),
new_file_handle_with_sequence(files[3], 0, 999, 0, 2),
]
.iter(),
3,
);
assert_eq!(windows.len(), 1);
let fgs = &windows.get(&0).unwrap().files;
assert_eq!(2, fgs.len());
assert_eq!(
fgs.get(&NonZeroU64::new(1))
.unwrap()
.files()
.iter()
.map(|f| f.file_id())
.collect::<HashSet<_>>(),
[files[0], files[1]].into_iter().collect()
);
assert_eq!(
fgs.get(&NonZeroU64::new(2))
.unwrap()
.files()
.iter()
.map(|f| f.file_id())
.collect::<HashSet<_>>(),
[files[2], files[3]].into_iter().collect()
);
}
@@ -408,8 +470,22 @@ mod tests {
];
files[0].set_compacting(true);
files[2].set_compacting(true);
let windows = assign_to_windows(files.iter(), 3);
assert_eq!(3, windows.get(&0).unwrap().files.len());
let mut windows = assign_to_windows(files.iter(), 3);
let window0 = windows.remove(&0).unwrap();
assert_eq!(1, window0.files.len());
let candidates = window0
.files
.into_values()
.flat_map(|fg| fg.into_files())
.map(|f| f.file_id())
.collect::<HashSet<_>>();
assert_eq!(candidates.len(), 3);
assert_eq!(
candidates,
[files[1].file_id(), files[3].file_id(), files[4].file_id()]
.into_iter()
.collect::<HashSet<_>>()
);
}
/// (Window value, overlapping, files' time ranges in window)
@@ -438,9 +514,11 @@ mod tests {
let mut file_ranges = actual_window
.files
.iter()
.map(|f| {
let (s, e) = f.time_range();
(s.value(), e.value())
.flat_map(|(_, f)| {
f.files().iter().map(|f| {
let (s, e) = f.time_range();
(s.value(), e.value())
})
})
.collect::<Vec<_>>();
file_ranges.sort_unstable_by(|l, r| l.0.cmp(&r.0).then(l.1.cmp(&r.1)));
@@ -607,10 +685,10 @@ mod tests {
CompactionPickerTestCase {
window_size: 3,
input_files: [
new_file_handle(file_ids[0], -2000, -3, 0),
new_file_handle(file_ids[1], -3000, -100, 0),
new_file_handle(file_ids[2], 0, 2999, 0), //active windows
new_file_handle(file_ids[3], 50, 2998, 0), //active windows
new_file_handle_with_sequence(file_ids[0], -2000, -3, 0, 1),
new_file_handle_with_sequence(file_ids[1], -3000, -100, 0, 2),
new_file_handle_with_sequence(file_ids[2], 0, 2999, 0, 3), //active windows
new_file_handle_with_sequence(file_ids[3], 50, 2998, 0, 4), //active windows
]
.to_vec(),
expected_outputs: vec![
@@ -636,11 +714,11 @@ mod tests {
CompactionPickerTestCase {
window_size: 3,
input_files: [
new_file_handle(file_ids[0], -2000, -3, 0),
new_file_handle(file_ids[1], -3000, -100, 0),
new_file_handle(file_ids[2], 0, 2999, 0),
new_file_handle(file_ids[3], 50, 2998, 0),
new_file_handle(file_ids[4], 11, 2990, 0),
new_file_handle_with_sequence(file_ids[0], -2000, -3, 0, 1),
new_file_handle_with_sequence(file_ids[1], -3000, -100, 0, 2),
new_file_handle_with_sequence(file_ids[2], 0, 2999, 0, 3),
new_file_handle_with_sequence(file_ids[3], 50, 2998, 0, 4),
new_file_handle_with_sequence(file_ids[4], 11, 2990, 0, 5),
]
.to_vec(),
expected_outputs: vec![
@@ -655,6 +733,27 @@ mod tests {
],
}
.check();
// Case 3:
// A compaction may split output into several files that have overlapping time ranges and same sequence,
// we should treat these files as one FileGroup.
let file_ids = (0..6).map(|_| FileId::random()).collect::<Vec<_>>();
CompactionPickerTestCase {
window_size: 3,
input_files: [
new_file_handle_with_sequence(file_ids[0], 0, 2999, 1, 1),
new_file_handle_with_sequence(file_ids[1], 0, 2998, 1, 1),
new_file_handle_with_sequence(file_ids[2], 3000, 5999, 1, 2),
new_file_handle_with_sequence(file_ids[3], 3000, 5000, 1, 2),
new_file_handle_with_sequence(file_ids[4], 11, 2990, 0, 3),
]
.to_vec(),
expected_outputs: vec![ExpectedOutput {
input_files: vec![0, 1, 4],
output_level: 1,
}],
}
.check();
}
// TODO(hl): TTL tester that checks if get_expired_ssts function works as expected.

View File

@@ -42,6 +42,13 @@ use crate::worker::WorkerId;
#[snafu(visibility(pub))]
#[stack_trace_debug]
pub enum Error {
#[snafu(display("Unexpected data type"))]
DataTypeMismatch {
source: datatypes::error::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("External error, context: {}", context))]
External {
source: BoxedError,
@@ -291,35 +298,6 @@ pub enum Error {
#[snafu(display("Failed to write region"))]
WriteGroup { source: Arc<Error> },
#[snafu(display("Row value mismatches field data type"))]
FieldTypeMismatch { source: datatypes::error::Error },
#[snafu(display("Failed to serialize field"))]
SerializeField {
#[snafu(source)]
error: memcomparable::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display(
"Data type: {} does not support serialization/deserialization",
data_type,
))]
NotSupportedField {
data_type: ConcreteDataType,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to deserialize field"))]
DeserializeField {
#[snafu(source)]
error: memcomparable::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Invalid parquet SST file {}, reason: {}", file, reason))]
InvalidParquet {
file: String,
@@ -1028,6 +1006,20 @@ pub enum Error {
location: Location,
source: common_grpc::Error,
},
#[snafu(display("Failed to encode"))]
Encode {
#[snafu(implicit)]
location: Location,
source: mito_codec::error::Error,
},
#[snafu(display("Failed to decode"))]
Decode {
#[snafu(implicit)]
location: Location,
source: mito_codec::error::Error,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@@ -1052,6 +1044,7 @@ impl ErrorExt for Error {
use Error::*;
match self {
DataTypeMismatch { source, .. } => source.status_code(),
OpenDal { .. } | ReadParquet { .. } => StatusCode::StorageUnavailable,
WriteWal { source, .. } | ReadWal { source, .. } | DeleteWal { source, .. } => {
source.status_code()
@@ -1095,7 +1088,6 @@ impl ErrorExt for Error {
| BiErrors { .. }
| StopScheduler { .. }
| ComputeVector { .. }
| SerializeField { .. }
| EncodeMemtable { .. }
| CreateDir { .. }
| ReadDataPart { .. }
@@ -1107,9 +1099,7 @@ impl ErrorExt for Error {
WriteParquet { .. } => StatusCode::StorageUnavailable,
WriteGroup { source, .. } => source.status_code(),
FieldTypeMismatch { source, .. } => source.status_code(),
NotSupportedField { .. } => StatusCode::Unsupported,
DeserializeField { .. } | EncodeSparsePrimaryKey { .. } => StatusCode::Unexpected,
EncodeSparsePrimaryKey { .. } => StatusCode::Unexpected,
InvalidBatch { .. } => StatusCode::InvalidArguments,
InvalidRecordBatch { .. } => StatusCode::InvalidArguments,
ConvertVector { source, .. } => source.status_code(),
@@ -1181,7 +1171,9 @@ impl ErrorExt for Error {
ScanSeries { source, .. } => source.status_code(),
ScanMultiTimes { .. } => StatusCode::InvalidArguments,
Error::ConvertBulkWalEntry { source, .. } => source.status_code(),
ConvertBulkWalEntry { source, .. } => source.status_code(),
Encode { source, .. } | Decode { source, .. } => source.status_code(),
}
}

View File

@@ -41,7 +41,6 @@ pub mod read;
pub mod region;
mod region_write_ctx;
pub mod request;
pub mod row_converter;
pub mod schedule;
pub mod sst;
mod time_provider;

View File

@@ -21,6 +21,8 @@ use std::sync::Arc;
pub use bulk::part::EncodedBulkPart;
use common_time::Timestamp;
use mito_codec::key_values::KeyValue;
pub use mito_codec::key_values::KeyValues;
use serde::{Deserialize, Serialize};
use store_api::metadata::RegionMetadataRef;
use store_api::storage::{ColumnId, SequenceNumber};
@@ -29,8 +31,6 @@ use table::predicate::Predicate;
use crate::config::MitoConfig;
use crate::error::Result;
use crate::flush::WriteBufferManagerRef;
use crate::memtable::key_values::KeyValue;
pub use crate::memtable::key_values::KeyValues;
use crate::memtable::partition_tree::{PartitionTreeConfig, PartitionTreeMemtableBuilder};
use crate::memtable::time_series::TimeSeriesMemtableBuilder;
use crate::metrics::WRITE_BUFFER_BYTES;
@@ -42,7 +42,6 @@ use crate::sst::file::FileTimeRange;
mod builder;
pub mod bulk;
pub mod key_values;
pub mod partition_tree;
mod simple_bulk_memtable;
mod stats;

View File

@@ -16,13 +16,13 @@
use std::sync::{Arc, RwLock};
use mito_codec::key_values::KeyValue;
use store_api::metadata::RegionMetadataRef;
use store_api::storage::{ColumnId, SequenceNumber};
use table::predicate::Predicate;
use crate::error::Result;
use crate::memtable::bulk::part::{BulkPart, EncodedBulkPart};
use crate::memtable::key_values::KeyValue;
use crate::memtable::{
BoxedBatchIterator, KeyValues, Memtable, MemtableId, MemtableRanges, MemtableRef,
MemtableStats, PredicateGroup,

View File

@@ -17,12 +17,12 @@
use std::collections::VecDeque;
use std::sync::Arc;
use mito_codec::row_converter::{build_primary_key_codec, DensePrimaryKeyCodec};
use parquet::file::metadata::ParquetMetaData;
use store_api::metadata::RegionMetadataRef;
use store_api::storage::ColumnId;
use table::predicate::Predicate;
use crate::row_converter::{build_primary_key_codec, DensePrimaryKeyCodec};
use crate::sst::parquet::file_range::RangeBase;
use crate::sst::parquet::format::ReadFormat;
use crate::sst::parquet::reader::SimpleFilterContext;

View File

@@ -38,6 +38,8 @@ use datatypes::data_type::DataType;
use datatypes::prelude::{MutableVector, ScalarVectorBuilder, Vector};
use datatypes::value::Value;
use datatypes::vectors::Helper;
use mito_codec::key_values::{KeyValue, KeyValuesRef};
use mito_codec::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodec, PrimaryKeyCodecExt};
use parquet::arrow::ArrowWriter;
use parquet::data_type::AsBytes;
use parquet::file::metadata::ParquetMetaData;
@@ -47,13 +49,12 @@ use store_api::metadata::RegionMetadataRef;
use store_api::storage::SequenceNumber;
use table::predicate::Predicate;
use crate::error;
use crate::error::{ComputeArrowSnafu, EncodeMemtableSnafu, NewRecordBatchSnafu, Result};
use crate::error::{
self, ComputeArrowSnafu, EncodeMemtableSnafu, EncodeSnafu, NewRecordBatchSnafu, Result,
};
use crate::memtable::bulk::context::BulkIterContextRef;
use crate::memtable::bulk::part_reader::BulkPartIter;
use crate::memtable::key_values::{KeyValue, KeyValuesRef};
use crate::memtable::BoxedBatchIterator;
use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodec, PrimaryKeyCodecExt};
use crate::sst::parquet::format::{PrimaryKeyArray, ReadFormat};
use crate::sst::parquet::helper::parse_parquet_metadata;
use crate::sst::to_sst_arrow_schema;
@@ -354,7 +355,9 @@ fn mutations_to_record_batch(
for row in key_values.iter() {
pk_buffer.clear();
pk_encoder.encode_to_vec(row.primary_keys(), &mut pk_buffer)?;
pk_encoder
.encode_to_vec(row.primary_keys(), &mut pk_buffer)
.context(EncodeSnafu)?;
pk_builder.append_value(pk_buffer.as_bytes());
ts_vector.push_value_ref(row.timestamp());
sequence_builder.append_value(row.sequence());

View File

@@ -19,7 +19,6 @@ mod dedup;
mod dict;
mod merger;
mod partition;
mod primary_key_filter;
mod shard;
mod shard_builder;
mod tree;
@@ -29,7 +28,8 @@ use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering};
use std::sync::Arc;
use common_base::readable_size::ReadableSize;
pub(crate) use primary_key_filter::{DensePrimaryKeyFilter, SparsePrimaryKeyFilter};
use mito_codec::key_values::KeyValue;
use mito_codec::row_converter::{build_primary_key_codec, PrimaryKeyCodec};
use serde::{Deserialize, Serialize};
use store_api::metadata::RegionMetadataRef;
use store_api::storage::{ColumnId, SequenceNumber};
@@ -38,7 +38,6 @@ use table::predicate::Predicate;
use crate::error::{Result, UnsupportedOperationSnafu};
use crate::flush::WriteBufferManagerRef;
use crate::memtable::bulk::part::BulkPart;
use crate::memtable::key_values::KeyValue;
use crate::memtable::partition_tree::tree::PartitionTree;
use crate::memtable::stats::WriteMetrics;
use crate::memtable::{
@@ -47,7 +46,6 @@ use crate::memtable::{
PredicateGroup,
};
use crate::region::options::MergeMode;
use crate::row_converter::{build_primary_key_codec, PrimaryKeyCodec};
/// Use `1/DICTIONARY_SIZE_FACTOR` of OS memory as dictionary size.
pub(crate) const DICTIONARY_SIZE_FACTOR: u64 = 8;
@@ -368,11 +366,11 @@ mod tests {
use datatypes::schema::ColumnSchema;
use datatypes::value::Value;
use datatypes::vectors::Int64Vector;
use mito_codec::row_converter::DensePrimaryKeyCodec;
use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
use store_api::storage::RegionId;
use super::*;
use crate::row_converter::DensePrimaryKeyCodec;
use crate::test_util::memtable_util::{
self, collect_iter_timestamps, region_metadata_to_row_schema,
};

View File

@@ -33,6 +33,7 @@ use datatypes::vectors::{
TimestampSecondVector, UInt16Vector, UInt16VectorBuilder, UInt64Vector, UInt64VectorBuilder,
UInt8Vector, UInt8VectorBuilder,
};
use mito_codec::key_values::KeyValue;
use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
use parquet::arrow::ArrowWriter;
use parquet::basic::{Compression, Encoding, ZstdLevel};
@@ -44,7 +45,6 @@ use store_api::storage::consts::{OP_TYPE_COLUMN_NAME, SEQUENCE_COLUMN_NAME};
use crate::error;
use crate::error::Result;
use crate::memtable::key_values::KeyValue;
use crate::memtable::partition_tree::merger::{DataBatchKey, DataNode, DataSource, Merger};
use crate::memtable::partition_tree::PkIndex;
use crate::metrics::{

View File

@@ -22,13 +22,16 @@ use std::time::{Duration, Instant};
use api::v1::SemanticType;
use common_recordbatch::filter::SimpleFilterEvaluator;
use mito_codec::key_values::KeyValue;
use mito_codec::primary_key_filter::is_partition_column;
use mito_codec::row_converter::{PrimaryKeyCodec, PrimaryKeyFilter};
use snafu::ResultExt;
use store_api::codec::PrimaryKeyEncoding;
use store_api::metadata::RegionMetadataRef;
use store_api::metric_engine_consts::DATA_SCHEMA_TABLE_ID_COLUMN_NAME;
use store_api::storage::ColumnId;
use crate::error::Result;
use crate::memtable::key_values::KeyValue;
use crate::error::{EncodeSnafu, Result};
use crate::memtable::partition_tree::data::{DataBatch, DataParts, DATA_INIT_CAP};
use crate::memtable::partition_tree::dedup::DedupReader;
use crate::memtable::partition_tree::shard::{
@@ -39,7 +42,6 @@ use crate::memtable::partition_tree::{PartitionTreeConfig, PkId};
use crate::memtable::stats::WriteMetrics;
use crate::metrics::PARTITION_TREE_READ_STAGE_ELAPSED;
use crate::read::{Batch, BatchBuilder};
use crate::row_converter::{PrimaryKeyCodec, PrimaryKeyFilter};
/// Key of a partition.
pub type PartitionKey = u32;
@@ -91,7 +93,9 @@ impl Partition {
// `primary_key` is sparse, re-encode the full primary key.
let sparse_key = primary_key.clone();
primary_key.clear();
row_codec.encode_key_value(&key_value, primary_key)?;
row_codec
.encode_key_value(&key_value, primary_key)
.context(EncodeSnafu)?;
let pk_id = inner.shard_builder.write_with_key(
primary_key,
Some(&sparse_key),
@@ -304,11 +308,6 @@ impl Partition {
.map(|meta| meta.column_schema.name == DATA_SCHEMA_TABLE_ID_COLUMN_NAME)
.unwrap_or(false)
}
/// Returns true if this is a partition column.
pub(crate) fn is_partition_column(name: &str) -> bool {
name == DATA_SCHEMA_TABLE_ID_COLUMN_NAME
}
}
pub(crate) struct PartitionStats {
@@ -446,7 +445,7 @@ impl ReadPartitionContext {
fn need_prune_key(metadata: &RegionMetadataRef, filters: &[SimpleFilterEvaluator]) -> bool {
for filter in filters {
// We already pruned partitions before so we skip the partition column.
if Partition::is_partition_column(filter.column_name()) {
if is_partition_column(filter.column_name()) {
continue;
}
let Some(column) = metadata.column_by_name(filter.column_name()) else {

View File

@@ -17,10 +17,11 @@
use std::cmp::Ordering;
use std::time::{Duration, Instant};
use mito_codec::key_values::KeyValue;
use mito_codec::row_converter::PrimaryKeyFilter;
use store_api::metadata::RegionMetadataRef;
use crate::error::Result;
use crate::memtable::key_values::KeyValue;
use crate::memtable::partition_tree::data::{
DataBatch, DataParts, DataPartsReader, DataPartsReaderBuilder, DATA_INIT_CAP,
};
@@ -29,7 +30,6 @@ use crate::memtable::partition_tree::merger::{Merger, Node};
use crate::memtable::partition_tree::shard_builder::ShardBuilderReader;
use crate::memtable::partition_tree::{PkId, PkIndex, ShardId};
use crate::metrics::PARTITION_TREE_READ_STAGE_ELAPSED;
use crate::row_converter::PrimaryKeyFilter;
/// Shard stores data related to the same key dictionary.
pub struct Shard {

View File

@@ -18,10 +18,11 @@ use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
use mito_codec::key_values::KeyValue;
use mito_codec::row_converter::PrimaryKeyFilter;
use store_api::metadata::RegionMetadataRef;
use crate::error::Result;
use crate::memtable::key_values::KeyValue;
use crate::memtable::partition_tree::data::{
DataBatch, DataBuffer, DataBufferReader, DataBufferReaderBuilder, DataParts, DATA_INIT_CAP,
};
@@ -30,7 +31,6 @@ use crate::memtable::partition_tree::shard::Shard;
use crate::memtable::partition_tree::{PartitionTreeConfig, PkId, PkIndex, ShardId};
use crate::memtable::stats::WriteMetrics;
use crate::metrics::PARTITION_TREE_READ_STAGE_ELAPSED;
use crate::row_converter::PrimaryKeyFilter;
/// Builder to write keys and data to a shard that the key dictionary
/// is still active.

View File

@@ -23,8 +23,10 @@ use common_recordbatch::filter::SimpleFilterEvaluator;
use common_time::Timestamp;
use datafusion_common::ScalarValue;
use datatypes::prelude::ValueRef;
use memcomparable::Serializer;
use serde::Serialize;
use mito_codec::key_values::KeyValue;
use mito_codec::primary_key_filter::is_partition_column;
use mito_codec::row_converter::sparse::{FieldWithId, SparseEncoder};
use mito_codec::row_converter::{PrimaryKeyCodec, SortField};
use snafu::{ensure, ResultExt};
use store_api::codec::PrimaryKeyEncoding;
use store_api::metadata::RegionMetadataRef;
@@ -32,10 +34,9 @@ use store_api::storage::{ColumnId, SequenceNumber};
use table::predicate::Predicate;
use crate::error::{
EncodeSparsePrimaryKeySnafu, PrimaryKeyLengthMismatchSnafu, Result, SerializeFieldSnafu,
EncodeSnafu, EncodeSparsePrimaryKeySnafu, PrimaryKeyLengthMismatchSnafu, Result,
};
use crate::flush::WriteBufferManagerRef;
use crate::memtable::key_values::KeyValue;
use crate::memtable::partition_tree::partition::{
Partition, PartitionKey, PartitionReader, PartitionRef, ReadPartitionContext,
};
@@ -46,7 +47,6 @@ use crate::metrics::{PARTITION_TREE_READ_STAGE_ELAPSED, READ_ROWS_TOTAL, READ_ST
use crate::read::dedup::LastNonNullIter;
use crate::read::Batch;
use crate::region::options::MergeMode;
use crate::row_converter::{PrimaryKeyCodec, SortField};
/// The partition tree.
pub struct PartitionTree {
@@ -73,15 +73,15 @@ impl PartitionTree {
config: &PartitionTreeConfig,
write_buffer_manager: Option<WriteBufferManagerRef>,
) -> Self {
let sparse_encoder = SparseEncoder {
fields: metadata
let sparse_encoder = SparseEncoder::new(
metadata
.primary_key_columns()
.map(|c| FieldWithId {
field: SortField::new(c.column_schema.data_type.clone()),
column_id: c.column_id,
})
.collect(),
};
);
let is_partitioned = Partition::has_multi_partitions(&metadata);
let mut config = config.clone();
if config.merge_mode == MergeMode::LastNonNull {
@@ -129,7 +129,8 @@ impl PartitionTree {
} else {
// For compatibility, use the sparse encoder for dense primary key.
self.sparse_encoder
.encode_to_vec(kv.primary_keys(), buffer)?;
.encode_to_vec(kv.primary_keys(), buffer)
.context(EncodeSnafu)?;
}
Ok(())
}
@@ -166,7 +167,9 @@ impl PartitionTree {
if self.is_partitioned {
self.encode_sparse_primary_key(&kv, pk_buffer)?;
} else {
self.row_codec.encode_key_value(&kv, pk_buffer)?;
self.row_codec
.encode_key_value(&kv, pk_buffer)
.context(EncodeSnafu)?;
}
// Write rows with
@@ -208,7 +211,9 @@ impl PartitionTree {
if self.is_partitioned {
self.encode_sparse_primary_key(&kv, pk_buffer)?;
} else {
self.row_codec.encode_key_value(&kv, pk_buffer)?;
self.row_codec
.encode_key_value(&kv, pk_buffer)
.context(EncodeSnafu)?;
}
// Write rows with
@@ -415,7 +420,7 @@ impl PartitionTree {
for (key, partition) in partitions.iter() {
let mut is_needed = true;
for filter in filters {
if !Partition::is_partition_column(filter.column_name()) {
if !is_partition_column(filter.column_name()) {
continue;
}
@@ -436,34 +441,6 @@ impl PartitionTree {
}
}
struct FieldWithId {
field: SortField,
column_id: ColumnId,
}
struct SparseEncoder {
fields: Vec<FieldWithId>,
}
impl SparseEncoder {
fn encode_to_vec<'a, I>(&self, row: I, buffer: &mut Vec<u8>) -> Result<()>
where
I: Iterator<Item = ValueRef<'a>>,
{
let mut serializer = Serializer::new(buffer);
for (value, field) in row.zip(self.fields.iter()) {
if !value.is_null() {
field
.column_id
.serialize(&mut serializer)
.context(SerializeFieldSnafu)?;
field.field.serialize(&mut serializer, &value)?;
}
}
Ok(())
}
}
#[derive(Default)]
struct TreeIterMetrics {
iter_elapsed: Duration,

View File

@@ -19,6 +19,7 @@ use std::sync::{Arc, RwLock};
use api::v1::OpType;
use datatypes::vectors::Helper;
use mito_codec::key_values::KeyValue;
use snafu::{OptionExt, ResultExt};
use store_api::metadata::RegionMetadataRef;
use store_api::storage::{ColumnId, SequenceNumber};
@@ -26,7 +27,6 @@ use table::predicate::Predicate;
use crate::flush::WriteBufferManagerRef;
use crate::memtable::bulk::part::BulkPart;
use crate::memtable::key_values::KeyValue;
use crate::memtable::stats::WriteMetrics;
use crate::memtable::time_series::{Series, Values};
use crate::memtable::{

View File

@@ -29,6 +29,7 @@ use datatypes::arrow::array::{
};
use datatypes::arrow::buffer::{BooleanBuffer, MutableBuffer};
use datatypes::arrow::datatypes::{DataType, Int64Type};
use mito_codec::key_values::KeyValue;
use smallvec::{smallvec, SmallVec};
use snafu::{OptionExt, ResultExt};
use store_api::metadata::RegionMetadataRef;
@@ -36,7 +37,6 @@ use store_api::metadata::RegionMetadataRef;
use crate::error;
use crate::error::{InvalidRequestSnafu, Result};
use crate::memtable::bulk::part::BulkPart;
use crate::memtable::key_values::KeyValue;
use crate::memtable::version::SmallMemtableVec;
use crate::memtable::{KeyValues, MemtableBuilderRef, MemtableId, MemtableRef};

View File

@@ -35,17 +35,19 @@ use datatypes::vectors::{
Helper, TimestampMicrosecondVector, TimestampMillisecondVector, TimestampNanosecondVector,
TimestampSecondVector, UInt64Vector, UInt8Vector,
};
use mito_codec::key_values::KeyValue;
use mito_codec::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt};
use snafu::{ensure, OptionExt, ResultExt};
use store_api::metadata::RegionMetadataRef;
use store_api::storage::{ColumnId, SequenceNumber};
use table::predicate::Predicate;
use crate::error;
use crate::error::{ComputeArrowSnafu, ConvertVectorSnafu, PrimaryKeyLengthMismatchSnafu, Result};
use crate::error::{
self, ComputeArrowSnafu, ConvertVectorSnafu, EncodeSnafu, PrimaryKeyLengthMismatchSnafu, Result,
};
use crate::flush::WriteBufferManagerRef;
use crate::memtable::builder::{FieldBuilder, StringBuilder};
use crate::memtable::bulk::part::BulkPart;
use crate::memtable::key_values::KeyValue;
use crate::memtable::simple_bulk_memtable::SimpleBulkMemtable;
use crate::memtable::stats::WriteMetrics;
use crate::memtable::{
@@ -57,7 +59,6 @@ use crate::metrics::{READ_ROWS_TOTAL, READ_STAGE_ELAPSED};
use crate::read::dedup::LastNonNullIter;
use crate::read::{Batch, BatchBuilder, BatchColumn};
use crate::region::options::MergeMode;
use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt};
/// Initial vector builder capacity.
const INITIAL_BUILDER_CAPACITY: usize = 4;
@@ -176,7 +177,10 @@ impl TimeSeriesMemtable {
}
);
let primary_key_encoded = self.row_codec.encode(kv.primary_keys())?;
let primary_key_encoded = self
.row_codec
.encode(kv.primary_keys())
.context(EncodeSnafu)?;
let (key_allocated, value_allocated) =
self.series_set.push_to_series(primary_key_encoded, &kv);
@@ -1107,11 +1111,11 @@ mod tests {
use datatypes::schema::ColumnSchema;
use datatypes::value::{OrderedFloat, Value};
use datatypes::vectors::{Float64Vector, Int64Vector, TimestampMillisecondVector};
use mito_codec::row_converter::SortField;
use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
use store_api::storage::RegionId;
use super::*;
use crate::row_converter::SortField;
use crate::test_util::column_metadata_to_column_schema;
fn schema_for_test() -> RegionMetadataRef {

View File

@@ -50,16 +50,17 @@ use datatypes::vectors::{
};
use futures::stream::BoxStream;
use futures::TryStreamExt;
use mito_codec::row_converter::{CompositeValues, PrimaryKeyCodec};
use snafu::{ensure, OptionExt, ResultExt};
use store_api::metadata::RegionMetadata;
use store_api::storage::{ColumnId, SequenceNumber};
use crate::error::{
ComputeArrowSnafu, ComputeVectorSnafu, ConvertVectorSnafu, InvalidBatchSnafu, Result,
ComputeArrowSnafu, ComputeVectorSnafu, ConvertVectorSnafu, DecodeSnafu, InvalidBatchSnafu,
Result,
};
use crate::memtable::BoxedBatchIterator;
use crate::read::prune::PruneReader;
use crate::row_converter::{CompositeValues, PrimaryKeyCodec};
/// Storage internal representation of a batch of rows for a primary key (time series).
///
@@ -612,7 +613,7 @@ impl Batch {
column_id: ColumnId,
) -> Result<Option<&Value>> {
if self.pk_values.is_none() {
self.pk_values = Some(codec.decode(&self.primary_key)?);
self.pk_values = Some(codec.decode(&self.primary_key).context(DecodeSnafu)?);
}
let pk_values = self.pk_values.as_ref().unwrap();
@@ -1026,12 +1027,12 @@ pub(crate) struct ScannerMetrics {
#[cfg(test)]
mod tests {
use mito_codec::row_converter::{self, build_primary_key_codec_with_fields};
use store_api::codec::PrimaryKeyEncoding;
use store_api::storage::consts::ReservedColumnId;
use super::*;
use crate::error::Error;
use crate::row_converter::{self, build_primary_key_codec_with_fields};
use crate::test_util::new_batch_builder;
fn new_batch(

View File

@@ -20,17 +20,17 @@ use std::sync::Arc;
use datatypes::data_type::ConcreteDataType;
use datatypes::value::Value;
use datatypes::vectors::VectorRef;
use mito_codec::row_converter::{
build_primary_key_codec, build_primary_key_codec_with_fields, CompositeValues, PrimaryKeyCodec,
SortField,
};
use snafu::{ensure, OptionExt, ResultExt};
use store_api::metadata::{RegionMetadata, RegionMetadataRef};
use store_api::storage::ColumnId;
use crate::error::{CompatReaderSnafu, CreateDefaultSnafu, Result};
use crate::error::{CompatReaderSnafu, CreateDefaultSnafu, DecodeSnafu, EncodeSnafu, Result};
use crate::read::projection::ProjectionMapper;
use crate::read::{Batch, BatchColumn, BatchReader};
use crate::row_converter::{
build_primary_key_codec, build_primary_key_codec_with_fields, CompositeValues, PrimaryKeyCodec,
SortField,
};
/// Reader to adapt schema of underlying reader to expected schema.
pub struct CompatReader<R> {
@@ -155,7 +155,9 @@ impl CompatPrimaryKey {
batch.primary_key().len() + self.converter.estimated_size().unwrap_or_default(),
);
buffer.extend_from_slice(batch.primary_key());
self.converter.encode_values(&self.values, &mut buffer)?;
self.converter
.encode_values(&self.values, &mut buffer)
.context(EncodeSnafu)?;
batch.set_primary_key(buffer);
@@ -405,7 +407,10 @@ impl RewritePrimaryKey {
let values = if let Some(pk_values) = batch.pk_values() {
pk_values
} else {
let new_pk_values = self.original.decode(batch.primary_key())?;
let new_pk_values = self
.original
.decode(batch.primary_key())
.context(DecodeSnafu)?;
batch.set_pk_values(new_pk_values);
// Safety: We ensure pk_values is not None.
batch.pk_values().as_ref().unwrap()
@@ -416,7 +421,9 @@ impl RewritePrimaryKey {
);
match values {
CompositeValues::Dense(values) => {
self.new.encode_values(values.as_slice(), &mut buffer)?;
self.new
.encode_values(values.as_slice(), &mut buffer)
.context(EncodeSnafu)?;
}
CompositeValues::Sparse(values) => {
let values = self
@@ -427,7 +434,9 @@ impl RewritePrimaryKey {
(*id, value.as_value_ref())
})
.collect::<Vec<_>>();
self.new.encode_value_refs(&values, &mut buffer)?;
self.new
.encode_value_refs(&values, &mut buffer)
.context(EncodeSnafu)?;
}
}
batch.set_primary_key(buffer);
@@ -445,12 +454,14 @@ mod tests {
use datatypes::schema::ColumnSchema;
use datatypes::value::ValueRef;
use datatypes::vectors::{Int64Vector, TimestampMillisecondVector, UInt64Vector, UInt8Vector};
use mito_codec::row_converter::{
DensePrimaryKeyCodec, PrimaryKeyCodecExt, SparsePrimaryKeyCodec,
};
use store_api::codec::PrimaryKeyEncoding;
use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
use store_api::storage::RegionId;
use super::*;
use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt, SparsePrimaryKeyCodec};
use crate::test_util::{check_reader_result, VecBatchReader};
/// Creates a new [RegionMetadata].

View File

@@ -26,6 +26,7 @@ use datatypes::prelude::{ConcreteDataType, DataType};
use datatypes::schema::{Schema, SchemaRef};
use datatypes::value::Value;
use datatypes::vectors::VectorRef;
use mito_codec::row_converter::{build_primary_key_codec, CompositeValues, PrimaryKeyCodec};
use snafu::{OptionExt, ResultExt};
use store_api::metadata::RegionMetadataRef;
use store_api::storage::ColumnId;
@@ -33,7 +34,6 @@ use store_api::storage::ColumnId;
use crate::cache::CacheStrategy;
use crate::error::{InvalidRequestSnafu, Result};
use crate::read::Batch;
use crate::row_converter::{build_primary_key_codec, CompositeValues, PrimaryKeyCodec};
/// Only cache vector when its length `<=` this value.
const MAX_VECTOR_LENGTH_TO_CACHE: usize = 16384;
@@ -320,12 +320,12 @@ mod tests {
use datatypes::arrow::array::{Int64Array, TimestampMillisecondArray, UInt64Array, UInt8Array};
use datatypes::arrow::util::pretty;
use datatypes::value::ValueRef;
use mito_codec::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt, SortField};
use mito_codec::test_util::TestRegionMetadataBuilder;
use super::*;
use crate::cache::CacheManager;
use crate::read::BatchBuilder;
use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt, SortField};
use crate::test_util::meta_util::TestRegionMetadataBuilder;
fn new_batch(
ts_start: i64,

View File

@@ -962,19 +962,22 @@ impl StreamContext {
}
}
}
if verbose {
write!(f, "{{")?;
}
write!(
f,
"partition_count={} ({} memtable ranges, {} file {} ranges)",
"\"partition_count\":{{\"count\":{}, \"mem_ranges\":{}, \"files\":{}, \"file_ranges\":{}}}",
self.ranges.len(),
num_mem_ranges,
self.input.num_files(),
num_file_ranges,
)?;
if let Some(selector) = &self.input.series_row_selector {
write!(f, ", selector={}", selector)?;
write!(f, ", \"selector\":\"{}\"", selector)?;
}
if let Some(distribution) = &self.input.distribution {
write!(f, ", distribution={}", distribution)?;
write!(f, ", \"distribution\":\"{}\"", distribution)?;
}
if verbose {
@@ -991,14 +994,15 @@ impl StreamContext {
impl fmt::Debug for FileWrapper<'_> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let (start, end) = self.file.time_range();
write!(
f,
"[file={}, time_range=({}::{}, {}::{}), rows={}, size={}, index_size={}]",
r#"{{"file_id":"{}","time_range_start":"{}::{}","time_range_end":"{}::{}","rows":{},"size":{},"index_size":{}}}"#,
self.file.file_id(),
self.file.time_range().0.value(),
self.file.time_range().0.unit(),
self.file.time_range().1.value(),
self.file.time_range().1.unit(),
start.value(),
start.unit(),
end.value(),
end.unit(),
self.file.num_rows(),
self.file.size(),
self.file.index_size()
@@ -1014,25 +1018,22 @@ impl StreamContext {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let output_schema = self.input.mapper.output_schema();
if !output_schema.is_empty() {
write!(f, ", projection=")?;
f.debug_list()
.entries(output_schema.column_schemas().iter().map(|col| &col.name))
.finish()?;
let names: Vec<_> = output_schema
.column_schemas()
.iter()
.map(|col| &col.name)
.collect();
write!(f, ", \"projection\": {:?}", names)?;
}
if let Some(predicate) = &self.input.predicate.predicate() {
if !predicate.exprs().is_empty() {
write!(f, ", filters=[")?;
for (i, expr) in predicate.exprs().iter().enumerate() {
if i == predicate.exprs().len() - 1 {
write!(f, "{}]", expr)?;
} else {
write!(f, "{}, ", expr)?;
}
}
let exprs: Vec<_> =
predicate.exprs().iter().map(|e| e.to_string()).collect();
write!(f, ", \"filters\": {:?}", exprs)?;
}
}
if !self.input.files.is_empty() {
write!(f, ", files=")?;
write!(f, ", \"files\": ")?;
f.debug_list()
.entries(self.input.files.iter().map(|file| FileWrapper { file }))
.finish()?;

View File

@@ -142,36 +142,36 @@ impl fmt::Debug for ScanMetricsSet {
write!(
f,
"{{prepare_scan_cost={prepare_scan_cost:?}, \
build_reader_cost={build_reader_cost:?}, \
scan_cost={scan_cost:?}, \
convert_cost={convert_cost:?}, \
yield_cost={yield_cost:?}, \
total_cost={total_cost:?}, \
num_rows={num_rows}, \
num_batches={num_batches}, \
num_mem_ranges={num_mem_ranges}, \
num_file_ranges={num_file_ranges}, \
build_parts_cost={build_parts_cost:?}, \
rg_total={rg_total}, \
rg_fulltext_filtered={rg_fulltext_filtered}, \
rg_inverted_filtered={rg_inverted_filtered}, \
rg_minmax_filtered={rg_minmax_filtered}, \
rg_bloom_filtered={rg_bloom_filtered}, \
rows_before_filter={rows_before_filter}, \
rows_fulltext_filtered={rows_fulltext_filtered}, \
rows_inverted_filtered={rows_inverted_filtered}, \
rows_bloom_filtered={rows_bloom_filtered}, \
rows_precise_filtered={rows_precise_filtered}, \
num_sst_record_batches={num_sst_record_batches}, \
num_sst_batches={num_sst_batches}, \
num_sst_rows={num_sst_rows}, \
first_poll={first_poll:?}, \
num_series_send_timeout={num_series_send_timeout}, \
num_distributor_rows={num_distributor_rows}, \
num_distributor_batches={num_distributor_batches}, \
distributor_scan_cost={distributor_scan_cost:?}, \
distributor_yield_cost={distributor_yield_cost:?}}},"
"{{\"prepare_scan_cost\":\"{prepare_scan_cost:?}\", \
\"build_reader_cost\":\"{build_reader_cost:?}\", \
\"scan_cost\":\"{scan_cost:?}\", \
\"convert_cost\":\"{convert_cost:?}\", \
\"yield_cost\":\"{yield_cost:?}\", \
\"total_cost\":\"{total_cost:?}\", \
\"num_rows\":{num_rows}, \
\"num_batches\":{num_batches}, \
\"num_mem_ranges\":{num_mem_ranges}, \
\"num_file_ranges\":{num_file_ranges}, \
\"build_parts_cost\":\"{build_parts_cost:?}\", \
\"rg_total\":{rg_total}, \
\"rg_fulltext_filtered\":{rg_fulltext_filtered}, \
\"rg_inverted_filtered\":{rg_inverted_filtered}, \
\"rg_minmax_filtered\":{rg_minmax_filtered}, \
\"rg_bloom_filtered\":{rg_bloom_filtered}, \
\"rows_before_filter\":{rows_before_filter}, \
\"rows_fulltext_filtered\":{rows_fulltext_filtered}, \
\"rows_inverted_filtered\":{rows_inverted_filtered}, \
\"rows_bloom_filtered\":{rows_bloom_filtered}, \
\"rows_precise_filtered\":{rows_precise_filtered}, \
\"num_sst_record_batches\":{num_sst_record_batches}, \
\"num_sst_batches\":{num_sst_batches}, \
\"num_sst_rows\":{num_sst_rows}, \
\"first_poll\":\"{first_poll:?}\", \
\"num_series_send_timeout\":{num_series_send_timeout}, \
\"num_distributor_rows\":{num_distributor_rows}, \
\"num_distributor_batches\":{num_distributor_batches}, \
\"distributor_scan_cost\":\"{distributor_scan_cost:?}\", \
\"distributor_yield_cost\":\"{distributor_yield_cost:?}\"}}"
)
}
}
@@ -390,10 +390,11 @@ impl PartitionMetricsList {
/// Format verbose metrics for each partition for explain.
pub(crate) fn format_verbose_metrics(&self, f: &mut fmt::Formatter) -> fmt::Result {
let list = self.0.lock().unwrap();
write!(f, ", metrics_per_partition: ")?;
write!(f, ", \"metrics_per_partition\": ")?;
f.debug_list()
.entries(list.iter().filter_map(|p| p.as_ref()))
.finish()
.finish()?;
write!(f, "}}")
}
}
@@ -488,7 +489,11 @@ impl PartitionMetrics {
impl fmt::Debug for PartitionMetrics {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let metrics = self.0.metrics.lock().unwrap();
write!(f, "[partition={}, {:?}]", self.0.partition, metrics)
write!(
f,
r#"{{"partition":{}, "metrics":{:?}}}"#,
self.0.partition, metrics
)
}
}

View File

@@ -922,11 +922,12 @@ mod tests {
use api::v1::{Row, SemanticType};
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnDefaultConstraint;
use mito_codec::test_util::i64_value;
use store_api::metadata::RegionMetadataBuilder;
use super::*;
use crate::error::Error;
use crate::test_util::{i64_value, ts_ms_value};
use crate::test_util::ts_ms_value;
fn new_column_schema(
name: &str,

View File

@@ -20,13 +20,13 @@ use std::sync::Arc;
use common_recordbatch::filter::SimpleFilterEvaluator;
use datatypes::value::{Value, ValueRef};
pub use dense::{DensePrimaryKeyCodec, SortField};
use mito_codec::key_values::KeyValue;
pub use sparse::{SparsePrimaryKeyCodec, SparseValues, COLUMN_ID_ENCODE_SIZE};
use store_api::codec::PrimaryKeyEncoding;
use store_api::metadata::{RegionMetadata, RegionMetadataRef};
use store_api::storage::ColumnId;
use crate::error::Result;
use crate::memtable::key_values::KeyValue;
/// Row value encoder/decoder.
pub trait PrimaryKeyCodecExt {

View File

@@ -13,7 +13,6 @@
// limitations under the License.
pub(crate) mod bloom_filter;
mod codec;
pub(crate) mod fulltext_index;
mod indexer;
pub mod intermediate;

View File

@@ -22,6 +22,8 @@ use datatypes::data_type::ConcreteDataType;
use datatypes::value::Value;
use index::bloom_filter::applier::InListPredicate;
use index::Bytes;
use mito_codec::index::IndexValueCodec;
use mito_codec::row_converter::SortField;
use object_store::ObjectStore;
use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
use snafu::{OptionExt, ResultExt};
@@ -30,10 +32,8 @@ use store_api::storage::ColumnId;
use crate::cache::file_cache::FileCacheRef;
use crate::cache::index::bloom_filter_index::BloomFilterIndexCacheRef;
use crate::error::{ColumnNotFoundSnafu, ConvertValueSnafu, Result};
use crate::row_converter::SortField;
use crate::error::{ColumnNotFoundSnafu, ConvertValueSnafu, EncodeSnafu, Result};
use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplier;
use crate::sst::index::codec::IndexValueCodec;
use crate::sst::index::puffin_manager::PuffinManagerFactory;
pub struct BloomFilterIndexApplierBuilder<'a> {
@@ -322,7 +322,8 @@ fn encode_lit(lit: &ScalarValue, data_type: ConcreteDataType) -> Result<Bytes> {
let value = Value::try_from(lit.clone()).context(ConvertValueSnafu)?;
let mut bytes = vec![];
let field = SortField::new(data_type);
IndexValueCodec::encode_nonnull_value(value.as_value_ref(), &field, &mut bytes)?;
IndexValueCodec::encode_nonnull_value(value.as_value_ref(), &field, &mut bytes)
.context(EncodeSnafu)?;
Ok(bytes)
}

View File

@@ -19,6 +19,8 @@ use std::sync::Arc;
use common_telemetry::{debug, warn};
use datatypes::schema::SkippingIndexType;
use index::bloom_filter::creator::BloomFilterCreator;
use mito_codec::index::{IndexValueCodec, IndexValuesCodec};
use mito_codec::row_converter::SortField;
use puffin::puffin_manager::{PuffinWriter, PutOptions};
use snafu::{ensure, ResultExt};
use store_api::metadata::RegionMetadataRef;
@@ -26,14 +28,12 @@ use store_api::storage::ColumnId;
use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
use crate::error::{
BiErrorsSnafu, BloomFilterFinishSnafu, IndexOptionsSnafu, OperateAbortedIndexSnafu,
PuffinAddBlobSnafu, PushBloomFilterValueSnafu, Result,
BiErrorsSnafu, BloomFilterFinishSnafu, EncodeSnafu, IndexOptionsSnafu,
OperateAbortedIndexSnafu, PuffinAddBlobSnafu, PushBloomFilterValueSnafu, Result,
};
use crate::read::Batch;
use crate::row_converter::SortField;
use crate::sst::file::FileId;
use crate::sst::index::bloom_filter::INDEX_BLOB_TYPE;
use crate::sst::index::codec::{IndexValueCodec, IndexValuesCodec};
use crate::sst::index::intermediate::{
IntermediateLocation, IntermediateManager, TempFileProvider,
};
@@ -210,7 +210,8 @@ impl BloomFilterIndexer {
v.as_value_ref(),
field,
&mut buf,
)?;
)
.context(EncodeSnafu)?;
Ok(buf)
})
.transpose()?;
@@ -234,11 +235,8 @@ impl BloomFilterIndexer {
let elems = (!value.is_null())
.then(|| {
let mut buf = vec![];
IndexValueCodec::encode_nonnull_value(
value,
&sort_field,
&mut buf,
)?;
IndexValueCodec::encode_nonnull_value(value, &sort_field, &mut buf)
.context(EncodeSnafu)?;
Ok(buf)
})
.transpose()?;
@@ -353,6 +351,7 @@ pub(crate) mod tests {
use datatypes::value::ValueRef;
use datatypes::vectors::{UInt64Vector, UInt8Vector};
use index::bloom_filter::reader::{BloomFilterReader, BloomFilterReaderImpl};
use mito_codec::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt};
use object_store::services::Memory;
use object_store::ObjectStore;
use puffin::puffin_manager::{PuffinManager, PuffinReader};
@@ -362,7 +361,6 @@ pub(crate) mod tests {
use super::*;
use crate::access_layer::FilePathProvider;
use crate::read::BatchColumn;
use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt};
use crate::sst::index::puffin_manager::PuffinManagerFactory;
pub fn mock_object_store() -> ObjectStore {

View File

@@ -29,7 +29,7 @@ use store_api::metadata::RegionMetadataRef;
use store_api::storage::{ColumnId, ConcreteDataType, RegionId};
use crate::error::{
CastVectorSnafu, CreateFulltextCreatorSnafu, FieldTypeMismatchSnafu, FulltextFinishSnafu,
CastVectorSnafu, CreateFulltextCreatorSnafu, DataTypeMismatchSnafu, FulltextFinishSnafu,
FulltextPushTextSnafu, IndexOptionsSnafu, OperateAbortedIndexSnafu, Result,
};
use crate::read::Batch;
@@ -259,7 +259,7 @@ impl SingleCreator {
let data = data.get_ref(i);
let text = data
.as_string()
.context(FieldTypeMismatchSnafu)?
.context(DataTypeMismatchSnafu)?
.unwrap_or_default();
self.inner.push_text(text).await?;
}

View File

@@ -27,6 +27,8 @@ use datatypes::data_type::ConcreteDataType;
use datatypes::value::Value;
use index::inverted_index::search::index_apply::PredicatesIndexApplier;
use index::inverted_index::search::predicate::Predicate;
use mito_codec::index::IndexValueCodec;
use mito_codec::row_converter::SortField;
use object_store::ObjectStore;
use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
use snafu::{OptionExt, ResultExt};
@@ -35,9 +37,9 @@ use store_api::storage::ColumnId;
use crate::cache::file_cache::FileCacheRef;
use crate::cache::index::inverted_index::InvertedIndexCacheRef;
use crate::error::{BuildIndexApplierSnafu, ColumnNotFoundSnafu, ConvertValueSnafu, Result};
use crate::row_converter::SortField;
use crate::sst::index::codec::IndexValueCodec;
use crate::error::{
BuildIndexApplierSnafu, ColumnNotFoundSnafu, ConvertValueSnafu, EncodeSnafu, Result,
};
use crate::sst::index::inverted_index::applier::InvertedIndexApplier;
use crate::sst::index::puffin_manager::PuffinManagerFactory;
@@ -230,7 +232,8 @@ impl<'a> InvertedIndexApplierBuilder<'a> {
let value = Value::try_from(lit.clone()).context(ConvertValueSnafu)?;
let mut bytes = vec![];
let field = SortField::new(data_type);
IndexValueCodec::encode_nonnull_value(value.as_value_ref(), &field, &mut bytes)?;
IndexValueCodec::encode_nonnull_value(value.as_value_ref(), &field, &mut bytes)
.context(EncodeSnafu)?;
Ok(bytes)
}
}

View File

@@ -194,7 +194,7 @@ mod tests {
};
let res = builder.collect_between(&between);
assert!(matches!(res, Err(Error::FieldTypeMismatch { .. })));
assert!(matches!(res, Err(Error::Encode { .. })));
assert!(builder.output.is_empty());
}

View File

@@ -264,7 +264,7 @@ mod tests {
);
let res = builder.collect_comparison_expr(&tag_column(), &Operator::Lt, &int64_lit(10));
assert!(matches!(res, Err(Error::FieldTypeMismatch { .. })));
assert!(matches!(res, Err(Error::Encode { .. })));
assert!(builder.output.is_empty());
}

View File

@@ -226,7 +226,7 @@ mod tests {
);
let res = builder.collect_eq(&tag_column(), &int64_lit(1));
assert!(matches!(res, Err(Error::FieldTypeMismatch { .. })));
assert!(matches!(res, Err(Error::Encode { .. })));
assert!(builder.output.is_empty());
}

View File

@@ -167,7 +167,7 @@ mod tests {
};
let res = builder.collect_inlist(&in_list);
assert!(matches!(res, Err(Error::FieldTypeMismatch { .. })));
assert!(matches!(res, Err(Error::Encode { .. })));
assert!(builder.output.is_empty());
}

View File

@@ -22,6 +22,8 @@ use index::inverted_index::create::sort::external_sort::ExternalSorter;
use index::inverted_index::create::sort_create::SortIndexCreator;
use index::inverted_index::create::InvertedIndexCreator;
use index::inverted_index::format::writer::InvertedIndexBlobWriter;
use mito_codec::index::{IndexValueCodec, IndexValuesCodec};
use mito_codec::row_converter::SortField;
use puffin::puffin_manager::{PuffinWriter, PutOptions};
use snafu::{ensure, ResultExt};
use store_api::metadata::RegionMetadataRef;
@@ -30,13 +32,11 @@ use tokio::io::duplex;
use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
use crate::error::{
BiErrorsSnafu, IndexFinishSnafu, OperateAbortedIndexSnafu, PuffinAddBlobSnafu,
BiErrorsSnafu, EncodeSnafu, IndexFinishSnafu, OperateAbortedIndexSnafu, PuffinAddBlobSnafu,
PushIndexValueSnafu, Result,
};
use crate::read::Batch;
use crate::row_converter::SortField;
use crate::sst::file::FileId;
use crate::sst::index::codec::{IndexValueCodec, IndexValuesCodec};
use crate::sst::index::intermediate::{
IntermediateLocation, IntermediateManager, TempFileProvider,
};
@@ -205,7 +205,8 @@ impl InvertedIndexer {
v.as_value_ref(),
field,
&mut self.value_buf,
)?;
)
.context(EncodeSnafu)?;
Ok(self.value_buf.as_slice())
})
.transpose()?;
@@ -238,7 +239,8 @@ impl InvertedIndexer {
value,
&sort_field,
&mut self.value_buf,
)?;
)
.context(EncodeSnafu)?;
self.index_creator
.push_with_name(col_id_str, Some(&self.value_buf))
.await
@@ -334,6 +336,7 @@ mod tests {
use datatypes::value::ValueRef;
use datatypes::vectors::{UInt64Vector, UInt8Vector};
use futures::future::BoxFuture;
use mito_codec::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt};
use object_store::services::Memory;
use object_store::ObjectStore;
use puffin::puffin_manager::cache::PuffinMetadataCache;
@@ -346,7 +349,6 @@ mod tests {
use crate::cache::index::inverted_index::InvertedIndexCache;
use crate::metrics::CACHE_BYTES;
use crate::read::BatchColumn;
use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt};
use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
use crate::sst::index::puffin_manager::PuffinManagerFactory;

View File

@@ -22,18 +22,19 @@ use api::v1::{OpType, SemanticType};
use common_telemetry::error;
use datatypes::arrow::array::BooleanArray;
use datatypes::arrow::buffer::BooleanBuffer;
use mito_codec::row_converter::{CompositeValues, PrimaryKeyCodec};
use parquet::arrow::arrow_reader::RowSelection;
use snafu::{OptionExt, ResultExt};
use store_api::storage::TimeSeriesRowSelector;
use crate::error::{
DecodeStatsSnafu, FieldTypeMismatchSnafu, RecordBatchSnafu, Result, StatsNotPresentSnafu,
DataTypeMismatchSnafu, DecodeSnafu, DecodeStatsSnafu, RecordBatchSnafu, Result,
StatsNotPresentSnafu,
};
use crate::read::compat::CompatBatch;
use crate::read::last_row::RowGroupLastRowCachedReader;
use crate::read::prune::PruneReader;
use crate::read::Batch;
use crate::row_converter::{CompositeValues, PrimaryKeyCodec};
use crate::sst::file::FileHandle;
use crate::sst::parquet::format::ReadFormat;
use crate::sst::parquet::reader::{
@@ -270,7 +271,11 @@ impl RangeBase {
let pk_values = if let Some(pk_values) = input.pk_values() {
pk_values
} else {
input.set_pk_values(self.codec.decode(input.primary_key())?);
input.set_pk_values(
self.codec
.decode(input.primary_key())
.context(DecodeSnafu)?,
);
input.pk_values().unwrap()
};
let pk_value = match pk_values {
@@ -284,12 +289,12 @@ impl RangeBase {
v[pk_index]
.1
.try_to_scalar_value(filter_ctx.data_type())
.context(FieldTypeMismatchSnafu)?
.context(DataTypeMismatchSnafu)?
}
CompositeValues::Sparse(v) => {
let v = v.get_or_null(filter_ctx.column_id());
v.try_to_scalar_value(filter_ctx.data_type())
.context(FieldTypeMismatchSnafu)?
.context(DataTypeMismatchSnafu)?
}
};
if filter

View File

@@ -38,6 +38,7 @@ use datatypes::arrow::datatypes::{SchemaRef, UInt32Type};
use datatypes::arrow::record_batch::RecordBatch;
use datatypes::prelude::DataType;
use datatypes::vectors::{Helper, Vector};
use mito_codec::row_converter::{build_primary_key_codec_with_fields, SortField};
use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData};
use parquet::file::statistics::Statistics;
use snafu::{ensure, OptionExt, ResultExt};
@@ -48,7 +49,6 @@ use crate::error::{
ConvertVectorSnafu, InvalidBatchSnafu, InvalidRecordBatchSnafu, NewRecordBatchSnafu, Result,
};
use crate::read::{Batch, BatchBuilder, BatchColumn};
use crate::row_converter::{build_primary_key_codec_with_fields, SortField};
use crate::sst::file::{FileMeta, FileTimeRange};
use crate::sst::to_sst_arrow_schema;

View File

@@ -26,6 +26,7 @@ use datafusion_expr::Expr;
use datatypes::arrow::error::ArrowError;
use datatypes::arrow::record_batch::RecordBatch;
use datatypes::data_type::ConcreteDataType;
use mito_codec::row_converter::build_primary_key_codec;
use object_store::ObjectStore;
use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, RowSelection};
use parquet::arrow::{parquet_to_arrow_field_levels, FieldLevels, ProjectionMask};
@@ -48,7 +49,6 @@ use crate::metrics::{
};
use crate::read::prune::{PruneReader, Source};
use crate::read::{Batch, BatchReader};
use crate::row_converter::build_primary_key_codec;
use crate::sst::file::{FileHandle, FileId};
use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplierRef;
use crate::sst::index::fulltext_index::applier::FulltextIndexApplierRef;

View File

@@ -16,7 +16,6 @@
pub mod batch_util;
pub mod memtable_util;
pub mod meta_util;
pub mod scheduler_util;
pub mod sst_util;
pub mod version_util;
@@ -841,13 +840,6 @@ impl CreateRequestBuilder {
}
}
/// Creates value for i64.
pub(crate) fn i64_value(data: i64) -> v1::Value {
v1::Value {
value_data: Some(ValueData::I64Value(data)),
}
}
/// Creates value for timestamp millis.
pub(crate) fn ts_ms_value(data: i64) -> v1::Value {
v1::Value {

View File

@@ -25,20 +25,20 @@ use datatypes::data_type::ConcreteDataType;
use datatypes::scalars::ScalarVector;
use datatypes::schema::ColumnSchema;
use datatypes::vectors::TimestampMillisecondVector;
use mito_codec::key_values::KeyValue;
use mito_codec::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt, SortField};
use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder, RegionMetadataRef};
use store_api::storage::{ColumnId, RegionId, SequenceNumber};
use table::predicate::Predicate;
use crate::error::Result;
use crate::memtable::bulk::part::BulkPart;
use crate::memtable::key_values::KeyValue;
use crate::memtable::partition_tree::data::{timestamp_array_to_i64_slice, DataBatch, DataBuffer};
use crate::memtable::{
BoxedBatchIterator, KeyValues, Memtable, MemtableBuilder, MemtableId, MemtableRanges,
MemtableRef, MemtableStats,
};
use crate::read::scan_region::PredicateGroup;
use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt, SortField};
/// Empty memtable for test.
#[derive(Debug, Default)]

View File

@@ -22,6 +22,7 @@ use datatypes::arrow::array::{BinaryArray, TimestampMillisecondArray, UInt64Arra
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use datatypes::value::ValueRef;
use mito_codec::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt, SortField};
use parquet::file::metadata::ParquetMetaData;
use store_api::metadata::{
ColumnMetadata, RegionMetadata, RegionMetadataBuilder, RegionMetadataRef,
@@ -29,7 +30,6 @@ use store_api::metadata::{
use store_api::storage::RegionId;
use crate::read::{Batch, BatchBuilder, Source};
use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt, SortField};
use crate::sst::file::{FileHandle, FileId, FileMeta};
use crate::test_util::{new_batch_builder, new_noop_file_purger, VecBatchReader};

View File

@@ -15,6 +15,7 @@
//! Utilities to mock version.
use std::collections::HashMap;
use std::num::NonZeroU64;
use std::sync::Arc;
use api::v1::value::ValueData;
@@ -103,7 +104,7 @@ impl VersionControlBuilder {
index_file_size: 0,
num_rows: 0,
num_row_groups: 0,
sequence: None,
sequence: NonZeroU64::new(start_ms as u64),
},
);
self
@@ -196,7 +197,7 @@ pub(crate) fn apply_edit(
index_file_size: 0,
num_rows: 0,
num_row_groups: 0,
sequence: None,
sequence: NonZeroU64::new(*start_ms as u64),
}
})
.collect();

View File

@@ -26,6 +26,7 @@ common-base.workspace = true
common-catalog.workspace = true
common-datasource.workspace = true
common-error.workspace = true
common-frontend.workspace = true
common-function.workspace = true
common-grpc.workspace = true
common-grpc-expr.workspace = true

View File

@@ -12,18 +12,29 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashSet;
use ahash::{HashMap, HashMapExt};
use api::v1::flow::DirtyWindowRequest;
use api::v1::region::{
bulk_insert_request, region_request, BulkInsertRequest, RegionRequest, RegionRequestHeader,
};
use api::v1::ArrowIpc;
use arrow::array::{
Array, TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
TimestampSecondArray,
};
use arrow::datatypes::{DataType, Int64Type, TimeUnit};
use arrow::record_batch::RecordBatch;
use common_base::AffectedRows;
use common_grpc::flight::{FlightDecoder, FlightEncoder, FlightMessage};
use common_grpc::FlightData;
use common_telemetry::error;
use common_telemetry::tracing_context::TracingContext;
use snafu::ResultExt;
use snafu::{OptionExt, ResultExt};
use store_api::storage::RegionId;
use table::metadata::TableId;
use table::metadata::TableInfoRef;
use table::TableRef;
use crate::insert::Inserter;
use crate::{error, metrics};
@@ -32,10 +43,12 @@ impl Inserter {
/// Handle bulk insert request.
pub async fn handle_bulk_insert(
&self,
table_id: TableId,
table: TableRef,
decoder: &mut FlightDecoder,
data: FlightData,
) -> error::Result<AffectedRows> {
let table_info = table.table_info();
let table_id = table_info.table_id();
let decode_timer = metrics::HANDLE_BULK_INSERT_ELAPSED
.with_label_values(&["decode_request"])
.start_timer();
@@ -48,6 +61,10 @@ impl Inserter {
return Ok(0);
};
decode_timer.observe_duration();
// notify flownode to update dirty timestamps if flow is configured.
self.maybe_update_flow_dirty_window(table_info, record_batch.clone());
metrics::BULK_REQUEST_MESSAGE_SIZE.observe(body_size as f64);
metrics::BULK_REQUEST_ROWS
.with_label_values(&["raw"])
@@ -216,4 +233,103 @@ impl Inserter {
crate::metrics::DIST_INGEST_ROW_COUNT.inc_by(rows_inserted as u64);
Ok(rows_inserted)
}
fn maybe_update_flow_dirty_window(&self, table_info: TableInfoRef, record_batch: RecordBatch) {
let table_id = table_info.table_id();
let table_flownode_set_cache = self.table_flownode_set_cache.clone();
let node_manager = self.node_manager.clone();
common_runtime::spawn_global(async move {
let result = table_flownode_set_cache
.get(table_id)
.await
.context(error::RequestInsertsSnafu);
let flownodes = match result {
Ok(flownodes) => flownodes.unwrap_or_default(),
Err(e) => {
error!(e; "Failed to get flownodes for table id: {}", table_id);
return;
}
};
let peers: HashSet<_> = flownodes.values().cloned().collect();
if peers.is_empty() {
return;
}
let Ok(timestamps) = extract_timestamps(
&record_batch,
&table_info
.meta
.schema
.timestamp_column()
.as_ref()
.unwrap()
.name,
)
.inspect_err(|e| {
error!(e; "Failed to extract timestamps from record batch");
}) else {
return;
};
for peer in peers {
let node_manager = node_manager.clone();
let timestamps = timestamps.clone();
common_runtime::spawn_global(async move {
if let Err(e) = node_manager
.flownode(&peer)
.await
.handle_mark_window_dirty(DirtyWindowRequest {
table_id,
timestamps,
})
.await
.context(error::RequestInsertsSnafu)
{
error!(e; "Failed to mark timestamps as dirty, table: {}", table_id);
}
});
}
});
}
}
/// Calculate the timestamp range of record batch. Return `None` if record batch is empty.
fn extract_timestamps(rb: &RecordBatch, timestamp_index_name: &str) -> error::Result<Vec<i64>> {
let ts_col = rb
.column_by_name(timestamp_index_name)
.context(error::ColumnNotFoundSnafu {
msg: timestamp_index_name,
})?;
if rb.num_rows() == 0 {
return Ok(vec![]);
}
let primitive = match ts_col.data_type() {
DataType::Timestamp(unit, _) => match unit {
TimeUnit::Second => ts_col
.as_any()
.downcast_ref::<TimestampSecondArray>()
.unwrap()
.reinterpret_cast::<Int64Type>(),
TimeUnit::Millisecond => ts_col
.as_any()
.downcast_ref::<TimestampMillisecondArray>()
.unwrap()
.reinterpret_cast::<Int64Type>(),
TimeUnit::Microsecond => ts_col
.as_any()
.downcast_ref::<TimestampMicrosecondArray>()
.unwrap()
.reinterpret_cast::<Int64Type>(),
TimeUnit::Nanosecond => ts_col
.as_any()
.downcast_ref::<TimestampNanosecondArray>()
.unwrap()
.reinterpret_cast::<Int64Type>(),
},
t => {
return error::InvalidTimeIndexTypeSnafu { ty: t.clone() }.fail();
}
};
Ok(primitive.iter().flatten().collect())
}

View File

@@ -837,6 +837,22 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Invalid time index type: {}", ty))]
InvalidTimeIndexType {
ty: arrow::datatypes::DataType,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Invalid process id: {}", id))]
InvalidProcessId { id: String },
#[snafu(display("ProcessManager is not present, this can be caused by misconfiguration."))]
ProcessManagerMissing {
#[snafu(implicit)]
location: Location,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -964,6 +980,9 @@ impl ErrorExt for Error {
Error::ColumnOptions { source, .. } => source.status_code(),
Error::DecodeFlightData { source, .. } => source.status_code(),
Error::ComputeArrow { .. } => StatusCode::Internal,
Error::InvalidTimeIndexType { .. } => StatusCode::InvalidArguments,
Error::InvalidProcessId { .. } => StatusCode::InvalidArguments,
Error::ProcessManagerMissing { .. } => StatusCode::Unexpected,
}
}

View File

@@ -78,7 +78,7 @@ pub struct Inserter {
catalog_manager: CatalogManagerRef,
pub(crate) partition_manager: PartitionRuleManagerRef,
pub(crate) node_manager: NodeManagerRef,
table_flownode_set_cache: TableFlownodeSetCacheRef,
pub(crate) table_flownode_set_cache: TableFlownodeSetCacheRef,
}
pub type InserterRef = Arc<Inserter>;

View File

@@ -21,6 +21,7 @@ mod cursor;
mod ddl;
mod describe;
mod dml;
mod kill;
mod set;
mod show;
mod tql;
@@ -32,6 +33,7 @@ use std::time::Duration;
use async_stream::stream;
use catalog::kvbackend::KvBackendCatalogManager;
use catalog::process_manager::ProcessManagerRef;
use catalog::CatalogManagerRef;
use client::{OutputData, RecordBatches};
use common_error::ext::BoxedError;
@@ -94,11 +96,13 @@ pub struct StatementExecutor {
partition_manager: PartitionRuleManagerRef,
cache_invalidator: CacheInvalidatorRef,
inserter: InserterRef,
process_manager: Option<ProcessManagerRef>,
}
pub type StatementExecutorRef = Arc<StatementExecutor>;
impl StatementExecutor {
#[allow(clippy::too_many_arguments)]
pub fn new(
catalog_manager: CatalogManagerRef,
query_engine: QueryEngineRef,
@@ -107,6 +111,7 @@ impl StatementExecutor {
cache_invalidator: CacheInvalidatorRef,
inserter: InserterRef,
table_route_cache: TableRouteCacheRef,
process_manager: Option<ProcessManagerRef>,
) -> Self {
Self {
catalog_manager,
@@ -118,6 +123,7 @@ impl StatementExecutor {
partition_manager: Arc::new(PartitionRuleManager::new(kv_backend, table_route_cache)),
cache_invalidator,
inserter,
process_manager,
}
}
@@ -363,6 +369,7 @@ impl StatementExecutor {
Statement::ShowSearchPath(_) => self.show_search_path(query_ctx).await,
Statement::Use(db) => self.use_database(db, query_ctx).await,
Statement::Admin(admin) => self.execute_admin_command(admin, query_ctx).await,
Statement::Kill(id) => self.execute_kill(query_ctx, id).await,
}
}

View File

@@ -0,0 +1,46 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_frontend::DisplayProcessId;
use common_query::Output;
use common_telemetry::error;
use session::context::QueryContextRef;
use snafu::ResultExt;
use crate::error;
use crate::statement::StatementExecutor;
impl StatementExecutor {
pub async fn execute_kill(
&self,
query_ctx: QueryContextRef,
process_id: String,
) -> crate::error::Result<Output> {
let Some(process_manager) = self.process_manager.as_ref() else {
error!("Process manager is not initialized");
return error::ProcessManagerMissingSnafu.fail();
};
let display_id = DisplayProcessId::try_from(process_id.as_str())
.map_err(|_| error::InvalidProcessIdSnafu { id: process_id }.build())?;
let current_user_catalog = query_ctx.current_catalog().to_string();
process_manager
.kill_process(display_id.server_addr, current_user_catalog, display_id.id)
.await
.context(error::CatalogSnafu)?;
Ok(Output::new_with_affected_rows(0))
}
}

View File

@@ -478,6 +478,21 @@ impl QueryEngine for DatafusionQueryEngine {
fn engine_context(&self, query_ctx: QueryContextRef) -> QueryEngineContext {
let mut state = self.state.session_state();
state.config_mut().set_extension(query_ctx.clone());
// note that hints in "x-greptime-hints" is automatically parsed
// and set to query context's extension, so we can get it from query context.
if let Some(parallelism) = query_ctx.extension("query_parallelism") {
if let Ok(n) = parallelism.parse::<u64>() {
if n > 0 {
let new_cfg = state.config().clone().with_target_partitions(n as usize);
*state.config_mut() = new_cfg;
}
} else {
common_telemetry::warn!(
"Failed to parse query_parallelism: {}, using default value",
parallelism
);
}
}
QueryEngineContext::new(state, query_ctx)
}

View File

@@ -41,8 +41,9 @@ use datafusion::prelude as df_prelude;
use datafusion::prelude::{Column, Expr as DfExpr, JoinType};
use datafusion::scalar::ScalarValue;
use datafusion::sql::TableReference;
use datafusion_common::DFSchema;
use datafusion_expr::utils::conjunction;
use datafusion_expr::{col, lit, SortExpr};
use datafusion_expr::{col, lit, ExprSchemable, SortExpr};
use datatypes::arrow::datatypes::{DataType as ArrowDataType, TimeUnit as ArrowTimeUnit};
use datatypes::data_type::ConcreteDataType;
use itertools::Itertools;
@@ -336,7 +337,7 @@ impl PromPlanner {
let group_exprs = self.agg_modifier_to_col(input.schema(), modifier, false)?;
let val = Self::get_param_value_as_f64(*op, param)?;
let val = Self::get_param_as_literal_expr(param, Some(*op), Some(ArrowDataType::Float64))?;
// convert op and value columns to window exprs.
let window_exprs = self.create_window_exprs(*op, group_exprs.clone(), &input)?;
@@ -354,7 +355,7 @@ impl PromPlanner {
let predicate = DfExpr::BinaryExpr(BinaryExpr {
left: Box::new(col(rank)),
op: Operator::LtEq,
right: Box::new(lit(val)),
right: Box::new(val.clone()),
});
match expr {
@@ -1946,8 +1947,9 @@ impl PromPlanner {
let aggr = match op.id() {
token::T_SUM => sum_udaf(),
token::T_QUANTILE => {
let q = Self::get_param_value_as_f64(op, param)?;
non_col_args.push(lit(q));
let q =
Self::get_param_as_literal_expr(param, Some(op), Some(ArrowDataType::Float64))?;
non_col_args.push(q);
quantile_udaf()
}
token::T_AVG => avg_udaf(),
@@ -2027,20 +2029,50 @@ impl PromPlanner {
Ok(val)
}
fn get_param_value_as_f64(op: TokenType, param: &Option<Box<PromExpr>>) -> Result<f64> {
let param = param
.as_deref()
.with_context(|| FunctionInvalidArgumentSnafu {
fn_name: op.to_string(),
})?;
let PromExpr::NumberLiteral(NumberLiteral { val }) = param else {
return FunctionInvalidArgumentSnafu {
fn_name: op.to_string(),
fn get_param_as_literal_expr(
param: &Option<Box<PromExpr>>,
op: Option<TokenType>,
expected_type: Option<ArrowDataType>,
) -> Result<DfExpr> {
let prom_param = param.as_deref().with_context(|| {
if let Some(op) = op {
FunctionInvalidArgumentSnafu {
fn_name: op.to_string(),
}
} else {
FunctionInvalidArgumentSnafu {
fn_name: "unknown".to_string(),
}
}
.fail();
};
})?;
Ok(*val)
let expr = Self::try_build_literal_expr(prom_param).with_context(|| {
if let Some(op) = op {
FunctionInvalidArgumentSnafu {
fn_name: op.to_string(),
}
} else {
FunctionInvalidArgumentSnafu {
fn_name: "unknown".to_string(),
}
}
})?;
// check if the type is expected
if let Some(expected_type) = expected_type {
// literal should not have reference to column
let expr_type = expr
.get_type(&DFSchema::empty())
.context(DataFusionPlanningSnafu)?;
if expected_type != expr_type {
return FunctionInvalidArgumentSnafu {
fn_name: format!("expected {expected_type:?}, but found {expr_type:?}"),
}
.fail();
}
}
Ok(expr)
}
/// Create [DfExpr::WindowFunction] expr for each value column with given window function.
@@ -2096,6 +2128,28 @@ impl PromPlanner {
Ok(normalized_exprs)
}
/// Try to build a [f64] from [PromExpr].
#[deprecated(
note = "use `Self::get_param_as_literal_expr` instead. This is only for `create_histogram_plan`"
)]
fn try_build_float_literal(expr: &PromExpr) -> Option<f64> {
match expr {
PromExpr::NumberLiteral(NumberLiteral { val }) => Some(*val),
PromExpr::Paren(ParenExpr { expr }) => Self::try_build_float_literal(expr),
PromExpr::Unary(UnaryExpr { expr, .. }) => {
Self::try_build_float_literal(expr).map(|f| -f)
}
PromExpr::StringLiteral(_)
| PromExpr::Binary(_)
| PromExpr::VectorSelector(_)
| PromExpr::MatrixSelector(_)
| PromExpr::Call(_)
| PromExpr::Extension(_)
| PromExpr::Aggregate(_)
| PromExpr::Subquery(_) => None,
}
}
/// Create a [SPECIAL_HISTOGRAM_QUANTILE] plan.
async fn create_histogram_plan(
&mut self,
@@ -2108,11 +2162,13 @@ impl PromPlanner {
}
.fail();
}
#[allow(deprecated)]
let phi = Self::try_build_float_literal(&args.args[0]).with_context(|| {
FunctionInvalidArgumentSnafu {
fn_name: SPECIAL_HISTOGRAM_QUANTILE.to_string(),
}
})?;
let input = args.args[1].as_ref().clone();
let input_plan = self.prom_expr_to_plan(&input, session_state).await?;
@@ -2163,11 +2219,7 @@ impl PromPlanner {
}
.fail();
}
let lit = Self::try_build_float_literal(&args.args[0]).with_context(|| {
FunctionInvalidArgumentSnafu {
fn_name: SPECIAL_VECTOR_FUNCTION.to_string(),
}
})?;
let lit = Self::get_param_as_literal_expr(&Some(args.args[0].clone()), None, None)?;
// reuse `SPECIAL_TIME_FUNCTION` as name of time index column
self.ctx.time_index_column = Some(SPECIAL_TIME_FUNCTION.to_string());
@@ -2182,7 +2234,7 @@ impl PromPlanner {
self.ctx.interval,
SPECIAL_TIME_FUNCTION.to_string(),
GREPTIME_VALUE.to_string(),
Some(DfExpr::Literal(ScalarValue::Float64(Some(lit)))),
Some(lit),
)
.context(DataFusionPlanningSnafu)?,
),
@@ -2301,25 +2353,6 @@ impl PromPlanner {
}
}
/// Try to build a [f64] from [PromExpr].
fn try_build_float_literal(expr: &PromExpr) -> Option<f64> {
match expr {
PromExpr::NumberLiteral(NumberLiteral { val }) => Some(*val),
PromExpr::Paren(ParenExpr { expr }) => Self::try_build_float_literal(expr),
PromExpr::Unary(UnaryExpr { expr, .. }) => {
Self::try_build_float_literal(expr).map(|f| -f)
}
PromExpr::StringLiteral(_)
| PromExpr::Binary(_)
| PromExpr::VectorSelector(_)
| PromExpr::MatrixSelector(_)
| PromExpr::Call(_)
| PromExpr::Extension(_)
| PromExpr::Aggregate(_)
| PromExpr::Subquery(_) => None,
}
}
/// Return a lambda to build binary expression from token.
/// Because some binary operator are function in DataFusion like `atan2` or `^`.
#[allow(clippy::type_complexity)]

View File

@@ -88,7 +88,10 @@ opensrv-mysql = { git = "https://github.com/datafuselabs/opensrv", rev = "a1fb4d
opentelemetry-proto.workspace = true
otel-arrow-rust.workspace = true
parking_lot.workspace = true
pgwire = { version = "0.30", default-features = false, features = ["server-api-ring"] }
#pgwire = { version = "0.30", default-features = false, features = ["server-api-ring"] }
pgwire = { git = "https://github.com/sunng87/pgwire", rev = "127573d997228cfb70c7699881c568eae8131270", default-features = false, features = [
"server-api-ring",
] }
pin-project = "1.0"
pipeline.workspace = true
postgres-types = { version = "0.2", features = ["with-chrono-0_4", "with-serde_json-1"] }

View File

@@ -1 +1 @@
v0.9.2
v0.9.3

View File

@@ -13,7 +13,9 @@
// limitations under the License.
use api::v1::frontend::frontend_server::Frontend;
use api::v1::frontend::{ListProcessRequest, ListProcessResponse};
use api::v1::frontend::{
KillProcessRequest, KillProcessResponse, ListProcessRequest, ListProcessResponse,
};
use catalog::process_manager::ProcessManagerRef;
use common_telemetry::error;
use tonic::{Code, Request, Response, Status};
@@ -41,12 +43,27 @@ impl Frontend for FrontendGrpcHandler {
} else {
Some(list_process_request.catalog.as_str())
};
match self.process_manager.local_processes(catalog) {
Ok(processes) => Ok(Response::new(ListProcessResponse { processes })),
Err(e) => {
error!(e; "Failed to handle list process request");
Err(Status::new(Code::Internal, e.to_string()))
}
}
let processes = self.process_manager.local_processes(catalog).map_err(|e| {
error!(e; "Failed to handle list process request");
Status::new(Code::Internal, e.to_string())
})?;
Ok(Response::new(ListProcessResponse { processes }))
}
async fn kill_process(
&self,
request: Request<KillProcessRequest>,
) -> Result<Response<KillProcessResponse>, Status> {
let req = request.into_inner();
let success = self
.process_manager
.kill_process(req.server_addr, req.catalog, req.process_id)
.await
.map_err(|e| {
error!(e; "Failed to handle kill process request");
Status::new(Code::Internal, e.to_string())
})?;
Ok(Response::new(KillProcessResponse { success }))
}
}

View File

@@ -40,7 +40,7 @@ use futures_util::StreamExt;
use session::context::{QueryContext, QueryContextBuilder, QueryContextRef};
use session::hints::READ_PREFERENCE_HINT;
use snafu::{OptionExt, ResultExt};
use table::metadata::TableId;
use table::TableRef;
use tokio::sync::mpsc;
use crate::error::Error::UnsupportedAuthScheme;
@@ -149,8 +149,8 @@ impl GreptimeRequestHandler {
.clone()
.unwrap_or_else(common_runtime::global_runtime);
runtime.spawn(async move {
// Cached table id
let mut table_id: Option<TableId> = None;
// Cached table ref
let mut table_ref: Option<TableRef> = None;
let mut decoder = FlightDecoder::default();
while let Some(request) = stream.next().await {
@@ -169,7 +169,7 @@ impl GreptimeRequestHandler {
let timer = metrics::GRPC_BULK_INSERT_ELAPSED.start_timer();
let result = handler
.put_record_batch(&table_name, &mut table_id, &mut decoder, data)
.put_record_batch(&table_name, &mut table_ref, &mut decoder, data)
.await
.inspect_err(|e| error!(e; "Failed to handle flight record batches"));
timer.observe_duration();

View File

@@ -33,6 +33,7 @@ use common_telemetry::{error, warn};
use datatypes::value::column_data_to_json;
use headers::ContentType;
use lazy_static::lazy_static;
use mime_guess::mime;
use pipeline::util::to_pipeline_version;
use pipeline::{
ContextReq, GreptimePipelineParams, PipelineContext, PipelineDefinition, Value as PipelineValue,
@@ -47,7 +48,9 @@ use crate::error::{
status_code_to_http_status, Error, InvalidParameterSnafu, ParseJsonSnafu, PipelineSnafu, Result,
};
use crate::http::header::constants::GREPTIME_PIPELINE_PARAMS_HEADER;
use crate::http::header::{CONTENT_TYPE_NDJSON_STR, CONTENT_TYPE_PROTOBUF_STR};
use crate::http::header::{
CONTENT_TYPE_NDJSON_STR, CONTENT_TYPE_NDJSON_SUBTYPE_STR, CONTENT_TYPE_PROTOBUF_STR,
};
use crate::http::result::greptime_manage_resp::GreptimedbManageResponse;
use crate::http::result::greptime_result_v1::GreptimedbV1Response;
use crate::http::HttpResponse;
@@ -665,12 +668,13 @@ impl TryFrom<&ContentType> for EventPayloadResolverInner {
type Error = Error;
fn try_from(content_type: &ContentType) -> Result<Self> {
match content_type {
x if *x == *JSON_CONTENT_TYPE => Ok(EventPayloadResolverInner::Json),
x if *x == *NDJSON_CONTENT_TYPE => Ok(EventPayloadResolverInner::Ndjson),
x if *x == *TEXT_CONTENT_TYPE || *x == *TEXT_UTF8_CONTENT_TYPE => {
Ok(EventPayloadResolverInner::Text)
let mime: mime_guess::Mime = content_type.clone().into();
match (mime.type_(), mime.subtype()) {
(mime::APPLICATION, mime::JSON) => Ok(EventPayloadResolverInner::Json),
(mime::APPLICATION, subtype) if subtype == CONTENT_TYPE_NDJSON_SUBTYPE_STR => {
Ok(EventPayloadResolverInner::Ndjson)
}
(mime::TEXT, mime::PLAIN) => Ok(EventPayloadResolverInner::Text),
_ => InvalidParameterSnafu {
reason: format!(
"invalid content type: {}, expected: one of {}",

Some files were not shown because too many files have changed in this diff Show More