mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-23 14:40:01 +00:00
Compare commits
10 Commits
feature/df
...
v0.17.0
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8bf772fb50 | ||
|
|
9c1240921d | ||
|
|
eb52129a91 | ||
|
|
a0a2b40cbe | ||
|
|
067c4458d6 | ||
|
|
4e9c31bf5c | ||
|
|
9320a6ddaa | ||
|
|
4c9fcb7dee | ||
|
|
9dc16772fe | ||
|
|
6ee91f6af4 |
@@ -1,3 +1,8 @@
|
|||||||
|
logging:
|
||||||
|
level: "info"
|
||||||
|
format: "json"
|
||||||
|
filters:
|
||||||
|
- log_store=debug
|
||||||
meta:
|
meta:
|
||||||
configData: |-
|
configData: |-
|
||||||
[runtime]
|
[runtime]
|
||||||
|
|||||||
@@ -402,8 +402,8 @@
|
|||||||
| `event_recorder` | -- | -- | Configuration options for the event recorder. |
|
| `event_recorder` | -- | -- | Configuration options for the event recorder. |
|
||||||
| `event_recorder.ttl` | String | `90d` | TTL for the events table that will be used to store the events. Default is `90d`. |
|
| `event_recorder.ttl` | String | `90d` | TTL for the events table that will be used to store the events. Default is `90d`. |
|
||||||
| `stats_persistence` | -- | -- | Configuration options for the stats persistence. |
|
| `stats_persistence` | -- | -- | Configuration options for the stats persistence. |
|
||||||
| `stats_persistence.ttl` | String | `30d` | TTL for the stats table that will be used to store the stats. Default is `30d`.<br/>Set to `0s` to disable stats persistence. |
|
| `stats_persistence.ttl` | String | `0s` | TTL for the stats table that will be used to store the stats.<br/>Set to `0s` to disable stats persistence.<br/>Default is `0s`.<br/>If you want to enable stats persistence, set the TTL to a value greater than 0.<br/>It is recommended to set a small value, e.g., `3h`. |
|
||||||
| `stats_persistence.interval` | String | `60s` | The interval to persist the stats. Default is `60s`.<br/>The minimum value is `60s`, if the value is less than `60s`, it will be overridden to `60s`. |
|
| `stats_persistence.interval` | String | `10m` | The interval to persist the stats. Default is `10m`.<br/>The minimum value is `10m`, if the value is less than `10m`, it will be overridden to `10m`. |
|
||||||
| `logging` | -- | -- | The logging options. |
|
| `logging` | -- | -- | The logging options. |
|
||||||
| `logging.dir` | String | `./greptimedb_data/logs` | The directory to store the log files. If set to empty, logs will not be written to files. |
|
| `logging.dir` | String | `./greptimedb_data/logs` | The directory to store the log files. If set to empty, logs will not be written to files. |
|
||||||
| `logging.level` | String | Unset | The log level. Can be `info`/`debug`/`warn`/`error`. |
|
| `logging.level` | String | Unset | The log level. Can be `info`/`debug`/`warn`/`error`. |
|
||||||
|
|||||||
@@ -274,12 +274,15 @@ ttl = "90d"
|
|||||||
|
|
||||||
## Configuration options for the stats persistence.
|
## Configuration options for the stats persistence.
|
||||||
[stats_persistence]
|
[stats_persistence]
|
||||||
## TTL for the stats table that will be used to store the stats. Default is `30d`.
|
## TTL for the stats table that will be used to store the stats.
|
||||||
## Set to `0s` to disable stats persistence.
|
## Set to `0s` to disable stats persistence.
|
||||||
ttl = "30d"
|
## Default is `0s`.
|
||||||
## The interval to persist the stats. Default is `60s`.
|
## If you want to enable stats persistence, set the TTL to a value greater than 0.
|
||||||
## The minimum value is `60s`, if the value is less than `60s`, it will be overridden to `60s`.
|
## It is recommended to set a small value, e.g., `3h`.
|
||||||
interval = "60s"
|
ttl = "0s"
|
||||||
|
## The interval to persist the stats. Default is `10m`.
|
||||||
|
## The minimum value is `10m`, if the value is less than `10m`, it will be overridden to `10m`.
|
||||||
|
interval = "10m"
|
||||||
|
|
||||||
## The logging options.
|
## The logging options.
|
||||||
[logging]
|
[logging]
|
||||||
|
|||||||
File diff suppressed because it is too large
Load Diff
@@ -87,6 +87,13 @@
|
|||||||
| Other Request P99 per Instance | `histogram_quantile(0.99, sum by(instance, pod, le, scheme, operation) (rate(opendal_operation_duration_seconds_bucket{instance=~"$datanode", operation!~"read\|write\|list\|Writer::write\|Writer::close\|Reader::read"}[$__rate_interval])))` | `timeseries` | Other Request P99 per Instance. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{scheme}}]-[{{operation}}]` |
|
| Other Request P99 per Instance | `histogram_quantile(0.99, sum by(instance, pod, le, scheme, operation) (rate(opendal_operation_duration_seconds_bucket{instance=~"$datanode", operation!~"read\|write\|list\|Writer::write\|Writer::close\|Reader::read"}[$__rate_interval])))` | `timeseries` | Other Request P99 per Instance. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{scheme}}]-[{{operation}}]` |
|
||||||
| Opendal traffic | `sum by(instance, pod, scheme, operation) (rate(opendal_operation_bytes_sum{instance=~"$datanode"}[$__rate_interval]))` | `timeseries` | Total traffic as in bytes by instance and operation | `prometheus` | `decbytes` | `[{{instance}}]-[{{pod}}]-[{{scheme}}]-[{{operation}}]` |
|
| Opendal traffic | `sum by(instance, pod, scheme, operation) (rate(opendal_operation_bytes_sum{instance=~"$datanode"}[$__rate_interval]))` | `timeseries` | Total traffic as in bytes by instance and operation | `prometheus` | `decbytes` | `[{{instance}}]-[{{pod}}]-[{{scheme}}]-[{{operation}}]` |
|
||||||
| OpenDAL errors per Instance | `sum by(instance, pod, scheme, operation, error) (rate(opendal_operation_errors_total{instance=~"$datanode", error!="NotFound"}[$__rate_interval]))` | `timeseries` | OpenDAL error counts per Instance. | `prometheus` | -- | `[{{instance}}]-[{{pod}}]-[{{scheme}}]-[{{operation}}]-[{{error}}]` |
|
| OpenDAL errors per Instance | `sum by(instance, pod, scheme, operation, error) (rate(opendal_operation_errors_total{instance=~"$datanode", error!="NotFound"}[$__rate_interval]))` | `timeseries` | OpenDAL error counts per Instance. | `prometheus` | -- | `[{{instance}}]-[{{pod}}]-[{{scheme}}]-[{{operation}}]-[{{error}}]` |
|
||||||
|
# Remote WAL
|
||||||
|
| Title | Query | Type | Description | Datasource | Unit | Legend Format |
|
||||||
|
| --- | --- | --- | --- | --- | --- | --- |
|
||||||
|
| Triggered region flush total | `meta_triggered_region_flush_total` | `timeseries` | Triggered region flush total | `prometheus` | `none` | `{{pod}}-{{topic_name}}` |
|
||||||
|
| Triggered region checkpoint total | `meta_triggered_region_checkpoint_total` | `timeseries` | Triggered region checkpoint total | `prometheus` | `none` | `{{pod}}-{{topic_name}}` |
|
||||||
|
| Topic estimated replay size | `meta_topic_estimated_replay_size` | `timeseries` | Topic estimated max replay size | `prometheus` | `bytes` | `{{pod}}-{{topic_name}}` |
|
||||||
|
| Kafka logstore's bytes traffic | `rate(greptime_logstore_kafka_client_bytes_total[$__rate_interval])` | `timeseries` | Kafka logstore's bytes traffic | `prometheus` | `bytes` | `{{pod}}-{{logstore}}` |
|
||||||
# Metasrv
|
# Metasrv
|
||||||
| Title | Query | Type | Description | Datasource | Unit | Legend Format |
|
| Title | Query | Type | Description | Datasource | Unit | Legend Format |
|
||||||
| --- | --- | --- | --- | --- | --- | --- |
|
| --- | --- | --- | --- | --- | --- | --- |
|
||||||
@@ -103,6 +110,8 @@
|
|||||||
| Meta KV Ops Latency | `histogram_quantile(0.99, sum by(pod, le, op, target) (greptime_meta_kv_request_elapsed_bucket))` | `timeseries` | Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads. | `prometheus` | `s` | `{{pod}}-{{op}} p99` |
|
| Meta KV Ops Latency | `histogram_quantile(0.99, sum by(pod, le, op, target) (greptime_meta_kv_request_elapsed_bucket))` | `timeseries` | Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads. | `prometheus` | `s` | `{{pod}}-{{op}} p99` |
|
||||||
| Rate of meta KV Ops | `rate(greptime_meta_kv_request_elapsed_count[$__rate_interval])` | `timeseries` | Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads. | `prometheus` | `none` | `{{pod}}-{{op}} p99` |
|
| Rate of meta KV Ops | `rate(greptime_meta_kv_request_elapsed_count[$__rate_interval])` | `timeseries` | Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads. | `prometheus` | `none` | `{{pod}}-{{op}} p99` |
|
||||||
| DDL Latency | `histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_tables_bucket))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_table))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_view))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_flow))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_drop_table))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_alter_table))` | `timeseries` | Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads. | `prometheus` | `s` | `CreateLogicalTables-{{step}} p90` |
|
| DDL Latency | `histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_tables_bucket))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_table))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_view))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_flow))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_drop_table))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_alter_table))` | `timeseries` | Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads. | `prometheus` | `s` | `CreateLogicalTables-{{step}} p90` |
|
||||||
|
| Reconciliation stats | `greptime_meta_reconciliation_stats` | `timeseries` | Reconciliation stats | `prometheus` | `s` | `{{pod}}-{{table_type}}-{{type}}` |
|
||||||
|
| Reconciliation steps | `histogram_quantile(0.9, greptime_meta_reconciliation_procedure_bucket)` | `timeseries` | Elapsed of Reconciliation steps | `prometheus` | `s` | `{{procedure_name}}-{{step}}-P90` |
|
||||||
# Flownode
|
# Flownode
|
||||||
| Title | Query | Type | Description | Datasource | Unit | Legend Format |
|
| Title | Query | Type | Description | Datasource | Unit | Legend Format |
|
||||||
| --- | --- | --- | --- | --- | --- | --- |
|
| --- | --- | --- | --- | --- | --- | --- |
|
||||||
|
|||||||
@@ -802,6 +802,48 @@ groups:
|
|||||||
type: prometheus
|
type: prometheus
|
||||||
uid: ${metrics}
|
uid: ${metrics}
|
||||||
legendFormat: '[{{instance}}]-[{{pod}}]-[{{scheme}}]-[{{operation}}]-[{{error}}]'
|
legendFormat: '[{{instance}}]-[{{pod}}]-[{{scheme}}]-[{{operation}}]-[{{error}}]'
|
||||||
|
- title: Remote WAL
|
||||||
|
panels:
|
||||||
|
- title: Triggered region flush total
|
||||||
|
type: timeseries
|
||||||
|
description: Triggered region flush total
|
||||||
|
unit: none
|
||||||
|
queries:
|
||||||
|
- expr: meta_triggered_region_flush_total
|
||||||
|
datasource:
|
||||||
|
type: prometheus
|
||||||
|
uid: ${metrics}
|
||||||
|
legendFormat: '{{pod}}-{{topic_name}}'
|
||||||
|
- title: Triggered region checkpoint total
|
||||||
|
type: timeseries
|
||||||
|
description: Triggered region checkpoint total
|
||||||
|
unit: none
|
||||||
|
queries:
|
||||||
|
- expr: meta_triggered_region_checkpoint_total
|
||||||
|
datasource:
|
||||||
|
type: prometheus
|
||||||
|
uid: ${metrics}
|
||||||
|
legendFormat: '{{pod}}-{{topic_name}}'
|
||||||
|
- title: Topic estimated replay size
|
||||||
|
type: timeseries
|
||||||
|
description: Topic estimated max replay size
|
||||||
|
unit: bytes
|
||||||
|
queries:
|
||||||
|
- expr: meta_topic_estimated_replay_size
|
||||||
|
datasource:
|
||||||
|
type: prometheus
|
||||||
|
uid: ${metrics}
|
||||||
|
legendFormat: '{{pod}}-{{topic_name}}'
|
||||||
|
- title: Kafka logstore's bytes traffic
|
||||||
|
type: timeseries
|
||||||
|
description: Kafka logstore's bytes traffic
|
||||||
|
unit: bytes
|
||||||
|
queries:
|
||||||
|
- expr: rate(greptime_logstore_kafka_client_bytes_total[$__rate_interval])
|
||||||
|
datasource:
|
||||||
|
type: prometheus
|
||||||
|
uid: ${metrics}
|
||||||
|
legendFormat: '{{pod}}-{{logstore}}'
|
||||||
- title: Metasrv
|
- title: Metasrv
|
||||||
panels:
|
panels:
|
||||||
- title: Region migration datanode
|
- title: Region migration datanode
|
||||||
@@ -948,6 +990,26 @@ groups:
|
|||||||
type: prometheus
|
type: prometheus
|
||||||
uid: ${metrics}
|
uid: ${metrics}
|
||||||
legendFormat: AlterTable-{{step}} p90
|
legendFormat: AlterTable-{{step}} p90
|
||||||
|
- title: Reconciliation stats
|
||||||
|
type: timeseries
|
||||||
|
description: Reconciliation stats
|
||||||
|
unit: s
|
||||||
|
queries:
|
||||||
|
- expr: greptime_meta_reconciliation_stats
|
||||||
|
datasource:
|
||||||
|
type: prometheus
|
||||||
|
uid: ${metrics}
|
||||||
|
legendFormat: '{{pod}}-{{table_type}}-{{type}}'
|
||||||
|
- title: Reconciliation steps
|
||||||
|
type: timeseries
|
||||||
|
description: 'Elapsed of Reconciliation steps '
|
||||||
|
unit: s
|
||||||
|
queries:
|
||||||
|
- expr: histogram_quantile(0.9, greptime_meta_reconciliation_procedure_bucket)
|
||||||
|
datasource:
|
||||||
|
type: prometheus
|
||||||
|
uid: ${metrics}
|
||||||
|
legendFormat: '{{procedure_name}}-{{step}}-P90'
|
||||||
- title: Flownode
|
- title: Flownode
|
||||||
panels:
|
panels:
|
||||||
- title: Flow Ingest / Output Rate
|
- title: Flow Ingest / Output Rate
|
||||||
|
|||||||
File diff suppressed because it is too large
Load Diff
@@ -87,6 +87,13 @@
|
|||||||
| Other Request P99 per Instance | `histogram_quantile(0.99, sum by(instance, pod, le, scheme, operation) (rate(opendal_operation_duration_seconds_bucket{ operation!~"read\|write\|list\|Writer::write\|Writer::close\|Reader::read"}[$__rate_interval])))` | `timeseries` | Other Request P99 per Instance. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{scheme}}]-[{{operation}}]` |
|
| Other Request P99 per Instance | `histogram_quantile(0.99, sum by(instance, pod, le, scheme, operation) (rate(opendal_operation_duration_seconds_bucket{ operation!~"read\|write\|list\|Writer::write\|Writer::close\|Reader::read"}[$__rate_interval])))` | `timeseries` | Other Request P99 per Instance. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{scheme}}]-[{{operation}}]` |
|
||||||
| Opendal traffic | `sum by(instance, pod, scheme, operation) (rate(opendal_operation_bytes_sum{}[$__rate_interval]))` | `timeseries` | Total traffic as in bytes by instance and operation | `prometheus` | `decbytes` | `[{{instance}}]-[{{pod}}]-[{{scheme}}]-[{{operation}}]` |
|
| Opendal traffic | `sum by(instance, pod, scheme, operation) (rate(opendal_operation_bytes_sum{}[$__rate_interval]))` | `timeseries` | Total traffic as in bytes by instance and operation | `prometheus` | `decbytes` | `[{{instance}}]-[{{pod}}]-[{{scheme}}]-[{{operation}}]` |
|
||||||
| OpenDAL errors per Instance | `sum by(instance, pod, scheme, operation, error) (rate(opendal_operation_errors_total{ error!="NotFound"}[$__rate_interval]))` | `timeseries` | OpenDAL error counts per Instance. | `prometheus` | -- | `[{{instance}}]-[{{pod}}]-[{{scheme}}]-[{{operation}}]-[{{error}}]` |
|
| OpenDAL errors per Instance | `sum by(instance, pod, scheme, operation, error) (rate(opendal_operation_errors_total{ error!="NotFound"}[$__rate_interval]))` | `timeseries` | OpenDAL error counts per Instance. | `prometheus` | -- | `[{{instance}}]-[{{pod}}]-[{{scheme}}]-[{{operation}}]-[{{error}}]` |
|
||||||
|
# Remote WAL
|
||||||
|
| Title | Query | Type | Description | Datasource | Unit | Legend Format |
|
||||||
|
| --- | --- | --- | --- | --- | --- | --- |
|
||||||
|
| Triggered region flush total | `meta_triggered_region_flush_total` | `timeseries` | Triggered region flush total | `prometheus` | `none` | `{{pod}}-{{topic_name}}` |
|
||||||
|
| Triggered region checkpoint total | `meta_triggered_region_checkpoint_total` | `timeseries` | Triggered region checkpoint total | `prometheus` | `none` | `{{pod}}-{{topic_name}}` |
|
||||||
|
| Topic estimated replay size | `meta_topic_estimated_replay_size` | `timeseries` | Topic estimated max replay size | `prometheus` | `bytes` | `{{pod}}-{{topic_name}}` |
|
||||||
|
| Kafka logstore's bytes traffic | `rate(greptime_logstore_kafka_client_bytes_total[$__rate_interval])` | `timeseries` | Kafka logstore's bytes traffic | `prometheus` | `bytes` | `{{pod}}-{{logstore}}` |
|
||||||
# Metasrv
|
# Metasrv
|
||||||
| Title | Query | Type | Description | Datasource | Unit | Legend Format |
|
| Title | Query | Type | Description | Datasource | Unit | Legend Format |
|
||||||
| --- | --- | --- | --- | --- | --- | --- |
|
| --- | --- | --- | --- | --- | --- | --- |
|
||||||
@@ -103,6 +110,8 @@
|
|||||||
| Meta KV Ops Latency | `histogram_quantile(0.99, sum by(pod, le, op, target) (greptime_meta_kv_request_elapsed_bucket))` | `timeseries` | Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads. | `prometheus` | `s` | `{{pod}}-{{op}} p99` |
|
| Meta KV Ops Latency | `histogram_quantile(0.99, sum by(pod, le, op, target) (greptime_meta_kv_request_elapsed_bucket))` | `timeseries` | Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads. | `prometheus` | `s` | `{{pod}}-{{op}} p99` |
|
||||||
| Rate of meta KV Ops | `rate(greptime_meta_kv_request_elapsed_count[$__rate_interval])` | `timeseries` | Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads. | `prometheus` | `none` | `{{pod}}-{{op}} p99` |
|
| Rate of meta KV Ops | `rate(greptime_meta_kv_request_elapsed_count[$__rate_interval])` | `timeseries` | Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads. | `prometheus` | `none` | `{{pod}}-{{op}} p99` |
|
||||||
| DDL Latency | `histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_tables_bucket))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_table))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_view))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_flow))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_drop_table))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_alter_table))` | `timeseries` | Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads. | `prometheus` | `s` | `CreateLogicalTables-{{step}} p90` |
|
| DDL Latency | `histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_tables_bucket))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_table))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_view))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_flow))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_drop_table))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_alter_table))` | `timeseries` | Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads. | `prometheus` | `s` | `CreateLogicalTables-{{step}} p90` |
|
||||||
|
| Reconciliation stats | `greptime_meta_reconciliation_stats` | `timeseries` | Reconciliation stats | `prometheus` | `s` | `{{pod}}-{{table_type}}-{{type}}` |
|
||||||
|
| Reconciliation steps | `histogram_quantile(0.9, greptime_meta_reconciliation_procedure_bucket)` | `timeseries` | Elapsed of Reconciliation steps | `prometheus` | `s` | `{{procedure_name}}-{{step}}-P90` |
|
||||||
# Flownode
|
# Flownode
|
||||||
| Title | Query | Type | Description | Datasource | Unit | Legend Format |
|
| Title | Query | Type | Description | Datasource | Unit | Legend Format |
|
||||||
| --- | --- | --- | --- | --- | --- | --- |
|
| --- | --- | --- | --- | --- | --- | --- |
|
||||||
|
|||||||
@@ -802,6 +802,48 @@ groups:
|
|||||||
type: prometheus
|
type: prometheus
|
||||||
uid: ${metrics}
|
uid: ${metrics}
|
||||||
legendFormat: '[{{instance}}]-[{{pod}}]-[{{scheme}}]-[{{operation}}]-[{{error}}]'
|
legendFormat: '[{{instance}}]-[{{pod}}]-[{{scheme}}]-[{{operation}}]-[{{error}}]'
|
||||||
|
- title: Remote WAL
|
||||||
|
panels:
|
||||||
|
- title: Triggered region flush total
|
||||||
|
type: timeseries
|
||||||
|
description: Triggered region flush total
|
||||||
|
unit: none
|
||||||
|
queries:
|
||||||
|
- expr: meta_triggered_region_flush_total
|
||||||
|
datasource:
|
||||||
|
type: prometheus
|
||||||
|
uid: ${metrics}
|
||||||
|
legendFormat: '{{pod}}-{{topic_name}}'
|
||||||
|
- title: Triggered region checkpoint total
|
||||||
|
type: timeseries
|
||||||
|
description: Triggered region checkpoint total
|
||||||
|
unit: none
|
||||||
|
queries:
|
||||||
|
- expr: meta_triggered_region_checkpoint_total
|
||||||
|
datasource:
|
||||||
|
type: prometheus
|
||||||
|
uid: ${metrics}
|
||||||
|
legendFormat: '{{pod}}-{{topic_name}}'
|
||||||
|
- title: Topic estimated replay size
|
||||||
|
type: timeseries
|
||||||
|
description: Topic estimated max replay size
|
||||||
|
unit: bytes
|
||||||
|
queries:
|
||||||
|
- expr: meta_topic_estimated_replay_size
|
||||||
|
datasource:
|
||||||
|
type: prometheus
|
||||||
|
uid: ${metrics}
|
||||||
|
legendFormat: '{{pod}}-{{topic_name}}'
|
||||||
|
- title: Kafka logstore's bytes traffic
|
||||||
|
type: timeseries
|
||||||
|
description: Kafka logstore's bytes traffic
|
||||||
|
unit: bytes
|
||||||
|
queries:
|
||||||
|
- expr: rate(greptime_logstore_kafka_client_bytes_total[$__rate_interval])
|
||||||
|
datasource:
|
||||||
|
type: prometheus
|
||||||
|
uid: ${metrics}
|
||||||
|
legendFormat: '{{pod}}-{{logstore}}'
|
||||||
- title: Metasrv
|
- title: Metasrv
|
||||||
panels:
|
panels:
|
||||||
- title: Region migration datanode
|
- title: Region migration datanode
|
||||||
@@ -948,6 +990,26 @@ groups:
|
|||||||
type: prometheus
|
type: prometheus
|
||||||
uid: ${metrics}
|
uid: ${metrics}
|
||||||
legendFormat: AlterTable-{{step}} p90
|
legendFormat: AlterTable-{{step}} p90
|
||||||
|
- title: Reconciliation stats
|
||||||
|
type: timeseries
|
||||||
|
description: Reconciliation stats
|
||||||
|
unit: s
|
||||||
|
queries:
|
||||||
|
- expr: greptime_meta_reconciliation_stats
|
||||||
|
datasource:
|
||||||
|
type: prometheus
|
||||||
|
uid: ${metrics}
|
||||||
|
legendFormat: '{{pod}}-{{table_type}}-{{type}}'
|
||||||
|
- title: Reconciliation steps
|
||||||
|
type: timeseries
|
||||||
|
description: 'Elapsed of Reconciliation steps '
|
||||||
|
unit: s
|
||||||
|
queries:
|
||||||
|
- expr: histogram_quantile(0.9, greptime_meta_reconciliation_procedure_bucket)
|
||||||
|
datasource:
|
||||||
|
type: prometheus
|
||||||
|
uid: ${metrics}
|
||||||
|
legendFormat: '{{procedure_name}}-{{step}}-P90'
|
||||||
- title: Flownode
|
- title: Flownode
|
||||||
panels:
|
panels:
|
||||||
- title: Flow Ingest / Output Rate
|
- title: Flow Ingest / Output Rate
|
||||||
|
|||||||
@@ -251,7 +251,6 @@ macro_rules! define_from_tonic_status {
|
|||||||
.get(key)
|
.get(key)
|
||||||
.and_then(|v| String::from_utf8(v.as_bytes().to_vec()).ok())
|
.and_then(|v| String::from_utf8(v.as_bytes().to_vec()).ok())
|
||||||
}
|
}
|
||||||
|
|
||||||
let code = metadata_value(&e, $crate::GREPTIME_DB_HEADER_ERROR_CODE)
|
let code = metadata_value(&e, $crate::GREPTIME_DB_HEADER_ERROR_CODE)
|
||||||
.and_then(|s| {
|
.and_then(|s| {
|
||||||
if let Ok(code) = s.parse::<u32>() {
|
if let Ok(code) = s.parse::<u32>() {
|
||||||
|
|||||||
@@ -108,10 +108,6 @@ pub struct OpenRegion {
|
|||||||
pub region_wal_options: HashMap<RegionNumber, String>,
|
pub region_wal_options: HashMap<RegionNumber, String>,
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
pub skip_wal_replay: bool,
|
pub skip_wal_replay: bool,
|
||||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
|
||||||
pub replay_entry_id: Option<u64>,
|
|
||||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
|
||||||
pub metadata_replay_entry_id: Option<u64>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl OpenRegion {
|
impl OpenRegion {
|
||||||
@@ -128,22 +124,8 @@ impl OpenRegion {
|
|||||||
region_options,
|
region_options,
|
||||||
region_wal_options,
|
region_wal_options,
|
||||||
skip_wal_replay,
|
skip_wal_replay,
|
||||||
replay_entry_id: None,
|
|
||||||
metadata_replay_entry_id: None,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Sets the replay entry id.
|
|
||||||
pub fn with_replay_entry_id(mut self, replay_entry_id: Option<u64>) -> Self {
|
|
||||||
self.replay_entry_id = replay_entry_id;
|
|
||||||
self
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Sets the metadata replay entry id.
|
|
||||||
pub fn with_metadata_replay_entry_id(mut self, metadata_replay_entry_id: Option<u64>) -> Self {
|
|
||||||
self.metadata_replay_entry_id = metadata_replay_entry_id;
|
|
||||||
self
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// The instruction of downgrading leader region.
|
/// The instruction of downgrading leader region.
|
||||||
@@ -169,7 +151,7 @@ impl Display for DowngradeRegion {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Upgrades a follower region to leader region.
|
/// Upgrades a follower region to leader region.
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
|
||||||
pub struct UpgradeRegion {
|
pub struct UpgradeRegion {
|
||||||
/// The [RegionId].
|
/// The [RegionId].
|
||||||
pub region_id: RegionId,
|
pub region_id: RegionId,
|
||||||
@@ -186,6 +168,24 @@ pub struct UpgradeRegion {
|
|||||||
/// The hint for replaying memtable.
|
/// The hint for replaying memtable.
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
pub location_id: Option<u64>,
|
pub location_id: Option<u64>,
|
||||||
|
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||||
|
pub replay_entry_id: Option<u64>,
|
||||||
|
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||||
|
pub metadata_replay_entry_id: Option<u64>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl UpgradeRegion {
|
||||||
|
/// Sets the replay entry id.
|
||||||
|
pub fn with_replay_entry_id(mut self, replay_entry_id: Option<u64>) -> Self {
|
||||||
|
self.replay_entry_id = replay_entry_id;
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Sets the metadata replay entry id.
|
||||||
|
pub fn with_metadata_replay_entry_id(mut self, metadata_replay_entry_id: Option<u64>) -> Self {
|
||||||
|
self.metadata_replay_entry_id = metadata_replay_entry_id;
|
||||||
|
self
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||||
@@ -370,8 +370,6 @@ mod tests {
|
|||||||
region_options,
|
region_options,
|
||||||
region_wal_options: HashMap::new(),
|
region_wal_options: HashMap::new(),
|
||||||
skip_wal_replay: false,
|
skip_wal_replay: false,
|
||||||
replay_entry_id: None,
|
|
||||||
metadata_replay_entry_id: None,
|
|
||||||
};
|
};
|
||||||
assert_eq!(expected, deserialized);
|
assert_eq!(expected, deserialized);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -46,7 +46,7 @@ pub struct TopicRegionValue {
|
|||||||
pub checkpoint: Option<ReplayCheckpoint>,
|
pub checkpoint: Option<ReplayCheckpoint>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Copy, Serialize, Deserialize, Default)]
|
#[derive(Debug, Clone, Copy, Serialize, Deserialize, Default, PartialEq, Eq, PartialOrd, Ord)]
|
||||||
pub struct ReplayCheckpoint {
|
pub struct ReplayCheckpoint {
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
pub entry_id: u64,
|
pub entry_id: u64,
|
||||||
|
|||||||
@@ -24,6 +24,7 @@ use datatypes::schema::ColumnSchema;
|
|||||||
use futures::future::{join_all, try_join_all};
|
use futures::future::{join_all, try_join_all};
|
||||||
use snafu::{ensure, OptionExt, ResultExt};
|
use snafu::{ensure, OptionExt, ResultExt};
|
||||||
use store_api::metadata::{ColumnMetadata, RegionMetadata};
|
use store_api::metadata::{ColumnMetadata, RegionMetadata};
|
||||||
|
use store_api::storage::consts::ReservedColumnId;
|
||||||
use store_api::storage::{RegionId, TableId};
|
use store_api::storage::{RegionId, TableId};
|
||||||
use table::metadata::{RawTableInfo, RawTableMeta};
|
use table::metadata::{RawTableInfo, RawTableMeta};
|
||||||
use table::table_name::TableName;
|
use table::table_name::TableName;
|
||||||
@@ -384,6 +385,7 @@ pub(crate) fn build_table_meta_from_column_metadatas(
|
|||||||
|
|
||||||
*next_column_id = column_ids
|
*next_column_id = column_ids
|
||||||
.iter()
|
.iter()
|
||||||
|
.filter(|id| !ReservedColumnId::is_reserved(**id))
|
||||||
.max()
|
.max()
|
||||||
.map(|max| max + 1)
|
.map(|max| max + 1)
|
||||||
.unwrap_or(*next_column_id)
|
.unwrap_or(*next_column_id)
|
||||||
@@ -1039,9 +1041,13 @@ mod tests {
|
|||||||
fn test_build_table_info_from_column_metadatas() {
|
fn test_build_table_info_from_column_metadatas() {
|
||||||
let mut column_metadatas = new_test_column_metadatas();
|
let mut column_metadatas = new_test_column_metadatas();
|
||||||
column_metadatas.push(ColumnMetadata {
|
column_metadatas.push(ColumnMetadata {
|
||||||
column_schema: ColumnSchema::new("col3", ConcreteDataType::string_datatype(), true),
|
column_schema: ColumnSchema::new(
|
||||||
|
"__table_id",
|
||||||
|
ConcreteDataType::string_datatype(),
|
||||||
|
true,
|
||||||
|
),
|
||||||
semantic_type: SemanticType::Tag,
|
semantic_type: SemanticType::Tag,
|
||||||
column_id: 3,
|
column_id: ReservedColumnId::table_id(),
|
||||||
});
|
});
|
||||||
|
|
||||||
let table_id = 1;
|
let table_id = 1;
|
||||||
@@ -1066,8 +1072,11 @@ mod tests {
|
|||||||
assert_eq!(new_table_meta.partition_key_indices, vec![2]);
|
assert_eq!(new_table_meta.partition_key_indices, vec![2]);
|
||||||
assert_eq!(new_table_meta.value_indices, vec![1, 2]);
|
assert_eq!(new_table_meta.value_indices, vec![1, 2]);
|
||||||
assert_eq!(new_table_meta.schema.timestamp_index, Some(1));
|
assert_eq!(new_table_meta.schema.timestamp_index, Some(1));
|
||||||
assert_eq!(new_table_meta.column_ids, vec![0, 1, 2, 3]);
|
assert_eq!(
|
||||||
assert_eq!(new_table_meta.next_column_id, 4);
|
new_table_meta.column_ids,
|
||||||
|
vec![0, 1, 2, ReservedColumnId::table_id()]
|
||||||
|
);
|
||||||
|
assert_eq!(new_table_meta.next_column_id, table_meta.next_column_id);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
|||||||
@@ -238,10 +238,7 @@ mod tests {
|
|||||||
// Upgrade region
|
// Upgrade region
|
||||||
let instruction = Instruction::UpgradeRegion(UpgradeRegion {
|
let instruction = Instruction::UpgradeRegion(UpgradeRegion {
|
||||||
region_id,
|
region_id,
|
||||||
last_entry_id: None,
|
..Default::default()
|
||||||
metadata_last_entry_id: None,
|
|
||||||
replay_timeout: None,
|
|
||||||
location_id: None,
|
|
||||||
});
|
});
|
||||||
assert!(
|
assert!(
|
||||||
heartbeat_handler.is_acceptable(&heartbeat_env.create_handler_ctx((meta, instruction)))
|
heartbeat_handler.is_acceptable(&heartbeat_env.create_handler_ctx((meta, instruction)))
|
||||||
|
|||||||
@@ -16,7 +16,7 @@ use common_meta::instruction::{InstructionReply, OpenRegion, SimpleReply};
|
|||||||
use common_meta::wal_options_allocator::prepare_wal_options;
|
use common_meta::wal_options_allocator::prepare_wal_options;
|
||||||
use futures_util::future::BoxFuture;
|
use futures_util::future::BoxFuture;
|
||||||
use store_api::path_utils::table_dir;
|
use store_api::path_utils::table_dir;
|
||||||
use store_api::region_request::{PathType, RegionOpenRequest, RegionRequest, ReplayCheckpoint};
|
use store_api::region_request::{PathType, RegionOpenRequest, RegionRequest};
|
||||||
|
|
||||||
use crate::heartbeat::handler::HandlerContext;
|
use crate::heartbeat::handler::HandlerContext;
|
||||||
|
|
||||||
@@ -29,31 +29,18 @@ impl HandlerContext {
|
|||||||
mut region_options,
|
mut region_options,
|
||||||
region_wal_options,
|
region_wal_options,
|
||||||
skip_wal_replay,
|
skip_wal_replay,
|
||||||
replay_entry_id,
|
|
||||||
metadata_replay_entry_id,
|
|
||||||
}: OpenRegion,
|
}: OpenRegion,
|
||||||
) -> BoxFuture<'static, Option<InstructionReply>> {
|
) -> BoxFuture<'static, Option<InstructionReply>> {
|
||||||
Box::pin(async move {
|
Box::pin(async move {
|
||||||
let region_id = Self::region_ident_to_region_id(®ion_ident);
|
let region_id = Self::region_ident_to_region_id(®ion_ident);
|
||||||
prepare_wal_options(&mut region_options, region_id, ®ion_wal_options);
|
prepare_wal_options(&mut region_options, region_id, ®ion_wal_options);
|
||||||
let checkpoint = match (replay_entry_id, metadata_replay_entry_id) {
|
|
||||||
(Some(replay_entry_id), Some(metadata_replay_entry_id)) => Some(ReplayCheckpoint {
|
|
||||||
entry_id: replay_entry_id,
|
|
||||||
metadata_entry_id: Some(metadata_replay_entry_id),
|
|
||||||
}),
|
|
||||||
(Some(replay_entry_id), None) => Some(ReplayCheckpoint {
|
|
||||||
entry_id: replay_entry_id,
|
|
||||||
metadata_entry_id: None,
|
|
||||||
}),
|
|
||||||
_ => None,
|
|
||||||
};
|
|
||||||
let request = RegionRequest::Open(RegionOpenRequest {
|
let request = RegionRequest::Open(RegionOpenRequest {
|
||||||
engine: region_ident.engine,
|
engine: region_ident.engine,
|
||||||
table_dir: table_dir(®ion_storage_path, region_id.table_id()),
|
table_dir: table_dir(®ion_storage_path, region_id.table_id()),
|
||||||
path_type: PathType::Bare,
|
path_type: PathType::Bare,
|
||||||
options: region_options,
|
options: region_options,
|
||||||
skip_wal_replay,
|
skip_wal_replay,
|
||||||
checkpoint,
|
checkpoint: None,
|
||||||
});
|
});
|
||||||
let result = self.region_server.handle_request(region_id, request).await;
|
let result = self.region_server.handle_request(region_id, request).await;
|
||||||
let success = result.is_ok();
|
let success = result.is_ok();
|
||||||
|
|||||||
@@ -15,7 +15,7 @@
|
|||||||
use common_meta::instruction::{InstructionReply, UpgradeRegion, UpgradeRegionReply};
|
use common_meta::instruction::{InstructionReply, UpgradeRegion, UpgradeRegionReply};
|
||||||
use common_telemetry::{info, warn};
|
use common_telemetry::{info, warn};
|
||||||
use futures_util::future::BoxFuture;
|
use futures_util::future::BoxFuture;
|
||||||
use store_api::region_request::{RegionCatchupRequest, RegionRequest};
|
use store_api::region_request::{RegionCatchupRequest, RegionRequest, ReplayCheckpoint};
|
||||||
|
|
||||||
use crate::heartbeat::handler::HandlerContext;
|
use crate::heartbeat::handler::HandlerContext;
|
||||||
use crate::heartbeat::task_tracker::WaitResult;
|
use crate::heartbeat::task_tracker::WaitResult;
|
||||||
@@ -29,6 +29,8 @@ impl HandlerContext {
|
|||||||
metadata_last_entry_id,
|
metadata_last_entry_id,
|
||||||
replay_timeout,
|
replay_timeout,
|
||||||
location_id,
|
location_id,
|
||||||
|
replay_entry_id,
|
||||||
|
metadata_replay_entry_id,
|
||||||
}: UpgradeRegion,
|
}: UpgradeRegion,
|
||||||
) -> BoxFuture<'static, Option<InstructionReply>> {
|
) -> BoxFuture<'static, Option<InstructionReply>> {
|
||||||
Box::pin(async move {
|
Box::pin(async move {
|
||||||
@@ -50,6 +52,14 @@ impl HandlerContext {
|
|||||||
|
|
||||||
let region_server_moved = self.region_server.clone();
|
let region_server_moved = self.region_server.clone();
|
||||||
|
|
||||||
|
let checkpoint = match (replay_entry_id, metadata_replay_entry_id) {
|
||||||
|
(Some(entry_id), metadata_entry_id) => Some(ReplayCheckpoint {
|
||||||
|
entry_id,
|
||||||
|
metadata_entry_id,
|
||||||
|
}),
|
||||||
|
_ => None,
|
||||||
|
};
|
||||||
|
|
||||||
// The catchup task is almost zero cost if the inside region is writable.
|
// The catchup task is almost zero cost if the inside region is writable.
|
||||||
// Therefore, it always registers a new catchup task.
|
// Therefore, it always registers a new catchup task.
|
||||||
let register_result = self
|
let register_result = self
|
||||||
@@ -66,6 +76,7 @@ impl HandlerContext {
|
|||||||
entry_id: last_entry_id,
|
entry_id: last_entry_id,
|
||||||
metadata_entry_id: metadata_last_entry_id,
|
metadata_entry_id: metadata_last_entry_id,
|
||||||
location_id,
|
location_id,
|
||||||
|
checkpoint,
|
||||||
}),
|
}),
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
@@ -148,10 +159,8 @@ mod tests {
|
|||||||
.clone()
|
.clone()
|
||||||
.handle_upgrade_region_instruction(UpgradeRegion {
|
.handle_upgrade_region_instruction(UpgradeRegion {
|
||||||
region_id,
|
region_id,
|
||||||
last_entry_id: None,
|
|
||||||
metadata_last_entry_id: None,
|
|
||||||
replay_timeout,
|
replay_timeout,
|
||||||
location_id: None,
|
..Default::default()
|
||||||
})
|
})
|
||||||
.await;
|
.await;
|
||||||
assert_matches!(reply, Some(InstructionReply::UpgradeRegion(_)));
|
assert_matches!(reply, Some(InstructionReply::UpgradeRegion(_)));
|
||||||
@@ -187,10 +196,8 @@ mod tests {
|
|||||||
.clone()
|
.clone()
|
||||||
.handle_upgrade_region_instruction(UpgradeRegion {
|
.handle_upgrade_region_instruction(UpgradeRegion {
|
||||||
region_id,
|
region_id,
|
||||||
last_entry_id: None,
|
|
||||||
metadata_last_entry_id: None,
|
|
||||||
replay_timeout,
|
replay_timeout,
|
||||||
location_id: None,
|
..Default::default()
|
||||||
})
|
})
|
||||||
.await;
|
.await;
|
||||||
assert_matches!(reply, Some(InstructionReply::UpgradeRegion(_)));
|
assert_matches!(reply, Some(InstructionReply::UpgradeRegion(_)));
|
||||||
@@ -227,10 +234,8 @@ mod tests {
|
|||||||
.clone()
|
.clone()
|
||||||
.handle_upgrade_region_instruction(UpgradeRegion {
|
.handle_upgrade_region_instruction(UpgradeRegion {
|
||||||
region_id,
|
region_id,
|
||||||
last_entry_id: None,
|
|
||||||
metadata_last_entry_id: None,
|
|
||||||
replay_timeout,
|
replay_timeout,
|
||||||
location_id: None,
|
..Default::default()
|
||||||
})
|
})
|
||||||
.await;
|
.await;
|
||||||
assert_matches!(reply, Some(InstructionReply::UpgradeRegion(_)));
|
assert_matches!(reply, Some(InstructionReply::UpgradeRegion(_)));
|
||||||
@@ -271,9 +276,7 @@ mod tests {
|
|||||||
.handle_upgrade_region_instruction(UpgradeRegion {
|
.handle_upgrade_region_instruction(UpgradeRegion {
|
||||||
region_id,
|
region_id,
|
||||||
replay_timeout,
|
replay_timeout,
|
||||||
last_entry_id: None,
|
..Default::default()
|
||||||
metadata_last_entry_id: None,
|
|
||||||
location_id: None,
|
|
||||||
})
|
})
|
||||||
.await;
|
.await;
|
||||||
assert_matches!(reply, Some(InstructionReply::UpgradeRegion(_)));
|
assert_matches!(reply, Some(InstructionReply::UpgradeRegion(_)));
|
||||||
@@ -289,10 +292,8 @@ mod tests {
|
|||||||
let reply = handler_context
|
let reply = handler_context
|
||||||
.handle_upgrade_region_instruction(UpgradeRegion {
|
.handle_upgrade_region_instruction(UpgradeRegion {
|
||||||
region_id,
|
region_id,
|
||||||
last_entry_id: None,
|
|
||||||
metadata_last_entry_id: None,
|
|
||||||
replay_timeout: Some(Duration::from_millis(500)),
|
replay_timeout: Some(Duration::from_millis(500)),
|
||||||
location_id: None,
|
..Default::default()
|
||||||
})
|
})
|
||||||
.await;
|
.await;
|
||||||
assert_matches!(reply, Some(InstructionReply::UpgradeRegion(_)));
|
assert_matches!(reply, Some(InstructionReply::UpgradeRegion(_)));
|
||||||
@@ -332,10 +333,7 @@ mod tests {
|
|||||||
.clone()
|
.clone()
|
||||||
.handle_upgrade_region_instruction(UpgradeRegion {
|
.handle_upgrade_region_instruction(UpgradeRegion {
|
||||||
region_id,
|
region_id,
|
||||||
last_entry_id: None,
|
..Default::default()
|
||||||
metadata_last_entry_id: None,
|
|
||||||
replay_timeout: None,
|
|
||||||
location_id: None,
|
|
||||||
})
|
})
|
||||||
.await;
|
.await;
|
||||||
assert_matches!(reply, Some(InstructionReply::UpgradeRegion(_)));
|
assert_matches!(reply, Some(InstructionReply::UpgradeRegion(_)));
|
||||||
@@ -351,10 +349,8 @@ mod tests {
|
|||||||
.clone()
|
.clone()
|
||||||
.handle_upgrade_region_instruction(UpgradeRegion {
|
.handle_upgrade_region_instruction(UpgradeRegion {
|
||||||
region_id,
|
region_id,
|
||||||
last_entry_id: None,
|
|
||||||
metadata_last_entry_id: None,
|
|
||||||
replay_timeout: Some(Duration::from_millis(200)),
|
replay_timeout: Some(Duration::from_millis(200)),
|
||||||
location_id: None,
|
..Default::default()
|
||||||
})
|
})
|
||||||
.await;
|
.await;
|
||||||
assert_matches!(reply, Some(InstructionReply::UpgradeRegion(_)));
|
assert_matches!(reply, Some(InstructionReply::UpgradeRegion(_)));
|
||||||
|
|||||||
@@ -29,10 +29,15 @@ use common_runtime::JoinHandle;
|
|||||||
use common_telemetry::tracing::warn;
|
use common_telemetry::tracing::warn;
|
||||||
use common_telemetry::{debug, info};
|
use common_telemetry::{debug, info};
|
||||||
use common_time::TimeToLive;
|
use common_time::TimeToLive;
|
||||||
|
use datafusion_common::tree_node::{TreeNodeRecursion, TreeNodeVisitor};
|
||||||
|
use datafusion_expr::LogicalPlan;
|
||||||
|
use datatypes::prelude::ConcreteDataType;
|
||||||
use query::QueryEngineRef;
|
use query::QueryEngineRef;
|
||||||
|
use session::context::QueryContext;
|
||||||
use snafu::{ensure, OptionExt, ResultExt};
|
use snafu::{ensure, OptionExt, ResultExt};
|
||||||
use sql::parsers::utils::is_tql;
|
use sql::parsers::utils::is_tql;
|
||||||
use store_api::storage::{RegionId, TableId};
|
use store_api::storage::{RegionId, TableId};
|
||||||
|
use table::table_reference::TableReference;
|
||||||
use tokio::sync::{oneshot, RwLock};
|
use tokio::sync::{oneshot, RwLock};
|
||||||
|
|
||||||
use crate::batching_mode::frontend_client::FrontendClient;
|
use crate::batching_mode::frontend_client::FrontendClient;
|
||||||
@@ -42,8 +47,8 @@ use crate::batching_mode::utils::sql_to_df_plan;
|
|||||||
use crate::batching_mode::BatchingModeOptions;
|
use crate::batching_mode::BatchingModeOptions;
|
||||||
use crate::engine::FlowEngine;
|
use crate::engine::FlowEngine;
|
||||||
use crate::error::{
|
use crate::error::{
|
||||||
CreateFlowSnafu, ExternalSnafu, FlowAlreadyExistSnafu, FlowNotFoundSnafu, InvalidQuerySnafu,
|
CreateFlowSnafu, DatafusionSnafu, ExternalSnafu, FlowAlreadyExistSnafu, FlowNotFoundSnafu,
|
||||||
TableNotFoundMetaSnafu, UnexpectedSnafu, UnsupportedSnafu,
|
InvalidQuerySnafu, TableNotFoundMetaSnafu, UnexpectedSnafu, UnsupportedSnafu,
|
||||||
};
|
};
|
||||||
use crate::metrics::METRIC_FLOW_BATCHING_ENGINE_BULK_MARK_TIME_WINDOW;
|
use crate::metrics::METRIC_FLOW_BATCHING_ENGINE_BULK_MARK_TIME_WINDOW;
|
||||||
use crate::{CreateFlowArgs, Error, FlowId, TableName};
|
use crate::{CreateFlowArgs, Error, FlowId, TableName};
|
||||||
@@ -151,9 +156,11 @@ impl BatchingEngine {
|
|||||||
let handle: JoinHandle<Result<(), Error>> = tokio::spawn(async move {
|
let handle: JoinHandle<Result<(), Error>> = tokio::spawn(async move {
|
||||||
let src_table_names = &task.config.source_table_names;
|
let src_table_names = &task.config.source_table_names;
|
||||||
let mut all_dirty_windows = HashSet::new();
|
let mut all_dirty_windows = HashSet::new();
|
||||||
|
let mut is_dirty = false;
|
||||||
for src_table_name in src_table_names {
|
for src_table_name in src_table_names {
|
||||||
if let Some((timestamps, unit)) = group_by_table_name.get(src_table_name) {
|
if let Some((timestamps, unit)) = group_by_table_name.get(src_table_name) {
|
||||||
let Some(expr) = &task.config.time_window_expr else {
|
let Some(expr) = &task.config.time_window_expr else {
|
||||||
|
is_dirty = true;
|
||||||
continue;
|
continue;
|
||||||
};
|
};
|
||||||
for timestamp in timestamps {
|
for timestamp in timestamps {
|
||||||
@@ -168,6 +175,9 @@ impl BatchingEngine {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
let mut state = task.state.write().unwrap();
|
let mut state = task.state.write().unwrap();
|
||||||
|
if is_dirty {
|
||||||
|
state.dirty_time_windows.set_dirty();
|
||||||
|
}
|
||||||
let flow_id_label = task.config.flow_id.to_string();
|
let flow_id_label = task.config.flow_id.to_string();
|
||||||
for timestamp in all_dirty_windows {
|
for timestamp in all_dirty_windows {
|
||||||
state.dirty_time_windows.add_window(timestamp, None);
|
state.dirty_time_windows.add_window(timestamp, None);
|
||||||
@@ -269,9 +279,12 @@ impl BatchingEngine {
|
|||||||
let handle: JoinHandle<Result<(), Error>> = tokio::spawn(async move {
|
let handle: JoinHandle<Result<(), Error>> = tokio::spawn(async move {
|
||||||
let src_table_names = &task.config.source_table_names;
|
let src_table_names = &task.config.source_table_names;
|
||||||
|
|
||||||
|
let mut is_dirty = false;
|
||||||
|
|
||||||
for src_table_name in src_table_names {
|
for src_table_name in src_table_names {
|
||||||
if let Some(entry) = group_by_table_name.get(src_table_name) {
|
if let Some(entry) = group_by_table_name.get(src_table_name) {
|
||||||
let Some(expr) = &task.config.time_window_expr else {
|
let Some(expr) = &task.config.time_window_expr else {
|
||||||
|
is_dirty = true;
|
||||||
continue;
|
continue;
|
||||||
};
|
};
|
||||||
let involved_time_windows = expr.handle_rows(entry.clone()).await?;
|
let involved_time_windows = expr.handle_rows(entry.clone()).await?;
|
||||||
@@ -281,6 +294,10 @@ impl BatchingEngine {
|
|||||||
.add_lower_bounds(involved_time_windows.into_iter());
|
.add_lower_bounds(involved_time_windows.into_iter());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if is_dirty {
|
||||||
|
task.state.write().unwrap().dirty_time_windows.set_dirty();
|
||||||
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
});
|
});
|
||||||
handles.push(handle);
|
handles.push(handle);
|
||||||
@@ -370,13 +387,12 @@ impl BatchingEngine {
|
|||||||
}
|
}
|
||||||
})?;
|
})?;
|
||||||
let query_ctx = Arc::new(query_ctx);
|
let query_ctx = Arc::new(query_ctx);
|
||||||
|
let is_tql = is_tql(query_ctx.sql_dialect(), &sql)
|
||||||
|
.map_err(BoxedError::new)
|
||||||
|
.context(CreateFlowSnafu { sql: &sql })?;
|
||||||
|
|
||||||
// optionally set a eval interval for the flow
|
// optionally set a eval interval for the flow
|
||||||
if eval_interval.is_none()
|
if eval_interval.is_none() && is_tql {
|
||||||
&& is_tql(query_ctx.sql_dialect(), &sql)
|
|
||||||
.map_err(BoxedError::new)
|
|
||||||
.context(CreateFlowSnafu { sql: &sql })?
|
|
||||||
{
|
|
||||||
InvalidQuerySnafu {
|
InvalidQuerySnafu {
|
||||||
reason: "TQL query requires EVAL INTERVAL to be set".to_string(),
|
reason: "TQL query requires EVAL INTERVAL to be set".to_string(),
|
||||||
}
|
}
|
||||||
@@ -418,6 +434,11 @@ impl BatchingEngine {
|
|||||||
let (tx, rx) = oneshot::channel();
|
let (tx, rx) = oneshot::channel();
|
||||||
|
|
||||||
let plan = sql_to_df_plan(query_ctx.clone(), self.query_engine.clone(), &sql, true).await?;
|
let plan = sql_to_df_plan(query_ctx.clone(), self.query_engine.clone(), &sql, true).await?;
|
||||||
|
|
||||||
|
if is_tql {
|
||||||
|
self.check_is_tql_table(&plan, &query_ctx).await?;
|
||||||
|
}
|
||||||
|
|
||||||
let (column_name, time_window_expr, _, df_schema) = find_time_window_expr(
|
let (column_name, time_window_expr, _, df_schema) = find_time_window_expr(
|
||||||
&plan,
|
&plan,
|
||||||
self.query_engine.engine_state().catalog_manager().clone(),
|
self.query_engine.engine_state().catalog_manager().clone(),
|
||||||
@@ -484,6 +505,131 @@ impl BatchingEngine {
|
|||||||
Ok(Some(flow_id))
|
Ok(Some(flow_id))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn check_is_tql_table(
|
||||||
|
&self,
|
||||||
|
query: &LogicalPlan,
|
||||||
|
query_ctx: &QueryContext,
|
||||||
|
) -> Result<(), Error> {
|
||||||
|
struct CollectTableRef {
|
||||||
|
table_refs: HashSet<datafusion_common::TableReference>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl TreeNodeVisitor<'_> for CollectTableRef {
|
||||||
|
type Node = LogicalPlan;
|
||||||
|
fn f_down(
|
||||||
|
&mut self,
|
||||||
|
node: &Self::Node,
|
||||||
|
) -> datafusion_common::Result<TreeNodeRecursion> {
|
||||||
|
if let LogicalPlan::TableScan(scan) = node {
|
||||||
|
self.table_refs.insert(scan.table_name.clone());
|
||||||
|
}
|
||||||
|
Ok(TreeNodeRecursion::Continue)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
let mut table_refs = CollectTableRef {
|
||||||
|
table_refs: HashSet::new(),
|
||||||
|
};
|
||||||
|
query
|
||||||
|
.visit_with_subqueries(&mut table_refs)
|
||||||
|
.context(DatafusionSnafu {
|
||||||
|
context: "Checking if all source tables are TQL tables",
|
||||||
|
})?;
|
||||||
|
|
||||||
|
let default_catalog = query_ctx.current_catalog();
|
||||||
|
let default_schema = query_ctx.current_schema();
|
||||||
|
let default_schema = &default_schema;
|
||||||
|
|
||||||
|
for table_ref in table_refs.table_refs {
|
||||||
|
let table_ref = match &table_ref {
|
||||||
|
datafusion_common::TableReference::Bare { table } => {
|
||||||
|
TableReference::full(default_catalog, default_schema, table)
|
||||||
|
}
|
||||||
|
datafusion_common::TableReference::Partial { schema, table } => {
|
||||||
|
TableReference::full(default_catalog, schema, table)
|
||||||
|
}
|
||||||
|
datafusion_common::TableReference::Full {
|
||||||
|
catalog,
|
||||||
|
schema,
|
||||||
|
table,
|
||||||
|
} => TableReference::full(catalog, schema, table),
|
||||||
|
};
|
||||||
|
|
||||||
|
let table_id = self
|
||||||
|
.table_meta
|
||||||
|
.table_name_manager()
|
||||||
|
.get(table_ref.into())
|
||||||
|
.await
|
||||||
|
.map_err(BoxedError::new)
|
||||||
|
.context(ExternalSnafu)?
|
||||||
|
.with_context(|| UnexpectedSnafu {
|
||||||
|
reason: format!("Failed to get table id for table: {}", table_ref),
|
||||||
|
})?
|
||||||
|
.table_id();
|
||||||
|
let table_info =
|
||||||
|
get_table_info(self.table_meta.table_info_manager(), &table_id).await?;
|
||||||
|
// first check if it's only one f64 value column
|
||||||
|
let value_cols = table_info
|
||||||
|
.table_info
|
||||||
|
.meta
|
||||||
|
.schema
|
||||||
|
.column_schemas
|
||||||
|
.iter()
|
||||||
|
.filter(|col| col.data_type == ConcreteDataType::float64_datatype())
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
ensure!(
|
||||||
|
value_cols.len() == 1,
|
||||||
|
InvalidQuerySnafu {
|
||||||
|
reason: format!(
|
||||||
|
"TQL query only supports one f64 value column, table `{}`(id={}) has {} f64 value columns, columns are: {:?}",
|
||||||
|
table_ref,
|
||||||
|
table_id,
|
||||||
|
value_cols.len(),
|
||||||
|
value_cols
|
||||||
|
),
|
||||||
|
}
|
||||||
|
);
|
||||||
|
// TODO(discord9): do need to check rest columns is string and is tag column?
|
||||||
|
let pk_idxs = table_info
|
||||||
|
.table_info
|
||||||
|
.meta
|
||||||
|
.primary_key_indices
|
||||||
|
.iter()
|
||||||
|
.collect::<HashSet<_>>();
|
||||||
|
|
||||||
|
for (idx, col) in table_info
|
||||||
|
.table_info
|
||||||
|
.meta
|
||||||
|
.schema
|
||||||
|
.column_schemas
|
||||||
|
.iter()
|
||||||
|
.enumerate()
|
||||||
|
{
|
||||||
|
// three cases:
|
||||||
|
// 1. val column
|
||||||
|
// 2. timestamp column
|
||||||
|
// 3. tag column (string)
|
||||||
|
|
||||||
|
let is_pk: bool = pk_idxs.contains(&&idx);
|
||||||
|
|
||||||
|
ensure!(
|
||||||
|
col.data_type == ConcreteDataType::float64_datatype()
|
||||||
|
|| col.data_type.is_timestamp()
|
||||||
|
|| (col.data_type == ConcreteDataType::string_datatype() && is_pk),
|
||||||
|
InvalidQuerySnafu {
|
||||||
|
reason: format!(
|
||||||
|
"TQL query only supports f64 value column, timestamp column and string tag columns, table `{}`(id={}) has column `{}` with type {:?} which is not supported",
|
||||||
|
table_ref,
|
||||||
|
table_id,
|
||||||
|
col.name,
|
||||||
|
col.data_type
|
||||||
|
),
|
||||||
|
}
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn remove_flow_inner(&self, flow_id: FlowId) -> Result<(), Error> {
|
pub async fn remove_flow_inner(&self, flow_id: FlowId) -> Result<(), Error> {
|
||||||
if self.tasks.write().await.remove(&flow_id).is_none() {
|
if self.tasks.write().await.remove(&flow_id).is_none() {
|
||||||
warn!("Flow {flow_id} not found in tasks");
|
warn!("Flow {flow_id} not found in tasks");
|
||||||
|
|||||||
@@ -203,11 +203,21 @@ impl DirtyTimeWindows {
|
|||||||
self.windows.clear();
|
self.windows.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Set windows to be dirty, only useful for full aggr without time window
|
||||||
|
/// to mark some new data is inserted
|
||||||
|
pub fn set_dirty(&mut self) {
|
||||||
|
self.windows.insert(Timestamp::new_second(0), None);
|
||||||
|
}
|
||||||
|
|
||||||
/// Number of dirty windows.
|
/// Number of dirty windows.
|
||||||
pub fn len(&self) -> usize {
|
pub fn len(&self) -> usize {
|
||||||
self.windows.len()
|
self.windows.len()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn is_empty(&self) -> bool {
|
||||||
|
self.windows.is_empty()
|
||||||
|
}
|
||||||
|
|
||||||
/// Get the effective count of time windows, which is the number of time windows that can be
|
/// Get the effective count of time windows, which is the number of time windows that can be
|
||||||
/// used for query, compute from total time window range divided by `window_size`.
|
/// used for query, compute from total time window range divided by `window_size`.
|
||||||
pub fn effective_count(&self, window_size: &Duration) -> usize {
|
pub fn effective_count(&self, window_size: &Duration) -> usize {
|
||||||
|
|||||||
@@ -48,8 +48,8 @@ use crate::batching_mode::frontend_client::FrontendClient;
|
|||||||
use crate::batching_mode::state::{FilterExprInfo, TaskState};
|
use crate::batching_mode::state::{FilterExprInfo, TaskState};
|
||||||
use crate::batching_mode::time_window::TimeWindowExpr;
|
use crate::batching_mode::time_window::TimeWindowExpr;
|
||||||
use crate::batching_mode::utils::{
|
use crate::batching_mode::utils::{
|
||||||
get_table_info_df_schema, sql_to_df_plan, AddAutoColumnRewriter, AddFilterRewriter,
|
gen_plan_with_matching_schema, get_table_info_df_schema, sql_to_df_plan, AddFilterRewriter,
|
||||||
FindGroupByFinalName,
|
ColumnMatcherRewriter, FindGroupByFinalName,
|
||||||
};
|
};
|
||||||
use crate::batching_mode::BatchingModeOptions;
|
use crate::batching_mode::BatchingModeOptions;
|
||||||
use crate::df_optimizer::apply_df_optimizer;
|
use crate::df_optimizer::apply_df_optimizer;
|
||||||
@@ -618,41 +618,62 @@ impl BatchingTask {
|
|||||||
.map(|expr| expr.eval(low_bound))
|
.map(|expr| expr.eval(low_bound))
|
||||||
.transpose()?;
|
.transpose()?;
|
||||||
|
|
||||||
let (Some((Some(l), Some(u))), QueryType::Sql) =
|
let (expire_lower_bound, expire_upper_bound) =
|
||||||
(expire_time_window_bound, &self.config.query_type)
|
match (expire_time_window_bound, &self.config.query_type) {
|
||||||
else {
|
(Some((Some(l), Some(u))), QueryType::Sql) => (l, u),
|
||||||
// either no time window or not a sql query, then just use the original query
|
(None, QueryType::Sql) => {
|
||||||
|
// if it's sql query and no time window lower/upper bound is found, just return the original query(with auto columns)
|
||||||
// use sink_table_meta to add to query the `update_at` and `__ts_placeholder` column's value too for compatibility reason
|
// use sink_table_meta to add to query the `update_at` and `__ts_placeholder` column's value too for compatibility reason
|
||||||
debug!(
|
debug!(
|
||||||
"Flow id = {:?}, can't get window size: precise_lower_bound={expire_time_window_bound:?}, using the same query", self.config.flow_id
|
"Flow id = {:?}, no time window, using the same query",
|
||||||
|
self.config.flow_id
|
||||||
);
|
);
|
||||||
// clean dirty time window too, this could be from create flow's check_execute
|
// clean dirty time window too, this could be from create flow's check_execute
|
||||||
|
let is_dirty = !self.state.read().unwrap().dirty_time_windows.is_empty();
|
||||||
self.state.write().unwrap().dirty_time_windows.clean();
|
self.state.write().unwrap().dirty_time_windows.clean();
|
||||||
|
|
||||||
// TODO(discord9): not add auto column for tql query?
|
if !is_dirty {
|
||||||
let mut add_auto_column = AddAutoColumnRewriter::new(sink_table_schema.clone());
|
// no dirty data, hence no need to update
|
||||||
|
debug!("Flow id={:?}, no new data, not update", self.config.flow_id);
|
||||||
|
return Ok(None);
|
||||||
|
}
|
||||||
|
|
||||||
let plan = sql_to_df_plan(query_ctx.clone(), engine.clone(), &self.config.query, false)
|
let plan = gen_plan_with_matching_schema(
|
||||||
|
&self.config.query,
|
||||||
|
query_ctx,
|
||||||
|
engine,
|
||||||
|
sink_table_schema.clone(),
|
||||||
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
let plan = plan
|
|
||||||
.clone()
|
|
||||||
.rewrite(&mut add_auto_column)
|
|
||||||
.with_context(|_| DatafusionSnafu {
|
|
||||||
context: format!("Failed to rewrite plan:\n {}\n", plan),
|
|
||||||
})?
|
|
||||||
.data;
|
|
||||||
|
|
||||||
// since no time window lower/upper bound is found, just return the original query(with auto columns)
|
|
||||||
return Ok(Some(PlanInfo { plan, filter: None }));
|
return Ok(Some(PlanInfo { plan, filter: None }));
|
||||||
|
}
|
||||||
|
_ => {
|
||||||
|
// clean for tql have no use for time window
|
||||||
|
self.state.write().unwrap().dirty_time_windows.clean();
|
||||||
|
|
||||||
|
let plan = gen_plan_with_matching_schema(
|
||||||
|
&self.config.query,
|
||||||
|
query_ctx,
|
||||||
|
engine,
|
||||||
|
sink_table_schema.clone(),
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
return Ok(Some(PlanInfo { plan, filter: None }));
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
debug!(
|
debug!(
|
||||||
"Flow id = {:?}, found time window: precise_lower_bound={:?}, precise_upper_bound={:?} with dirty time windows: {:?}",
|
"Flow id = {:?}, found time window: precise_lower_bound={:?}, precise_upper_bound={:?} with dirty time windows: {:?}",
|
||||||
self.config.flow_id, l, u, self.state.read().unwrap().dirty_time_windows
|
self.config.flow_id, expire_lower_bound, expire_upper_bound, self.state.read().unwrap().dirty_time_windows
|
||||||
);
|
);
|
||||||
let window_size = u.sub(&l).with_context(|| UnexpectedSnafu {
|
let window_size = expire_upper_bound
|
||||||
reason: format!("Can't get window size from {u:?} - {l:?}"),
|
.sub(&expire_lower_bound)
|
||||||
|
.with_context(|| UnexpectedSnafu {
|
||||||
|
reason: format!(
|
||||||
|
"Can't get window size from {expire_upper_bound:?} - {expire_lower_bound:?}"
|
||||||
|
),
|
||||||
})?;
|
})?;
|
||||||
let col_name = self
|
let col_name = self
|
||||||
.config
|
.config
|
||||||
@@ -673,7 +694,7 @@ impl BatchingTask {
|
|||||||
.dirty_time_windows
|
.dirty_time_windows
|
||||||
.gen_filter_exprs(
|
.gen_filter_exprs(
|
||||||
&col_name,
|
&col_name,
|
||||||
Some(l),
|
Some(expire_lower_bound),
|
||||||
window_size,
|
window_size,
|
||||||
max_window_cnt
|
max_window_cnt
|
||||||
.unwrap_or(self.config.batch_opts.experimental_max_filter_num_per_query),
|
.unwrap_or(self.config.batch_opts.experimental_max_filter_num_per_query),
|
||||||
@@ -701,7 +722,7 @@ impl BatchingTask {
|
|||||||
};
|
};
|
||||||
|
|
||||||
let mut add_filter = AddFilterRewriter::new(expr.expr.clone());
|
let mut add_filter = AddFilterRewriter::new(expr.expr.clone());
|
||||||
let mut add_auto_column = AddAutoColumnRewriter::new(sink_table_schema.clone());
|
let mut add_auto_column = ColumnMatcherRewriter::new(sink_table_schema.clone());
|
||||||
|
|
||||||
let plan =
|
let plan =
|
||||||
sql_to_df_plan(query_ctx.clone(), engine.clone(), &self.config.query, false).await?;
|
sql_to_df_plan(query_ctx.clone(), engine.clone(), &self.config.query, false).await?;
|
||||||
@@ -732,7 +753,25 @@ fn create_table_with_expr(
|
|||||||
sink_table_name: &[String; 3],
|
sink_table_name: &[String; 3],
|
||||||
query_type: &QueryType,
|
query_type: &QueryType,
|
||||||
) -> Result<CreateTableExpr, Error> {
|
) -> Result<CreateTableExpr, Error> {
|
||||||
let (first_time_stamp, primary_keys) = build_primary_key_constraint(plan)?;
|
let table_def = match query_type {
|
||||||
|
&QueryType::Sql => {
|
||||||
|
if let Some(def) = build_pk_from_aggr(plan)? {
|
||||||
|
def
|
||||||
|
} else {
|
||||||
|
build_by_sql_schema(plan)?
|
||||||
|
}
|
||||||
|
}
|
||||||
|
QueryType::Tql => {
|
||||||
|
// first try build from aggr, then from tql schema because tql query might not have aggr node
|
||||||
|
if let Some(table_def) = build_pk_from_aggr(plan)? {
|
||||||
|
table_def
|
||||||
|
} else {
|
||||||
|
build_by_tql_schema(plan)?
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
let first_time_stamp = table_def.ts_col;
|
||||||
|
let primary_keys = table_def.pks;
|
||||||
|
|
||||||
let mut column_schemas = Vec::new();
|
let mut column_schemas = Vec::new();
|
||||||
for field in plan.schema().fields() {
|
for field in plan.schema().fields() {
|
||||||
@@ -755,7 +794,7 @@ fn create_table_with_expr(
|
|||||||
let is_val_column = !is_tag_column && first_time_stamp.as_ref() != Some(name);
|
let is_val_column = !is_tag_column && first_time_stamp.as_ref() != Some(name);
|
||||||
if is_val_column {
|
if is_val_column {
|
||||||
let col_schema =
|
let col_schema =
|
||||||
ColumnSchema::new("val", ConcreteDataType::float64_datatype(), true);
|
ColumnSchema::new(name, ConcreteDataType::float64_datatype(), true);
|
||||||
column_schemas.push(col_schema);
|
column_schemas.push(col_schema);
|
||||||
} else if is_tag_column {
|
} else if is_tag_column {
|
||||||
let col_schema =
|
let col_schema =
|
||||||
@@ -809,15 +848,63 @@ fn create_table_with_expr(
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// simply build by schema, return first timestamp column and no primary key
|
||||||
|
fn build_by_sql_schema(plan: &LogicalPlan) -> Result<TableDef, Error> {
|
||||||
|
let first_time_stamp = plan.schema().fields().iter().find_map(|f| {
|
||||||
|
if ConcreteDataType::from_arrow_type(f.data_type()).is_timestamp() {
|
||||||
|
Some(f.name().clone())
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
});
|
||||||
|
Ok(TableDef {
|
||||||
|
ts_col: first_time_stamp,
|
||||||
|
pks: vec![],
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Return first timestamp column found in output schema and all string columns
|
||||||
|
fn build_by_tql_schema(plan: &LogicalPlan) -> Result<TableDef, Error> {
|
||||||
|
let first_time_stamp = plan.schema().fields().iter().find_map(|f| {
|
||||||
|
if ConcreteDataType::from_arrow_type(f.data_type()).is_timestamp() {
|
||||||
|
Some(f.name().clone())
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
});
|
||||||
|
let string_columns = plan
|
||||||
|
.schema()
|
||||||
|
.fields()
|
||||||
|
.iter()
|
||||||
|
.filter_map(|f| {
|
||||||
|
if ConcreteDataType::from_arrow_type(f.data_type()).is_string() {
|
||||||
|
Some(f.name().clone())
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
|
Ok(TableDef {
|
||||||
|
ts_col: first_time_stamp,
|
||||||
|
pks: string_columns,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
struct TableDef {
|
||||||
|
ts_col: Option<String>,
|
||||||
|
pks: Vec<String>,
|
||||||
|
}
|
||||||
|
|
||||||
/// Return first timestamp column which is in group by clause and other columns which are also in group by clause
|
/// Return first timestamp column which is in group by clause and other columns which are also in group by clause
|
||||||
///
|
///
|
||||||
/// # Returns
|
/// # Returns
|
||||||
///
|
///
|
||||||
/// * `Option<String>` - first timestamp column which is in group by clause
|
/// * `Option<String>` - first timestamp column which is in group by clause
|
||||||
/// * `Vec<String>` - other columns which are also in group by clause
|
/// * `Vec<String>` - other columns which are also in group by clause
|
||||||
fn build_primary_key_constraint(
|
///
|
||||||
plan: &LogicalPlan,
|
/// if no aggregation found, return None
|
||||||
) -> Result<(Option<String>, Vec<String>), Error> {
|
fn build_pk_from_aggr(plan: &LogicalPlan) -> Result<Option<TableDef>, Error> {
|
||||||
let fields = plan.schema().fields();
|
let fields = plan.schema().fields();
|
||||||
let mut pk_names = FindGroupByFinalName::default();
|
let mut pk_names = FindGroupByFinalName::default();
|
||||||
|
|
||||||
@@ -827,13 +914,18 @@ fn build_primary_key_constraint(
|
|||||||
})?;
|
})?;
|
||||||
|
|
||||||
// if no group by clause, return empty with first timestamp column found in output schema
|
// if no group by clause, return empty with first timestamp column found in output schema
|
||||||
let pk_final_names = pk_names.get_group_expr_names().unwrap_or_default();
|
let Some(pk_final_names) = pk_names.get_group_expr_names() else {
|
||||||
|
return Ok(None);
|
||||||
|
};
|
||||||
if pk_final_names.is_empty() {
|
if pk_final_names.is_empty() {
|
||||||
let first_ts_col = fields
|
let first_ts_col = fields
|
||||||
.iter()
|
.iter()
|
||||||
.find(|f| ConcreteDataType::from_arrow_type(f.data_type()).is_timestamp())
|
.find(|f| ConcreteDataType::from_arrow_type(f.data_type()).is_timestamp())
|
||||||
.map(|f| f.name().clone());
|
.map(|f| f.name().clone());
|
||||||
return Ok((first_ts_col, Vec::new()));
|
return Ok(Some(TableDef {
|
||||||
|
ts_col: first_ts_col,
|
||||||
|
pks: vec![],
|
||||||
|
}));
|
||||||
}
|
}
|
||||||
|
|
||||||
let all_pk_cols: Vec<_> = fields
|
let all_pk_cols: Vec<_> = fields
|
||||||
@@ -855,7 +947,10 @@ fn build_primary_key_constraint(
|
|||||||
.filter(|col| first_time_stamp != Some(col.to_string()))
|
.filter(|col| first_time_stamp != Some(col.to_string()))
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
Ok((first_time_stamp, all_pk_cols))
|
Ok(Some(TableDef {
|
||||||
|
ts_col: first_time_stamp,
|
||||||
|
pks: all_pk_cols,
|
||||||
|
}))
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
|
|||||||
@@ -24,7 +24,7 @@ use datafusion::error::Result as DfResult;
|
|||||||
use datafusion::logical_expr::Expr;
|
use datafusion::logical_expr::Expr;
|
||||||
use datafusion::sql::unparser::Unparser;
|
use datafusion::sql::unparser::Unparser;
|
||||||
use datafusion_common::tree_node::{
|
use datafusion_common::tree_node::{
|
||||||
Transformed, TreeNodeRecursion, TreeNodeRewriter, TreeNodeVisitor,
|
Transformed, TreeNode as _, TreeNodeRecursion, TreeNodeRewriter, TreeNodeVisitor,
|
||||||
};
|
};
|
||||||
use datafusion_common::{DFSchema, DataFusionError, ScalarValue};
|
use datafusion_common::{DFSchema, DataFusionError, ScalarValue};
|
||||||
use datafusion_expr::{Distinct, LogicalPlan, Projection};
|
use datafusion_expr::{Distinct, LogicalPlan, Projection};
|
||||||
@@ -135,6 +135,27 @@ pub async fn sql_to_df_plan(
|
|||||||
Ok(plan)
|
Ok(plan)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Generate a plan that matches the schema of the sink table
|
||||||
|
/// from given sql by alias and adding auto columns
|
||||||
|
pub(crate) async fn gen_plan_with_matching_schema(
|
||||||
|
sql: &str,
|
||||||
|
query_ctx: QueryContextRef,
|
||||||
|
engine: QueryEngineRef,
|
||||||
|
sink_table_schema: SchemaRef,
|
||||||
|
) -> Result<LogicalPlan, Error> {
|
||||||
|
let plan = sql_to_df_plan(query_ctx.clone(), engine.clone(), sql, false).await?;
|
||||||
|
|
||||||
|
let mut add_auto_column = ColumnMatcherRewriter::new(sink_table_schema);
|
||||||
|
let plan = plan
|
||||||
|
.clone()
|
||||||
|
.rewrite(&mut add_auto_column)
|
||||||
|
.with_context(|_| DatafusionSnafu {
|
||||||
|
context: format!("Failed to rewrite plan:\n {}\n", plan),
|
||||||
|
})?
|
||||||
|
.data;
|
||||||
|
Ok(plan)
|
||||||
|
}
|
||||||
|
|
||||||
pub fn df_plan_to_sql(plan: &LogicalPlan) -> Result<String, Error> {
|
pub fn df_plan_to_sql(plan: &LogicalPlan) -> Result<String, Error> {
|
||||||
/// A dialect that forces identifiers to be quoted when have uppercase
|
/// A dialect that forces identifiers to be quoted when have uppercase
|
||||||
struct ForceQuoteIdentifiers;
|
struct ForceQuoteIdentifiers;
|
||||||
@@ -239,19 +260,19 @@ impl TreeNodeVisitor<'_> for FindGroupByFinalName {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Add to the final select columns like `update_at`
|
/// Optionally add to the final select columns like `update_at` if the sink table has such column
|
||||||
/// (which doesn't necessary need to have exact name just need to be a extra timestamp column)
|
/// (which doesn't necessary need to have exact name just need to be a extra timestamp column)
|
||||||
/// and `__ts_placeholder`(this column need to have exact this name and be a timestamp)
|
/// and `__ts_placeholder`(this column need to have exact this name and be a timestamp)
|
||||||
/// with values like `now()` and `0`
|
/// with values like `now()` and `0`
|
||||||
///
|
///
|
||||||
/// it also give existing columns alias to column in sink table if needed
|
/// it also give existing columns alias to column in sink table if needed
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct AddAutoColumnRewriter {
|
pub struct ColumnMatcherRewriter {
|
||||||
pub schema: SchemaRef,
|
pub schema: SchemaRef,
|
||||||
pub is_rewritten: bool,
|
pub is_rewritten: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl AddAutoColumnRewriter {
|
impl ColumnMatcherRewriter {
|
||||||
pub fn new(schema: SchemaRef) -> Self {
|
pub fn new(schema: SchemaRef) -> Self {
|
||||||
Self {
|
Self {
|
||||||
schema,
|
schema,
|
||||||
@@ -348,7 +369,7 @@ impl AddAutoColumnRewriter {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl TreeNodeRewriter for AddAutoColumnRewriter {
|
impl TreeNodeRewriter for ColumnMatcherRewriter {
|
||||||
type Node = LogicalPlan;
|
type Node = LogicalPlan;
|
||||||
fn f_down(&mut self, mut node: Self::Node) -> DfResult<Transformed<Self::Node>> {
|
fn f_down(&mut self, mut node: Self::Node) -> DfResult<Transformed<Self::Node>> {
|
||||||
if self.is_rewritten {
|
if self.is_rewritten {
|
||||||
@@ -696,7 +717,7 @@ mod test {
|
|||||||
let ctx = QueryContext::arc();
|
let ctx = QueryContext::arc();
|
||||||
for (before, after, column_schemas) in testcases {
|
for (before, after, column_schemas) in testcases {
|
||||||
let schema = Arc::new(Schema::new(column_schemas));
|
let schema = Arc::new(Schema::new(column_schemas));
|
||||||
let mut add_auto_column_rewriter = AddAutoColumnRewriter::new(schema);
|
let mut add_auto_column_rewriter = ColumnMatcherRewriter::new(schema);
|
||||||
|
|
||||||
let plan = sql_to_df_plan(ctx.clone(), query_engine.clone(), before, false)
|
let plan = sql_to_df_plan(ctx.clone(), query_engine.clone(), before, false)
|
||||||
.await
|
.await
|
||||||
|
|||||||
@@ -302,6 +302,10 @@ impl LogStore for KafkaLogStore {
|
|||||||
},
|
},
|
||||||
))
|
))
|
||||||
.await?;
|
.await?;
|
||||||
|
debug!(
|
||||||
|
"Appended batch to Kafka, region_grouped_max_offset: {:?}",
|
||||||
|
region_grouped_max_offset
|
||||||
|
);
|
||||||
|
|
||||||
Ok(AppendBatchResponse {
|
Ok(AppendBatchResponse {
|
||||||
last_entry_ids: region_grouped_max_offset.into_iter().collect(),
|
last_entry_ids: region_grouped_max_offset.into_iter().collect(),
|
||||||
@@ -362,6 +366,17 @@ impl LogStore for KafkaLogStore {
|
|||||||
.context(GetOffsetSnafu {
|
.context(GetOffsetSnafu {
|
||||||
topic: &provider.topic,
|
topic: &provider.topic,
|
||||||
})?;
|
})?;
|
||||||
|
let latest_offset = (end_offset as u64).saturating_sub(1);
|
||||||
|
self.topic_stats
|
||||||
|
.entry(provider.clone())
|
||||||
|
.and_modify(|stat| {
|
||||||
|
stat.latest_offset = stat.latest_offset.max(latest_offset);
|
||||||
|
})
|
||||||
|
.or_insert_with(|| TopicStat {
|
||||||
|
latest_offset,
|
||||||
|
record_size: 0,
|
||||||
|
record_num: 0,
|
||||||
|
});
|
||||||
|
|
||||||
let region_indexes = if let (Some(index), Some(collector)) =
|
let region_indexes = if let (Some(index), Some(collector)) =
|
||||||
(index, self.client_manager.global_index_collector())
|
(index, self.client_manager.global_index_collector())
|
||||||
@@ -550,6 +565,7 @@ mod tests {
|
|||||||
use futures::TryStreamExt;
|
use futures::TryStreamExt;
|
||||||
use rand::prelude::SliceRandom;
|
use rand::prelude::SliceRandom;
|
||||||
use rand::Rng;
|
use rand::Rng;
|
||||||
|
use rskafka::client::partition::OffsetAt;
|
||||||
use store_api::logstore::entry::{Entry, MultiplePartEntry, MultiplePartHeader, NaiveEntry};
|
use store_api::logstore::entry::{Entry, MultiplePartEntry, MultiplePartHeader, NaiveEntry};
|
||||||
use store_api::logstore::provider::Provider;
|
use store_api::logstore::provider::Provider;
|
||||||
use store_api::logstore::LogStore;
|
use store_api::logstore::LogStore;
|
||||||
@@ -713,8 +729,16 @@ mod tests {
|
|||||||
.for_each(|entry| entry.set_entry_id(0));
|
.for_each(|entry| entry.set_entry_id(0));
|
||||||
assert_eq!(expected_entries, actual_entries);
|
assert_eq!(expected_entries, actual_entries);
|
||||||
}
|
}
|
||||||
let high_wathermark = logstore.latest_entry_id(&provider).unwrap();
|
let latest_entry_id = logstore.latest_entry_id(&provider).unwrap();
|
||||||
assert_eq!(high_wathermark, 99);
|
let client = logstore
|
||||||
|
.client_manager
|
||||||
|
.get_or_insert(provider.as_kafka_provider().unwrap())
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
assert_eq!(latest_entry_id, 99);
|
||||||
|
// The latest offset is the offset of the last record plus one.
|
||||||
|
let latest = client.client().get_offset(OffsetAt::Latest).await.unwrap();
|
||||||
|
assert_eq!(latest, 100);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
|
|||||||
@@ -112,11 +112,11 @@ mod tests {
|
|||||||
let current_latest_offset = topic_stats.get(&provider).unwrap().latest_offset;
|
let current_latest_offset = topic_stats.get(&provider).unwrap().latest_offset;
|
||||||
assert_eq!(current_latest_offset, 0);
|
assert_eq!(current_latest_offset, 0);
|
||||||
|
|
||||||
let record = vec![record()];
|
let record = vec![record(), record()];
|
||||||
let region = RegionId::new(1, 1);
|
let region = RegionId::new(1, 1);
|
||||||
producer.produce(region, record.clone()).await.unwrap();
|
producer.produce(region, record.clone()).await.unwrap();
|
||||||
tokio::time::sleep(Duration::from_millis(150)).await;
|
tokio::time::sleep(Duration::from_millis(150)).await;
|
||||||
let current_latest_offset = topic_stats.get(&provider).unwrap().latest_offset;
|
let current_latest_offset = topic_stats.get(&provider).unwrap().latest_offset;
|
||||||
assert_eq!(current_latest_offset, record.len() as u64);
|
assert_eq!(current_latest_offset, record.len() as u64 - 1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -33,9 +33,12 @@ impl BackgroundProducerWorker {
|
|||||||
.context(error::GetOffsetSnafu {
|
.context(error::GetOffsetSnafu {
|
||||||
topic: &self.provider.topic,
|
topic: &self.provider.topic,
|
||||||
}) {
|
}) {
|
||||||
Ok(offset) => match self.topic_stats.entry(self.provider.clone()) {
|
Ok(highwatermark) => {
|
||||||
|
// The highwatermark is the offset of the last record plus one.
|
||||||
|
let offset = (highwatermark as u64).saturating_sub(1);
|
||||||
|
|
||||||
|
match self.topic_stats.entry(self.provider.clone()) {
|
||||||
dashmap::Entry::Occupied(mut occupied_entry) => {
|
dashmap::Entry::Occupied(mut occupied_entry) => {
|
||||||
let offset = offset as u64;
|
|
||||||
let stat = occupied_entry.get_mut();
|
let stat = occupied_entry.get_mut();
|
||||||
if stat.latest_offset < offset {
|
if stat.latest_offset < offset {
|
||||||
stat.latest_offset = offset;
|
stat.latest_offset = offset;
|
||||||
@@ -47,7 +50,7 @@ impl BackgroundProducerWorker {
|
|||||||
}
|
}
|
||||||
dashmap::Entry::Vacant(vacant_entry) => {
|
dashmap::Entry::Vacant(vacant_entry) => {
|
||||||
vacant_entry.insert(TopicStat {
|
vacant_entry.insert(TopicStat {
|
||||||
latest_offset: offset as u64,
|
latest_offset: offset,
|
||||||
record_size: 0,
|
record_size: 0,
|
||||||
record_num: 0,
|
record_num: 0,
|
||||||
});
|
});
|
||||||
@@ -56,7 +59,8 @@ impl BackgroundProducerWorker {
|
|||||||
self.provider.topic, offset
|
self.provider.topic, offset
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
},
|
}
|
||||||
|
}
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
error!(err; "Failed to get latest offset for topic {}", self.provider.topic);
|
error!(err; "Failed to get latest offset for topic {}", self.provider.topic);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -152,13 +152,9 @@ fn align_ts(ts: i64, interval: Duration) -> i64 {
|
|||||||
impl PersistStatsHandler {
|
impl PersistStatsHandler {
|
||||||
/// Creates a new [`PersistStatsHandler`].
|
/// Creates a new [`PersistStatsHandler`].
|
||||||
pub fn new(inserter: Box<dyn Inserter>, mut persist_interval: Duration) -> Self {
|
pub fn new(inserter: Box<dyn Inserter>, mut persist_interval: Duration) -> Self {
|
||||||
if persist_interval < Duration::from_secs(60) {
|
if persist_interval < Duration::from_mins(10) {
|
||||||
warn!("persist_interval is less than 60 seconds, set to 60 seconds");
|
warn!("persist_interval is less than 10 minutes, set to 10 minutes");
|
||||||
persist_interval = Duration::from_secs(60);
|
persist_interval = Duration::from_mins(10);
|
||||||
}
|
|
||||||
if persist_interval.as_millis() == 0 {
|
|
||||||
warn!("persist_interval as milliseconds is zero, set to 60 second");
|
|
||||||
persist_interval = Duration::from_secs(60);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Self {
|
Self {
|
||||||
|
|||||||
@@ -16,6 +16,7 @@
|
|||||||
#![feature(assert_matches)]
|
#![feature(assert_matches)]
|
||||||
#![feature(hash_set_entry)]
|
#![feature(hash_set_entry)]
|
||||||
#![feature(let_chains)]
|
#![feature(let_chains)]
|
||||||
|
#![feature(duration_constructors_lite)]
|
||||||
#![feature(duration_constructors)]
|
#![feature(duration_constructors)]
|
||||||
|
|
||||||
pub mod bootstrap;
|
pub mod bootstrap;
|
||||||
|
|||||||
@@ -114,8 +114,8 @@ pub struct StatsPersistenceOptions {
|
|||||||
impl Default for StatsPersistenceOptions {
|
impl Default for StatsPersistenceOptions {
|
||||||
fn default() -> Self {
|
fn default() -> Self {
|
||||||
Self {
|
Self {
|
||||||
ttl: Duration::from_days(30),
|
ttl: Duration::ZERO,
|
||||||
interval: Duration::from_secs(60),
|
interval: Duration::from_mins(10),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -82,7 +82,7 @@ lazy_static! {
|
|||||||
.unwrap();
|
.unwrap();
|
||||||
/// The triggered region flush total counter.
|
/// The triggered region flush total counter.
|
||||||
pub static ref METRIC_META_TRIGGERED_REGION_FLUSH_TOTAL: IntCounterVec =
|
pub static ref METRIC_META_TRIGGERED_REGION_FLUSH_TOTAL: IntCounterVec =
|
||||||
register_int_counter_vec!("meta_triggered_region_flush_total", "meta triggered region flush total", &["topic_name", "region_type"]).unwrap();
|
register_int_counter_vec!("meta_triggered_region_flush_total", "meta triggered region flush total", &["topic_name"]).unwrap();
|
||||||
|
|
||||||
/// The triggered region checkpoint total counter.
|
/// The triggered region checkpoint total counter.
|
||||||
pub static ref METRIC_META_TRIGGERED_REGION_CHECKPOINT_TOTAL: IntCounterVec =
|
pub static ref METRIC_META_TRIGGERED_REGION_CHECKPOINT_TOTAL: IntCounterVec =
|
||||||
|
|||||||
@@ -19,7 +19,6 @@ use api::v1::meta::MailboxMessage;
|
|||||||
use common_meta::distributed_time_constants::REGION_LEASE_SECS;
|
use common_meta::distributed_time_constants::REGION_LEASE_SECS;
|
||||||
use common_meta::instruction::{Instruction, InstructionReply, OpenRegion, SimpleReply};
|
use common_meta::instruction::{Instruction, InstructionReply, OpenRegion, SimpleReply};
|
||||||
use common_meta::key::datanode_table::RegionInfo;
|
use common_meta::key::datanode_table::RegionInfo;
|
||||||
use common_meta::wal_options_allocator::extract_topic_from_wal_options;
|
|
||||||
use common_meta::RegionIdent;
|
use common_meta::RegionIdent;
|
||||||
use common_procedure::{Context as ProcedureContext, Status};
|
use common_procedure::{Context as ProcedureContext, Status};
|
||||||
use common_telemetry::info;
|
use common_telemetry::info;
|
||||||
@@ -68,7 +67,6 @@ impl OpenCandidateRegion {
|
|||||||
async fn build_open_region_instruction(&self, ctx: &mut Context) -> Result<Instruction> {
|
async fn build_open_region_instruction(&self, ctx: &mut Context) -> Result<Instruction> {
|
||||||
let pc = &ctx.persistent_ctx;
|
let pc = &ctx.persistent_ctx;
|
||||||
let table_id = pc.region_id.table_id();
|
let table_id = pc.region_id.table_id();
|
||||||
let region_id = pc.region_id;
|
|
||||||
let region_number = pc.region_id.region_number();
|
let region_number = pc.region_id.region_number();
|
||||||
let candidate_id = pc.to_peer.id;
|
let candidate_id = pc.to_peer.id;
|
||||||
let datanode_table_value = ctx.get_from_peer_datanode_table_value().await?;
|
let datanode_table_value = ctx.get_from_peer_datanode_table_value().await?;
|
||||||
@@ -80,15 +78,7 @@ impl OpenCandidateRegion {
|
|||||||
engine,
|
engine,
|
||||||
} = datanode_table_value.region_info.clone();
|
} = datanode_table_value.region_info.clone();
|
||||||
|
|
||||||
let checkpoint =
|
let open_instruction = Instruction::OpenRegion(OpenRegion::new(
|
||||||
if let Some(topic) = extract_topic_from_wal_options(region_id, ®ion_wal_options) {
|
|
||||||
ctx.fetch_replay_checkpoint(&topic).await.ok().flatten()
|
|
||||||
} else {
|
|
||||||
None
|
|
||||||
};
|
|
||||||
|
|
||||||
let open_instruction = Instruction::OpenRegion(
|
|
||||||
OpenRegion::new(
|
|
||||||
RegionIdent {
|
RegionIdent {
|
||||||
datanode_id: candidate_id,
|
datanode_id: candidate_id,
|
||||||
table_id,
|
table_id,
|
||||||
@@ -99,12 +89,7 @@ impl OpenCandidateRegion {
|
|||||||
region_options,
|
region_options,
|
||||||
region_wal_options,
|
region_wal_options,
|
||||||
true,
|
true,
|
||||||
)
|
));
|
||||||
.with_replay_entry_id(checkpoint.map(|checkpoint| checkpoint.entry_id))
|
|
||||||
.with_metadata_replay_entry_id(
|
|
||||||
checkpoint.and_then(|checkpoint| checkpoint.metadata_entry_id),
|
|
||||||
),
|
|
||||||
);
|
|
||||||
|
|
||||||
Ok(open_instruction)
|
Ok(open_instruction)
|
||||||
}
|
}
|
||||||
@@ -241,8 +226,6 @@ mod tests {
|
|||||||
region_options: Default::default(),
|
region_options: Default::default(),
|
||||||
region_wal_options: Default::default(),
|
region_wal_options: Default::default(),
|
||||||
skip_wal_replay: true,
|
skip_wal_replay: true,
|
||||||
replay_entry_id: None,
|
|
||||||
metadata_replay_entry_id: None,
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -19,6 +19,7 @@ use api::v1::meta::MailboxMessage;
|
|||||||
use common_meta::ddl::utils::parse_region_wal_options;
|
use common_meta::ddl::utils::parse_region_wal_options;
|
||||||
use common_meta::instruction::{Instruction, InstructionReply, UpgradeRegion, UpgradeRegionReply};
|
use common_meta::instruction::{Instruction, InstructionReply, UpgradeRegion, UpgradeRegionReply};
|
||||||
use common_meta::lock_key::RemoteWalLock;
|
use common_meta::lock_key::RemoteWalLock;
|
||||||
|
use common_meta::wal_options_allocator::extract_topic_from_wal_options;
|
||||||
use common_procedure::{Context as ProcedureContext, Status};
|
use common_procedure::{Context as ProcedureContext, Status};
|
||||||
use common_telemetry::{error, warn};
|
use common_telemetry::{error, warn};
|
||||||
use common_wal::options::WalOptions;
|
use common_wal::options::WalOptions;
|
||||||
@@ -111,23 +112,40 @@ impl UpgradeCandidateRegion {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Builds upgrade region instruction.
|
/// Builds upgrade region instruction.
|
||||||
fn build_upgrade_region_instruction(
|
async fn build_upgrade_region_instruction(
|
||||||
&self,
|
&self,
|
||||||
ctx: &Context,
|
ctx: &mut Context,
|
||||||
replay_timeout: Duration,
|
replay_timeout: Duration,
|
||||||
) -> Instruction {
|
) -> Result<Instruction> {
|
||||||
let pc = &ctx.persistent_ctx;
|
let pc = &ctx.persistent_ctx;
|
||||||
let region_id = pc.region_id;
|
let region_id = pc.region_id;
|
||||||
let last_entry_id = ctx.volatile_ctx.leader_region_last_entry_id;
|
let last_entry_id = ctx.volatile_ctx.leader_region_last_entry_id;
|
||||||
let metadata_last_entry_id = ctx.volatile_ctx.leader_region_metadata_last_entry_id;
|
let metadata_last_entry_id = ctx.volatile_ctx.leader_region_metadata_last_entry_id;
|
||||||
|
// Try our best to retrieve replay checkpoint.
|
||||||
|
let datanode_table_value = ctx.get_from_peer_datanode_table_value().await.ok();
|
||||||
|
let checkpoint = if let Some(topic) = datanode_table_value.as_ref().and_then(|v| {
|
||||||
|
extract_topic_from_wal_options(region_id, &v.region_info.region_wal_options)
|
||||||
|
}) {
|
||||||
|
ctx.fetch_replay_checkpoint(&topic).await.ok().flatten()
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
};
|
||||||
|
|
||||||
Instruction::UpgradeRegion(UpgradeRegion {
|
let upgrade_instruction = Instruction::UpgradeRegion(
|
||||||
|
UpgradeRegion {
|
||||||
region_id,
|
region_id,
|
||||||
last_entry_id,
|
last_entry_id,
|
||||||
metadata_last_entry_id,
|
metadata_last_entry_id,
|
||||||
replay_timeout: Some(replay_timeout),
|
replay_timeout: Some(replay_timeout),
|
||||||
location_id: Some(ctx.persistent_ctx.from_peer.id),
|
location_id: Some(ctx.persistent_ctx.from_peer.id),
|
||||||
})
|
replay_entry_id: None,
|
||||||
|
metadata_replay_entry_id: None,
|
||||||
|
}
|
||||||
|
.with_replay_entry_id(checkpoint.map(|c| c.entry_id))
|
||||||
|
.with_metadata_replay_entry_id(checkpoint.and_then(|c| c.metadata_entry_id)),
|
||||||
|
);
|
||||||
|
|
||||||
|
Ok(upgrade_instruction)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Tries to upgrade a candidate region.
|
/// Tries to upgrade a candidate region.
|
||||||
@@ -144,16 +162,19 @@ impl UpgradeCandidateRegion {
|
|||||||
/// - [UnexpectedInstructionReply](error::Error::UnexpectedInstructionReply) (impossible).
|
/// - [UnexpectedInstructionReply](error::Error::UnexpectedInstructionReply) (impossible).
|
||||||
/// - [ExceededDeadline](error::Error::ExceededDeadline)
|
/// - [ExceededDeadline](error::Error::ExceededDeadline)
|
||||||
/// - Invalid JSON (impossible).
|
/// - Invalid JSON (impossible).
|
||||||
async fn upgrade_region(&self, ctx: &Context) -> Result<()> {
|
async fn upgrade_region(&self, ctx: &mut Context) -> Result<()> {
|
||||||
let pc = &ctx.persistent_ctx;
|
|
||||||
let region_id = pc.region_id;
|
|
||||||
let candidate = &pc.to_peer;
|
|
||||||
let operation_timeout =
|
let operation_timeout =
|
||||||
ctx.next_operation_timeout()
|
ctx.next_operation_timeout()
|
||||||
.context(error::ExceededDeadlineSnafu {
|
.context(error::ExceededDeadlineSnafu {
|
||||||
operation: "Upgrade region",
|
operation: "Upgrade region",
|
||||||
})?;
|
})?;
|
||||||
let upgrade_instruction = self.build_upgrade_region_instruction(ctx, operation_timeout);
|
let upgrade_instruction = self
|
||||||
|
.build_upgrade_region_instruction(ctx, operation_timeout)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
let pc = &ctx.persistent_ctx;
|
||||||
|
let region_id = pc.region_id;
|
||||||
|
let candidate = &pc.to_peer;
|
||||||
|
|
||||||
let msg = MailboxMessage::json_message(
|
let msg = MailboxMessage::json_message(
|
||||||
&format!("Upgrade candidate region: {}", region_id),
|
&format!("Upgrade candidate region: {}", region_id),
|
||||||
@@ -283,8 +304,12 @@ impl UpgradeCandidateRegion {
|
|||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use std::assert_matches::assert_matches;
|
use std::assert_matches::assert_matches;
|
||||||
|
use std::collections::HashMap;
|
||||||
|
|
||||||
|
use common_meta::key::table_route::TableRouteValue;
|
||||||
|
use common_meta::key::test_utils::new_test_table_info;
|
||||||
use common_meta::peer::Peer;
|
use common_meta::peer::Peer;
|
||||||
|
use common_meta::rpc::router::{Region, RegionRoute};
|
||||||
use store_api::storage::RegionId;
|
use store_api::storage::RegionId;
|
||||||
|
|
||||||
use super::*;
|
use super::*;
|
||||||
@@ -308,14 +333,33 @@ mod tests {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn prepare_table_metadata(ctx: &Context, wal_options: HashMap<u32, String>) {
|
||||||
|
let table_info =
|
||||||
|
new_test_table_info(ctx.persistent_ctx.region_id.table_id(), vec![1]).into();
|
||||||
|
let region_routes = vec![RegionRoute {
|
||||||
|
region: Region::new_test(ctx.persistent_ctx.region_id),
|
||||||
|
leader_peer: Some(ctx.persistent_ctx.from_peer.clone()),
|
||||||
|
follower_peers: vec![ctx.persistent_ctx.to_peer.clone()],
|
||||||
|
..Default::default()
|
||||||
|
}];
|
||||||
|
ctx.table_metadata_manager
|
||||||
|
.create_table_metadata(
|
||||||
|
table_info,
|
||||||
|
TableRouteValue::physical(region_routes),
|
||||||
|
wal_options,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn test_datanode_is_unreachable() {
|
async fn test_datanode_is_unreachable() {
|
||||||
let state = UpgradeCandidateRegion::default();
|
let state = UpgradeCandidateRegion::default();
|
||||||
let persistent_context = new_persistent_context();
|
let persistent_context = new_persistent_context();
|
||||||
let env = TestingEnv::new();
|
let env = TestingEnv::new();
|
||||||
let ctx = env.context_factory().new_context(persistent_context);
|
let mut ctx = env.context_factory().new_context(persistent_context);
|
||||||
|
prepare_table_metadata(&ctx, HashMap::default()).await;
|
||||||
let err = state.upgrade_region(&ctx).await.unwrap_err();
|
let err = state.upgrade_region(&mut ctx).await.unwrap_err();
|
||||||
|
|
||||||
assert_matches!(err, Error::PusherNotFound { .. });
|
assert_matches!(err, Error::PusherNotFound { .. });
|
||||||
assert!(!err.is_retryable());
|
assert!(!err.is_retryable());
|
||||||
@@ -328,7 +372,8 @@ mod tests {
|
|||||||
let to_peer_id = persistent_context.to_peer.id;
|
let to_peer_id = persistent_context.to_peer.id;
|
||||||
|
|
||||||
let mut env = TestingEnv::new();
|
let mut env = TestingEnv::new();
|
||||||
let ctx = env.context_factory().new_context(persistent_context);
|
let mut ctx = env.context_factory().new_context(persistent_context);
|
||||||
|
prepare_table_metadata(&ctx, HashMap::default()).await;
|
||||||
let mailbox_ctx = env.mailbox_context();
|
let mailbox_ctx = env.mailbox_context();
|
||||||
|
|
||||||
let (tx, rx) = tokio::sync::mpsc::channel(1);
|
let (tx, rx) = tokio::sync::mpsc::channel(1);
|
||||||
@@ -339,7 +384,7 @@ mod tests {
|
|||||||
|
|
||||||
drop(rx);
|
drop(rx);
|
||||||
|
|
||||||
let err = state.upgrade_region(&ctx).await.unwrap_err();
|
let err = state.upgrade_region(&mut ctx).await.unwrap_err();
|
||||||
|
|
||||||
assert_matches!(err, Error::PushMessage { .. });
|
assert_matches!(err, Error::PushMessage { .. });
|
||||||
assert!(!err.is_retryable());
|
assert!(!err.is_retryable());
|
||||||
@@ -351,10 +396,11 @@ mod tests {
|
|||||||
let persistent_context = new_persistent_context();
|
let persistent_context = new_persistent_context();
|
||||||
let env = TestingEnv::new();
|
let env = TestingEnv::new();
|
||||||
let mut ctx = env.context_factory().new_context(persistent_context);
|
let mut ctx = env.context_factory().new_context(persistent_context);
|
||||||
|
prepare_table_metadata(&ctx, HashMap::default()).await;
|
||||||
ctx.volatile_ctx.metrics.operations_elapsed =
|
ctx.volatile_ctx.metrics.operations_elapsed =
|
||||||
ctx.persistent_ctx.timeout + Duration::from_secs(1);
|
ctx.persistent_ctx.timeout + Duration::from_secs(1);
|
||||||
|
|
||||||
let err = state.upgrade_region(&ctx).await.unwrap_err();
|
let err = state.upgrade_region(&mut ctx).await.unwrap_err();
|
||||||
|
|
||||||
assert_matches!(err, Error::ExceededDeadline { .. });
|
assert_matches!(err, Error::ExceededDeadline { .. });
|
||||||
assert!(!err.is_retryable());
|
assert!(!err.is_retryable());
|
||||||
@@ -367,7 +413,8 @@ mod tests {
|
|||||||
let to_peer_id = persistent_context.to_peer.id;
|
let to_peer_id = persistent_context.to_peer.id;
|
||||||
|
|
||||||
let mut env = TestingEnv::new();
|
let mut env = TestingEnv::new();
|
||||||
let ctx = env.context_factory().new_context(persistent_context);
|
let mut ctx = env.context_factory().new_context(persistent_context);
|
||||||
|
prepare_table_metadata(&ctx, HashMap::default()).await;
|
||||||
let mailbox_ctx = env.mailbox_context();
|
let mailbox_ctx = env.mailbox_context();
|
||||||
let mailbox = mailbox_ctx.mailbox().clone();
|
let mailbox = mailbox_ctx.mailbox().clone();
|
||||||
|
|
||||||
@@ -379,7 +426,7 @@ mod tests {
|
|||||||
|
|
||||||
send_mock_reply(mailbox, rx, |id| Ok(new_close_region_reply(id)));
|
send_mock_reply(mailbox, rx, |id| Ok(new_close_region_reply(id)));
|
||||||
|
|
||||||
let err = state.upgrade_region(&ctx).await.unwrap_err();
|
let err = state.upgrade_region(&mut ctx).await.unwrap_err();
|
||||||
assert_matches!(err, Error::UnexpectedInstructionReply { .. });
|
assert_matches!(err, Error::UnexpectedInstructionReply { .. });
|
||||||
assert!(!err.is_retryable());
|
assert!(!err.is_retryable());
|
||||||
}
|
}
|
||||||
@@ -391,7 +438,8 @@ mod tests {
|
|||||||
let to_peer_id = persistent_context.to_peer.id;
|
let to_peer_id = persistent_context.to_peer.id;
|
||||||
|
|
||||||
let mut env = TestingEnv::new();
|
let mut env = TestingEnv::new();
|
||||||
let ctx = env.context_factory().new_context(persistent_context);
|
let mut ctx = env.context_factory().new_context(persistent_context);
|
||||||
|
prepare_table_metadata(&ctx, HashMap::default()).await;
|
||||||
let mailbox_ctx = env.mailbox_context();
|
let mailbox_ctx = env.mailbox_context();
|
||||||
let mailbox = mailbox_ctx.mailbox().clone();
|
let mailbox = mailbox_ctx.mailbox().clone();
|
||||||
|
|
||||||
@@ -411,7 +459,7 @@ mod tests {
|
|||||||
))
|
))
|
||||||
});
|
});
|
||||||
|
|
||||||
let err = state.upgrade_region(&ctx).await.unwrap_err();
|
let err = state.upgrade_region(&mut ctx).await.unwrap_err();
|
||||||
|
|
||||||
assert_matches!(err, Error::RetryLater { .. });
|
assert_matches!(err, Error::RetryLater { .. });
|
||||||
assert!(err.is_retryable());
|
assert!(err.is_retryable());
|
||||||
@@ -425,7 +473,8 @@ mod tests {
|
|||||||
let to_peer_id = persistent_context.to_peer.id;
|
let to_peer_id = persistent_context.to_peer.id;
|
||||||
|
|
||||||
let mut env = TestingEnv::new();
|
let mut env = TestingEnv::new();
|
||||||
let ctx = env.context_factory().new_context(persistent_context);
|
let mut ctx = env.context_factory().new_context(persistent_context);
|
||||||
|
prepare_table_metadata(&ctx, HashMap::default()).await;
|
||||||
let mailbox_ctx = env.mailbox_context();
|
let mailbox_ctx = env.mailbox_context();
|
||||||
let mailbox = mailbox_ctx.mailbox().clone();
|
let mailbox = mailbox_ctx.mailbox().clone();
|
||||||
|
|
||||||
@@ -439,7 +488,7 @@ mod tests {
|
|||||||
Ok(new_upgrade_region_reply(id, true, false, None))
|
Ok(new_upgrade_region_reply(id, true, false, None))
|
||||||
});
|
});
|
||||||
|
|
||||||
let err = state.upgrade_region(&ctx).await.unwrap_err();
|
let err = state.upgrade_region(&mut ctx).await.unwrap_err();
|
||||||
|
|
||||||
assert_matches!(err, Error::Unexpected { .. });
|
assert_matches!(err, Error::Unexpected { .. });
|
||||||
assert!(!err.is_retryable());
|
assert!(!err.is_retryable());
|
||||||
@@ -457,7 +506,8 @@ mod tests {
|
|||||||
let to_peer_id = persistent_context.to_peer.id;
|
let to_peer_id = persistent_context.to_peer.id;
|
||||||
|
|
||||||
let mut env = TestingEnv::new();
|
let mut env = TestingEnv::new();
|
||||||
let ctx = env.context_factory().new_context(persistent_context);
|
let mut ctx = env.context_factory().new_context(persistent_context);
|
||||||
|
prepare_table_metadata(&ctx, HashMap::default()).await;
|
||||||
let mailbox_ctx = env.mailbox_context();
|
let mailbox_ctx = env.mailbox_context();
|
||||||
let mailbox = mailbox_ctx.mailbox().clone();
|
let mailbox = mailbox_ctx.mailbox().clone();
|
||||||
|
|
||||||
@@ -471,7 +521,7 @@ mod tests {
|
|||||||
Ok(new_upgrade_region_reply(id, false, true, None))
|
Ok(new_upgrade_region_reply(id, false, true, None))
|
||||||
});
|
});
|
||||||
|
|
||||||
let err = state.upgrade_region(&ctx).await.unwrap_err();
|
let err = state.upgrade_region(&mut ctx).await.unwrap_err();
|
||||||
|
|
||||||
assert_matches!(err, Error::RetryLater { .. });
|
assert_matches!(err, Error::RetryLater { .. });
|
||||||
assert!(err.is_retryable());
|
assert!(err.is_retryable());
|
||||||
@@ -491,7 +541,7 @@ mod tests {
|
|||||||
Ok(new_upgrade_region_reply(id, false, true, None))
|
Ok(new_upgrade_region_reply(id, false, true, None))
|
||||||
});
|
});
|
||||||
|
|
||||||
state.upgrade_region(&ctx).await.unwrap();
|
state.upgrade_region(&mut ctx).await.unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
@@ -503,6 +553,7 @@ mod tests {
|
|||||||
|
|
||||||
let mut env = TestingEnv::new();
|
let mut env = TestingEnv::new();
|
||||||
let mut ctx = env.context_factory().new_context(persistent_context);
|
let mut ctx = env.context_factory().new_context(persistent_context);
|
||||||
|
prepare_table_metadata(&ctx, HashMap::default()).await;
|
||||||
let mailbox_ctx = env.mailbox_context();
|
let mailbox_ctx = env.mailbox_context();
|
||||||
let mailbox = mailbox_ctx.mailbox().clone();
|
let mailbox = mailbox_ctx.mailbox().clone();
|
||||||
|
|
||||||
@@ -563,6 +614,7 @@ mod tests {
|
|||||||
|
|
||||||
let mut env = TestingEnv::new();
|
let mut env = TestingEnv::new();
|
||||||
let mut ctx = env.context_factory().new_context(persistent_context);
|
let mut ctx = env.context_factory().new_context(persistent_context);
|
||||||
|
prepare_table_metadata(&ctx, HashMap::default()).await;
|
||||||
let mailbox_ctx = env.mailbox_context();
|
let mailbox_ctx = env.mailbox_context();
|
||||||
let mailbox = mailbox_ctx.mailbox().clone();
|
let mailbox = mailbox_ctx.mailbox().clone();
|
||||||
|
|
||||||
@@ -621,6 +673,7 @@ mod tests {
|
|||||||
|
|
||||||
let mut env = TestingEnv::new();
|
let mut env = TestingEnv::new();
|
||||||
let mut ctx = env.context_factory().new_context(persistent_context);
|
let mut ctx = env.context_factory().new_context(persistent_context);
|
||||||
|
prepare_table_metadata(&ctx, HashMap::default()).await;
|
||||||
let mailbox_ctx = env.mailbox_context();
|
let mailbox_ctx = env.mailbox_context();
|
||||||
let mailbox = mailbox_ctx.mailbox().clone();
|
let mailbox = mailbox_ctx.mailbox().clone();
|
||||||
ctx.volatile_ctx.metrics.operations_elapsed =
|
ctx.volatile_ctx.metrics.operations_elapsed =
|
||||||
|
|||||||
@@ -29,7 +29,6 @@ use common_time::util::current_time_millis;
|
|||||||
use common_wal::config::kafka::common::{
|
use common_wal::config::kafka::common::{
|
||||||
DEFAULT_CHECKPOINT_TRIGGER_SIZE, DEFAULT_FLUSH_TRIGGER_SIZE,
|
DEFAULT_CHECKPOINT_TRIGGER_SIZE, DEFAULT_FLUSH_TRIGGER_SIZE,
|
||||||
};
|
};
|
||||||
use itertools::Itertools;
|
|
||||||
use snafu::{OptionExt, ResultExt};
|
use snafu::{OptionExt, ResultExt};
|
||||||
use store_api::storage::RegionId;
|
use store_api::storage::RegionId;
|
||||||
use tokio::sync::mpsc::{Receiver, Sender};
|
use tokio::sync::mpsc::{Receiver, Sender};
|
||||||
@@ -223,31 +222,34 @@ impl RegionFlushTrigger {
|
|||||||
&self,
|
&self,
|
||||||
topic: &str,
|
topic: &str,
|
||||||
region_ids: &[RegionId],
|
region_ids: &[RegionId],
|
||||||
|
topic_regions: &HashMap<RegionId, TopicRegionValue>,
|
||||||
leader_regions: &HashMap<RegionId, LeaderRegion>,
|
leader_regions: &HashMap<RegionId, LeaderRegion>,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
if region_ids.is_empty() {
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
|
|
||||||
let regions = region_ids
|
let regions = region_ids
|
||||||
.iter()
|
.iter()
|
||||||
.flat_map(|region_id| match leader_regions.get(region_id) {
|
.flat_map(|region_id| match leader_regions.get(region_id) {
|
||||||
Some(leader_region) => {
|
Some(leader_region) => should_persist_region_checkpoint(
|
||||||
let entry_id = leader_region.manifest.replay_entry_id();
|
leader_region,
|
||||||
let metadata_entry_id = leader_region.manifest.metadata_replay_entry_id();
|
topic_regions
|
||||||
|
.get(region_id)
|
||||||
Some((
|
.cloned()
|
||||||
|
.and_then(|value| value.checkpoint),
|
||||||
|
)
|
||||||
|
.map(|checkpoint| {
|
||||||
|
(
|
||||||
TopicRegionKey::new(*region_id, topic),
|
TopicRegionKey::new(*region_id, topic),
|
||||||
Some(TopicRegionValue::new(Some(ReplayCheckpoint::new(
|
Some(TopicRegionValue::new(Some(checkpoint))),
|
||||||
entry_id,
|
)
|
||||||
metadata_entry_id,
|
}),
|
||||||
)))),
|
|
||||||
))
|
|
||||||
}
|
|
||||||
None => None,
|
None => None,
|
||||||
})
|
})
|
||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
|
// The`chunks` will panic if chunks_size is zero, so we return early if there are no regions to persist.
|
||||||
|
if regions.is_empty() {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
let max_txn_ops = self.table_metadata_manager.kv_backend().max_txn_ops();
|
let max_txn_ops = self.table_metadata_manager.kv_backend().max_txn_ops();
|
||||||
let batch_size = max_txn_ops.min(regions.len());
|
let batch_size = max_txn_ops.min(regions.len());
|
||||||
for batch in regions.chunks(batch_size) {
|
for batch in regions.chunks(batch_size) {
|
||||||
@@ -271,14 +273,14 @@ impl RegionFlushTrigger {
|
|||||||
latest_entry_id: u64,
|
latest_entry_id: u64,
|
||||||
avg_record_size: usize,
|
avg_record_size: usize,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let region_ids = self
|
let topic_regions = self
|
||||||
.table_metadata_manager
|
.table_metadata_manager
|
||||||
.topic_region_manager()
|
.topic_region_manager()
|
||||||
.regions(topic)
|
.regions(topic)
|
||||||
.await
|
.await
|
||||||
.context(error::TableMetadataManagerSnafu)?;
|
.context(error::TableMetadataManagerSnafu)?;
|
||||||
|
|
||||||
if region_ids.is_empty() {
|
if topic_regions.is_empty() {
|
||||||
debug!("No regions found for topic: {}", topic);
|
debug!("No regions found for topic: {}", topic);
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
@@ -286,7 +288,7 @@ impl RegionFlushTrigger {
|
|||||||
// Filters regions need to persist checkpoints.
|
// Filters regions need to persist checkpoints.
|
||||||
let regions_to_persist = filter_regions_by_replay_size(
|
let regions_to_persist = filter_regions_by_replay_size(
|
||||||
topic,
|
topic,
|
||||||
region_ids
|
topic_regions
|
||||||
.iter()
|
.iter()
|
||||||
.map(|(region_id, value)| (*region_id, value.min_entry_id().unwrap_or_default())),
|
.map(|(region_id, value)| (*region_id, value.min_entry_id().unwrap_or_default())),
|
||||||
avg_record_size as u64,
|
avg_record_size as u64,
|
||||||
@@ -295,33 +297,25 @@ impl RegionFlushTrigger {
|
|||||||
);
|
);
|
||||||
let region_manifests = self
|
let region_manifests = self
|
||||||
.leader_region_registry
|
.leader_region_registry
|
||||||
.batch_get(region_ids.keys().cloned());
|
.batch_get(topic_regions.keys().cloned());
|
||||||
|
|
||||||
if let Err(err) = self
|
if let Err(err) = self
|
||||||
.persist_region_checkpoints(topic, ®ions_to_persist, ®ion_manifests)
|
.persist_region_checkpoints(
|
||||||
|
topic,
|
||||||
|
®ions_to_persist,
|
||||||
|
&topic_regions,
|
||||||
|
®ion_manifests,
|
||||||
|
)
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
error!(err; "Failed to persist region checkpoints for topic: {}", topic);
|
error!(err; "Failed to persist region checkpoints for topic: {}", topic);
|
||||||
}
|
}
|
||||||
|
|
||||||
let (inactive_regions, active_regions): (Vec<_>, Vec<_>) = region_manifests
|
let regions = region_manifests
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.partition_map(|(region_id, region)| {
|
.map(|(region_id, region)| (region_id, region.manifest.prunable_entry_id()))
|
||||||
if !region.manifest.is_inactive() {
|
.collect::<Vec<_>>();
|
||||||
itertools::Either::Left((region_id, region.manifest.prunable_entry_id()))
|
let min_entry_id = regions.iter().min_by_key(|(_, entry_id)| *entry_id);
|
||||||
} else {
|
|
||||||
itertools::Either::Right((region_id, region.manifest.prunable_entry_id()))
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
let min_entry_id = inactive_regions
|
|
||||||
.iter()
|
|
||||||
.min_by_key(|(_, entry_id)| *entry_id);
|
|
||||||
let min_entry_id = active_regions
|
|
||||||
.iter()
|
|
||||||
.min_by_key(|(_, entry_id)| *entry_id)
|
|
||||||
.or(min_entry_id);
|
|
||||||
|
|
||||||
if let Some((_, min_entry_id)) = min_entry_id {
|
if let Some((_, min_entry_id)) = min_entry_id {
|
||||||
let replay_size = (latest_entry_id.saturating_sub(*min_entry_id))
|
let replay_size = (latest_entry_id.saturating_sub(*min_entry_id))
|
||||||
.saturating_mul(avg_record_size as u64);
|
.saturating_mul(avg_record_size as u64);
|
||||||
@@ -331,45 +325,28 @@ impl RegionFlushTrigger {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Selects regions to flush from the set of active regions.
|
// Selects regions to flush from the set of active regions.
|
||||||
let mut regions_to_flush = filter_regions_by_replay_size(
|
let regions_to_flush = filter_regions_by_replay_size(
|
||||||
topic,
|
topic,
|
||||||
active_regions.into_iter(),
|
regions.into_iter(),
|
||||||
avg_record_size as u64,
|
avg_record_size as u64,
|
||||||
latest_entry_id,
|
latest_entry_id,
|
||||||
self.flush_trigger_size,
|
self.flush_trigger_size,
|
||||||
);
|
);
|
||||||
|
|
||||||
let active_regions_num = regions_to_flush.len();
|
|
||||||
// Selects regions to flush from the set of inactive regions.
|
|
||||||
// For inactive regions, we use a lower flush trigger size (half of the normal size)
|
|
||||||
// to encourage more aggressive flushing to update the region's topic latest entry id.
|
|
||||||
let inactive_regions_to_flush = filter_regions_by_replay_size(
|
|
||||||
topic,
|
|
||||||
inactive_regions.into_iter(),
|
|
||||||
avg_record_size as u64,
|
|
||||||
latest_entry_id,
|
|
||||||
self.flush_trigger_size / 2,
|
|
||||||
);
|
|
||||||
let inactive_regions_num = inactive_regions_to_flush.len();
|
|
||||||
regions_to_flush.extend(inactive_regions_to_flush);
|
|
||||||
|
|
||||||
// Sends flush instructions to datanodes.
|
// Sends flush instructions to datanodes.
|
||||||
if !regions_to_flush.is_empty() {
|
if !regions_to_flush.is_empty() {
|
||||||
self.send_flush_instructions(®ions_to_flush).await?;
|
self.send_flush_instructions(®ions_to_flush).await?;
|
||||||
debug!(
|
debug!(
|
||||||
"Sent {} flush instructions to datanodes for topic: '{}' ({} inactive regions)",
|
"Sent {} flush instructions to datanodes for topic: '{}', regions: {:?}",
|
||||||
regions_to_flush.len(),
|
regions_to_flush.len(),
|
||||||
topic,
|
topic,
|
||||||
inactive_regions_num,
|
regions_to_flush,
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
metrics::METRIC_META_TRIGGERED_REGION_FLUSH_TOTAL
|
metrics::METRIC_META_TRIGGERED_REGION_FLUSH_TOTAL
|
||||||
.with_label_values(&[topic, "active"])
|
.with_label_values(&[topic])
|
||||||
.inc_by(active_regions_num as u64);
|
.inc_by(regions_to_flush.len() as u64);
|
||||||
metrics::METRIC_META_TRIGGERED_REGION_FLUSH_TOTAL
|
|
||||||
.with_label_values(&[topic, "inactive"])
|
|
||||||
.inc_by(inactive_regions_num as u64);
|
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@@ -408,6 +385,26 @@ impl RegionFlushTrigger {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Determines whether a region checkpoint should be persisted based on current and persisted state.
|
||||||
|
fn should_persist_region_checkpoint(
|
||||||
|
current: &LeaderRegion,
|
||||||
|
persisted: Option<ReplayCheckpoint>,
|
||||||
|
) -> Option<ReplayCheckpoint> {
|
||||||
|
let new_checkpoint = ReplayCheckpoint::new(
|
||||||
|
current.manifest.replay_entry_id(),
|
||||||
|
current.manifest.metadata_replay_entry_id(),
|
||||||
|
);
|
||||||
|
|
||||||
|
let Some(persisted) = persisted else {
|
||||||
|
return Some(new_checkpoint);
|
||||||
|
};
|
||||||
|
|
||||||
|
if new_checkpoint > persisted {
|
||||||
|
return Some(new_checkpoint);
|
||||||
|
}
|
||||||
|
None
|
||||||
|
}
|
||||||
|
|
||||||
/// Filter regions based on the estimated replay size.
|
/// Filter regions based on the estimated replay size.
|
||||||
///
|
///
|
||||||
/// Returns the regions if its estimated replay size exceeds the given threshold.
|
/// Returns the regions if its estimated replay size exceeds the given threshold.
|
||||||
@@ -496,6 +493,7 @@ fn is_recent(timestamp: i64, now: i64, duration: Duration) -> bool {
|
|||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use common_base::readable_size::ReadableSize;
|
use common_base::readable_size::ReadableSize;
|
||||||
|
use common_meta::region_registry::LeaderRegionManifestInfo;
|
||||||
use store_api::storage::RegionId;
|
use store_api::storage::RegionId;
|
||||||
|
|
||||||
use super::*;
|
use super::*;
|
||||||
@@ -626,4 +624,92 @@ mod tests {
|
|||||||
// Only regions 1,1 and 1,2 should be flushed
|
// Only regions 1,1 and 1,2 should be flushed
|
||||||
assert_eq!(result, vec![region_id(1, 1), region_id(1, 2)]);
|
assert_eq!(result, vec![region_id(1, 1), region_id(1, 2)]);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn metric_leader_region(replay_entry_id: u64, metadata_replay_entry_id: u64) -> LeaderRegion {
|
||||||
|
LeaderRegion {
|
||||||
|
datanode_id: 1,
|
||||||
|
manifest: LeaderRegionManifestInfo::Metric {
|
||||||
|
data_manifest_version: 1,
|
||||||
|
data_flushed_entry_id: replay_entry_id,
|
||||||
|
data_topic_latest_entry_id: 0,
|
||||||
|
metadata_manifest_version: 1,
|
||||||
|
metadata_flushed_entry_id: metadata_replay_entry_id,
|
||||||
|
metadata_topic_latest_entry_id: 0,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn mito_leader_region(replay_entry_id: u64) -> LeaderRegion {
|
||||||
|
LeaderRegion {
|
||||||
|
datanode_id: 1,
|
||||||
|
manifest: LeaderRegionManifestInfo::Mito {
|
||||||
|
manifest_version: 1,
|
||||||
|
flushed_entry_id: replay_entry_id,
|
||||||
|
topic_latest_entry_id: 0,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_should_persist_region_checkpoint() {
|
||||||
|
// `persisted` is none
|
||||||
|
let current = metric_leader_region(100, 10);
|
||||||
|
let result = should_persist_region_checkpoint(¤t, None).unwrap();
|
||||||
|
assert_eq!(result, ReplayCheckpoint::new(100, Some(10)));
|
||||||
|
|
||||||
|
// `persisted.entry_id` is less than `current.manifest.replay_entry_id()`
|
||||||
|
let current = mito_leader_region(100);
|
||||||
|
let result =
|
||||||
|
should_persist_region_checkpoint(¤t, Some(ReplayCheckpoint::new(90, None)))
|
||||||
|
.unwrap();
|
||||||
|
assert_eq!(result, ReplayCheckpoint::new(100, None));
|
||||||
|
|
||||||
|
let current = metric_leader_region(100, 10);
|
||||||
|
let result =
|
||||||
|
should_persist_region_checkpoint(¤t, Some(ReplayCheckpoint::new(90, Some(10))))
|
||||||
|
.unwrap();
|
||||||
|
assert_eq!(result, ReplayCheckpoint::new(100, Some(10)));
|
||||||
|
|
||||||
|
// `persisted.metadata_entry_id` is less than `current.manifest.metadata_replay_entry_id()`
|
||||||
|
let current = metric_leader_region(100, 10);
|
||||||
|
let result =
|
||||||
|
should_persist_region_checkpoint(¤t, Some(ReplayCheckpoint::new(100, Some(8))))
|
||||||
|
.unwrap();
|
||||||
|
assert_eq!(result, ReplayCheckpoint::new(100, Some(10)));
|
||||||
|
|
||||||
|
// `persisted.metadata_entry_id` is none
|
||||||
|
let current = metric_leader_region(100, 10);
|
||||||
|
let result =
|
||||||
|
should_persist_region_checkpoint(¤t, Some(ReplayCheckpoint::new(100, None)))
|
||||||
|
.unwrap();
|
||||||
|
assert_eq!(result, ReplayCheckpoint::new(100, Some(10)));
|
||||||
|
|
||||||
|
// `current.manifest.metadata_replay_entry_id()` is none
|
||||||
|
let current = mito_leader_region(100);
|
||||||
|
let result =
|
||||||
|
should_persist_region_checkpoint(¤t, Some(ReplayCheckpoint::new(100, Some(8))))
|
||||||
|
.is_none();
|
||||||
|
assert!(result);
|
||||||
|
|
||||||
|
// `persisted.entry_id` is equal to `current.manifest.replay_entry_id()`
|
||||||
|
let current = metric_leader_region(100, 10);
|
||||||
|
let result =
|
||||||
|
should_persist_region_checkpoint(¤t, Some(ReplayCheckpoint::new(100, Some(10))));
|
||||||
|
assert!(result.is_none());
|
||||||
|
let current = mito_leader_region(100);
|
||||||
|
let result =
|
||||||
|
should_persist_region_checkpoint(¤t, Some(ReplayCheckpoint::new(100, None)));
|
||||||
|
assert!(result.is_none());
|
||||||
|
|
||||||
|
// `persisted.entry_id` is less than `current.manifest.replay_entry_id()`
|
||||||
|
// `persisted.metadata_entry_id` is greater than `current.manifest.metadata_replay_entry_id()`
|
||||||
|
let current = metric_leader_region(80, 11);
|
||||||
|
let result =
|
||||||
|
should_persist_region_checkpoint(¤t, Some(ReplayCheckpoint::new(90, Some(10))));
|
||||||
|
assert!(result.is_none());
|
||||||
|
let current = mito_leader_region(80);
|
||||||
|
let result =
|
||||||
|
should_persist_region_checkpoint(¤t, Some(ReplayCheckpoint::new(90, Some(10))));
|
||||||
|
assert!(result.is_none());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -97,7 +97,7 @@ impl store_server::Store for Metasrv {
|
|||||||
let req = req.into_inner();
|
let req = req.into_inner();
|
||||||
|
|
||||||
let _timer = METRIC_META_KV_REQUEST_ELAPSED
|
let _timer = METRIC_META_KV_REQUEST_ELAPSED
|
||||||
.with_label_values(&[self.kv_backend().name(), "batch_pub"])
|
.with_label_values(&[self.kv_backend().name(), "batch_put"])
|
||||||
.start_timer();
|
.start_timer();
|
||||||
|
|
||||||
let req: BatchPutRequest = req.into();
|
let req: BatchPutRequest = req.into();
|
||||||
|
|||||||
@@ -15,7 +15,9 @@
|
|||||||
use common_telemetry::debug;
|
use common_telemetry::debug;
|
||||||
use snafu::{OptionExt, ResultExt};
|
use snafu::{OptionExt, ResultExt};
|
||||||
use store_api::region_engine::RegionEngine;
|
use store_api::region_engine::RegionEngine;
|
||||||
use store_api::region_request::{AffectedRows, RegionCatchupRequest, RegionRequest};
|
use store_api::region_request::{
|
||||||
|
AffectedRows, RegionCatchupRequest, RegionRequest, ReplayCheckpoint,
|
||||||
|
};
|
||||||
use store_api::storage::RegionId;
|
use store_api::storage::RegionId;
|
||||||
|
|
||||||
use crate::engine::MetricEngineInner;
|
use crate::engine::MetricEngineInner;
|
||||||
@@ -59,6 +61,10 @@ impl MetricEngineInner {
|
|||||||
entry_id: req.metadata_entry_id,
|
entry_id: req.metadata_entry_id,
|
||||||
metadata_entry_id: None,
|
metadata_entry_id: None,
|
||||||
location_id: req.location_id,
|
location_id: req.location_id,
|
||||||
|
checkpoint: req.checkpoint.map(|c| ReplayCheckpoint {
|
||||||
|
entry_id: c.metadata_entry_id.unwrap_or_default(),
|
||||||
|
metadata_entry_id: None,
|
||||||
|
}),
|
||||||
}),
|
}),
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
@@ -73,6 +79,10 @@ impl MetricEngineInner {
|
|||||||
entry_id: req.entry_id,
|
entry_id: req.entry_id,
|
||||||
metadata_entry_id: None,
|
metadata_entry_id: None,
|
||||||
location_id: req.location_id,
|
location_id: req.location_id,
|
||||||
|
checkpoint: req.checkpoint.map(|c| ReplayCheckpoint {
|
||||||
|
entry_id: c.entry_id,
|
||||||
|
metadata_entry_id: None,
|
||||||
|
}),
|
||||||
}),
|
}),
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
|
|||||||
@@ -189,6 +189,11 @@ impl AccessLayer {
|
|||||||
&self.puffin_manager_factory
|
&self.puffin_manager_factory
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Returns the intermediate manager.
|
||||||
|
pub fn intermediate_manager(&self) -> &IntermediateManager {
|
||||||
|
&self.intermediate_manager
|
||||||
|
}
|
||||||
|
|
||||||
/// Deletes a SST file (and its index file if it has one) with given file id.
|
/// Deletes a SST file (and its index file if it has one) with given file id.
|
||||||
pub(crate) async fn delete_sst(&self, file_meta: &FileMeta) -> Result<()> {
|
pub(crate) async fn delete_sst(&self, file_meta: &FileMeta) -> Result<()> {
|
||||||
let path = location::sst_file_path(&self.table_dir, file_meta.file_id(), self.path_type);
|
let path = location::sst_file_path(&self.table_dir, file_meta.file_id(), self.path_type);
|
||||||
|
|||||||
@@ -1077,6 +1077,7 @@ mod tests {
|
|||||||
let staging_manifest_ctx = {
|
let staging_manifest_ctx = {
|
||||||
let manager = RegionManifestManager::new(
|
let manager = RegionManifestManager::new(
|
||||||
version_control.current().version.metadata.clone(),
|
version_control.current().version.metadata.clone(),
|
||||||
|
0,
|
||||||
RegionManifestOptions {
|
RegionManifestOptions {
|
||||||
manifest_dir: "".to_string(),
|
manifest_dir: "".to_string(),
|
||||||
object_store: env.access_layer.object_store().clone(),
|
object_store: env.access_layer.object_store().clone(),
|
||||||
|
|||||||
@@ -127,8 +127,7 @@ async fn test_catchup_with_last_entry_id(factory: Option<LogStoreFactory>) {
|
|||||||
RegionRequest::Catchup(RegionCatchupRequest {
|
RegionRequest::Catchup(RegionCatchupRequest {
|
||||||
set_writable: false,
|
set_writable: false,
|
||||||
entry_id: last_entry_id,
|
entry_id: last_entry_id,
|
||||||
metadata_entry_id: None,
|
..Default::default()
|
||||||
location_id: None,
|
|
||||||
}),
|
}),
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
@@ -160,8 +159,7 @@ async fn test_catchup_with_last_entry_id(factory: Option<LogStoreFactory>) {
|
|||||||
RegionRequest::Catchup(RegionCatchupRequest {
|
RegionRequest::Catchup(RegionCatchupRequest {
|
||||||
set_writable: true,
|
set_writable: true,
|
||||||
entry_id: last_entry_id,
|
entry_id: last_entry_id,
|
||||||
metadata_entry_id: None,
|
..Default::default()
|
||||||
location_id: None,
|
|
||||||
}),
|
}),
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
@@ -251,8 +249,7 @@ async fn test_catchup_with_incorrect_last_entry_id(factory: Option<LogStoreFacto
|
|||||||
RegionRequest::Catchup(RegionCatchupRequest {
|
RegionRequest::Catchup(RegionCatchupRequest {
|
||||||
set_writable: false,
|
set_writable: false,
|
||||||
entry_id: incorrect_last_entry_id,
|
entry_id: incorrect_last_entry_id,
|
||||||
metadata_entry_id: None,
|
..Default::default()
|
||||||
location_id: None,
|
|
||||||
}),
|
}),
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
@@ -269,8 +266,7 @@ async fn test_catchup_with_incorrect_last_entry_id(factory: Option<LogStoreFacto
|
|||||||
RegionRequest::Catchup(RegionCatchupRequest {
|
RegionRequest::Catchup(RegionCatchupRequest {
|
||||||
set_writable: false,
|
set_writable: false,
|
||||||
entry_id: incorrect_last_entry_id,
|
entry_id: incorrect_last_entry_id,
|
||||||
metadata_entry_id: None,
|
..Default::default()
|
||||||
location_id: None,
|
|
||||||
}),
|
}),
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
@@ -340,9 +336,7 @@ async fn test_catchup_without_last_entry_id(factory: Option<LogStoreFactory>) {
|
|||||||
region_id,
|
region_id,
|
||||||
RegionRequest::Catchup(RegionCatchupRequest {
|
RegionRequest::Catchup(RegionCatchupRequest {
|
||||||
set_writable: false,
|
set_writable: false,
|
||||||
entry_id: None,
|
..Default::default()
|
||||||
metadata_entry_id: None,
|
|
||||||
location_id: None,
|
|
||||||
}),
|
}),
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
@@ -372,9 +366,7 @@ async fn test_catchup_without_last_entry_id(factory: Option<LogStoreFactory>) {
|
|||||||
region_id,
|
region_id,
|
||||||
RegionRequest::Catchup(RegionCatchupRequest {
|
RegionRequest::Catchup(RegionCatchupRequest {
|
||||||
set_writable: true,
|
set_writable: true,
|
||||||
entry_id: None,
|
..Default::default()
|
||||||
metadata_entry_id: None,
|
|
||||||
location_id: None,
|
|
||||||
}),
|
}),
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
@@ -465,9 +457,7 @@ async fn test_catchup_with_manifest_update(factory: Option<LogStoreFactory>) {
|
|||||||
region_id,
|
region_id,
|
||||||
RegionRequest::Catchup(RegionCatchupRequest {
|
RegionRequest::Catchup(RegionCatchupRequest {
|
||||||
set_writable: false,
|
set_writable: false,
|
||||||
entry_id: None,
|
..Default::default()
|
||||||
metadata_entry_id: None,
|
|
||||||
location_id: None,
|
|
||||||
}),
|
}),
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
@@ -503,9 +493,7 @@ async fn test_catchup_with_manifest_update(factory: Option<LogStoreFactory>) {
|
|||||||
region_id,
|
region_id,
|
||||||
RegionRequest::Catchup(RegionCatchupRequest {
|
RegionRequest::Catchup(RegionCatchupRequest {
|
||||||
set_writable: true,
|
set_writable: true,
|
||||||
entry_id: None,
|
..Default::default()
|
||||||
metadata_entry_id: None,
|
|
||||||
location_id: None,
|
|
||||||
}),
|
}),
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
@@ -652,9 +640,7 @@ async fn test_local_catchup(factory: Option<LogStoreFactory>) {
|
|||||||
region_id,
|
region_id,
|
||||||
RegionRequest::Catchup(RegionCatchupRequest {
|
RegionRequest::Catchup(RegionCatchupRequest {
|
||||||
set_writable: true,
|
set_writable: true,
|
||||||
entry_id: None,
|
..Default::default()
|
||||||
metadata_entry_id: None,
|
|
||||||
location_id: None,
|
|
||||||
}),
|
}),
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
@@ -715,9 +701,7 @@ async fn test_catchup_not_exist() {
|
|||||||
non_exist_region_id,
|
non_exist_region_id,
|
||||||
RegionRequest::Catchup(RegionCatchupRequest {
|
RegionRequest::Catchup(RegionCatchupRequest {
|
||||||
set_writable: true,
|
set_writable: true,
|
||||||
entry_id: None,
|
..Default::default()
|
||||||
metadata_entry_id: None,
|
|
||||||
location_id: None,
|
|
||||||
}),
|
}),
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
|
|||||||
@@ -27,8 +27,8 @@ use crate::error::{
|
|||||||
self, InstallManifestToSnafu, NoCheckpointSnafu, NoManifestsSnafu, RegionStoppedSnafu, Result,
|
self, InstallManifestToSnafu, NoCheckpointSnafu, NoManifestsSnafu, RegionStoppedSnafu, Result,
|
||||||
};
|
};
|
||||||
use crate::manifest::action::{
|
use crate::manifest::action::{
|
||||||
RegionChange, RegionCheckpoint, RegionManifest, RegionManifestBuilder, RegionMetaAction,
|
RegionChange, RegionCheckpoint, RegionEdit, RegionManifest, RegionManifestBuilder,
|
||||||
RegionMetaActionList,
|
RegionMetaAction, RegionMetaActionList,
|
||||||
};
|
};
|
||||||
use crate::manifest::checkpointer::Checkpointer;
|
use crate::manifest::checkpointer::Checkpointer;
|
||||||
use crate::manifest::storage::{
|
use crate::manifest::storage::{
|
||||||
@@ -150,6 +150,7 @@ impl RegionManifestManager {
|
|||||||
/// Constructs a region's manifest and persist it.
|
/// Constructs a region's manifest and persist it.
|
||||||
pub async fn new(
|
pub async fn new(
|
||||||
metadata: RegionMetadataRef,
|
metadata: RegionMetadataRef,
|
||||||
|
flushed_entry_id: u64,
|
||||||
options: RegionManifestOptions,
|
options: RegionManifestOptions,
|
||||||
total_manifest_size: Arc<AtomicU64>,
|
total_manifest_size: Arc<AtomicU64>,
|
||||||
manifest_version: Arc<AtomicU64>,
|
manifest_version: Arc<AtomicU64>,
|
||||||
@@ -163,8 +164,8 @@ impl RegionManifestManager {
|
|||||||
);
|
);
|
||||||
|
|
||||||
info!(
|
info!(
|
||||||
"Creating region manifest in {} with metadata {:?}",
|
"Creating region manifest in {} with metadata {:?}, flushed_entry_id: {}",
|
||||||
options.manifest_dir, metadata
|
options.manifest_dir, metadata, flushed_entry_id
|
||||||
);
|
);
|
||||||
|
|
||||||
let version = MIN_VERSION;
|
let version = MIN_VERSION;
|
||||||
@@ -184,9 +185,21 @@ impl RegionManifestManager {
|
|||||||
options.manifest_dir, manifest
|
options.manifest_dir, manifest
|
||||||
);
|
);
|
||||||
|
|
||||||
|
let mut actions = vec![RegionMetaAction::Change(RegionChange { metadata })];
|
||||||
|
if flushed_entry_id > 0 {
|
||||||
|
actions.push(RegionMetaAction::Edit(RegionEdit {
|
||||||
|
files_to_add: vec![],
|
||||||
|
files_to_remove: vec![],
|
||||||
|
timestamp_ms: None,
|
||||||
|
compaction_time_window: None,
|
||||||
|
flushed_entry_id: Some(flushed_entry_id),
|
||||||
|
flushed_sequence: None,
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
|
||||||
// Persist region change.
|
// Persist region change.
|
||||||
let action_list =
|
let action_list = RegionMetaActionList::new(actions);
|
||||||
RegionMetaActionList::with_action(RegionMetaAction::Change(RegionChange { metadata }));
|
|
||||||
// New region is not in staging mode.
|
// New region is not in staging mode.
|
||||||
// TODO(ruihang): add staging mode support if needed.
|
// TODO(ruihang): add staging mode support if needed.
|
||||||
store.save(version, &action_list.encode()?, false).await?;
|
store.save(version, &action_list.encode()?, false).await?;
|
||||||
|
|||||||
@@ -1122,6 +1122,7 @@ mod tests {
|
|||||||
let staging_ctx = {
|
let staging_ctx = {
|
||||||
let manager = RegionManifestManager::new(
|
let manager = RegionManifestManager::new(
|
||||||
version_control.current().version.metadata.clone(),
|
version_control.current().version.metadata.clone(),
|
||||||
|
0,
|
||||||
RegionManifestOptions {
|
RegionManifestOptions {
|
||||||
manifest_dir: "".to_string(),
|
manifest_dir: "".to_string(),
|
||||||
object_store: env.access_layer.object_store().clone(),
|
object_store: env.access_layer.object_store().clone(),
|
||||||
@@ -1187,6 +1188,7 @@ mod tests {
|
|||||||
|
|
||||||
let manager = RegionManifestManager::new(
|
let manager = RegionManifestManager::new(
|
||||||
metadata.clone(),
|
metadata.clone(),
|
||||||
|
0,
|
||||||
RegionManifestOptions {
|
RegionManifestOptions {
|
||||||
manifest_dir: "".to_string(),
|
manifest_dir: "".to_string(),
|
||||||
object_store: access_layer.object_store().clone(),
|
object_store: access_layer.object_store().clone(),
|
||||||
|
|||||||
@@ -238,8 +238,11 @@ impl RegionOpener {
|
|||||||
// Create a manifest manager for this region and writes regions to the manifest file.
|
// Create a manifest manager for this region and writes regions to the manifest file.
|
||||||
let region_manifest_options =
|
let region_manifest_options =
|
||||||
Self::manifest_options(config, &options, ®ion_dir, &self.object_store_manager)?;
|
Self::manifest_options(config, &options, ®ion_dir, &self.object_store_manager)?;
|
||||||
|
// For remote WAL, we need to set flushed_entry_id to current topic's latest entry id.
|
||||||
|
let flushed_entry_id = provider.initial_flushed_entry_id::<S>(wal.store());
|
||||||
let manifest_manager = RegionManifestManager::new(
|
let manifest_manager = RegionManifestManager::new(
|
||||||
metadata.clone(),
|
metadata.clone(),
|
||||||
|
flushed_entry_id,
|
||||||
region_manifest_options,
|
region_manifest_options,
|
||||||
self.stats.total_manifest_size.clone(),
|
self.stats.total_manifest_size.clone(),
|
||||||
self.stats.manifest_version.clone(),
|
self.stats.manifest_version.clone(),
|
||||||
@@ -439,7 +442,7 @@ impl RegionOpener {
|
|||||||
.build();
|
.build();
|
||||||
let flushed_entry_id = version.flushed_entry_id;
|
let flushed_entry_id = version.flushed_entry_id;
|
||||||
let version_control = Arc::new(VersionControl::new(version));
|
let version_control = Arc::new(VersionControl::new(version));
|
||||||
if !self.skip_wal_replay {
|
let topic_latest_entry_id = if !self.skip_wal_replay {
|
||||||
let replay_from_entry_id = self
|
let replay_from_entry_id = self
|
||||||
.replay_checkpoint
|
.replay_checkpoint
|
||||||
.unwrap_or_default()
|
.unwrap_or_default()
|
||||||
@@ -461,14 +464,26 @@ impl RegionOpener {
|
|||||||
on_region_opened,
|
on_region_opened,
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
// For remote WAL, we need to set topic_latest_entry_id to current topic's latest entry id.
|
||||||
|
// Only set after the WAL replay is completed.
|
||||||
|
let topic_latest_entry_id = if provider.is_remote_wal()
|
||||||
|
&& version_control.current().version.memtables.is_empty()
|
||||||
|
{
|
||||||
|
wal.store().latest_entry_id(&provider).unwrap_or(0)
|
||||||
|
} else {
|
||||||
|
0
|
||||||
|
};
|
||||||
|
|
||||||
|
topic_latest_entry_id
|
||||||
} else {
|
} else {
|
||||||
info!(
|
info!(
|
||||||
"Skip the WAL replay for region: {}, manifest version: {}, flushed_entry_id: {}",
|
"Skip the WAL replay for region: {}, manifest version: {}, flushed_entry_id: {}",
|
||||||
region_id, manifest.manifest_version, flushed_entry_id
|
region_id, manifest.manifest_version, flushed_entry_id
|
||||||
);
|
);
|
||||||
}
|
|
||||||
let now = self.time_provider.current_time_millis();
|
|
||||||
|
|
||||||
|
0
|
||||||
|
};
|
||||||
|
let now = self.time_provider.current_time_millis();
|
||||||
let region = MitoRegion {
|
let region = MitoRegion {
|
||||||
region_id: self.region_id,
|
region_id: self.region_id,
|
||||||
version_control,
|
version_control,
|
||||||
@@ -483,7 +498,7 @@ impl RegionOpener {
|
|||||||
last_flush_millis: AtomicI64::new(now),
|
last_flush_millis: AtomicI64::new(now),
|
||||||
last_compaction_millis: AtomicI64::new(now),
|
last_compaction_millis: AtomicI64::new(now),
|
||||||
time_provider: self.time_provider.clone(),
|
time_provider: self.time_provider.clone(),
|
||||||
topic_latest_entry_id: AtomicU64::new(0),
|
topic_latest_entry_id: AtomicU64::new(topic_latest_entry_id),
|
||||||
write_bytes: Arc::new(AtomicU64::new(0)),
|
write_bytes: Arc::new(AtomicU64::new(0)),
|
||||||
memtable_builder,
|
memtable_builder,
|
||||||
stats: self.stats.clone(),
|
stats: self.stats.clone(),
|
||||||
@@ -713,8 +728,8 @@ where
|
|||||||
|
|
||||||
let series_count = version_control.current().series_count();
|
let series_count = version_control.current().series_count();
|
||||||
info!(
|
info!(
|
||||||
"Replay WAL for region: {}, rows recovered: {}, last entry id: {}, total timeseries replayed: {}, elapsed: {:?}",
|
"Replay WAL for region: {}, provider: {:?}, rows recovered: {}, replay from entry id: {}, last entry id: {}, total timeseries replayed: {}, elapsed: {:?}",
|
||||||
region_id, rows_replayed, last_entry_id, series_count, now.elapsed()
|
region_id, provider, rows_replayed, replay_from_entry_id, last_entry_id, series_count, now.elapsed()
|
||||||
);
|
);
|
||||||
Ok(last_entry_id)
|
Ok(last_entry_id)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -371,7 +371,7 @@ impl VersionBuilder {
|
|||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Sets truncated entty id.
|
/// Sets truncated entry id.
|
||||||
pub(crate) fn truncated_entry_id(mut self, entry_id: Option<EntryId>) -> Self {
|
pub(crate) fn truncated_entry_id(mut self, entry_id: Option<EntryId>) -> Self {
|
||||||
self.truncated_entry_id = entry_id;
|
self.truncated_entry_id = entry_id;
|
||||||
self
|
self
|
||||||
|
|||||||
@@ -137,6 +137,14 @@ impl FilePurger for LocalFilePurger {
|
|||||||
error!(e; "Failed to purge stager with index file, file_id: {}, region: {}",
|
error!(e; "Failed to purge stager with index file, file_id: {}, region: {}",
|
||||||
file_meta.file_id(), file_meta.region_id);
|
file_meta.file_id(), file_meta.region_id);
|
||||||
}
|
}
|
||||||
|
let file_id = file_meta.file_id();
|
||||||
|
if let Err(e) = sst_layer
|
||||||
|
.intermediate_manager()
|
||||||
|
.prune_sst_dir(&file_id.region_id(), &file_id.file_id())
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
error!(e; "Failed to prune intermediate sst directory, region_id: {}, file_id: {}", file_id.region_id(), file_id.file_id());
|
||||||
|
}
|
||||||
})) {
|
})) {
|
||||||
error!(e; "Failed to schedule the file purge request");
|
error!(e; "Failed to schedule the file purge request");
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -110,6 +110,7 @@ pub struct Indexer {
|
|||||||
last_mem_fulltext_index: usize,
|
last_mem_fulltext_index: usize,
|
||||||
bloom_filter_indexer: Option<BloomFilterIndexer>,
|
bloom_filter_indexer: Option<BloomFilterIndexer>,
|
||||||
last_mem_bloom_filter: usize,
|
last_mem_bloom_filter: usize,
|
||||||
|
intermediate_manager: Option<IntermediateManager>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Indexer {
|
impl Indexer {
|
||||||
@@ -196,6 +197,7 @@ impl IndexerBuilder for IndexerBuilderImpl {
|
|||||||
indexer.inverted_indexer = self.build_inverted_indexer(file_id);
|
indexer.inverted_indexer = self.build_inverted_indexer(file_id);
|
||||||
indexer.fulltext_indexer = self.build_fulltext_indexer(file_id).await;
|
indexer.fulltext_indexer = self.build_fulltext_indexer(file_id).await;
|
||||||
indexer.bloom_filter_indexer = self.build_bloom_filter_indexer(file_id);
|
indexer.bloom_filter_indexer = self.build_bloom_filter_indexer(file_id);
|
||||||
|
indexer.intermediate_manager = Some(self.intermediate_manager.clone());
|
||||||
if indexer.inverted_indexer.is_none()
|
if indexer.inverted_indexer.is_none()
|
||||||
&& indexer.fulltext_indexer.is_none()
|
&& indexer.fulltext_indexer.is_none()
|
||||||
&& indexer.bloom_filter_indexer.is_none()
|
&& indexer.bloom_filter_indexer.is_none()
|
||||||
|
|||||||
@@ -21,6 +21,7 @@ impl Indexer {
|
|||||||
self.do_abort_inverted_index().await;
|
self.do_abort_inverted_index().await;
|
||||||
self.do_abort_fulltext_index().await;
|
self.do_abort_fulltext_index().await;
|
||||||
self.do_abort_bloom_filter().await;
|
self.do_abort_bloom_filter().await;
|
||||||
|
self.do_prune_intm_sst_dir().await;
|
||||||
self.puffin_manager = None;
|
self.puffin_manager = None;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -54,6 +54,7 @@ impl Indexer {
|
|||||||
return IndexOutput::default();
|
return IndexOutput::default();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
self.do_prune_intm_sst_dir().await;
|
||||||
output.file_size = self.do_finish_puffin_writer(writer).await;
|
output.file_size = self.do_finish_puffin_writer(writer).await;
|
||||||
output
|
output
|
||||||
}
|
}
|
||||||
@@ -270,4 +271,12 @@ impl Indexer {
|
|||||||
output.row_count = row_count;
|
output.row_count = row_count;
|
||||||
output.columns = column_ids;
|
output.columns = column_ids;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub(crate) async fn do_prune_intm_sst_dir(&mut self) {
|
||||||
|
if let Some(manager) = self.intermediate_manager.take() {
|
||||||
|
if let Err(e) = manager.prune_sst_dir(&self.region_id, &self.file_id).await {
|
||||||
|
warn!(e; "Failed to prune intermediate SST directory, region_id: {}, file_id: {}", self.region_id, self.file_id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -54,14 +54,22 @@ impl IntermediateManager {
|
|||||||
aux_path.as_ref()
|
aux_path.as_ref()
|
||||||
);
|
);
|
||||||
|
|
||||||
|
// Remove the intermediate directory on bankground
|
||||||
|
let aux_pb = PathBuf::from(aux_path.as_ref());
|
||||||
|
let intm_dir = aux_pb.join(INTERMEDIATE_DIR);
|
||||||
|
let deleted_dir = intm_dir.with_extension(format!("deleted-{}", Uuid::new_v4()));
|
||||||
|
if let Err(err) = tokio::fs::rename(&intm_dir, &deleted_dir).await {
|
||||||
|
warn!(err; "Failed to rename intermediate directory");
|
||||||
|
}
|
||||||
|
tokio::spawn(async move {
|
||||||
|
if let Err(err) = tokio::fs::remove_dir_all(deleted_dir).await {
|
||||||
|
warn!(err; "Failed to remove intermediate directory");
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
let store = new_fs_cache_store(&normalize_dir(aux_path.as_ref())).await?;
|
let store = new_fs_cache_store(&normalize_dir(aux_path.as_ref())).await?;
|
||||||
let store = InstrumentedStore::new(store);
|
let store = InstrumentedStore::new(store);
|
||||||
|
|
||||||
// Remove all garbage intermediate files from previous runs.
|
|
||||||
if let Err(err) = store.remove_all(INTERMEDIATE_DIR).await {
|
|
||||||
warn!(err; "Failed to remove garbage intermediate files");
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
base_dir: PathBuf::from(aux_path.as_ref()),
|
base_dir: PathBuf::from(aux_path.as_ref()),
|
||||||
store,
|
store,
|
||||||
@@ -94,6 +102,24 @@ impl IntermediateManager {
|
|||||||
.join(sst_file_id.to_string())
|
.join(sst_file_id.to_string())
|
||||||
.join(format!("fulltext-{column_id}-{uuid}"))
|
.join(format!("fulltext-{column_id}-{uuid}"))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Prunes the intermediate directory for SST files.
|
||||||
|
pub(crate) async fn prune_sst_dir(
|
||||||
|
&self,
|
||||||
|
region_id: &RegionId,
|
||||||
|
sst_file_id: &FileId,
|
||||||
|
) -> Result<()> {
|
||||||
|
let region_id = region_id.as_u64();
|
||||||
|
let sst_dir = format!("{INTERMEDIATE_DIR}/{region_id}/{sst_file_id}/");
|
||||||
|
self.store.remove_all(&sst_dir).await
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Prunes the intermediate directory for region files.
|
||||||
|
pub(crate) async fn prune_region_dir(&self, region_id: &RegionId) -> Result<()> {
|
||||||
|
let region_id = region_id.as_u64();
|
||||||
|
let region_dir = format!("{INTERMEDIATE_DIR}/{region_id}/");
|
||||||
|
self.store.remove_all(®ion_dir).await
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// `IntermediateLocation` produces paths for intermediate files
|
/// `IntermediateLocation` produces paths for intermediate files
|
||||||
@@ -268,6 +294,60 @@ mod tests {
|
|||||||
.unwrap());
|
.unwrap());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_cleanup_dir() {
|
||||||
|
let temp_dir = temp_dir::create_temp_dir("test_cleanup_dir_");
|
||||||
|
|
||||||
|
let region_id = RegionId::new(0, 0);
|
||||||
|
let sst_file_id = FileId::random();
|
||||||
|
let region_dir = temp_dir
|
||||||
|
.path()
|
||||||
|
.join(INTERMEDIATE_DIR)
|
||||||
|
.join(region_id.as_u64().to_string());
|
||||||
|
let sst_dir = region_dir.join(sst_file_id.to_string());
|
||||||
|
|
||||||
|
let path = temp_dir.path().to_str().unwrap();
|
||||||
|
let manager = IntermediateManager::init_fs(path).await.unwrap();
|
||||||
|
|
||||||
|
let location = IntermediateLocation::new(®ion_id, &sst_file_id);
|
||||||
|
let temp_file_provider = TempFileProvider::new(location, manager.clone());
|
||||||
|
|
||||||
|
let mut f1 = temp_file_provider
|
||||||
|
.create("sky", "000000000000")
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
f1.write_all(b"hello").await.unwrap();
|
||||||
|
f1.flush().await.unwrap();
|
||||||
|
f1.close().await.unwrap();
|
||||||
|
|
||||||
|
let mut f2 = temp_file_provider
|
||||||
|
.create("sky", "000000000001")
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
f2.write_all(b"world").await.unwrap();
|
||||||
|
f2.flush().await.unwrap();
|
||||||
|
f2.close().await.unwrap();
|
||||||
|
|
||||||
|
temp_file_provider.cleanup().await.unwrap();
|
||||||
|
|
||||||
|
// sst_dir and region_dir still exists
|
||||||
|
assert!(tokio::fs::try_exists(&sst_dir).await.unwrap());
|
||||||
|
assert!(tokio::fs::try_exists(®ion_dir).await.unwrap());
|
||||||
|
|
||||||
|
// sst_dir should be deleted, region_dir still exists
|
||||||
|
manager
|
||||||
|
.prune_sst_dir(®ion_id, &sst_file_id)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
assert!(tokio::fs::try_exists(®ion_dir).await.unwrap());
|
||||||
|
assert!(!tokio::fs::try_exists(&sst_dir).await.unwrap());
|
||||||
|
|
||||||
|
// sst_dir, region_dir should be deleted
|
||||||
|
manager.prune_region_dir(®ion_id).await.unwrap();
|
||||||
|
assert!(!tokio::fs::try_exists(&sst_dir).await.unwrap());
|
||||||
|
assert!(!tokio::fs::try_exists(®ion_dir).await.unwrap());
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_intermediate_location() {
|
fn test_intermediate_location() {
|
||||||
let sst_file_id = FileId::random();
|
let sst_file_id = FileId::random();
|
||||||
|
|||||||
@@ -563,6 +563,7 @@ impl TestEnv {
|
|||||||
if let Some(metadata) = initial_metadata {
|
if let Some(metadata) = initial_metadata {
|
||||||
RegionManifestManager::new(
|
RegionManifestManager::new(
|
||||||
metadata,
|
metadata,
|
||||||
|
0,
|
||||||
manifest_opts,
|
manifest_opts,
|
||||||
Default::default(),
|
Default::default(),
|
||||||
Default::default(),
|
Default::default(),
|
||||||
|
|||||||
@@ -116,6 +116,7 @@ impl SchedulerEnv {
|
|||||||
Arc::new(ManifestContext::new(
|
Arc::new(ManifestContext::new(
|
||||||
RegionManifestManager::new(
|
RegionManifestManager::new(
|
||||||
metadata,
|
metadata,
|
||||||
|
0,
|
||||||
RegionManifestOptions {
|
RegionManifestOptions {
|
||||||
manifest_dir: "".to_string(),
|
manifest_dir: "".to_string(),
|
||||||
object_store: self.access_layer.object_store().clone(),
|
object_store: self.access_layer.object_store().clone(),
|
||||||
|
|||||||
@@ -65,7 +65,12 @@ impl<S: LogStore> RegionWorkerLoop<S> {
|
|||||||
|
|
||||||
if region.provider.is_remote_wal() {
|
if region.provider.is_remote_wal() {
|
||||||
let flushed_entry_id = region.version_control.current().last_entry_id;
|
let flushed_entry_id = region.version_control.current().last_entry_id;
|
||||||
info!("Trying to replay memtable for region: {region_id}, flushed entry id: {flushed_entry_id}");
|
let replay_from_entry_id = request
|
||||||
|
.checkpoint
|
||||||
|
.map(|c| c.entry_id)
|
||||||
|
.unwrap_or_default()
|
||||||
|
.max(flushed_entry_id);
|
||||||
|
info!("Trying to replay memtable for region: {region_id}, provider: {:?}, replay from entry id: {replay_from_entry_id}, flushed entry id: {flushed_entry_id}", region.provider);
|
||||||
let timer = Instant::now();
|
let timer = Instant::now();
|
||||||
let wal_entry_reader =
|
let wal_entry_reader =
|
||||||
self.wal
|
self.wal
|
||||||
@@ -75,15 +80,16 @@ impl<S: LogStore> RegionWorkerLoop<S> {
|
|||||||
®ion.provider,
|
®ion.provider,
|
||||||
wal_entry_reader,
|
wal_entry_reader,
|
||||||
region_id,
|
region_id,
|
||||||
flushed_entry_id,
|
replay_from_entry_id,
|
||||||
®ion.version_control,
|
®ion.version_control,
|
||||||
self.config.allow_stale_entries,
|
self.config.allow_stale_entries,
|
||||||
on_region_opened,
|
on_region_opened,
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
info!(
|
info!(
|
||||||
"Elapsed: {:?}, region: {region_id} catchup finished. last entry id: {last_entry_id}, expected: {:?}.",
|
"Elapsed: {:?}, region: {region_id}, provider: {:?} catchup finished. replay from entry id: {replay_from_entry_id}, flushed entry id: {flushed_entry_id}, last entry id: {last_entry_id}, expected: {:?}.",
|
||||||
timer.elapsed(),
|
timer.elapsed(),
|
||||||
|
region.provider,
|
||||||
request.entry_id
|
request.entry_id
|
||||||
);
|
);
|
||||||
if let Some(expected_last_entry_id) = request.entry_id {
|
if let Some(expected_last_entry_id) = request.entry_id {
|
||||||
|
|||||||
@@ -99,6 +99,7 @@ where
|
|||||||
let object_store = region.access_layer.object_store().clone();
|
let object_store = region.access_layer.object_store().clone();
|
||||||
let dropping_regions = self.dropping_regions.clone();
|
let dropping_regions = self.dropping_regions.clone();
|
||||||
let listener = self.listener.clone();
|
let listener = self.listener.clone();
|
||||||
|
let intm_manager = self.intermediate_manager.clone();
|
||||||
common_runtime::spawn_global(async move {
|
common_runtime::spawn_global(async move {
|
||||||
let gc_duration = listener
|
let gc_duration = listener
|
||||||
.on_later_drop_begin(region_id)
|
.on_later_drop_begin(region_id)
|
||||||
@@ -111,6 +112,9 @@ where
|
|||||||
gc_duration,
|
gc_duration,
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
|
if let Err(err) = intm_manager.prune_region_dir(®ion_id).await {
|
||||||
|
warn!(err; "Failed to prune intermediate region directory, region_id: {}", region_id);
|
||||||
|
}
|
||||||
listener.on_later_drop_end(region_id, removed);
|
listener.on_later_drop_end(region_id, removed);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|||||||
@@ -369,6 +369,9 @@ impl<H> BoundedStager<H> {
|
|||||||
/// Note: It can't recover the mapping between puffin files and keys, so TTL
|
/// Note: It can't recover the mapping between puffin files and keys, so TTL
|
||||||
/// is configured to purge the dangling files and directories.
|
/// is configured to purge the dangling files and directories.
|
||||||
async fn recover(&self) -> Result<()> {
|
async fn recover(&self) -> Result<()> {
|
||||||
|
let timer = std::time::Instant::now();
|
||||||
|
info!("Recovering the staging area, base_dir: {:?}", self.base_dir);
|
||||||
|
|
||||||
let mut read_dir = fs::read_dir(&self.base_dir).await.context(ReadSnafu)?;
|
let mut read_dir = fs::read_dir(&self.base_dir).await.context(ReadSnafu)?;
|
||||||
|
|
||||||
let mut elems = HashMap::new();
|
let mut elems = HashMap::new();
|
||||||
@@ -430,6 +433,7 @@ impl<H> BoundedStager<H> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
let mut size = 0;
|
let mut size = 0;
|
||||||
|
let num_elems = elems.len();
|
||||||
for (key, value) in elems {
|
for (key, value) in elems {
|
||||||
size += value.size();
|
size += value.size();
|
||||||
self.cache.insert(key, value).await;
|
self.cache.insert(key, value).await;
|
||||||
@@ -440,6 +444,12 @@ impl<H> BoundedStager<H> {
|
|||||||
|
|
||||||
self.cache.run_pending_tasks().await;
|
self.cache.run_pending_tasks().await;
|
||||||
|
|
||||||
|
info!(
|
||||||
|
"Recovered the staging area, num_entries: {}, num_bytes: {}, cost: {:?}",
|
||||||
|
num_elems,
|
||||||
|
size,
|
||||||
|
timer.elapsed()
|
||||||
|
);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -15,6 +15,7 @@
|
|||||||
use std::fmt::Display;
|
use std::fmt::Display;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use crate::logstore::LogStore;
|
||||||
use crate::storage::RegionId;
|
use crate::storage::RegionId;
|
||||||
|
|
||||||
// The Provider of kafka log store
|
// The Provider of kafka log store
|
||||||
@@ -78,6 +79,18 @@ impl Display for Provider {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl Provider {
|
impl Provider {
|
||||||
|
/// Returns the initial flushed entry id of the provider.
|
||||||
|
/// This is used to initialize the flushed entry id of the region when creating the region from scratch.
|
||||||
|
///
|
||||||
|
/// Currently only used for remote WAL.
|
||||||
|
/// For local WAL, the initial flushed entry id is 0.
|
||||||
|
pub fn initial_flushed_entry_id<S: LogStore>(&self, wal: &S) -> u64 {
|
||||||
|
if matches!(self, Provider::Kafka(_)) {
|
||||||
|
return wal.latest_entry_id(self).unwrap_or(0);
|
||||||
|
}
|
||||||
|
0
|
||||||
|
}
|
||||||
|
|
||||||
pub fn raft_engine_provider(id: u64) -> Provider {
|
pub fn raft_engine_provider(id: u64) -> Provider {
|
||||||
Provider::RaftEngine(RaftEngineProvider { id })
|
Provider::RaftEngine(RaftEngineProvider { id })
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1358,7 +1358,7 @@ pub enum RegionTruncateRequest {
|
|||||||
///
|
///
|
||||||
/// Makes a readonly region to catch up to leader region changes.
|
/// Makes a readonly region to catch up to leader region changes.
|
||||||
/// There is no effect if it operating on a leader region.
|
/// There is no effect if it operating on a leader region.
|
||||||
#[derive(Debug, Clone, Copy)]
|
#[derive(Debug, Clone, Copy, Default)]
|
||||||
pub struct RegionCatchupRequest {
|
pub struct RegionCatchupRequest {
|
||||||
/// Sets it to writable if it's available after it has caught up with all changes.
|
/// Sets it to writable if it's available after it has caught up with all changes.
|
||||||
pub set_writable: bool,
|
pub set_writable: bool,
|
||||||
@@ -1371,6 +1371,8 @@ pub struct RegionCatchupRequest {
|
|||||||
pub metadata_entry_id: Option<entry::Id>,
|
pub metadata_entry_id: Option<entry::Id>,
|
||||||
/// The hint for replaying memtable.
|
/// The hint for replaying memtable.
|
||||||
pub location_id: Option<u64>,
|
pub location_id: Option<u64>,
|
||||||
|
/// Replay checkpoint.
|
||||||
|
pub checkpoint: Option<ReplayCheckpoint>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get sequences of regions by region ids.
|
/// Get sequences of regions by region ids.
|
||||||
|
|||||||
@@ -15,11 +15,11 @@ Affected Rows: 0
|
|||||||
|
|
||||||
SHOW CREATE TABLE cnt_reqs;
|
SHOW CREATE TABLE cnt_reqs;
|
||||||
|
|
||||||
+----------+-----------------------------------------+
|
+----------+-------------------------------------------+
|
||||||
| Table | Create Table |
|
| Table | Create Table |
|
||||||
+----------+-----------------------------------------+
|
+----------+-------------------------------------------+
|
||||||
| cnt_reqs | CREATE TABLE IF NOT EXISTS "cnt_reqs" ( |
|
| cnt_reqs | CREATE TABLE IF NOT EXISTS "cnt_reqs" ( |
|
||||||
| | "val" DOUBLE NULL, |
|
| | "count(http_requests.val)" DOUBLE NULL, |
|
||||||
| | "ts" TIMESTAMP(3) NOT NULL, |
|
| | "ts" TIMESTAMP(3) NOT NULL, |
|
||||||
| | "status_code" STRING NULL, |
|
| | "status_code" STRING NULL, |
|
||||||
| | TIME INDEX ("ts"), |
|
| | TIME INDEX ("ts"), |
|
||||||
@@ -28,7 +28,13 @@ SHOW CREATE TABLE cnt_reqs;
|
|||||||
| | |
|
| | |
|
||||||
| | ENGINE=mito |
|
| | ENGINE=mito |
|
||||||
| | |
|
| | |
|
||||||
+----------+-----------------------------------------+
|
+----------+-------------------------------------------+
|
||||||
|
|
||||||
|
-- test if sink table is tql queryable
|
||||||
|
TQL EVAL (now() - '1m'::interval, now(), '5s') count_values("status_code", cnt_reqs);
|
||||||
|
|
||||||
|
++
|
||||||
|
++
|
||||||
|
|
||||||
INSERT INTO TABLE http_requests VALUES
|
INSERT INTO TABLE http_requests VALUES
|
||||||
(now() - '17s'::interval, 'host1', 'idc1', 200),
|
(now() - '17s'::interval, 'host1', 'idc1', 200),
|
||||||
@@ -80,6 +86,43 @@ DROP TABLE cnt_reqs;
|
|||||||
|
|
||||||
Affected Rows: 0
|
Affected Rows: 0
|
||||||
|
|
||||||
|
CREATE TABLE http_requests_two_vals (
|
||||||
|
ts timestamp(3) time index,
|
||||||
|
host STRING,
|
||||||
|
idc STRING,
|
||||||
|
val DOUBLE,
|
||||||
|
valb DOUBLE,
|
||||||
|
PRIMARY KEY(host, idc),
|
||||||
|
);
|
||||||
|
|
||||||
|
Affected Rows: 0
|
||||||
|
|
||||||
|
-- should failed with two value columns error
|
||||||
|
CREATE FLOW calc_reqs SINK TO cnt_reqs EVAL INTERVAL '1m' AS
|
||||||
|
TQL EVAL (now() - '1m'::interval, now(), '5s') count_values("status_code", http_requests_two_vals);
|
||||||
|
|
||||||
|
Error: 3001(EngineExecuteQuery), Unsupported expr type: count_values on multi-value input
|
||||||
|
|
||||||
|
-- should failed with two value columns error
|
||||||
|
-- SQLNESS REPLACE id=[0-9]+ id=[REDACTED]
|
||||||
|
CREATE FLOW calc_reqs SINK TO cnt_reqs EVAL INTERVAL '1m' AS
|
||||||
|
TQL EVAL (now() - '1m'::interval, now(), '5s') rate(http_requests_two_vals[5m]);
|
||||||
|
|
||||||
|
Error: 3001(EngineExecuteQuery), Invalid query: TQL query only supports one f64 value column, table `greptime.public.http_requests_two_vals`(id=[REDACTED]) has 2 f64 value columns, columns are: [val Float64 null, valb Float64 null]
|
||||||
|
|
||||||
|
SHOW TABLES;
|
||||||
|
|
||||||
|
+------------------------+
|
||||||
|
| Tables |
|
||||||
|
+------------------------+
|
||||||
|
| http_requests_two_vals |
|
||||||
|
| numbers |
|
||||||
|
+------------------------+
|
||||||
|
|
||||||
|
DROP TABLE http_requests_two_vals;
|
||||||
|
|
||||||
|
Affected Rows: 0
|
||||||
|
|
||||||
CREATE TABLE http_requests (
|
CREATE TABLE http_requests (
|
||||||
ts timestamp(3) time index,
|
ts timestamp(3) time index,
|
||||||
host STRING,
|
host STRING,
|
||||||
@@ -114,11 +157,11 @@ Affected Rows: 0
|
|||||||
|
|
||||||
SHOW CREATE TABLE cnt_reqs;
|
SHOW CREATE TABLE cnt_reqs;
|
||||||
|
|
||||||
+----------+-----------------------------------------+
|
+----------+-------------------------------------------+
|
||||||
| Table | Create Table |
|
| Table | Create Table |
|
||||||
+----------+-----------------------------------------+
|
+----------+-------------------------------------------+
|
||||||
| cnt_reqs | CREATE TABLE IF NOT EXISTS "cnt_reqs" ( |
|
| cnt_reqs | CREATE TABLE IF NOT EXISTS "cnt_reqs" ( |
|
||||||
| | "val" DOUBLE NULL, |
|
| | "count(http_requests.val)" DOUBLE NULL, |
|
||||||
| | "ts" TIMESTAMP(3) NOT NULL, |
|
| | "ts" TIMESTAMP(3) NOT NULL, |
|
||||||
| | "status_code" STRING NULL, |
|
| | "status_code" STRING NULL, |
|
||||||
| | TIME INDEX ("ts"), |
|
| | TIME INDEX ("ts"), |
|
||||||
@@ -127,7 +170,13 @@ SHOW CREATE TABLE cnt_reqs;
|
|||||||
| | |
|
| | |
|
||||||
| | ENGINE=mito |
|
| | ENGINE=mito |
|
||||||
| | |
|
| | |
|
||||||
+----------+-----------------------------------------+
|
+----------+-------------------------------------------+
|
||||||
|
|
||||||
|
-- test if sink table is tql queryable
|
||||||
|
TQL EVAL (now() - '1m'::interval, now(), '5s') count_values("status_code", cnt_reqs);
|
||||||
|
|
||||||
|
++
|
||||||
|
++
|
||||||
|
|
||||||
INSERT INTO TABLE http_requests VALUES
|
INSERT INTO TABLE http_requests VALUES
|
||||||
(0::Timestamp, 'host1', 'idc1', 200),
|
(0::Timestamp, 'host1', 'idc1', 200),
|
||||||
@@ -158,11 +207,11 @@ ADMIN FLUSH_FLOW('calc_reqs');
|
|||||||
| FLOW_FLUSHED |
|
| FLOW_FLUSHED |
|
||||||
+-------------------------------+
|
+-------------------------------+
|
||||||
|
|
||||||
SELECT val, ts, status_code FROM cnt_reqs ORDER BY ts, status_code;
|
SELECT * FROM cnt_reqs ORDER BY ts, status_code;
|
||||||
|
|
||||||
+-----+---------------------+-------------+
|
+--------------------------+---------------------+-------------+
|
||||||
| val | ts | status_code |
|
| count(http_requests.val) | ts | status_code |
|
||||||
+-----+---------------------+-------------+
|
+--------------------------+---------------------+-------------+
|
||||||
| 3.0 | 1970-01-01T00:00:00 | 200.0 |
|
| 3.0 | 1970-01-01T00:00:00 | 200.0 |
|
||||||
| 1.0 | 1970-01-01T00:00:00 | 401.0 |
|
| 1.0 | 1970-01-01T00:00:00 | 401.0 |
|
||||||
| 1.0 | 1970-01-01T00:00:05 | 401.0 |
|
| 1.0 | 1970-01-01T00:00:05 | 401.0 |
|
||||||
@@ -171,7 +220,7 @@ SELECT val, ts, status_code FROM cnt_reqs ORDER BY ts, status_code;
|
|||||||
| 2.0 | 1970-01-01T00:00:10 | 200.0 |
|
| 2.0 | 1970-01-01T00:00:10 | 200.0 |
|
||||||
| 2.0 | 1970-01-01T00:00:10 | 201.0 |
|
| 2.0 | 1970-01-01T00:00:10 | 201.0 |
|
||||||
| 4.0 | 1970-01-01T00:00:15 | 500.0 |
|
| 4.0 | 1970-01-01T00:00:15 | 500.0 |
|
||||||
+-----+---------------------+-------------+
|
+--------------------------+---------------------+-------------+
|
||||||
|
|
||||||
DROP FLOW calc_reqs;
|
DROP FLOW calc_reqs;
|
||||||
|
|
||||||
@@ -199,18 +248,24 @@ Affected Rows: 0
|
|||||||
|
|
||||||
SHOW CREATE TABLE rate_reqs;
|
SHOW CREATE TABLE rate_reqs;
|
||||||
|
|
||||||
+-----------+------------------------------------------+
|
+-----------+-----------------------------------------------------------+
|
||||||
| Table | Create Table |
|
| Table | Create Table |
|
||||||
+-----------+------------------------------------------+
|
+-----------+-----------------------------------------------------------+
|
||||||
| rate_reqs | CREATE TABLE IF NOT EXISTS "rate_reqs" ( |
|
| rate_reqs | CREATE TABLE IF NOT EXISTS "rate_reqs" ( |
|
||||||
| | "ts" TIMESTAMP(3) NOT NULL, |
|
| | "ts" TIMESTAMP(3) NOT NULL, |
|
||||||
| | "val" DOUBLE NULL, |
|
| | "prom_rate(ts_range,val,ts,Int64(300000))" DOUBLE NULL, |
|
||||||
| | TIME INDEX ("ts") |
|
| | TIME INDEX ("ts") |
|
||||||
| | ) |
|
| | ) |
|
||||||
| | |
|
| | |
|
||||||
| | ENGINE=mito |
|
| | ENGINE=mito |
|
||||||
| | |
|
| | |
|
||||||
+-----------+------------------------------------------+
|
+-----------+-----------------------------------------------------------+
|
||||||
|
|
||||||
|
-- test if sink table is tql queryable
|
||||||
|
TQL EVAL (now() - '1m'::interval, now(), '5s') count_values("status_code", rate_reqs);
|
||||||
|
|
||||||
|
++
|
||||||
|
++
|
||||||
|
|
||||||
INSERT INTO TABLE http_requests VALUES
|
INSERT INTO TABLE http_requests VALUES
|
||||||
(now() - '1m'::interval, 0),
|
(now() - '1m'::interval, 0),
|
||||||
@@ -248,3 +303,84 @@ DROP TABLE rate_reqs;
|
|||||||
|
|
||||||
Affected Rows: 0
|
Affected Rows: 0
|
||||||
|
|
||||||
|
CREATE TABLE http_requests_total (
|
||||||
|
host STRING,
|
||||||
|
job STRING,
|
||||||
|
instance STRING,
|
||||||
|
byte DOUBLE,
|
||||||
|
ts TIMESTAMP TIME INDEX,
|
||||||
|
PRIMARY KEY (host, job, instance)
|
||||||
|
);
|
||||||
|
|
||||||
|
Affected Rows: 0
|
||||||
|
|
||||||
|
CREATE FLOW calc_rate
|
||||||
|
SINK TO rate_reqs
|
||||||
|
EVAL INTERVAL '1m' AS
|
||||||
|
TQL EVAL (now() - '1m'::interval, now(), '30s') rate(http_requests_total{job="my_service"}[1m]);
|
||||||
|
|
||||||
|
Affected Rows: 0
|
||||||
|
|
||||||
|
SHOW CREATE TABLE rate_reqs;
|
||||||
|
|
||||||
|
+-----------+-----------------------------------------------------------+
|
||||||
|
| Table | Create Table |
|
||||||
|
+-----------+-----------------------------------------------------------+
|
||||||
|
| rate_reqs | CREATE TABLE IF NOT EXISTS "rate_reqs" ( |
|
||||||
|
| | "ts" TIMESTAMP(3) NOT NULL, |
|
||||||
|
| | "prom_rate(ts_range,byte,ts,Int64(60000))" DOUBLE NULL, |
|
||||||
|
| | "host" STRING NULL, |
|
||||||
|
| | "job" STRING NULL, |
|
||||||
|
| | "instance" STRING NULL, |
|
||||||
|
| | TIME INDEX ("ts"), |
|
||||||
|
| | PRIMARY KEY ("host", "job", "instance") |
|
||||||
|
| | ) |
|
||||||
|
| | |
|
||||||
|
| | ENGINE=mito |
|
||||||
|
| | |
|
||||||
|
+-----------+-----------------------------------------------------------+
|
||||||
|
|
||||||
|
-- test if sink table is tql queryable
|
||||||
|
TQL EVAL (now() - '1m'::interval, now(), '5s') count_values("status_code", rate_reqs);
|
||||||
|
|
||||||
|
++
|
||||||
|
++
|
||||||
|
|
||||||
|
INSERT INTO TABLE http_requests_total VALUES
|
||||||
|
('localhost', 'my_service', 'instance1', 100, now() - '1min'::interval),
|
||||||
|
('localhost', 'my_service', 'instance1', 200, now() - '45s'::interval),
|
||||||
|
('remotehost', 'my_service', 'instance1', 300, now() - '30s'::interval),
|
||||||
|
('remotehost', 'their_service', 'instance1', 300, now() - '15s'::interval),
|
||||||
|
('localhost', 'my_service', 'instance1', 400, now());
|
||||||
|
|
||||||
|
Affected Rows: 5
|
||||||
|
|
||||||
|
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
|
||||||
|
ADMIN FLUSH_FLOW('calc_rate');
|
||||||
|
|
||||||
|
+-------------------------------+
|
||||||
|
| ADMIN FLUSH_FLOW('calc_rate') |
|
||||||
|
+-------------------------------+
|
||||||
|
| FLOW_FLUSHED |
|
||||||
|
+-------------------------------+
|
||||||
|
|
||||||
|
SELECT count(*)>0 FROM rate_reqs;
|
||||||
|
|
||||||
|
+---------------------+
|
||||||
|
| count(*) > Int64(0) |
|
||||||
|
+---------------------+
|
||||||
|
| true |
|
||||||
|
+---------------------+
|
||||||
|
|
||||||
|
DROP FLOW calc_rate;
|
||||||
|
|
||||||
|
Affected Rows: 0
|
||||||
|
|
||||||
|
DROP TABLE http_requests_total;
|
||||||
|
|
||||||
|
Affected Rows: 0
|
||||||
|
|
||||||
|
DROP TABLE rate_reqs;
|
||||||
|
|
||||||
|
Affected Rows: 0
|
||||||
|
|
||||||
|
|||||||
@@ -22,15 +22,17 @@ INSERT INTO test VALUES
|
|||||||
Affected Rows: 3
|
Affected Rows: 3
|
||||||
|
|
||||||
-- SQLNESS SLEEP 3s
|
-- SQLNESS SLEEP 3s
|
||||||
SELECT SUM(region_rows), SUM(disk_size), SUM(sst_size), SUM(index_size)
|
-- For regions using different WAL implementations, the manifest size may vary.
|
||||||
|
-- The remote WAL implementation additionally stores a flushed entry ID when creating the manifest.
|
||||||
|
SELECT SUM(region_rows), SUM(memtable_size), SUM(sst_size), SUM(index_size)
|
||||||
FROM INFORMATION_SCHEMA.REGION_STATISTICS WHERE table_id
|
FROM INFORMATION_SCHEMA.REGION_STATISTICS WHERE table_id
|
||||||
IN (SELECT TABLE_ID FROM INFORMATION_SCHEMA.TABLES WHERE table_name = 'test' and table_schema = 'public');
|
IN (SELECT TABLE_ID FROM INFORMATION_SCHEMA.TABLES WHERE table_name = 'test' and table_schema = 'public');
|
||||||
|
|
||||||
+-------------------------------------------------------+-----------------------------------------------------+----------------------------------------------------+------------------------------------------------------+
|
+-------------------------------------------------------+---------------------------------------------------------+----------------------------------------------------+------------------------------------------------------+
|
||||||
| sum(information_schema.region_statistics.region_rows) | sum(information_schema.region_statistics.disk_size) | sum(information_schema.region_statistics.sst_size) | sum(information_schema.region_statistics.index_size) |
|
| sum(information_schema.region_statistics.region_rows) | sum(information_schema.region_statistics.memtable_size) | sum(information_schema.region_statistics.sst_size) | sum(information_schema.region_statistics.index_size) |
|
||||||
+-------------------------------------------------------+-----------------------------------------------------+----------------------------------------------------+------------------------------------------------------+
|
+-------------------------------------------------------+---------------------------------------------------------+----------------------------------------------------+------------------------------------------------------+
|
||||||
| 3 | 2699 | 0 | 0 |
|
| 3 | 78 | 0 | 0 |
|
||||||
+-------------------------------------------------------+-----------------------------------------------------+----------------------------------------------------+------------------------------------------------------+
|
+-------------------------------------------------------+---------------------------------------------------------+----------------------------------------------------+------------------------------------------------------+
|
||||||
|
|
||||||
SELECT data_length, index_length, avg_row_length, table_rows FROM INFORMATION_SCHEMA.TABLES WHERE table_name = 'test';
|
SELECT data_length, index_length, avg_row_length, table_rows FROM INFORMATION_SCHEMA.TABLES WHERE table_name = 'test';
|
||||||
|
|
||||||
|
|||||||
@@ -17,7 +17,9 @@ INSERT INTO test VALUES
|
|||||||
(21, 'c', 21);
|
(21, 'c', 21);
|
||||||
|
|
||||||
-- SQLNESS SLEEP 3s
|
-- SQLNESS SLEEP 3s
|
||||||
SELECT SUM(region_rows), SUM(disk_size), SUM(sst_size), SUM(index_size)
|
-- For regions using different WAL implementations, the manifest size may vary.
|
||||||
|
-- The remote WAL implementation additionally stores a flushed entry ID when creating the manifest.
|
||||||
|
SELECT SUM(region_rows), SUM(memtable_size), SUM(sst_size), SUM(index_size)
|
||||||
FROM INFORMATION_SCHEMA.REGION_STATISTICS WHERE table_id
|
FROM INFORMATION_SCHEMA.REGION_STATISTICS WHERE table_id
|
||||||
IN (SELECT TABLE_ID FROM INFORMATION_SCHEMA.TABLES WHERE table_name = 'test' and table_schema = 'public');
|
IN (SELECT TABLE_ID FROM INFORMATION_SCHEMA.TABLES WHERE table_name = 'test' and table_schema = 'public');
|
||||||
|
|
||||||
|
|||||||
@@ -15,11 +15,11 @@ Affected Rows: 0
|
|||||||
|
|
||||||
SHOW CREATE TABLE cnt_reqs;
|
SHOW CREATE TABLE cnt_reqs;
|
||||||
|
|
||||||
+----------+-----------------------------------------+
|
+----------+-------------------------------------------+
|
||||||
| Table | Create Table |
|
| Table | Create Table |
|
||||||
+----------+-----------------------------------------+
|
+----------+-------------------------------------------+
|
||||||
| cnt_reqs | CREATE TABLE IF NOT EXISTS "cnt_reqs" ( |
|
| cnt_reqs | CREATE TABLE IF NOT EXISTS "cnt_reqs" ( |
|
||||||
| | "val" DOUBLE NULL, |
|
| | "count(http_requests.val)" DOUBLE NULL, |
|
||||||
| | "ts" TIMESTAMP(3) NOT NULL, |
|
| | "ts" TIMESTAMP(3) NOT NULL, |
|
||||||
| | "status_code" STRING NULL, |
|
| | "status_code" STRING NULL, |
|
||||||
| | TIME INDEX ("ts"), |
|
| | TIME INDEX ("ts"), |
|
||||||
@@ -28,7 +28,13 @@ SHOW CREATE TABLE cnt_reqs;
|
|||||||
| | |
|
| | |
|
||||||
| | ENGINE=mito |
|
| | ENGINE=mito |
|
||||||
| | |
|
| | |
|
||||||
+----------+-----------------------------------------+
|
+----------+-------------------------------------------+
|
||||||
|
|
||||||
|
-- test if sink table is tql queryable
|
||||||
|
TQL EVAL (now() - '1m'::interval, now(), '5s') count_values("status_code", cnt_reqs);
|
||||||
|
|
||||||
|
++
|
||||||
|
++
|
||||||
|
|
||||||
INSERT INTO TABLE http_requests VALUES
|
INSERT INTO TABLE http_requests VALUES
|
||||||
(now() - '17s'::interval, 'host1', 'idc1', 200),
|
(now() - '17s'::interval, 'host1', 'idc1', 200),
|
||||||
@@ -80,6 +86,43 @@ DROP TABLE cnt_reqs;
|
|||||||
|
|
||||||
Affected Rows: 0
|
Affected Rows: 0
|
||||||
|
|
||||||
|
CREATE TABLE http_requests_two_vals (
|
||||||
|
ts timestamp(3) time index,
|
||||||
|
host STRING,
|
||||||
|
idc STRING,
|
||||||
|
val DOUBLE,
|
||||||
|
valb DOUBLE,
|
||||||
|
PRIMARY KEY(host, idc),
|
||||||
|
);
|
||||||
|
|
||||||
|
Affected Rows: 0
|
||||||
|
|
||||||
|
-- should failed with two value columns error
|
||||||
|
CREATE FLOW calc_reqs SINK TO cnt_reqs EVAL INTERVAL '1m' AS
|
||||||
|
TQL EVAL (now() - '1m'::interval, now(), '5s') count_values("status_code", http_requests_two_vals);
|
||||||
|
|
||||||
|
Error: 3001(EngineExecuteQuery), Unsupported expr type: count_values on multi-value input
|
||||||
|
|
||||||
|
-- should failed with two value columns error
|
||||||
|
-- SQLNESS REPLACE id=[0-9]+ id=[REDACTED]
|
||||||
|
CREATE FLOW calc_reqs SINK TO cnt_reqs EVAL INTERVAL '1m' AS
|
||||||
|
TQL EVAL (now() - '1m'::interval, now(), '5s') rate(http_requests_two_vals[5m]);
|
||||||
|
|
||||||
|
Error: 3001(EngineExecuteQuery), Invalid query: TQL query only supports one f64 value column, table `greptime.public.http_requests_two_vals`(id=[REDACTED]) has 2 f64 value columns, columns are: [val Float64 null, valb Float64 null]
|
||||||
|
|
||||||
|
SHOW TABLES;
|
||||||
|
|
||||||
|
+------------------------+
|
||||||
|
| Tables |
|
||||||
|
+------------------------+
|
||||||
|
| http_requests_two_vals |
|
||||||
|
| numbers |
|
||||||
|
+------------------------+
|
||||||
|
|
||||||
|
DROP TABLE http_requests_two_vals;
|
||||||
|
|
||||||
|
Affected Rows: 0
|
||||||
|
|
||||||
CREATE TABLE http_requests (
|
CREATE TABLE http_requests (
|
||||||
ts timestamp(3) time index,
|
ts timestamp(3) time index,
|
||||||
host STRING,
|
host STRING,
|
||||||
@@ -114,11 +157,11 @@ Affected Rows: 0
|
|||||||
|
|
||||||
SHOW CREATE TABLE cnt_reqs;
|
SHOW CREATE TABLE cnt_reqs;
|
||||||
|
|
||||||
+----------+-----------------------------------------+
|
+----------+-------------------------------------------+
|
||||||
| Table | Create Table |
|
| Table | Create Table |
|
||||||
+----------+-----------------------------------------+
|
+----------+-------------------------------------------+
|
||||||
| cnt_reqs | CREATE TABLE IF NOT EXISTS "cnt_reqs" ( |
|
| cnt_reqs | CREATE TABLE IF NOT EXISTS "cnt_reqs" ( |
|
||||||
| | "val" DOUBLE NULL, |
|
| | "count(http_requests.val)" DOUBLE NULL, |
|
||||||
| | "ts" TIMESTAMP(3) NOT NULL, |
|
| | "ts" TIMESTAMP(3) NOT NULL, |
|
||||||
| | "status_code" STRING NULL, |
|
| | "status_code" STRING NULL, |
|
||||||
| | TIME INDEX ("ts"), |
|
| | TIME INDEX ("ts"), |
|
||||||
@@ -127,7 +170,13 @@ SHOW CREATE TABLE cnt_reqs;
|
|||||||
| | |
|
| | |
|
||||||
| | ENGINE=mito |
|
| | ENGINE=mito |
|
||||||
| | |
|
| | |
|
||||||
+----------+-----------------------------------------+
|
+----------+-------------------------------------------+
|
||||||
|
|
||||||
|
-- test if sink table is tql queryable
|
||||||
|
TQL EVAL (now() - '1m'::interval, now(), '5s') count_values("status_code", cnt_reqs);
|
||||||
|
|
||||||
|
++
|
||||||
|
++
|
||||||
|
|
||||||
INSERT INTO TABLE http_requests VALUES
|
INSERT INTO TABLE http_requests VALUES
|
||||||
(0::Timestamp, 'host1', 'idc1', 200),
|
(0::Timestamp, 'host1', 'idc1', 200),
|
||||||
@@ -158,11 +207,11 @@ ADMIN FLUSH_FLOW('calc_reqs');
|
|||||||
| FLOW_FLUSHED |
|
| FLOW_FLUSHED |
|
||||||
+-------------------------------+
|
+-------------------------------+
|
||||||
|
|
||||||
SELECT val, ts, status_code FROM cnt_reqs ORDER BY ts, status_code;
|
SELECT * FROM cnt_reqs ORDER BY ts, status_code;
|
||||||
|
|
||||||
+-----+---------------------+-------------+
|
+--------------------------+---------------------+-------------+
|
||||||
| val | ts | status_code |
|
| count(http_requests.val) | ts | status_code |
|
||||||
+-----+---------------------+-------------+
|
+--------------------------+---------------------+-------------+
|
||||||
| 3.0 | 1970-01-01T00:00:00 | 200.0 |
|
| 3.0 | 1970-01-01T00:00:00 | 200.0 |
|
||||||
| 1.0 | 1970-01-01T00:00:00 | 401.0 |
|
| 1.0 | 1970-01-01T00:00:00 | 401.0 |
|
||||||
| 1.0 | 1970-01-01T00:00:05 | 401.0 |
|
| 1.0 | 1970-01-01T00:00:05 | 401.0 |
|
||||||
@@ -171,7 +220,7 @@ SELECT val, ts, status_code FROM cnt_reqs ORDER BY ts, status_code;
|
|||||||
| 2.0 | 1970-01-01T00:00:10 | 200.0 |
|
| 2.0 | 1970-01-01T00:00:10 | 200.0 |
|
||||||
| 2.0 | 1970-01-01T00:00:10 | 201.0 |
|
| 2.0 | 1970-01-01T00:00:10 | 201.0 |
|
||||||
| 4.0 | 1970-01-01T00:00:15 | 500.0 |
|
| 4.0 | 1970-01-01T00:00:15 | 500.0 |
|
||||||
+-----+---------------------+-------------+
|
+--------------------------+---------------------+-------------+
|
||||||
|
|
||||||
DROP FLOW calc_reqs;
|
DROP FLOW calc_reqs;
|
||||||
|
|
||||||
@@ -199,18 +248,24 @@ Affected Rows: 0
|
|||||||
|
|
||||||
SHOW CREATE TABLE rate_reqs;
|
SHOW CREATE TABLE rate_reqs;
|
||||||
|
|
||||||
+-----------+------------------------------------------+
|
+-----------+-----------------------------------------------------------+
|
||||||
| Table | Create Table |
|
| Table | Create Table |
|
||||||
+-----------+------------------------------------------+
|
+-----------+-----------------------------------------------------------+
|
||||||
| rate_reqs | CREATE TABLE IF NOT EXISTS "rate_reqs" ( |
|
| rate_reqs | CREATE TABLE IF NOT EXISTS "rate_reqs" ( |
|
||||||
| | "ts" TIMESTAMP(3) NOT NULL, |
|
| | "ts" TIMESTAMP(3) NOT NULL, |
|
||||||
| | "val" DOUBLE NULL, |
|
| | "prom_rate(ts_range,val,ts,Int64(300000))" DOUBLE NULL, |
|
||||||
| | TIME INDEX ("ts") |
|
| | TIME INDEX ("ts") |
|
||||||
| | ) |
|
| | ) |
|
||||||
| | |
|
| | |
|
||||||
| | ENGINE=mito |
|
| | ENGINE=mito |
|
||||||
| | |
|
| | |
|
||||||
+-----------+------------------------------------------+
|
+-----------+-----------------------------------------------------------+
|
||||||
|
|
||||||
|
-- test if sink table is tql queryable
|
||||||
|
TQL EVAL (now() - '1m'::interval, now(), '5s') count_values("status_code", rate_reqs);
|
||||||
|
|
||||||
|
++
|
||||||
|
++
|
||||||
|
|
||||||
INSERT INTO TABLE http_requests VALUES
|
INSERT INTO TABLE http_requests VALUES
|
||||||
(now() - '1m'::interval, 0),
|
(now() - '1m'::interval, 0),
|
||||||
@@ -248,3 +303,84 @@ DROP TABLE rate_reqs;
|
|||||||
|
|
||||||
Affected Rows: 0
|
Affected Rows: 0
|
||||||
|
|
||||||
|
CREATE TABLE http_requests_total (
|
||||||
|
host STRING,
|
||||||
|
job STRING,
|
||||||
|
instance STRING,
|
||||||
|
byte DOUBLE,
|
||||||
|
ts TIMESTAMP TIME INDEX,
|
||||||
|
PRIMARY KEY (host, job, instance)
|
||||||
|
);
|
||||||
|
|
||||||
|
Affected Rows: 0
|
||||||
|
|
||||||
|
CREATE FLOW calc_rate
|
||||||
|
SINK TO rate_reqs
|
||||||
|
EVAL INTERVAL '1m' AS
|
||||||
|
TQL EVAL (now() - '1m'::interval, now(), '30s') rate(http_requests_total{job="my_service"}[1m]);
|
||||||
|
|
||||||
|
Affected Rows: 0
|
||||||
|
|
||||||
|
SHOW CREATE TABLE rate_reqs;
|
||||||
|
|
||||||
|
+-----------+-----------------------------------------------------------+
|
||||||
|
| Table | Create Table |
|
||||||
|
+-----------+-----------------------------------------------------------+
|
||||||
|
| rate_reqs | CREATE TABLE IF NOT EXISTS "rate_reqs" ( |
|
||||||
|
| | "ts" TIMESTAMP(3) NOT NULL, |
|
||||||
|
| | "prom_rate(ts_range,byte,ts,Int64(60000))" DOUBLE NULL, |
|
||||||
|
| | "host" STRING NULL, |
|
||||||
|
| | "job" STRING NULL, |
|
||||||
|
| | "instance" STRING NULL, |
|
||||||
|
| | TIME INDEX ("ts"), |
|
||||||
|
| | PRIMARY KEY ("host", "job", "instance") |
|
||||||
|
| | ) |
|
||||||
|
| | |
|
||||||
|
| | ENGINE=mito |
|
||||||
|
| | |
|
||||||
|
+-----------+-----------------------------------------------------------+
|
||||||
|
|
||||||
|
-- test if sink table is tql queryable
|
||||||
|
TQL EVAL (now() - '1m'::interval, now(), '5s') count_values("status_code", rate_reqs);
|
||||||
|
|
||||||
|
++
|
||||||
|
++
|
||||||
|
|
||||||
|
INSERT INTO TABLE http_requests_total VALUES
|
||||||
|
('localhost', 'my_service', 'instance1', 100, now() - '1min'::interval),
|
||||||
|
('localhost', 'my_service', 'instance1', 200, now() - '45s'::interval),
|
||||||
|
('remotehost', 'my_service', 'instance1', 300, now() - '30s'::interval),
|
||||||
|
('remotehost', 'their_service', 'instance1', 300, now() - '15s'::interval),
|
||||||
|
('localhost', 'my_service', 'instance1', 400, now());
|
||||||
|
|
||||||
|
Affected Rows: 5
|
||||||
|
|
||||||
|
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
|
||||||
|
ADMIN FLUSH_FLOW('calc_rate');
|
||||||
|
|
||||||
|
+-------------------------------+
|
||||||
|
| ADMIN FLUSH_FLOW('calc_rate') |
|
||||||
|
+-------------------------------+
|
||||||
|
| FLOW_FLUSHED |
|
||||||
|
+-------------------------------+
|
||||||
|
|
||||||
|
SELECT count(*)>0 FROM rate_reqs;
|
||||||
|
|
||||||
|
+---------------------+
|
||||||
|
| count(*) > Int64(0) |
|
||||||
|
+---------------------+
|
||||||
|
| true |
|
||||||
|
+---------------------+
|
||||||
|
|
||||||
|
DROP FLOW calc_rate;
|
||||||
|
|
||||||
|
Affected Rows: 0
|
||||||
|
|
||||||
|
DROP TABLE http_requests_total;
|
||||||
|
|
||||||
|
Affected Rows: 0
|
||||||
|
|
||||||
|
DROP TABLE rate_reqs;
|
||||||
|
|
||||||
|
Affected Rows: 0
|
||||||
|
|
||||||
|
|||||||
@@ -11,6 +11,9 @@ TQL EVAL (now() - '1m'::interval, now(), '5s') count_values("status_code", http_
|
|||||||
|
|
||||||
SHOW CREATE TABLE cnt_reqs;
|
SHOW CREATE TABLE cnt_reqs;
|
||||||
|
|
||||||
|
-- test if sink table is tql queryable
|
||||||
|
TQL EVAL (now() - '1m'::interval, now(), '5s') count_values("status_code", cnt_reqs);
|
||||||
|
|
||||||
INSERT INTO TABLE http_requests VALUES
|
INSERT INTO TABLE http_requests VALUES
|
||||||
(now() - '17s'::interval, 'host1', 'idc1', 200),
|
(now() - '17s'::interval, 'host1', 'idc1', 200),
|
||||||
(now() - '17s'::interval, 'host2', 'idc1', 200),
|
(now() - '17s'::interval, 'host2', 'idc1', 200),
|
||||||
@@ -39,6 +42,28 @@ DROP FLOW calc_reqs;
|
|||||||
DROP TABLE http_requests;
|
DROP TABLE http_requests;
|
||||||
DROP TABLE cnt_reqs;
|
DROP TABLE cnt_reqs;
|
||||||
|
|
||||||
|
CREATE TABLE http_requests_two_vals (
|
||||||
|
ts timestamp(3) time index,
|
||||||
|
host STRING,
|
||||||
|
idc STRING,
|
||||||
|
val DOUBLE,
|
||||||
|
valb DOUBLE,
|
||||||
|
PRIMARY KEY(host, idc),
|
||||||
|
);
|
||||||
|
|
||||||
|
-- should failed with two value columns error
|
||||||
|
CREATE FLOW calc_reqs SINK TO cnt_reqs EVAL INTERVAL '1m' AS
|
||||||
|
TQL EVAL (now() - '1m'::interval, now(), '5s') count_values("status_code", http_requests_two_vals);
|
||||||
|
|
||||||
|
-- should failed with two value columns error
|
||||||
|
-- SQLNESS REPLACE id=[0-9]+ id=[REDACTED]
|
||||||
|
CREATE FLOW calc_reqs SINK TO cnt_reqs EVAL INTERVAL '1m' AS
|
||||||
|
TQL EVAL (now() - '1m'::interval, now(), '5s') rate(http_requests_two_vals[5m]);
|
||||||
|
|
||||||
|
SHOW TABLES;
|
||||||
|
|
||||||
|
DROP TABLE http_requests_two_vals;
|
||||||
|
|
||||||
CREATE TABLE http_requests (
|
CREATE TABLE http_requests (
|
||||||
ts timestamp(3) time index,
|
ts timestamp(3) time index,
|
||||||
host STRING,
|
host STRING,
|
||||||
@@ -64,6 +89,9 @@ TQL EVAL (now() - now(), now()-(now()-'15s'::interval), '5s') count_values("stat
|
|||||||
|
|
||||||
SHOW CREATE TABLE cnt_reqs;
|
SHOW CREATE TABLE cnt_reqs;
|
||||||
|
|
||||||
|
-- test if sink table is tql queryable
|
||||||
|
TQL EVAL (now() - '1m'::interval, now(), '5s') count_values("status_code", cnt_reqs);
|
||||||
|
|
||||||
INSERT INTO TABLE http_requests VALUES
|
INSERT INTO TABLE http_requests VALUES
|
||||||
(0::Timestamp, 'host1', 'idc1', 200),
|
(0::Timestamp, 'host1', 'idc1', 200),
|
||||||
(0::Timestamp, 'host2', 'idc1', 200),
|
(0::Timestamp, 'host2', 'idc1', 200),
|
||||||
@@ -85,7 +113,7 @@ INSERT INTO TABLE http_requests VALUES
|
|||||||
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
|
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
|
||||||
ADMIN FLUSH_FLOW('calc_reqs');
|
ADMIN FLUSH_FLOW('calc_reqs');
|
||||||
|
|
||||||
SELECT val, ts, status_code FROM cnt_reqs ORDER BY ts, status_code;
|
SELECT * FROM cnt_reqs ORDER BY ts, status_code;
|
||||||
|
|
||||||
DROP FLOW calc_reqs;
|
DROP FLOW calc_reqs;
|
||||||
DROP TABLE http_requests;
|
DROP TABLE http_requests;
|
||||||
@@ -101,6 +129,9 @@ TQL EVAL (now() - '1m'::interval, now(), '30s') rate(http_requests[5m]);
|
|||||||
|
|
||||||
SHOW CREATE TABLE rate_reqs;
|
SHOW CREATE TABLE rate_reqs;
|
||||||
|
|
||||||
|
-- test if sink table is tql queryable
|
||||||
|
TQL EVAL (now() - '1m'::interval, now(), '5s') count_values("status_code", rate_reqs);
|
||||||
|
|
||||||
INSERT INTO TABLE http_requests VALUES
|
INSERT INTO TABLE http_requests VALUES
|
||||||
(now() - '1m'::interval, 0),
|
(now() - '1m'::interval, 0),
|
||||||
(now() - '30s'::interval, 1),
|
(now() - '30s'::interval, 1),
|
||||||
@@ -114,3 +145,38 @@ SELECT count(*) > 0 FROM rate_reqs;
|
|||||||
DROP FLOW calc_rate;
|
DROP FLOW calc_rate;
|
||||||
DROP TABLE http_requests;
|
DROP TABLE http_requests;
|
||||||
DROP TABLE rate_reqs;
|
DROP TABLE rate_reqs;
|
||||||
|
|
||||||
|
CREATE TABLE http_requests_total (
|
||||||
|
host STRING,
|
||||||
|
job STRING,
|
||||||
|
instance STRING,
|
||||||
|
byte DOUBLE,
|
||||||
|
ts TIMESTAMP TIME INDEX,
|
||||||
|
PRIMARY KEY (host, job, instance)
|
||||||
|
);
|
||||||
|
|
||||||
|
CREATE FLOW calc_rate
|
||||||
|
SINK TO rate_reqs
|
||||||
|
EVAL INTERVAL '1m' AS
|
||||||
|
TQL EVAL (now() - '1m'::interval, now(), '30s') rate(http_requests_total{job="my_service"}[1m]);
|
||||||
|
|
||||||
|
SHOW CREATE TABLE rate_reqs;
|
||||||
|
|
||||||
|
-- test if sink table is tql queryable
|
||||||
|
TQL EVAL (now() - '1m'::interval, now(), '5s') count_values("status_code", rate_reqs);
|
||||||
|
|
||||||
|
INSERT INTO TABLE http_requests_total VALUES
|
||||||
|
('localhost', 'my_service', 'instance1', 100, now() - '1min'::interval),
|
||||||
|
('localhost', 'my_service', 'instance1', 200, now() - '45s'::interval),
|
||||||
|
('remotehost', 'my_service', 'instance1', 300, now() - '30s'::interval),
|
||||||
|
('remotehost', 'their_service', 'instance1', 300, now() - '15s'::interval),
|
||||||
|
('localhost', 'my_service', 'instance1', 400, now());
|
||||||
|
|
||||||
|
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
|
||||||
|
ADMIN FLUSH_FLOW('calc_rate');
|
||||||
|
|
||||||
|
SELECT count(*)>0 FROM rate_reqs;
|
||||||
|
|
||||||
|
DROP FLOW calc_rate;
|
||||||
|
DROP TABLE http_requests_total;
|
||||||
|
DROP TABLE rate_reqs;
|
||||||
|
|||||||
Reference in New Issue
Block a user