mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-05 12:52:57 +00:00
Compare commits
10 Commits
ci/update-
...
daviderli6
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e631d0c4ae | ||
|
|
ede82331b2 | ||
|
|
56e696bd55 | ||
|
|
bc0cdf62ba | ||
|
|
eaf7b4b9dd | ||
|
|
7ae0e150e5 | ||
|
|
43c30b55ae | ||
|
|
153e80450a | ||
|
|
1624dc41c5 | ||
|
|
300262562b |
4
.github/ISSUE_TEMPLATE/config.yml
vendored
4
.github/ISSUE_TEMPLATE/config.yml
vendored
@@ -2,7 +2,7 @@ blank_issues_enabled: false
|
||||
contact_links:
|
||||
- name: Greptime Community Slack
|
||||
url: https://greptime.com/slack
|
||||
about: Get free help from the Greptime community
|
||||
about: Get free help from the Greptime community.
|
||||
- name: Greptime Community Discussion
|
||||
url: https://github.com/greptimeTeam/discussions
|
||||
about: Get free help from the Greptime community
|
||||
about: Get free help from the Greptime community.
|
||||
|
||||
@@ -22,7 +22,6 @@ datanode:
|
||||
[wal]
|
||||
provider = "kafka"
|
||||
broker_endpoints = ["kafka.kafka-cluster.svc.cluster.local:9092"]
|
||||
linger = "2ms"
|
||||
overwrite_entry_start_id = true
|
||||
frontend:
|
||||
configData: |-
|
||||
|
||||
@@ -4,7 +4,7 @@ DEV_BUILDER_IMAGE_TAG=$1
|
||||
|
||||
update_dev_builder_version() {
|
||||
if [ -z "$DEV_BUILDER_IMAGE_TAG" ]; then
|
||||
echo "Error: Should specify the dev-builder image tag"
|
||||
echo "Error: Should specify the dev-builder image tag"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
@@ -17,7 +17,7 @@ update_dev_builder_version() {
|
||||
git checkout -b $BRANCH_NAME
|
||||
|
||||
# Update the dev-builder image tag in the Makefile.
|
||||
gsed -i "s/DEV_BUILDER_IMAGE_TAG ?=.*/DEV_BUILDER_IMAGE_TAG ?= ${DEV_BUILDER_IMAGE_TAG}/g" Makefile
|
||||
sed -i "s/DEV_BUILDER_IMAGE_TAG ?=.*/DEV_BUILDER_IMAGE_TAG ?= ${DEV_BUILDER_IMAGE_TAG}/g" Makefile
|
||||
|
||||
# Commit the changes.
|
||||
git add Makefile
|
||||
|
||||
2
Makefile
2
Makefile
@@ -8,7 +8,7 @@ CARGO_BUILD_OPTS := --locked
|
||||
IMAGE_REGISTRY ?= docker.io
|
||||
IMAGE_NAMESPACE ?= greptime
|
||||
IMAGE_TAG ?= latest
|
||||
DEV_BUILDER_IMAGE_TAG ?= 2025-04-15-1a517ec8-20250428023155
|
||||
DEV_BUILDER_IMAGE_TAG ?= 2025-05-19-b2377d4b-20250520045554
|
||||
BUILDX_MULTI_PLATFORM_BUILD ?= false
|
||||
BUILDX_BUILDER_NAME ?= gtbuilder
|
||||
BASE_IMAGE ?= ubuntu
|
||||
|
||||
10
README.md
10
README.md
@@ -121,7 +121,7 @@ docker pull greptime/greptimedb
|
||||
|
||||
```shell
|
||||
docker run -p 127.0.0.1:4000-4003:4000-4003 \
|
||||
-v "$(pwd)/greptimedb:/greptimedb_data" \
|
||||
-v "$(pwd)/greptimedb_data:/greptimedb_data" \
|
||||
--name greptime --rm \
|
||||
greptime/greptimedb:latest standalone start \
|
||||
--http-addr 0.0.0.0:4000 \
|
||||
@@ -129,7 +129,7 @@ docker run -p 127.0.0.1:4000-4003:4000-4003 \
|
||||
--mysql-addr 0.0.0.0:4002 \
|
||||
--postgres-addr 0.0.0.0:4003
|
||||
```
|
||||
Dashboard: [http://localhost:4000/dashboard](http://localhost:4000/dashboard)
|
||||
Dashboard: [http://localhost:4000/dashboard](http://localhost:4000/dashboard)
|
||||
[Full Install Guide](https://docs.greptime.com/getting-started/installation/overview)
|
||||
|
||||
**Troubleshooting:**
|
||||
@@ -167,7 +167,7 @@ cargo run -- standalone start
|
||||
|
||||
## Project Status
|
||||
|
||||
> **Status:** Beta.
|
||||
> **Status:** Beta.
|
||||
> **GA (v1.0):** Targeted for mid 2025.
|
||||
|
||||
- Being used in production by early adopters
|
||||
@@ -197,8 +197,8 @@ GreptimeDB is licensed under the [Apache License 2.0](https://apache.org/license
|
||||
|
||||
## Commercial Support
|
||||
|
||||
Running GreptimeDB in your organization?
|
||||
We offer enterprise add-ons, services, training, and consulting.
|
||||
Running GreptimeDB in your organization?
|
||||
We offer enterprise add-ons, services, training, and consulting.
|
||||
[Contact us](https://greptime.com/contactus) for details.
|
||||
|
||||
## Contributing
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -46,6 +46,7 @@
|
||||
| Ingest Rows per Instance | `sum by(instance, pod)(rate(greptime_table_operator_ingest_rows{instance=~"$frontend"}[$__rate_interval]))` | `timeseries` | Ingestion rate by row as in each frontend | `prometheus` | `rowsps` | `[{{instance}}]-[{{pod}}]` |
|
||||
| Region Call QPS per Instance | `sum by(instance, pod, request_type) (rate(greptime_grpc_region_request_count{instance=~"$frontend"}[$__rate_interval]))` | `timeseries` | Region Call QPS per Instance. | `prometheus` | `ops` | `[{{instance}}]-[{{pod}}]-[{{request_type}}]` |
|
||||
| Region Call P99 per Instance | `histogram_quantile(0.99, sum by(instance, pod, le, request_type) (rate(greptime_grpc_region_request_bucket{instance=~"$frontend"}[$__rate_interval])))` | `timeseries` | Region Call P99 per Instance. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{request_type}}]` |
|
||||
| Frontend Handle Bulk Insert Elapsed Time | `sum by(instance, pod, stage) (rate(greptime_table_operator_handle_bulk_insert_sum[$__rate_interval]))/sum by(instance, pod, stage) (rate(greptime_table_operator_handle_bulk_insert_count[$__rate_interval]))`<br/>`histogram_quantile(0.99, sum by(instance, pod, stage, le) (rate(greptime_table_operator_handle_bulk_insert_bucket[$__rate_interval])))` | `timeseries` | Per-stage time for frontend to handle bulk insert requests | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]-AVG` |
|
||||
# Mito Engine
|
||||
| Title | Query | Type | Description | Datasource | Unit | Legend Format |
|
||||
| --- | --- | --- | --- | --- | --- | --- |
|
||||
@@ -67,6 +68,8 @@
|
||||
| WAL sync duration seconds | `histogram_quantile(0.99, sum by(le, type, node, instance, pod) (rate(raft_engine_sync_log_duration_seconds_bucket[$__rate_interval])))` | `timeseries` | Raft engine (local disk) log store sync latency, p99 | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-p99` |
|
||||
| Log Store op duration seconds | `histogram_quantile(0.99, sum by(le,logstore,optype,instance, pod) (rate(greptime_logstore_op_elapsed_bucket[$__rate_interval])))` | `timeseries` | Write-ahead log operations latency at p99 | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{logstore}}]-[{{optype}}]-p99` |
|
||||
| Inflight Flush | `greptime_mito_inflight_flush_count` | `timeseries` | Ongoing flush task count | `prometheus` | `none` | `[{{instance}}]-[{{pod}}]` |
|
||||
| Compaction Input/Output Bytes | `sum by(instance, pod) (greptime_mito_compaction_input_bytes)`<br/>`sum by(instance, pod) (greptime_mito_compaction_output_bytes)` | `timeseries` | Compaction oinput output bytes | `prometheus` | `bytes` | `[{{instance}}]-[{{pod}}]-input` |
|
||||
| Region Worker Handle Bulk Insert Requests | `histogram_quantile(0.95, sum by(le,instance, stage, pod) (rate(greptime_region_worker_handle_write_bucket[$__rate_interval])))`<br/>`sum by(le,instance, stage, pod) (rate(greptime_region_worker_handle_write_sum[$__rate_interval]))/sum by(le,instance, stage, pod) (rate(greptime_region_worker_handle_write_count[$__rate_interval]))` | `timeseries` | Per-stage elapsed time for region worker to handle bulk insert region requests. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]-P95` |
|
||||
# OpenDAL
|
||||
| Title | Query | Type | Description | Datasource | Unit | Legend Format |
|
||||
| --- | --- | --- | --- | --- | --- | --- |
|
||||
|
||||
@@ -371,6 +371,21 @@ groups:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: '[{{instance}}]-[{{pod}}]-[{{request_type}}]'
|
||||
- title: 'Frontend Handle Bulk Insert Elapsed Time '
|
||||
type: timeseries
|
||||
description: Per-stage time for frontend to handle bulk insert requests
|
||||
unit: s
|
||||
queries:
|
||||
- expr: sum by(instance, pod, stage) (rate(greptime_table_operator_handle_bulk_insert_sum[$__rate_interval]))/sum by(instance, pod, stage) (rate(greptime_table_operator_handle_bulk_insert_count[$__rate_interval]))
|
||||
datasource:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: '[{{instance}}]-[{{pod}}]-[{{stage}}]-AVG'
|
||||
- expr: histogram_quantile(0.99, sum by(instance, pod, stage, le) (rate(greptime_table_operator_handle_bulk_insert_bucket[$__rate_interval])))
|
||||
datasource:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: '[{{instance}}]-[{{pod}}]-[{{stage}}]-P95'
|
||||
- title: Mito Engine
|
||||
panels:
|
||||
- title: Request OPS per Instance
|
||||
@@ -562,6 +577,36 @@ groups:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: '[{{instance}}]-[{{pod}}]'
|
||||
- title: Compaction Input/Output Bytes
|
||||
type: timeseries
|
||||
description: Compaction oinput output bytes
|
||||
unit: bytes
|
||||
queries:
|
||||
- expr: sum by(instance, pod) (greptime_mito_compaction_input_bytes)
|
||||
datasource:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: '[{{instance}}]-[{{pod}}]-input'
|
||||
- expr: sum by(instance, pod) (greptime_mito_compaction_output_bytes)
|
||||
datasource:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: '[{{instance}}]-[{{pod}}]-output'
|
||||
- title: Region Worker Handle Bulk Insert Requests
|
||||
type: timeseries
|
||||
description: Per-stage elapsed time for region worker to handle bulk insert region requests.
|
||||
unit: s
|
||||
queries:
|
||||
- expr: histogram_quantile(0.95, sum by(le,instance, stage, pod) (rate(greptime_region_worker_handle_write_bucket[$__rate_interval])))
|
||||
datasource:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: '[{{instance}}]-[{{pod}}]-[{{stage}}]-P95'
|
||||
- expr: sum by(le,instance, stage, pod) (rate(greptime_region_worker_handle_write_sum[$__rate_interval]))/sum by(le,instance, stage, pod) (rate(greptime_region_worker_handle_write_count[$__rate_interval]))
|
||||
datasource:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: '[{{instance}}]-[{{pod}}]-[{{stage}}]-AVG'
|
||||
- title: OpenDAL
|
||||
panels:
|
||||
- title: QPS per Instance
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -46,6 +46,7 @@
|
||||
| Ingest Rows per Instance | `sum by(instance, pod)(rate(greptime_table_operator_ingest_rows{}[$__rate_interval]))` | `timeseries` | Ingestion rate by row as in each frontend | `prometheus` | `rowsps` | `[{{instance}}]-[{{pod}}]` |
|
||||
| Region Call QPS per Instance | `sum by(instance, pod, request_type) (rate(greptime_grpc_region_request_count{}[$__rate_interval]))` | `timeseries` | Region Call QPS per Instance. | `prometheus` | `ops` | `[{{instance}}]-[{{pod}}]-[{{request_type}}]` |
|
||||
| Region Call P99 per Instance | `histogram_quantile(0.99, sum by(instance, pod, le, request_type) (rate(greptime_grpc_region_request_bucket{}[$__rate_interval])))` | `timeseries` | Region Call P99 per Instance. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{request_type}}]` |
|
||||
| Frontend Handle Bulk Insert Elapsed Time | `sum by(instance, pod, stage) (rate(greptime_table_operator_handle_bulk_insert_sum[$__rate_interval]))/sum by(instance, pod, stage) (rate(greptime_table_operator_handle_bulk_insert_count[$__rate_interval]))`<br/>`histogram_quantile(0.99, sum by(instance, pod, stage, le) (rate(greptime_table_operator_handle_bulk_insert_bucket[$__rate_interval])))` | `timeseries` | Per-stage time for frontend to handle bulk insert requests | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]-AVG` |
|
||||
# Mito Engine
|
||||
| Title | Query | Type | Description | Datasource | Unit | Legend Format |
|
||||
| --- | --- | --- | --- | --- | --- | --- |
|
||||
@@ -67,6 +68,8 @@
|
||||
| WAL sync duration seconds | `histogram_quantile(0.99, sum by(le, type, node, instance, pod) (rate(raft_engine_sync_log_duration_seconds_bucket[$__rate_interval])))` | `timeseries` | Raft engine (local disk) log store sync latency, p99 | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-p99` |
|
||||
| Log Store op duration seconds | `histogram_quantile(0.99, sum by(le,logstore,optype,instance, pod) (rate(greptime_logstore_op_elapsed_bucket[$__rate_interval])))` | `timeseries` | Write-ahead log operations latency at p99 | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{logstore}}]-[{{optype}}]-p99` |
|
||||
| Inflight Flush | `greptime_mito_inflight_flush_count` | `timeseries` | Ongoing flush task count | `prometheus` | `none` | `[{{instance}}]-[{{pod}}]` |
|
||||
| Compaction Input/Output Bytes | `sum by(instance, pod) (greptime_mito_compaction_input_bytes)`<br/>`sum by(instance, pod) (greptime_mito_compaction_output_bytes)` | `timeseries` | Compaction oinput output bytes | `prometheus` | `bytes` | `[{{instance}}]-[{{pod}}]-input` |
|
||||
| Region Worker Handle Bulk Insert Requests | `histogram_quantile(0.95, sum by(le,instance, stage, pod) (rate(greptime_region_worker_handle_write_bucket[$__rate_interval])))`<br/>`sum by(le,instance, stage, pod) (rate(greptime_region_worker_handle_write_sum[$__rate_interval]))/sum by(le,instance, stage, pod) (rate(greptime_region_worker_handle_write_count[$__rate_interval]))` | `timeseries` | Per-stage elapsed time for region worker to handle bulk insert region requests. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]-P95` |
|
||||
# OpenDAL
|
||||
| Title | Query | Type | Description | Datasource | Unit | Legend Format |
|
||||
| --- | --- | --- | --- | --- | --- | --- |
|
||||
|
||||
@@ -371,6 +371,21 @@ groups:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: '[{{instance}}]-[{{pod}}]-[{{request_type}}]'
|
||||
- title: 'Frontend Handle Bulk Insert Elapsed Time '
|
||||
type: timeseries
|
||||
description: Per-stage time for frontend to handle bulk insert requests
|
||||
unit: s
|
||||
queries:
|
||||
- expr: sum by(instance, pod, stage) (rate(greptime_table_operator_handle_bulk_insert_sum[$__rate_interval]))/sum by(instance, pod, stage) (rate(greptime_table_operator_handle_bulk_insert_count[$__rate_interval]))
|
||||
datasource:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: '[{{instance}}]-[{{pod}}]-[{{stage}}]-AVG'
|
||||
- expr: histogram_quantile(0.99, sum by(instance, pod, stage, le) (rate(greptime_table_operator_handle_bulk_insert_bucket[$__rate_interval])))
|
||||
datasource:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: '[{{instance}}]-[{{pod}}]-[{{stage}}]-P95'
|
||||
- title: Mito Engine
|
||||
panels:
|
||||
- title: Request OPS per Instance
|
||||
@@ -562,6 +577,36 @@ groups:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: '[{{instance}}]-[{{pod}}]'
|
||||
- title: Compaction Input/Output Bytes
|
||||
type: timeseries
|
||||
description: Compaction oinput output bytes
|
||||
unit: bytes
|
||||
queries:
|
||||
- expr: sum by(instance, pod) (greptime_mito_compaction_input_bytes)
|
||||
datasource:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: '[{{instance}}]-[{{pod}}]-input'
|
||||
- expr: sum by(instance, pod) (greptime_mito_compaction_output_bytes)
|
||||
datasource:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: '[{{instance}}]-[{{pod}}]-output'
|
||||
- title: Region Worker Handle Bulk Insert Requests
|
||||
type: timeseries
|
||||
description: Per-stage elapsed time for region worker to handle bulk insert region requests.
|
||||
unit: s
|
||||
queries:
|
||||
- expr: histogram_quantile(0.95, sum by(le,instance, stage, pod) (rate(greptime_region_worker_handle_write_bucket[$__rate_interval])))
|
||||
datasource:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: '[{{instance}}]-[{{pod}}]-[{{stage}}]-P95'
|
||||
- expr: sum by(le,instance, stage, pod) (rate(greptime_region_worker_handle_write_sum[$__rate_interval]))/sum by(le,instance, stage, pod) (rate(greptime_region_worker_handle_write_count[$__rate_interval]))
|
||||
datasource:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: '[{{instance}}]-[{{pod}}]-[{{stage}}]-AVG'
|
||||
- title: OpenDAL
|
||||
panels:
|
||||
- title: QPS per Instance
|
||||
|
||||
@@ -256,6 +256,11 @@ impl DatanodeTableManager {
|
||||
})?
|
||||
.and_then(|r| DatanodeTableValue::try_from_raw_value(&r.value))?
|
||||
.region_info;
|
||||
|
||||
// If the region options are the same, we don't need to update it.
|
||||
if region_info.region_options == new_region_options {
|
||||
return Ok(Txn::new());
|
||||
}
|
||||
// substitute region options only.
|
||||
region_info.region_options = new_region_options;
|
||||
|
||||
|
||||
@@ -158,12 +158,7 @@ mod tests {
|
||||
provider = "kafka"
|
||||
broker_endpoints = ["127.0.0.1:9092"]
|
||||
max_batch_bytes = "1MB"
|
||||
linger = "200ms"
|
||||
consumer_wait_timeout = "100ms"
|
||||
backoff_init = "500ms"
|
||||
backoff_max = "10s"
|
||||
backoff_base = 2
|
||||
backoff_deadline = "5mins"
|
||||
num_topics = 32
|
||||
num_partitions = 1
|
||||
selector_type = "round_robin"
|
||||
|
||||
@@ -596,7 +596,7 @@ impl FrontendInvoker {
|
||||
.start_timer();
|
||||
|
||||
self.inserter
|
||||
.handle_row_inserts(requests, ctx, &self.statement_executor)
|
||||
.handle_row_inserts(requests, ctx, &self.statement_executor, false)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(common_frontend::error::ExternalSnafu)
|
||||
|
||||
@@ -75,7 +75,10 @@ impl GrpcQueryHandler for Instance {
|
||||
|
||||
let output = match request {
|
||||
Request::Inserts(requests) => self.handle_inserts(requests, ctx.clone()).await?,
|
||||
Request::RowInserts(requests) => self.handle_row_inserts(requests, ctx.clone()).await?,
|
||||
Request::RowInserts(requests) => {
|
||||
self.handle_row_inserts(requests, ctx.clone(), false)
|
||||
.await?
|
||||
}
|
||||
Request::Deletes(requests) => self.handle_deletes(requests, ctx.clone()).await?,
|
||||
Request::RowDeletes(requests) => self.handle_row_deletes(requests, ctx.clone()).await?,
|
||||
Request::Query(query_request) => {
|
||||
@@ -416,9 +419,15 @@ impl Instance {
|
||||
&self,
|
||||
requests: RowInsertRequests,
|
||||
ctx: QueryContextRef,
|
||||
accommodate_existing_schema: bool,
|
||||
) -> Result<Output> {
|
||||
self.inserter
|
||||
.handle_row_inserts(requests, ctx, self.statement_executor.as_ref())
|
||||
.handle_row_inserts(
|
||||
requests,
|
||||
ctx,
|
||||
self.statement_executor.as_ref(),
|
||||
accommodate_existing_schema,
|
||||
)
|
||||
.await
|
||||
.context(TableOperationSnafu)
|
||||
}
|
||||
@@ -430,7 +439,7 @@ impl Instance {
|
||||
ctx: QueryContextRef,
|
||||
) -> Result<Output> {
|
||||
self.inserter
|
||||
.handle_last_non_null_inserts(requests, ctx, self.statement_executor.as_ref())
|
||||
.handle_last_non_null_inserts(requests, ctx, self.statement_executor.as_ref(), true)
|
||||
.await
|
||||
.context(TableOperationSnafu)
|
||||
}
|
||||
|
||||
@@ -53,7 +53,7 @@ impl OpentsdbProtocolHandler for Instance {
|
||||
};
|
||||
|
||||
let output = self
|
||||
.handle_row_inserts(requests, ctx)
|
||||
.handle_row_inserts(requests, ctx, true)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(servers::error::ExecuteGrpcQuerySnafu)?;
|
||||
|
||||
@@ -63,7 +63,7 @@ impl OpenTelemetryProtocolHandler for Instance {
|
||||
None
|
||||
};
|
||||
|
||||
self.handle_row_inserts(requests, ctx)
|
||||
self.handle_row_inserts(requests, ctx, false)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(error::ExecuteGrpcQuerySnafu)
|
||||
|
||||
@@ -195,7 +195,7 @@ impl PromStoreProtocolHandler for Instance {
|
||||
.map_err(BoxedError::new)
|
||||
.context(error::ExecuteGrpcQuerySnafu)?
|
||||
} else {
|
||||
self.handle_row_inserts(request, ctx.clone())
|
||||
self.handle_row_inserts(request, ctx.clone(), true)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(error::ExecuteGrpcQuerySnafu)?
|
||||
|
||||
@@ -233,7 +233,7 @@ impl SlowQueryEventHandler {
|
||||
.into();
|
||||
|
||||
self.inserter
|
||||
.handle_row_inserts(requests, query_ctx, &self.statement_executor)
|
||||
.handle_row_inserts(requests, query_ctx, &self.statement_executor, false)
|
||||
.await
|
||||
.context(TableOperationSnafu)?;
|
||||
|
||||
|
||||
@@ -33,7 +33,7 @@ use crate::error::{
|
||||
};
|
||||
use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
|
||||
use crate::metrics::{
|
||||
FLUSH_BYTES_TOTAL, FLUSH_ELAPSED, FLUSH_ERRORS_TOTAL, FLUSH_REQUESTS_TOTAL,
|
||||
FLUSH_BYTES_TOTAL, FLUSH_ELAPSED, FLUSH_FAILURE_TOTAL, FLUSH_REQUESTS_TOTAL,
|
||||
INFLIGHT_FLUSH_COUNT,
|
||||
};
|
||||
use crate::read::Source;
|
||||
@@ -601,7 +601,7 @@ impl FlushScheduler {
|
||||
pub(crate) fn on_flush_failed(&mut self, region_id: RegionId, err: Arc<Error>) {
|
||||
error!(err; "Region {} failed to flush, cancel all pending tasks", region_id);
|
||||
|
||||
FLUSH_ERRORS_TOTAL.inc();
|
||||
FLUSH_FAILURE_TOTAL.inc();
|
||||
|
||||
// Remove this region.
|
||||
let Some(flush_status) = self.region_status.remove(®ion_id) else {
|
||||
|
||||
@@ -70,8 +70,8 @@ lazy_static! {
|
||||
)
|
||||
.unwrap();
|
||||
/// Counter of scheduled failed flush jobs.
|
||||
pub static ref FLUSH_ERRORS_TOTAL: IntCounter =
|
||||
register_int_counter!("greptime_mito_flush_errors_total", "mito flush errors total").unwrap();
|
||||
pub static ref FLUSH_FAILURE_TOTAL: IntCounter =
|
||||
register_int_counter!("greptime_mito_flush_failure_total", "mito flush failure total").unwrap();
|
||||
/// Elapsed time of a flush job.
|
||||
pub static ref FLUSH_ELAPSED: HistogramVec = register_histogram_vec!(
|
||||
"greptime_mito_flush_elapsed",
|
||||
|
||||
@@ -68,5 +68,6 @@ tokio-util.workspace = true
|
||||
tonic.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
common-meta = { workspace = true, features = ["testing"] }
|
||||
common-test-util.workspace = true
|
||||
path-slash = "0.2"
|
||||
|
||||
@@ -147,7 +147,7 @@ impl Inserter {
|
||||
statement_executor: &StatementExecutor,
|
||||
) -> Result<Output> {
|
||||
let row_inserts = ColumnToRow::convert(requests)?;
|
||||
self.handle_row_inserts(row_inserts, ctx, statement_executor)
|
||||
self.handle_row_inserts(row_inserts, ctx, statement_executor, false)
|
||||
.await
|
||||
}
|
||||
|
||||
@@ -157,6 +157,7 @@ impl Inserter {
|
||||
mut requests: RowInsertRequests,
|
||||
ctx: QueryContextRef,
|
||||
statement_executor: &StatementExecutor,
|
||||
accommodate_existing_schema: bool,
|
||||
) -> Result<Output> {
|
||||
preprocess_row_insert_requests(&mut requests.inserts)?;
|
||||
self.handle_row_inserts_with_create_type(
|
||||
@@ -164,6 +165,7 @@ impl Inserter {
|
||||
ctx,
|
||||
statement_executor,
|
||||
AutoCreateTableType::Physical,
|
||||
accommodate_existing_schema,
|
||||
)
|
||||
.await
|
||||
}
|
||||
@@ -180,6 +182,7 @@ impl Inserter {
|
||||
ctx,
|
||||
statement_executor,
|
||||
AutoCreateTableType::Log,
|
||||
false,
|
||||
)
|
||||
.await
|
||||
}
|
||||
@@ -195,6 +198,7 @@ impl Inserter {
|
||||
ctx,
|
||||
statement_executor,
|
||||
AutoCreateTableType::Trace,
|
||||
false,
|
||||
)
|
||||
.await
|
||||
}
|
||||
@@ -205,12 +209,14 @@ impl Inserter {
|
||||
requests: RowInsertRequests,
|
||||
ctx: QueryContextRef,
|
||||
statement_executor: &StatementExecutor,
|
||||
accommodate_existing_schema: bool,
|
||||
) -> Result<Output> {
|
||||
self.handle_row_inserts_with_create_type(
|
||||
requests,
|
||||
ctx,
|
||||
statement_executor,
|
||||
AutoCreateTableType::LastNonNull,
|
||||
accommodate_existing_schema,
|
||||
)
|
||||
.await
|
||||
}
|
||||
@@ -222,6 +228,7 @@ impl Inserter {
|
||||
ctx: QueryContextRef,
|
||||
statement_executor: &StatementExecutor,
|
||||
create_type: AutoCreateTableType,
|
||||
accommodate_existing_schema: bool,
|
||||
) -> Result<Output> {
|
||||
// remove empty requests
|
||||
requests.inserts.retain(|req| {
|
||||
@@ -236,7 +243,13 @@ impl Inserter {
|
||||
instant_table_ids,
|
||||
table_infos,
|
||||
} = self
|
||||
.create_or_alter_tables_on_demand(&requests, &ctx, create_type, statement_executor)
|
||||
.create_or_alter_tables_on_demand(
|
||||
&mut requests,
|
||||
&ctx,
|
||||
create_type,
|
||||
statement_executor,
|
||||
accommodate_existing_schema,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let name_to_info = table_infos
|
||||
@@ -281,10 +294,11 @@ impl Inserter {
|
||||
table_infos,
|
||||
} = self
|
||||
.create_or_alter_tables_on_demand(
|
||||
&requests,
|
||||
&mut requests,
|
||||
&ctx,
|
||||
AutoCreateTableType::Logical(physical_table.to_string()),
|
||||
statement_executor,
|
||||
true,
|
||||
)
|
||||
.await?;
|
||||
let name_to_info = table_infos
|
||||
@@ -448,12 +462,18 @@ impl Inserter {
|
||||
///
|
||||
/// Returns a mapping from table name to table id, where table name is the table name involved in the requests.
|
||||
/// This mapping is used in the conversion of RowToRegion.
|
||||
///
|
||||
/// `accommodate_existing_schema` is used to determine if the existing schema should override the new schema.
|
||||
/// It only works for TIME_INDEX and VALUE columns. This is for the case where the user creates a table with
|
||||
/// custom schema, and then inserts data with endpoints that have default schema setting, like prometheus
|
||||
/// remote write. This will modify the `RowInsertRequests` in place.
|
||||
async fn create_or_alter_tables_on_demand(
|
||||
&self,
|
||||
requests: &RowInsertRequests,
|
||||
requests: &mut RowInsertRequests,
|
||||
ctx: &QueryContextRef,
|
||||
auto_create_table_type: AutoCreateTableType,
|
||||
statement_executor: &StatementExecutor,
|
||||
accommodate_existing_schema: bool,
|
||||
) -> Result<CreateAlterTableResult> {
|
||||
let _timer = crate::metrics::CREATE_ALTER_ON_DEMAND
|
||||
.with_label_values(&[auto_create_table_type.as_str()])
|
||||
@@ -504,7 +524,7 @@ impl Inserter {
|
||||
let mut alter_tables = vec![];
|
||||
let mut instant_table_ids = HashSet::new();
|
||||
|
||||
for req in &requests.inserts {
|
||||
for req in &mut requests.inserts {
|
||||
match self.get_table(catalog, &schema, &req.table_name).await? {
|
||||
Some(table) => {
|
||||
let table_info = table.table_info();
|
||||
@@ -512,9 +532,12 @@ impl Inserter {
|
||||
instant_table_ids.insert(table_info.table_id());
|
||||
}
|
||||
table_infos.insert(table_info.table_id(), table.table_info());
|
||||
if let Some(alter_expr) =
|
||||
self.get_alter_table_expr_on_demand(req, &table, ctx)?
|
||||
{
|
||||
if let Some(alter_expr) = self.get_alter_table_expr_on_demand(
|
||||
req,
|
||||
&table,
|
||||
ctx,
|
||||
accommodate_existing_schema,
|
||||
)? {
|
||||
alter_tables.push(alter_expr);
|
||||
}
|
||||
}
|
||||
@@ -788,12 +811,16 @@ impl Inserter {
|
||||
}
|
||||
|
||||
/// Returns an alter table expression if it finds new columns in the request.
|
||||
/// It always adds columns if not exist.
|
||||
/// When `accommodate_existing_schema` is false, it always adds columns if not exist.
|
||||
/// When `accommodate_existing_schema` is true, it may modify the input `req` to
|
||||
/// accommodate it with existing schema. See [`create_or_alter_tables_on_demand`](Self::create_or_alter_tables_on_demand)
|
||||
/// for more details.
|
||||
fn get_alter_table_expr_on_demand(
|
||||
&self,
|
||||
req: &RowInsertRequest,
|
||||
req: &mut RowInsertRequest,
|
||||
table: &TableRef,
|
||||
ctx: &QueryContextRef,
|
||||
accommodate_existing_schema: bool,
|
||||
) -> Result<Option<AlterTableExpr>> {
|
||||
let catalog_name = ctx.current_catalog();
|
||||
let schema_name = ctx.current_schema();
|
||||
@@ -802,10 +829,64 @@ impl Inserter {
|
||||
let request_schema = req.rows.as_ref().unwrap().schema.as_slice();
|
||||
let column_exprs = ColumnExpr::from_column_schemas(request_schema);
|
||||
let add_columns = expr_helper::extract_add_columns_expr(&table.schema(), column_exprs)?;
|
||||
let Some(add_columns) = add_columns else {
|
||||
let Some(mut add_columns) = add_columns else {
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
// If accommodate_existing_schema is true, update request schema for Timestamp/Field columns
|
||||
if accommodate_existing_schema {
|
||||
let table_schema = table.schema();
|
||||
// Find timestamp column name
|
||||
let ts_col_name = table_schema.timestamp_column().map(|c| c.name.clone());
|
||||
// Find field column name if there is only one
|
||||
let mut field_col_name = None;
|
||||
let mut multiple_field_cols = false;
|
||||
table.field_columns().for_each(|col| {
|
||||
if field_col_name.is_none() {
|
||||
field_col_name = Some(col.name.clone());
|
||||
} else {
|
||||
multiple_field_cols = true;
|
||||
}
|
||||
});
|
||||
if multiple_field_cols {
|
||||
field_col_name = None;
|
||||
}
|
||||
|
||||
// Update column name in request schema for Timestamp/Field columns
|
||||
if let Some(rows) = req.rows.as_mut() {
|
||||
for col in &mut rows.schema {
|
||||
match col.semantic_type {
|
||||
x if x == SemanticType::Timestamp as i32 => {
|
||||
if let Some(ref ts_name) = ts_col_name {
|
||||
if col.column_name != *ts_name {
|
||||
col.column_name = ts_name.clone();
|
||||
}
|
||||
}
|
||||
}
|
||||
x if x == SemanticType::Field as i32 => {
|
||||
if let Some(ref field_name) = field_col_name {
|
||||
if col.column_name != *field_name {
|
||||
col.column_name = field_name.clone();
|
||||
}
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Remove from add_columns any column that is timestamp or field (if there is only one field column)
|
||||
add_columns.add_columns.retain(|col| {
|
||||
let def = col.column_def.as_ref().unwrap();
|
||||
def.semantic_type != SemanticType::Timestamp as i32
|
||||
&& (def.semantic_type != SemanticType::Field as i32 && field_col_name.is_some())
|
||||
});
|
||||
|
||||
if add_columns.add_columns.is_empty() {
|
||||
return Ok(None);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(Some(AlterTableExpr {
|
||||
catalog_name: catalog_name.to_string(),
|
||||
schema_name: schema_name.to_string(),
|
||||
@@ -1039,3 +1120,124 @@ impl FlowMirrorTask {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::v1::{ColumnSchema as GrpcColumnSchema, RowInsertRequest, Rows, SemanticType, Value};
|
||||
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
|
||||
use common_meta::cache::new_table_flownode_set_cache;
|
||||
use common_meta::ddl::test_util::datanode_handler::NaiveDatanodeHandler;
|
||||
use common_meta::test_util::MockDatanodeManager;
|
||||
use datatypes::data_type::ConcreteDataType;
|
||||
use datatypes::schema::ColumnSchema;
|
||||
use moka::future::Cache;
|
||||
use session::context::QueryContext;
|
||||
use table::dist_table::DummyDataSource;
|
||||
use table::metadata::{TableInfoBuilder, TableMetaBuilder, TableType};
|
||||
use table::TableRef;
|
||||
|
||||
use super::*;
|
||||
use crate::tests::{create_partition_rule_manager, prepare_mocked_backend};
|
||||
|
||||
fn make_table_ref_with_schema(ts_name: &str, field_name: &str) -> TableRef {
|
||||
let schema = datatypes::schema::SchemaBuilder::try_from_columns(vec![
|
||||
ColumnSchema::new(
|
||||
ts_name,
|
||||
ConcreteDataType::timestamp_millisecond_datatype(),
|
||||
false,
|
||||
)
|
||||
.with_time_index(true),
|
||||
ColumnSchema::new(field_name, ConcreteDataType::float64_datatype(), true),
|
||||
])
|
||||
.unwrap()
|
||||
.build()
|
||||
.unwrap();
|
||||
let meta = TableMetaBuilder::empty()
|
||||
.schema(Arc::new(schema))
|
||||
.primary_key_indices(vec![])
|
||||
.value_indices(vec![1])
|
||||
.engine("mito")
|
||||
.next_column_id(0)
|
||||
.options(Default::default())
|
||||
.created_on(Default::default())
|
||||
.region_numbers(vec![0])
|
||||
.build()
|
||||
.unwrap();
|
||||
let info = Arc::new(
|
||||
TableInfoBuilder::default()
|
||||
.table_id(1)
|
||||
.table_version(0)
|
||||
.name("test_table")
|
||||
.schema_name(DEFAULT_SCHEMA_NAME)
|
||||
.catalog_name(DEFAULT_CATALOG_NAME)
|
||||
.desc(None)
|
||||
.table_type(TableType::Base)
|
||||
.meta(meta)
|
||||
.build()
|
||||
.unwrap(),
|
||||
);
|
||||
Arc::new(table::Table::new(
|
||||
info,
|
||||
table::metadata::FilterPushDownType::Unsupported,
|
||||
Arc::new(DummyDataSource),
|
||||
))
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_accommodate_existing_schema_logic() {
|
||||
let ts_name = "my_ts";
|
||||
let field_name = "my_field";
|
||||
let table = make_table_ref_with_schema(ts_name, field_name);
|
||||
|
||||
// The request uses different names for timestamp and field columns
|
||||
let mut req = RowInsertRequest {
|
||||
table_name: "test_table".to_string(),
|
||||
rows: Some(Rows {
|
||||
schema: vec![
|
||||
GrpcColumnSchema {
|
||||
column_name: "ts_wrong".to_string(),
|
||||
datatype: api::v1::ColumnDataType::TimestampMillisecond as i32,
|
||||
semantic_type: SemanticType::Timestamp as i32,
|
||||
..Default::default()
|
||||
},
|
||||
GrpcColumnSchema {
|
||||
column_name: "field_wrong".to_string(),
|
||||
datatype: api::v1::ColumnDataType::Float64 as i32,
|
||||
semantic_type: SemanticType::Field as i32,
|
||||
..Default::default()
|
||||
},
|
||||
],
|
||||
rows: vec![api::v1::Row {
|
||||
values: vec![Value::default(), Value::default()],
|
||||
}],
|
||||
}),
|
||||
};
|
||||
let ctx = Arc::new(QueryContext::with(
|
||||
DEFAULT_CATALOG_NAME,
|
||||
DEFAULT_SCHEMA_NAME,
|
||||
));
|
||||
|
||||
let kv_backend = prepare_mocked_backend().await;
|
||||
let inserter = Inserter::new(
|
||||
catalog::memory::MemoryCatalogManager::new(),
|
||||
create_partition_rule_manager(kv_backend.clone()).await,
|
||||
Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler)),
|
||||
Arc::new(new_table_flownode_set_cache(
|
||||
String::new(),
|
||||
Cache::new(100),
|
||||
kv_backend.clone(),
|
||||
)),
|
||||
);
|
||||
let alter_expr = inserter
|
||||
.get_alter_table_expr_on_demand(&mut req, &table, &ctx, true)
|
||||
.unwrap();
|
||||
assert!(alter_expr.is_none());
|
||||
|
||||
// The request's schema should have updated names for timestamp and field columns
|
||||
let req_schema = req.rows.as_ref().unwrap().schema.clone();
|
||||
assert_eq!(req_schema[0].column_name, ts_name);
|
||||
assert_eq!(req_schema[1].column_name, field_name);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
|
||||
#![feature(assert_matches)]
|
||||
#![feature(if_let_guard)]
|
||||
#![feature(let_chains)]
|
||||
|
||||
mod bulk_insert;
|
||||
pub mod delete;
|
||||
|
||||
@@ -57,33 +57,13 @@ mod tests {
|
||||
use api::v1::value::ValueData;
|
||||
use api::v1::{ColumnDataType, ColumnSchema, Row, SemanticType, Value};
|
||||
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
|
||||
use common_meta::key::catalog_name::{CatalogManager, CatalogNameKey};
|
||||
use common_meta::key::schema_name::{SchemaManager, SchemaNameKey};
|
||||
use common_meta::kv_backend::memory::MemoryKvBackend;
|
||||
use common_meta::kv_backend::KvBackendRef;
|
||||
use datatypes::vectors::{Int32Vector, VectorRef};
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
use super::*;
|
||||
use crate::tests::{create_partition_rule_manager, new_test_table_info};
|
||||
|
||||
async fn prepare_mocked_backend() -> KvBackendRef {
|
||||
let backend = Arc::new(MemoryKvBackend::default());
|
||||
|
||||
let catalog_manager = CatalogManager::new(backend.clone());
|
||||
let schema_manager = SchemaManager::new(backend.clone());
|
||||
|
||||
catalog_manager
|
||||
.create(CatalogNameKey::default(), false)
|
||||
.await
|
||||
.unwrap();
|
||||
schema_manager
|
||||
.create(SchemaNameKey::default(), None, false)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
backend
|
||||
}
|
||||
use crate::tests::{
|
||||
create_partition_rule_manager, new_test_table_info, prepare_mocked_backend,
|
||||
};
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_delete_request_table_to_region() {
|
||||
|
||||
@@ -73,33 +73,13 @@ mod tests {
|
||||
use api::v1::value::ValueData;
|
||||
use api::v1::{ColumnDataType, ColumnSchema, Row, SemanticType, Value};
|
||||
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
|
||||
use common_meta::key::catalog_name::{CatalogManager, CatalogNameKey};
|
||||
use common_meta::key::schema_name::{SchemaManager, SchemaNameKey};
|
||||
use common_meta::kv_backend::memory::MemoryKvBackend;
|
||||
use common_meta::kv_backend::KvBackendRef;
|
||||
use datatypes::vectors::{Int32Vector, VectorRef};
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
use super::*;
|
||||
use crate::tests::{create_partition_rule_manager, new_test_table_info};
|
||||
|
||||
async fn prepare_mocked_backend() -> KvBackendRef {
|
||||
let backend = Arc::new(MemoryKvBackend::default());
|
||||
|
||||
let catalog_manager = CatalogManager::new(backend.clone());
|
||||
let schema_manager = SchemaManager::new(backend.clone());
|
||||
|
||||
catalog_manager
|
||||
.create(CatalogNameKey::default(), false)
|
||||
.await
|
||||
.unwrap();
|
||||
schema_manager
|
||||
.create(SchemaNameKey::default(), None, false)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
backend
|
||||
}
|
||||
use crate::tests::{
|
||||
create_partition_rule_manager, new_test_table_info, prepare_mocked_backend,
|
||||
};
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_insert_request_table_to_region() {
|
||||
|
||||
@@ -843,8 +843,46 @@ impl StatementExecutor {
|
||||
}
|
||||
);
|
||||
|
||||
self.alter_logical_tables_procedure(alter_table_exprs, query_context)
|
||||
.await?;
|
||||
// group by physical table id
|
||||
let mut groups: HashMap<TableId, Vec<AlterTableExpr>> = HashMap::new();
|
||||
for expr in alter_table_exprs {
|
||||
// Get table_id from catalog_manager
|
||||
let catalog = if expr.catalog_name.is_empty() {
|
||||
query_context.current_catalog()
|
||||
} else {
|
||||
&expr.catalog_name
|
||||
};
|
||||
let schema = if expr.schema_name.is_empty() {
|
||||
query_context.current_schema()
|
||||
} else {
|
||||
expr.schema_name.to_string()
|
||||
};
|
||||
let table_name = &expr.table_name;
|
||||
let table = self
|
||||
.catalog_manager
|
||||
.table(catalog, &schema, table_name, Some(&query_context))
|
||||
.await
|
||||
.context(CatalogSnafu)?
|
||||
.with_context(|| TableNotFoundSnafu {
|
||||
table_name: format_full_table_name(catalog, &schema, table_name),
|
||||
})?;
|
||||
let table_id = table.table_info().ident.table_id;
|
||||
let physical_table_id = self
|
||||
.table_metadata_manager
|
||||
.table_route_manager()
|
||||
.get_physical_table_id(table_id)
|
||||
.await
|
||||
.context(TableMetadataManagerSnafu)?;
|
||||
groups.entry(physical_table_id).or_default().push(expr);
|
||||
}
|
||||
|
||||
// Submit procedure for each physical table
|
||||
let mut handles = Vec::with_capacity(groups.len());
|
||||
for (_physical_table_id, exprs) in groups {
|
||||
let fut = self.alter_logical_tables_procedure(exprs, query_context.clone());
|
||||
handles.push(fut);
|
||||
}
|
||||
let _results = futures::future::try_join_all(handles).await?;
|
||||
|
||||
Ok(Output::new_with_affected_rows(0))
|
||||
}
|
||||
|
||||
@@ -12,6 +12,8 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
mod kv_backend;
|
||||
mod partition_manager;
|
||||
|
||||
pub(crate) use kv_backend::prepare_mocked_backend;
|
||||
pub(crate) use partition_manager::{create_partition_rule_manager, new_test_table_info};
|
||||
|
||||
38
src/operator/src/tests/kv_backend.rs
Normal file
38
src/operator/src/tests/kv_backend.rs
Normal file
@@ -0,0 +1,38 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_meta::key::catalog_name::{CatalogManager, CatalogNameKey};
|
||||
use common_meta::key::schema_name::{SchemaManager, SchemaNameKey};
|
||||
use common_meta::kv_backend::memory::MemoryKvBackend;
|
||||
use common_meta::kv_backend::KvBackendRef;
|
||||
|
||||
pub async fn prepare_mocked_backend() -> KvBackendRef {
|
||||
let backend = Arc::new(MemoryKvBackend::default());
|
||||
|
||||
let catalog_manager = CatalogManager::new(backend.clone());
|
||||
let schema_manager = SchemaManager::new(backend.clone());
|
||||
|
||||
catalog_manager
|
||||
.create(CatalogNameKey::default(), false)
|
||||
.await
|
||||
.unwrap();
|
||||
schema_manager
|
||||
.create(SchemaNameKey::default(), None, false)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
backend
|
||||
}
|
||||
@@ -249,6 +249,7 @@ impl PipelineTable {
|
||||
requests,
|
||||
Self::query_ctx(&table_info),
|
||||
&self.statement_executor,
|
||||
false,
|
||||
)
|
||||
.await
|
||||
.context(InsertPipelineSnafu)?;
|
||||
|
||||
@@ -1 +1 @@
|
||||
v0.9.0
|
||||
v0.9.2
|
||||
|
||||
@@ -475,6 +475,38 @@ pub fn mock_timeseries() -> Vec<TimeSeries> {
|
||||
]
|
||||
}
|
||||
|
||||
/// Add new labels to the mock timeseries.
|
||||
pub fn mock_timeseries_new_label() -> Vec<TimeSeries> {
|
||||
let ts_demo_metrics = TimeSeries {
|
||||
labels: vec![
|
||||
new_label(METRIC_NAME_LABEL.to_string(), "demo_metrics".to_string()),
|
||||
new_label("idc".to_string(), "idc3".to_string()),
|
||||
new_label("new_label1".to_string(), "foo".to_string()),
|
||||
],
|
||||
samples: vec![Sample {
|
||||
value: 42.0,
|
||||
timestamp: 3000,
|
||||
}],
|
||||
..Default::default()
|
||||
};
|
||||
let ts_multi_labels = TimeSeries {
|
||||
labels: vec![
|
||||
new_label(METRIC_NAME_LABEL.to_string(), "metric1".to_string()),
|
||||
new_label("idc".to_string(), "idc4".to_string()),
|
||||
new_label("env".to_string(), "prod".to_string()),
|
||||
new_label("host".to_string(), "host9".to_string()),
|
||||
new_label("new_label2".to_string(), "bar".to_string()),
|
||||
],
|
||||
samples: vec![Sample {
|
||||
value: 99.0,
|
||||
timestamp: 4000,
|
||||
}],
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
vec![ts_demo_metrics, ts_multi_labels]
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
@@ -43,7 +43,7 @@ use servers::http::result::greptime_result_v1::GreptimedbV1Response;
|
||||
use servers::http::result::influxdb_result_v1::{InfluxdbOutput, InfluxdbV1Response};
|
||||
use servers::http::test_helpers::{TestClient, TestResponse};
|
||||
use servers::http::GreptimeQueryOutput;
|
||||
use servers::prom_store;
|
||||
use servers::prom_store::{self, mock_timeseries_new_label};
|
||||
use table::table_name::TableName;
|
||||
use tests_integration::test_util::{
|
||||
setup_test_http_app, setup_test_http_app_with_frontend,
|
||||
@@ -1270,6 +1270,24 @@ pub async fn test_prometheus_remote_write(store_type: StorageType) {
|
||||
)
|
||||
.await;
|
||||
|
||||
// Write snappy encoded data with new labels
|
||||
let write_request = WriteRequest {
|
||||
timeseries: mock_timeseries_new_label(),
|
||||
..Default::default()
|
||||
};
|
||||
let serialized_request = write_request.encode_to_vec();
|
||||
let compressed_request =
|
||||
prom_store::snappy_compress(&serialized_request).expect("failed to encode snappy");
|
||||
|
||||
let res = client
|
||||
.post("/v1/prometheus/write")
|
||||
.header("Content-Encoding", "snappy")
|
||||
.body(compressed_request)
|
||||
.send()
|
||||
.await;
|
||||
|
||||
assert_eq!(res.status(), StatusCode::NO_CONTENT);
|
||||
|
||||
guard.remove_all().await;
|
||||
}
|
||||
|
||||
|
||||
@@ -14,7 +14,6 @@ sync_write = false
|
||||
{{ else }}
|
||||
provider = "kafka"
|
||||
broker_endpoints = {kafka_wal_broker_endpoints | unescaped}
|
||||
linger = "5ms"
|
||||
overwrite_entry_start_id = true
|
||||
{{ endif }}
|
||||
|
||||
|
||||
@@ -13,7 +13,6 @@ sync_write = false
|
||||
{{ else }}
|
||||
provider = "kafka"
|
||||
broker_endpoints = {kafka_wal_broker_endpoints | unescaped}
|
||||
linger = "5ms"
|
||||
{{ endif }}
|
||||
|
||||
[storage]
|
||||
|
||||
Reference in New Issue
Block a user