From 782276848070771f1996fe5919821008c44aa6c6 Mon Sep 17 00:00:00 2001 From: wangtq Date: Fri, 21 Oct 2022 14:35:16 +0800 Subject: [PATCH 01/14] fix: cmd exporter hangs while executing some commands --- dbmind/common/cmd_executor.py | 1 + .../components/cmd_exporter/core/service.py | 92 +++++++++++++++---- .../components/cmd_exporter/yamls/default.yml | 25 ++++- 3 files changed, 96 insertions(+), 22 deletions(-) diff --git a/dbmind/common/cmd_executor.py b/dbmind/common/cmd_executor.py index c61d830..77c06c5 100644 --- a/dbmind/common/cmd_executor.py +++ b/dbmind/common/cmd_executor.py @@ -363,6 +363,7 @@ def multiple_cmd_exec(cmdline, **communicate_kwargs): ) if communicate_kwargs.get('input'): _p.stdin.write(communicate_kwargs.pop('input')) + _p.stdin.close() # avoid hanging else: if require_stdin[index] and len(process_list): prev_process = process_list[-1] diff --git a/dbmind/components/cmd_exporter/core/service.py b/dbmind/components/cmd_exporter/core/service.py index 516fc26..ef5d675 100644 --- a/dbmind/components/cmd_exporter/core/service.py +++ b/dbmind/components/cmd_exporter/core/service.py @@ -107,8 +107,9 @@ def perform_shell_command(cmd, **kwargs): except subprocess.TimeoutExpired: output = b'' logging.warning( - 'Timed out after %d seconds while executing %s.', kwargs.get('timeout'), - cmd + 'Timed out after %d seconds while executing %s, input is %s.', + kwargs.get('timeout'), + cmd, kwargs['input'] ) exitcode = -1 except Exception as e: @@ -129,7 +130,7 @@ class QueryInstance: self.timeout = item.get('timeout') self.metrics = [] self.label_names = [] - self.label_obj = {} + self.label_objs = {} for m in item['metrics']: for name, metric_item in m.items(): @@ -143,7 +144,7 @@ class QueryInstance: self.metrics.append(metric) else: self.label_names.append(metric.name) - self.label_obj[metric.name] = metric + self.label_objs[metric.name] = metric # `global_labels` is required and must be added anytime. self.label_names.extend(global_labels.keys()) @@ -168,52 +169,105 @@ class QueryInstance: exitcode, query_result = perform_shell_command(self.query, timeout=self.timeout) if not query_result: - logging.warning("Fetched nothing for metric '%s'." % self.query) + logging.warning("Fetched nothing for query '%s'." % self.query) return # Update for all metrics in current query instance. # `global_labels` is the essential labels for each metric family. - labels = {} + label_query_results = [] for label_name in self.label_names: - if label_name in self.label_obj: - obj = self.label_obj[label_name] + if label_name in self.label_objs: + obj = self.label_objs[label_name] remaining_time = endtime - time.time() - _, label_value = perform_shell_command( + _, result = perform_shell_command( obj.subquery, input=query_result, timeout=remaining_time ) else: - label_value = global_labels.get(label_name, 'None') - labels[label_name] = label_value + result = global_labels.get(label_name, 'None') + label_query_results.append(result.split('\n')) + + if len(label_query_results) == 0: + logging.warning( + "Fetched nothing on label for the metric '%s'." % self.name + ) + return + + metric_query_results = [] for metric in self.metrics: - metric_family = metric.entity.labels(**labels) remaining_time = endtime - time.time() - _, value = perform_shell_command( + _, result = perform_shell_command( metric.subquery, input=query_result, timeout=remaining_time ) + metric_query_results.append(result.split('\n')) + if len(metric_query_results) == 0: + logging.warning( + "Fetched nothing on metric value for the metric '%s'." % self.name + ) + return + + # Check whether we can merge these label + # and metric query results into a 2-dim array. + if len(label_query_results[0]) != len(metric_query_results[0]): + logging.error('Cannot fetch the metric %s because the' + 'dimension between label and metric is not consistent.') + return + + def _get_or_create(list_object, index): + if index < len(list_object): + return list_object[index] + new_dict = dict() + list_object.append(new_dict) + return new_dict + + def construct_labels(): + r = [] + for label_idx, name in enumerate(self.label_names): + for row_idx, v in enumerate(label_query_results[label_idx]): + d = _get_or_create(r, row_idx) + d[name] = v + return r + + def construct_metric_values(): + r = [] + for metric_idx, metric_ in enumerate(self.metrics): + for row_idx, v in enumerate(metric_query_results[metric_idx]): + d = _get_or_create(r, row_idx) + d[metric_.name] = v + return r + + def set_metric_value(family_, value_): # None is equivalent to NaN instead of zero. - if len(value) == 0: + if len(value_) == 0: logging.warning( 'Not found field %s in the %s.', metric.name, self.name ) else: - value = cast_to_int_or_float(value) + value_ = cast_to_int_or_float(value_) # Different usages (Prometheus data type) have different setting methods. # Thus, we have to select to different if-branches according to metric's usage. if metric.usage == 'COUNTER': - metric_family.set(value) + family_.set(value_) elif metric.usage == 'GAUGE': - metric_family.set(value) + family_.set(value_) elif metric.usage == 'SUMMARY': - metric_family.observe(value) + family_.observe(value_) elif metric.usage == 'HISTOGRAM': - metric_family.observe(value) + family_.observe(value_) else: logging.error( 'Not supported metric %s due to usage %s.' % (metric.name, metric.usage) ) + labels_ = construct_labels() + values_ = construct_metric_values() + for l_, v_ in zip(labels_, values_): + for m in self.metrics: + metric_family = m.entity.labels(**l_) + metric_value = v_[m.name] + set_metric_value(metric_family, metric_value) + def config_collecting_params(parallel, constant_labels): global _thread_pool_executor diff --git a/dbmind/components/cmd_exporter/yamls/default.yml b/dbmind/components/cmd_exporter/yamls/default.yml index e1ffac0..80a8f8d 100644 --- a/dbmind/components/cmd_exporter/yamls/default.yml +++ b/dbmind/components/cmd_exporter/yamls/default.yml @@ -24,7 +24,7 @@ # ... -cluster: +gaussdb_cluster: query: "cm_ctl query -Cvi" timeout: 5 metrics: @@ -37,8 +37,27 @@ cluster: usage: "LABEL" description: "primary node list" - standby: - subquery: grep -E "Standby" | grep -v "|" | awk '{print $3}' + subquery: grep -E "Standby" | grep -v "|" | awk '{print $3}' | sed -z 's/\n/,/g;s/,$/\n/g' usage: "LABEL" description: "standby node list" - +gaussdb_progress: + query: "ps -aux | grep gaussdb" + timeout: 5 + metrics: + - cpu_usage: + subquery: awk '{print $3}' + usage: "GAUGE" + description: "cpu usage of gaussdb process" + - mem_usage: + subquery: awk '{print $4}' + usage: "GAUGE" + description: "mem usage of gaussdb process" + - user: + subquery: awk '{print $1}' + usage: "LABEL" + description: "all of users who have gaussdb process" + - pid: + subquery: awk '{print $2}' + usage: "LABEL" + description: "pid of gaussdb process" -- Gitee From 22129e3999edd0b05424fc22e1b7481bf2876964 Mon Sep 17 00:00:00 2001 From: yuchen Date: Fri, 21 Oct 2022 11:45:29 +0800 Subject: [PATCH 02/14] feat(add): add influxdb interface to tsdb_client --- dbmind/cmd/config_utils.py | 2 +- dbmind/common/tsdb/influxdb_client.py | 337 +++++++++++++++++- dbmind/common/tsdb/prometheus_client.py | 30 +- dbmind/common/tsdb/tsdb_client.py | 32 ++ dbmind/common/tsdb/tsdb_client_factory.py | 13 +- dbmind/components/anomaly_analysis.py | 32 +- .../correlation_analysis.py | 5 +- 7 files changed, 399 insertions(+), 52 deletions(-) diff --git a/dbmind/cmd/config_utils.py b/dbmind/cmd/config_utils.py index 55f5d9c..26b989b 100644 --- a/dbmind/cmd/config_utils.py +++ b/dbmind/cmd/config_utils.py @@ -54,7 +54,7 @@ ENCRYPTED_SIGNAL = 'Encrypted->' # Used by check_config_validity(). CONFIG_OPTIONS = { - 'TSDB-name': ['prometheus'], + 'TSDB-name': ['prometheus', 'influxdb'], 'METADATABASE-dbtype': ['sqlite', 'opengauss', 'postgresql'], 'WORKER-type': ['local', 'dist'], 'LOG-level': ['DEBUG', 'INFO', 'WARNING', 'ERROR'] diff --git a/dbmind/common/tsdb/influxdb_client.py b/dbmind/common/tsdb/influxdb_client.py index 1ad6c34..0518f99 100644 --- a/dbmind/common/tsdb/influxdb_client.py +++ b/dbmind/common/tsdb/influxdb_client.py @@ -10,4 +10,339 @@ # EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, # MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. # See the Mulan PSL v2 for more details. -raise NotImplementedError() + +from collections import defaultdict +from datetime import datetime, timedelta +from urllib.parse import urlparse + +from dbmind.common.http.requests_utils import create_requests_session +from dbmind.common.utils import cached_property +from dbmind.common.tsdb.tsdb_client import cast_duration_to_seconds +from dbmind.service.utils import DISTINGUISHING_INSTANCE_LABEL + +from .tsdb_client import TsdbClient +from ..exceptions import ApiClientException +from ..types import Sequence +from ..types.ssl import SSLContext + + +def _influxql_generator( + metric_name: str, + node_name: str, + start_ts_in_s: float = None, + end_ts_in_s: float = None, + label_config: dict = None, + labels_like: dict = None, +): + r""" + The InfluxDB API only accepts query in influxql in params['q']. This method is to get the well-formed + influxql with given metric_name, node_name, start timestamp in seconds, end timestamp in seconds, + label configuration and regex labels. + :param metric_name: (str) The name of the metric. + :param node_name: (str) The exact node_name key of the metric_name. + :param start_ts_in_s: (float) A timestamp number in seconds that specifies the metric range start time. + :param end_ts_in_s: (float) A timestamp number in seconds that specifies the metric range end time. + :param label_config: (dict) A dictionary contains the exact filting information + :param labels_like: (dict) A dictionary contains the patial filting information in form of regex. + :return: (list) A list of filted sequences of the specified metric_name between the given time range. + """ + + def time_filter_generator(start_ts_in_ms, end_ts_in_ms): + time_filter = list() + if start_ts_in_ms: + time_filter.append(f"time >= {start_ts_in_ms}ms") + if end_ts_in_ms: + time_filter.append(f"time <= {end_ts_in_ms}ms") + return time_filter + + def condition_filter_generator(label_config, labels_like): + filters = list() + if label_config: + for k, v in label_config.items(): + if k.endswith("!"): + filters.append(f"{k} !~ /^{v}$/") + else: + filters.append(f"{k} =~ /^{v}$/") + + if labels_like: + for k, v in labels_like.items(): + if k.endswith("!"): + filters.append(f"{k} !~ /{v}/") + else: + filters.append(f"{k} =~ /{v}/") + + return filters + + filters = condition_filter_generator(label_config, labels_like) + start_ts_in_ms = int(start_ts_in_s * 1000) if start_ts_in_s else None + end_ts_in_ms = int(end_ts_in_s * 1000) if end_ts_in_s else None + filters += time_filter_generator(start_ts_in_ms, end_ts_in_ms) + filters = "WHERE " + " AND ".join(filters) if filters else "" + group_by = f"GROUP BY {node_name}" + influxql = f"SELECT value, dbname, {node_name} FROM {metric_name} {filters} {group_by}" + + return influxql + +# Standardized the format of return value. +def _standardize(data): + results_dict = defaultdict(list) + for d in data["results"]: + if "error" in d: + raise ValueError(d.get("error")) + + if "series" not in d: + return list() + + for series in d["series"]: + for timestamp, value, dbname, node_name in series["values"]: + metric_name = series["name"] + results_dict[(metric_name, dbname, node_name)].append((timestamp, value)) + + results = list() + for (metric_name, dbname, node), time_value_list in results_dict.items(): + results.append( + Sequence( + timestamps=tuple(int(t_v[0]) for t_v in time_value_list), + values=tuple(float(t_v[1]) for t_v in time_value_list), + name=metric_name, + labels={"datname": dbname, DISTINGUISHING_INSTANCE_LABEL: node}, + ) + ) + + return results + + +class InfluxdbClient(TsdbClient): + def __init__( + self, + url: str, + username: str = None, + password: str = None, + ssl_context: SSLContext = None, + headers: dict = None, + dbname: str = None, + ): + """Functions as a Constructor for the class InfluxDB Connect.""" + if url is None: + raise TypeError("missing url") + + self.headers = headers + self.url = url + self.influxdb_host = urlparse(self.url).netloc + self._all_metrics = None + self._session = create_requests_session(username, password, ssl_context) + self.dbname = dbname if dbname else self._dbname + + def check_connection(self, params: dict = None) -> bool: + """ + Check InfluxDB connection. + :param params: (dict) Optional dictionary containing parameters to be + sent along with the API request. + :returns: (bool) True if the endpoint can be reached, False if cannot be reached. + """ + response = self._session.get( + f"{self.url}/ping", + headers=self.headers, + params=params + ) + return response.ok + + def get_current_metric_value( + self, metric_name: str, label_config: dict = None, params: dict = None + ): + r""" + Get the current metric target for the specified metric and label configuration. + :param metric_name: (str) The name of the metric + :param label_config: (dict) A dictionary that specifies metric labels and their + values + :param params: (dict) Optional dictionary containing GET parameters to be sent + along with the API request, such as "epoch", "db" + :returns: (list) A list of current metric values for the specified metric + :raises: + (RequestException) Raises an exception in case of a connection error + (ApiClientException) Raises in case of non 200 response status code + """ + params = params or {} + labels_like = params.pop("labels_like") if "labels_like" in params else {} + current_ts_in_s = params.pop("time") if "time" in params else datetime.timestamp(datetime.now()) + node_name = self._get_node_name(metric_name) + influxql = _influxql_generator( + metric_name, + node_name, + start_ts_in_s=None, + end_ts_in_s=current_ts_in_s, + label_config=label_config, + labels_like=labels_like + ) + influxql = influxql.replace("SELECT value", "SELECT last(value)") + # using the query API to get raw data + response = self._session.get( + f"{self.url}/query", + params={**params, **{"q": influxql, "epoch": "ms", "db": self.dbname}}, + headers=self.headers, + ) + if response.status_code == 200: + data = response.json() + else: + raise ApiClientException( + "HTTP Status Code {} ({!r})".format(response.status_code, response.content) + ) + + return _standardize(data) + + def get_metric_range_data( + self, + metric_name: str, + label_config: dict = None, + start_time: datetime = (datetime.now() - timedelta(minutes=30)), + end_time: datetime = datetime.now(), + chunk_size: timedelta = None, + step: str = None, + params: dict = None + ): + r""" + Get the current metric target for the specified metric and label configuration. + :param metric_name: (str) The name of the metric. + :param label_config: (dict) A dictionary specifying metric labels and their + values. + :param start_time: (datetime) A datetime object that specifies the metric range start time. + :param end_time: (datetime) A datetime object that specifies the metric range end time. + :param chunk_size: (timedelta) Duration of metric data downloaded in one request. For + example, setting it to timedelta(hours=3) will download 3 hours worth of data in each + request made to the InfluxDB host + :param step: (str) Query resolution step width in duration format or float number of seconds + :param params: (dict) Optional dictionary containing GET parameters to be + sent along with the API request, such as "time" + :return: (list) A list of metric data for the specified metric in the given time + range + :raises: + (RequestException) Raises an exception in case of a connection error + (ApiClientException) Raises in case of non 200 response status code + """ + params = params or {} + labels_like = params.pop("labels_like") if "labels_like" in params else {} + node_name = self._get_node_name(metric_name) + start_ts_in_s = datetime.timestamp(start_time) + end_ts_in_s = datetime.timestamp(end_time) + influxql = _influxql_generator( + metric_name, + node_name, + start_ts_in_s=start_ts_in_s, + end_ts_in_s=end_ts_in_s, + label_config=label_config, + labels_like=labels_like + ) + response = self._session.get( + f"{self.url}/query", + params={**params, **{"q": influxql, "epoch": "ms", "db": self.dbname}}, + headers=self.headers, + ) + if response.status_code == 200: + data = response.json() + else: + raise ApiClientException( + "HTTP Status Code {} ({!r})".format(response.status_code, response.content) + ) + + return _standardize(data) + + def custom_query(self, query: str, timeout=None, params: dict = None): + """ + Send an influxql to a InfluxDB Host. + This method takes as input a string which will be sent as a query to + the specified InfluxDB Host. This query is a influxql query. + :param query: (str) This is a influxql query + :param params: (dict) Optional dictionary containing GET parameters to be + sent along with the API request, such as "epoch", "db" + :returns: (list) A list of metric data received in response of the query sent + :raises: + (RequestException) Raises an exception in case of a connection error + (ApiClientException) Raises in case of non 200 response status code + """ + params = params or {} + query = str(query) + # using the query API to get raw data + response = self._session.get( + f"{self.url}/query", + params={**params, **{"q": query, "epoch": "ms", "db": self.dbname}}, + headers=self.headers, + timeout=timeout + ) + if response.status_code == 200: + data = response.json() + else: + raise ApiClientException( + "HTTP Status Code {} ({!r})".format(response.status_code, response.content) + ) + + return _standardize(data) + + def _get_node_name(self, metric_name): + response = self._session.get( + f"{self.url}/query", + params={ + "q": f"select value, nodename, node_name from {metric_name} limit 1", + "db": self.dbname + }, + ).json() + _, _, nodename, node_name = response["results"][0]["series"][0]["values"][0] + if nodename: + return "nodename" + elif node_name: + return "node_name" + + def timestamp(self): + influxql = "SHOW DIAGNOSTICS" + response = self._session.get( + f"{self.url}/query", + params={"q": influxql, "epoch": "ms", "db": self.dbname}, + ) + if response.status_code == 200 and "series" in response.json()['results'][0]: + data = response.json() + for d in data["results"][0]["series"]: + if d.get("name") == "system": + time_str = d.get("values")[0][d.get("columns").index("currentTime")] + ts = datetime.timestamp(datetime.strptime(time_str[:19], "%Y-%m-%dT%H:%M:%S")) + return (ts + 8 * 60 * 60) * 1000 + return 0 + + @cached_property + def scrape_interval(self): + influxql = "SHOW DIAGNOSTICS" + response = self._session.get( + f"{self.url}/query", + params={"q": influxql, "epoch": "ms", "db": self.dbname}, + ) + if response.status_code == 200 and "series" in response.json()['results'][0]: + data = response.json() + for d in data["results"][0]["series"]: + if d.get("name") == "config-data": + interval = d.get("values")[0][d.get("columns").index("cache-snapshot-write-cold-duration")] + return cast_duration_to_seconds(interval) + return None + + @cached_property + def all_metrics(self): + influxql = "SHOW TAG KEYS" + response = self._session.get( + f"{self.url}/query", + params={"q": influxql, "db": self.dbname}, + headers=self.headers + ) + metrics = list() + if response.status_code == 200 and len(response.json()["results"]) > 0: + for series in response.json()["results"][0]["series"]: + if ["group"] in series["values"]: + continue + metrics.append(series["name"]) + return metrics + + @cached_property + def _dbname(self): + influxql = "SHOW DATABASES" + response = self._session.get( + f"{self.url}/query", + params={"q": influxql}, + ) + if response.status_code == 200 and len(response.json()["results"]) > 0: + return response.json()["results"][0]["series"][0]["values"][0][0] diff --git a/dbmind/common/tsdb/prometheus_client.py b/dbmind/common/tsdb/prometheus_client.py index fba8fea..1564e0c 100644 --- a/dbmind/common/tsdb/prometheus_client.py +++ b/dbmind/common/tsdb/prometheus_client.py @@ -13,9 +13,9 @@ import logging from datetime import datetime, timedelta from urllib.parse import urlparse -import re from dbmind.common.http.requests_utils import create_requests_session +from dbmind.common.tsdb.tsdb_client import cast_duration_to_seconds from dbmind.common.utils import cached_property from .tsdb_client import TsdbClient @@ -59,34 +59,6 @@ def _standardize(data, step=None): return rv -# We don't support year, month, week and ms. -_DURATION_RE = re.compile( - r'([0-9]+d)?([0-9]+h)?([0-9]+m)?([0-9]+s)?|0' -) - - -def cast_duration_to_seconds(duration_string): - r = re.match(_DURATION_RE, duration_string) - if r is None: - return None - - groups = r.groups() - seconds = 0 - for group in groups: - if group is None: - continue - if group.endswith('d'): - seconds += 24 * 60 * 60 * int(group[:-1]) - elif group.endswith('h'): - seconds += 60 * 60 * int(group[:-1]) - elif group.endswith('m'): - seconds += 60 * int(group[:-1]) - elif group.endswith('s'): - seconds += int(group[:-1]) - - return seconds - - class PrometheusClient(TsdbClient): """ A Class for collection of metrics from a Prometheus Host. diff --git a/dbmind/common/tsdb/tsdb_client.py b/dbmind/common/tsdb/tsdb_client.py index 55b1db8..99c2c72 100644 --- a/dbmind/common/tsdb/tsdb_client.py +++ b/dbmind/common/tsdb/tsdb_client.py @@ -10,9 +10,37 @@ # EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, # MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. # See the Mulan PSL v2 for more details. +import re import time from datetime import datetime, timedelta +# We don't support year, month, week and ms. +_DURATION_RE = re.compile( + r'([0-9]+d)?([0-9]+h)?([0-9]+m)?([0-9]+s)?|0' +) + + +def cast_duration_to_seconds(duration_string): + r = re.match(_DURATION_RE, duration_string) + if r is None: + return None + + groups = r.groups() + seconds = 0 + for group in groups: + if group is None: + continue + if group.endswith('d'): + seconds += 24 * 60 * 60 * int(group[:-1]) + elif group.endswith('h'): + seconds += 60 * 60 * int(group[:-1]) + elif group.endswith('m'): + seconds += 60 * int(group[:-1]) + elif group.endswith('s'): + seconds += int(group[:-1]) + + return seconds + class TsdbClient(object): """The common baseclass of various time series database @@ -49,6 +77,10 @@ class TsdbClient(object): """get metric target from tsdb""" pass + def custom_query(self, query: str, timeout=None, params: dict = None): + """use custom sql to query directly.""" + pass + def timestamp(self): """get the current unix-timestamp from the time-series database.""" return int(time.time() * 1000) diff --git a/dbmind/common/tsdb/tsdb_client_factory.py b/dbmind/common/tsdb/tsdb_client_factory.py index 83bf1f5..d402cc7 100644 --- a/dbmind/common/tsdb/tsdb_client_factory.py +++ b/dbmind/common/tsdb/tsdb_client_factory.py @@ -16,13 +16,14 @@ import time from dbmind.common.exceptions import ApiClientException from dbmind.common.tsdb.prometheus_client import PrometheusClient +from dbmind.common.tsdb.influxdb_client import InfluxdbClient from dbmind.common.tsdb.tsdb_client import TsdbClient from dbmind.common.types import SSLContext from dbmind.common.utils import dbmind_assert, raise_fatal_and_exit class TsdbClientFactory(object): - tsdb_name = host = port = None + tsdb_name = host = port = dbname = None username = password = None ssl_context = None @@ -41,12 +42,13 @@ class TsdbClientFactory(object): @classmethod def set_client_info(cls, tsdb_name, host, port, username=None, password=None, ssl_certfile=None, ssl_keyfile=None, - ssl_keyfile_password=None, ssl_ca_file=None): + ssl_keyfile_password=None, ssl_ca_file=None, dbname=None): cls.tsdb_name = tsdb_name cls.host = host cls.port = port cls.username = username cls.password = password + cls.dbname = dbname cls.ssl_context = SSLContext( ssl_certfile, @@ -71,6 +73,13 @@ class TsdbClientFactory(object): if not client.check_connection(): raise ApiClientException("Failed to connect TSDB url: %s" % url) cls.tsdb_client = client + elif cls.tsdb_name == 'influxdb': + client = InfluxdbClient(url=url, username=cls.username, password=cls.password, + ssl_context=cls.ssl_context, dbname=cls.dbname) + if not client.check_connection(): + raise ApiClientException("Failed to connect TSDB url: %s" % url) + cls.tsdb_client = client + if cls.tsdb_client is None: raise ApiClientException("Failed to init TSDB client, please check config file.") diff --git a/dbmind/components/anomaly_analysis.py b/dbmind/components/anomaly_analysis.py index f6d2bfb..af50a47 100644 --- a/dbmind/components/anomaly_analysis.py +++ b/dbmind/components/anomaly_analysis.py @@ -51,16 +51,22 @@ http.client.HTTPConnection._http_vsn_str = "HTTP/1.0" def get_sequences(arg): metric, host, start_datetime, end_datetime, length = arg result = [] - if ":" in host: - host_like = host.split(":")[0] + "(:[0-9]{4,5}|)" - else: - host_like = host + "(:[0-9]{4,5}|)" - seqs = dai.get_metric_sequence(metric, start_datetime, end_datetime).from_server_like(host_like).fetchall() + if global_vars.configs.get('TSDB', 'name') == "prometheus": + if ":" in host and not check_ip_valid(host.split(":")[0]) and check_port_valid(host.split(":")[1]): + host_like = {DISTINGUISHING_INSTANCE_LABEL: host.split(":")[0] + "(:[0-9]{4,5}|)"} + elif check_ip_valid(host): + host_like = {DISTINGUISHING_INSTANCE_LABEL: host + "(:[0-9]{4,5}|)"} + else: + raise ValueError(f"Invalid host: {host}.") + elif global_vars.configs.get('TSDB', 'name') == "influxdb": + host_like = {"dbname": host} + + seqs = dai.get_metric_sequence(metric, start_datetime, end_datetime).filter_like(**host_like).fetchall() for seq in seqs: if DISTINGUISHING_INSTANCE_LABEL not in seq.labels or len(seq) < 0.9 * length: continue - address = SequenceUtils.from_server(seq).strip() + from_instance = SequenceUtils.from_server(seq).strip() if seq.labels.get('event'): name = 'wait event-' + seq.labels.get('event') else: @@ -69,7 +75,7 @@ def get_sequences(arg): name += ' on ' + seq.labels.get('datname') elif seq.labels.get('device'): name += ' on ' + seq.labels.get('device') - name += ' from ' + address + name += ' from ' + from_instance result.append((name, seq)) @@ -159,13 +165,6 @@ def main(argv): end_time = args.end_time host = args.host - if ":" in host: - ip, port = host.split(":")[0], host.split(":")[1] - if not (check_ip_valid(ip) and check_port_valid(port)): - parser.exit(1, f"Invalid host: {host}.") - elif not check_ip_valid(host): - parser.exit(1, f"Invalid host: {host}.") - os.chdir(args.conf) global_vars.metric_map = utils.read_simple_config_file(constants.METRIC_MAP_CONFIG) global_vars.configs = load_sys_configs(constants.CONFILE_NAME) @@ -213,9 +212,8 @@ def main(argv): csv_path = os.path.join(args.csv_dump_path, new_name + ".csv") with open(csv_path, 'w+', newline='') as f: writer = csv.writer(f) - for _, name, corr, delay, values in sorted(result[this_name].values(), reverse=True): - writer.writerow((name, corr, delay)) # Discard the first element abs(corr) after sorting. - writer.writerow(values) + for _, name, corr, delay, values in sorted(result[this_name].values(), key=lambda t: t[3]): + writer.writerow((name, corr, delay) + values) # Discard the first element abs(corr) after sorting. if __name__ == '__main__': diff --git a/dbmind/components/correlation_analysis/correlation_analysis.py b/dbmind/components/correlation_analysis/correlation_analysis.py index 135b77e..954933d 100644 --- a/dbmind/components/correlation_analysis/correlation_analysis.py +++ b/dbmind/components/correlation_analysis/correlation_analysis.py @@ -28,6 +28,7 @@ from dbmind.common import utils from dbmind.common.tsdb import TsdbClientFactory from dbmind.common.utils.checking import date_type, path_type from dbmind.service import dai +from dbmind.service.utils import DISTINGUISHING_INSTANCE_LABEL from dbmind.common.algorithm.correlation import amplify_feature, max_cross_correlation @@ -96,7 +97,7 @@ def main(argv): # Gathering metric data metric_sequences = dai.get_metric_sequence(primary_metric, start_datetime, end_datetime).fetchall() for metric_sequence in metric_sequences: - address = metric_sequence.labels.get('from_instance') + address = metric_sequence.labels.get(DISTINGUISHING_INSTANCE_LABEL) host = address.split(':')[0] wait_events[host][primary_metric] = metric_sequence @@ -104,7 +105,7 @@ def main(argv): for metric in metrics: events = dai.get_metric_sequence(metric, start_datetime, end_datetime).fetchall() for event in events: - address = event.labels.get('from_instance') + address = event.labels.get(DISTINGUISHING_INSTANCE_LABEL) if ':' in address: host, port = address.split(':') else: -- Gitee From ca682af647183e29060ab0fb0a256d86170a943e Mon Sep 17 00:00:00 2001 From: wangtq Date: Tue, 25 Oct 2022 15:09:46 +0800 Subject: [PATCH 03/14] feat(dai): step auto estimate and error-tolerant in reading buffer --- dbmind/common/rpc/client.py | 5 +- dbmind/common/tsdb/tsdb_client.py | 2 +- .../opengauss_exporter/core/agent.py | 5 +- dbmind/service/dai.py | 53 +++++++++++++++---- tests/test_dai.py | 2 +- 5 files changed, 51 insertions(+), 16 deletions(-) diff --git a/dbmind/common/rpc/client.py b/dbmind/common/rpc/client.py index f3384a5..915f062 100644 --- a/dbmind/common/rpc/client.py +++ b/dbmind/common/rpc/client.py @@ -25,7 +25,7 @@ from .errors import RPCExecutionError, RPCConnectionError from .server import DEFAULT_URI, HEARTBEAT_FLAG, AUTH_FLAG standard_rpc_url_pattern = re.compile('(https?)://[-A-Za-z0-9+&@#%?=~_|!:,.;]+/[-A-Za-z0-9]+') -rpc_endpoint_pattern = re.compile('(https?)://[-A-Za-z0-9+&@#%?=~_|!:,.;]+$') +rpc_endpoint_pattern = re.compile('(https?)://[-A-Za-z0-9+&@#%?=~_|!:,.;]+/?$') class RPCClient: @@ -37,7 +37,8 @@ class RPCClient: if re.match(standard_rpc_url_pattern, url): self.url = url elif re.match(rpc_endpoint_pattern, url): - self.url = url + DEFAULT_URI + # The reason why we strip slash is to avoid wrong URL. + self.url = url.rstrip('/') + DEFAULT_URI else: raise ValueError('Invalid url format: %s.' % url) diff --git a/dbmind/common/tsdb/tsdb_client.py b/dbmind/common/tsdb/tsdb_client.py index 99c2c72..e02554d 100644 --- a/dbmind/common/tsdb/tsdb_client.py +++ b/dbmind/common/tsdb/tsdb_client.py @@ -86,7 +86,7 @@ class TsdbClient(object): return int(time.time() * 1000) def scrape_interval(self): - """get the scrape interval of tsdb""" + """get the scrape interval of tsdb. Unit is second.""" pass def all_metrics(self): diff --git a/dbmind/components/opengauss_exporter/core/agent.py b/dbmind/components/opengauss_exporter/core/agent.py index 46cfffa..12721bb 100644 --- a/dbmind/components/opengauss_exporter/core/agent.py +++ b/dbmind/components/opengauss_exporter/core/agent.py @@ -31,7 +31,10 @@ def query_in_postgres(stmt): @_rpc_service def query_in_database(stmt, database, return_tuples=False, fetch_all=False): - res = _agent_exclusive_driver.query(stmt, force_connection_db=database, return_tuples=return_tuples, fetch_all=fetch_all) + res = _agent_exclusive_driver.query( + stmt, force_connection_db=database, + return_tuples=return_tuples, + fetch_all=fetch_all) logging.info('[Agent] query_in_database (%s): %s; result: %s.', database, stmt, res) return res diff --git a/dbmind/service/dai.py b/dbmind/service/dai.py index 5c7a19e..605b320 100644 --- a/dbmind/service/dai.py +++ b/dbmind/service/dai.py @@ -52,7 +52,7 @@ class LazyFetcher: self.metric_name = _map_metric(metric_name) self.start_time = start_time self.end_time = end_time - self.step = step + self.step = step or estimate_appropriate_step_ms(start_time, end_time) self.labels = dict.copy(global_vars.must_filter_labels or {}) self.labels_like = dict() self.rv = None @@ -122,14 +122,18 @@ class LazyFetcher: start_time, end_time = datetime_to_timestamp(self.start_time), datetime_to_timestamp(self.end_time) step = self.step - buffered = buff.get( - metric_name=self.metric_name, - start_time=start_time, - end_time=end_time, - step=step, - labels=self.labels, - fetcher_func=self._fetch_sequence - ) + try: + buffered = buff.get( + metric_name=self.metric_name, + start_time=start_time, + end_time=end_time, + step=step, + labels=self.labels, + fetcher_func=self._fetch_sequence + ) + except Exception as e: + logging.error('SequenceBufferPool crashed.', exc_info=e) + return self._fetch_sequence(start_time, end_time, step) dbmind_assert(buffered is not None) return buffered @@ -139,6 +143,8 @@ class LazyFetcher: return self.rv def fetchone(self): + # Prometheus doesn't provide limit clause, so we + # still implement it as below. self.rv = self.rv or self._read_buffer() # If iterator has un-popped elements then return it, # otherwise return empty of the sequence. @@ -161,8 +167,33 @@ def _map_metric(metric_name, to_internal_name=True): return global_vars.metric_map.get(metric_name, metric_name).strip() -def estimate_appropriate_step(): - pass +def estimate_appropriate_step_ms(start_time, end_time): + """If we use a fixed step to fetch a metric sequence, + the response time will be very long while we obtain a + long-term sequence. So, we should estimate an appropriate + sampling step to fetch data. Here, we employ the + down-sampling logic of Prometheus. No matter how + long it takes to fetch, Prometheus always returns the data in + time. The data length is similar because Prometheus + uses a mapper mechanism to calculate a step for this + fetching. The mapping relationship is data in one + hour using the default scrape interval. For more than one hour + of data, increase the step according to + the above proportional relation. + """ + if None in (start_time, end_time): + return None + interval_second = TsdbClientFactory.get_tsdb_client().scrape_interval + if not interval_second: + # If returns None, it will only depend on TSDB's behavior. + return None + + ONE_HOUR = 3600 # unit: second + fetch_seconds = (end_time - start_time).seconds + if fetch_seconds <= ONE_HOUR: + return None + # return unit: microsecond + return fetch_seconds * interval_second // ONE_HOUR * 1000 or None def get_metric_sequence(metric_name, start_time, end_time, step=None): diff --git a/tests/test_dai.py b/tests/test_dai.py index 5371952..f34ff5c 100644 --- a/tests/test_dai.py +++ b/tests/test_dai.py @@ -103,4 +103,4 @@ def test_save_xxx(): def test_estimate_appropriate_step(): - dai.estimate_appropriate_step() + dai.estimate_appropriate_step_ms() -- Gitee From 8343af84b5fe5a72dc0b81c7f569e36b947326c0 Mon Sep 17 00:00:00 2001 From: yuchen Date: Tue, 25 Oct 2022 16:11:16 +0800 Subject: [PATCH 04/14] feat(increase_detector): rewrite the increase detector and add a new detector mad detector to anomaly detection --- dbmind/app/monitoring/generic_detection.py | 5 +- .../anomaly_detection/increase_detector.py | 149 ++++++------------ .../anomaly_detection/mad_detector.py | 45 ++++++ tests/test_detector.py | 31 ++-- 4 files changed, 108 insertions(+), 122 deletions(-) create mode 100644 dbmind/common/algorithm/anomaly_detection/mad_detector.py diff --git a/dbmind/app/monitoring/generic_detection.py b/dbmind/app/monitoring/generic_detection.py index b38f719..79b7af3 100644 --- a/dbmind/app/monitoring/generic_detection.py +++ b/dbmind/app/monitoring/generic_detection.py @@ -89,9 +89,8 @@ class AnomalyDetections(object): return anomalies @staticmethod - def do_increase_detect(sequence, window=50, max_coef=1, max_increase_rate=0.5): - increase_detector = IncreaseDetector(window=window, max_coef=max_coef, - max_increase_rate=max_increase_rate) + def do_increase_detect(sequence, side="positive", alpha=0.05): + increase_detector = IncreaseDetector(side=side, alpha=alpha) anomalies = increase_detector.fit_predict(sequence) return anomalies diff --git a/dbmind/common/algorithm/anomaly_detection/increase_detector.py b/dbmind/common/algorithm/anomaly_detection/increase_detector.py index 2fe2f96..bf3db32 100644 --- a/dbmind/common/algorithm/anomaly_detection/increase_detector.py +++ b/dbmind/common/algorithm/anomaly_detection/increase_detector.py @@ -11,113 +11,68 @@ # MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. # See the Mulan PSL v2 for more details. -from types import SimpleNamespace - import numpy as np +import scipy.stats from ._abstract_detector import AbstractDetector -from ._utils import merge_contiguous_anomalies_timestamps, over_max_coef -from .spike_detector import remove_spike from ...types import Sequence -def linear_fitting(values, value_mean=1): - if abs(value_mean) < 1: - value_mean = 1 - - axis_x = np.arange(1, 1 + len(values), 1) - axis_y = np.array(values, dtype='float') - axis_y /= abs(value_mean) - coef, intercept = np.polyfit(axis_x, axis_y, deg=1) - return coef, intercept - - -def is_stable(values, stable_threshold=0.15): - p0 = np.nanpercentile(values, 0) - p1 = np.nanpercentile(values, 25) - p3 = np.nanpercentile(values, 75) - p4 = np.nanpercentile(values, 100) - return (p3 - p1) / (p4 - p0) <= stable_threshold - - -gsfm_range = SimpleNamespace(samping_rate=0.2, step=5, stable_threshold=0.15) - - -def get_stable_fragment_mean(values, gsfm_range_): - """the stable_fragment_mean is used for zooming the size of data, - for example: values /= stable_fragment_mean - if no stable fragment found, return the average of the whole dataset. - """ - length = len(values) - window = int(length * gsfm_range_.samping_rate) - for i in range(0, length, gsfm_range_.step): - fragment = values[i:i + window] - if is_stable(fragment, stable_threshold=gsfm_range_.stable_threshold): - stable_fragment_mean = np.mean(fragment) - return stable_fragment_mean - - return np.mean(values) - - -def increase_condition(values, side, max_increase_rate): - length = len(values) - values = np.array(values) - if side == "positive": - increase_rate = (values[1:] - values[:-1] > 0).sum() / length - elif side == "negative": - increase_rate = (values[1:] - values[:-1] < 0).sum() / length - else: - raise ValueError(side) - - return increase_rate, increase_rate > max_increase_rate - - -def over_max_trend_count(predicted, max_trend_count): - length = len(predicted) - anomalies_timestamps = [predicted.timestamps[i] for i in range(length) if predicted.values[i]] - anomalies_timestamps_list = merge_contiguous_anomalies_timestamps( - predicted.timestamps, anomalies_timestamps - ) - return len(anomalies_timestamps_list) > max_trend_count - - class IncreaseDetector(AbstractDetector): - def __init__(self, side="positive", window=20, max_coef=0.3, max_increase_rate=0.7, - max_trend_count=5, gsfm_range_=gsfm_range): + """ + COX STUART TEST + Perform a Cox-Stuart test for data sequence. + In many measurement processes, it is desirable to detect the prescence of + trend. That is, if the data are assumed to be independent observations, + we are interested in knowing if there is in fact a time dependent trend + Given a set of ordered observations X1, X2, ..., Xn, let: half_n = n // 2, + Then pair the data as X1,X1+half_n, X2,X2+half_n, ..., Xn-half_n,Xn. + The Cox-Stuart test is then simply a sign test using the binomial distribution: + n_positive = sum(sign(Xi+half_n - Xn) > 0) + n_negative = sum(sign(Xi+half_n - Xn) < 0) + n_diff = n_positive + n_negative + p_value is a sum of n independent, identically distributed Bernoulli variables + with parameter p. + p_value_increase = binom.cdf(n_negative, n_diff, p=0.5) + p_value_decrease = binom.cdf(n_positive, n_diff, p=0.5) + (It's obvious that the sum of p_value_increase and p_value_decrease is always 1) + If p_value is higher than the significant level (default: 0.05), we may reject the + hypothesis that the sequence has a trend. + + parameters: + side (str, optional): "positive" to identify a increase trend in data. "negative" + to identify a decrease trend in data. Defaults to "positive". + alpha (float, optional): the significant level to accept the hypothesis that the + data sequence has a trend. Defaults to 0.05. + """ + def __init__(self, side="positive", alpha=0.05): self.side = side - self.window = window - self.max_coef = max_coef - self.max_increase_rate = max_increase_rate - self.max_trend_count = max_trend_count - self.gsfm_range = gsfm_range_ - - def do_increase_detect(self, s: Sequence, predicted): - """ We only need one stable fragment.""" - length = len(s) - stable_fragment_mean = get_stable_fragment_mean(s.values, gsfm_range) - - for i in range(0, length, self.window): - fragment = s.values[i: i + self.window] - increase_rate, over_increase_rate = increase_condition(fragment, self.side, self.max_increase_rate) - coef, _ = linear_fitting(fragment, stable_fragment_mean) - if over_increase_rate or over_max_coef(coef, self.side, self.max_coef): - predicted[i: i + self.window] = [True] * len(fragment) - - return predicted + self.alpha = alpha def _fit(self, sequence: Sequence): """Nothing to impl""" def _predict(self, s: Sequence) -> Sequence: - sequence_len = len(s) - self.window = min(self.window, sequence_len) - normal_sequence = remove_spike(s) # Polish later: moving average - - predicted = [False] * sequence_len - predicted = self.do_increase_detect(normal_sequence, predicted) - - predicted = Sequence(timestamps=s.timestamps, values=predicted) - if over_max_trend_count(predicted, self.max_trend_count): - predicted = Sequence(timestamps=s.timestamps, values=[False] * sequence_len) - - return predicted + x, y = s.timestamps, s.values + coef = np.polyfit(x, y, deg=1)[0] + half_n = int(len(y) / 2) + n_pos = n_neg = 0 + for i in range(half_n): + diff = y[i + half_n] - y[i] + if diff > 0: + n_pos += 1 + elif diff < 0: + n_neg += 1 + + n_diff = n_pos + n_neg + if self.side == "positive": + p_value = 2 * scipy.stats.binom.cdf(n_neg, n_diff, 0.5) + if p_value < self.alpha and coef > 0: + return Sequence(timestamps=s.timestamps, values=[True] * len(y)) + + elif self.side == "negative": + p_value = 2 * scipy.stats.binom.cdf(n_pos, n_diff, 0.5) + if p_value < self.alpha and coef < 0: + return Sequence(timestamps=s.timestamps, values=[True] * len(y)) + + return Sequence(timestamps=s.timestamps, values=[False] * len(y)) diff --git a/dbmind/common/algorithm/anomaly_detection/mad_detector.py b/dbmind/common/algorithm/anomaly_detection/mad_detector.py new file mode 100644 index 0000000..c6c84ca --- /dev/null +++ b/dbmind/common/algorithm/anomaly_detection/mad_detector.py @@ -0,0 +1,45 @@ +# Copyright (c) 2020 Huawei Technologies Co.,Ltd. +# +# openGauss is licensed under Mulan PSL v2. +# You can use this software according to the terms and conditions of the Mulan PSL v2. +# You may obtain a copy of Mulan PSL v2 at: +# +# http://license.coscl.org.cn/MulanPSL2 +# +# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, +# EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, +# MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. +# See the Mulan PSL v2 for more details. +import numpy as np + +from ._abstract_detector import AbstractDetector +from ...types import Sequence + + +class MadDetector(AbstractDetector): + def __init__(self, threshold=3, scale_factor=1.4826): + """Median Absolute Deviation Detector. + In statistics, the median absolute deviation (MAD) is a robust measure + of the variability of a univariate sample of quantitative data. + For a univariate data set X1, X2, ..., Xn, the MAD is defined as the median + of the absolute deviations from the data's median: + MAD = abs(x - x.median).median * scale_factor + + parameters: + threshold (float, optional): threshold to decide a anomaly data. Defaults to 3. + scale_factor (float, optional): Multiple relationship between standard deviation and absolute + median difference under normal distribution. + """ + self.threshold = threshold + self.scale_factor = scale_factor + + def _fit(self, s: Sequence): + """Nothing to impl""" + + def _predict(self, s: Sequence) -> Sequence: + x = np.array(s.values) + x_median = np.median(x) + abs_diff_median = np.abs(x - x_median) + mad = self.scale_factor * np.median(abs_diff_median) + rel_median = abs_diff_median / mad + return Sequence(timestamps=s.timestamps, values=rel_median > self.threshold) diff --git a/tests/test_detector.py b/tests/test_detector.py index 5f0930c..42297e1 100644 --- a/tests/test_detector.py +++ b/tests/test_detector.py @@ -451,33 +451,20 @@ def test_volatility_shift_detector(): def test_increase_detector(): - input_data = [i for i in list(range(50))] + [i for i in list(range(50))][::-1] + random.seed(0) + # test case: the increasing sequence. + input_data = [i * 0.05 + 0.7 * random.randint(1, 5) for i in list(range(50))] raw_data = Sequence(timestamps=list(range(len(input_data))), values=input_data) - detector = anomaly_detection.IncreaseDetector(side="positive", window=20, max_coef=0.3) + detector = anomaly_detection.IncreaseDetector(side="positive", alpha=0.05) res = detector.fit_predict(raw_data) - correct_data = ( - True, True, True, True, True, True, True, True, True, True, - True, True, True, True, True, True, True, True, True, True, - True, True, True, True, True, True, True, True, True, True, - True, True, True, True, True, True, True, True, True, True, - False, False, False, False, False, False, False, False, False, - False, False, False, False, False, False, False, False, False, - False, False, False, False, False, False, False, False, False, - False, False, False, False, False, False, False, False, False, - False, False, False, False, False, False, False, False, False, - False, False, False, False, False, False, False, False, False, - False, False, False, False, False, False - ) + correct_data = (True,) * 50 assert res.values == correct_data - - # test case: the length of sequence cannot be divided evenly by the window. - input_data = (1, 2, 3, 4, 5, 6, 20, 30, 40) + # test case: the decreasing sequence. + input_data = [(50 - i) * 0.05 + 0.7 * random.randint(1, 5) for i in list(range(50))] sequence_case = Sequence(timestamps=tuple(range(len(input_data))), values=input_data) - detector = anomaly_detection.IncreaseDetector(side="positive", window=4, max_coef=1) + detector = anomaly_detection.IncreaseDetector(side="negative", alpha=0.05) res = detector.fit_predict(sequence_case) - expected = ( - True, True, True, True, True, True, True, True, True - ) + expected = (True,) * 50 assert res.values == expected -- Gitee From 7d9a80c6750c53ade9618547a89feef651067426 Mon Sep 17 00:00:00 2001 From: wangtq Date: Wed, 26 Oct 2022 14:24:49 +0800 Subject: [PATCH 05/14] fix(web): Fix the problem that the database list cannot be displayed on Web. --- dbmind/service/web.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbmind/service/web.py b/dbmind/service/web.py index 500028f..9dd7082 100644 --- a/dbmind/service/web.py +++ b/dbmind/service/web.py @@ -274,7 +274,7 @@ def get_host_status(): @ttl_cache(seconds=60) def get_database_list(): - sequences = dai.get_latest_metric_value('pg_database_is_template').fetchall() + sequences = dai.get_latest_metric_value('pg_database_size_bytes').fetchall() rv = set() for seq in sequences: if seq.values[0] != 1: -- Gitee From e5a98e0a0b8836a1026efce2419641e16e8343bb Mon Sep 17 00:00:00 2001 From: yuchen Date: Wed, 26 Oct 2022 17:15:50 +0800 Subject: [PATCH 06/14] fix(basic algorithm): remove or replace the methods from package 'sklearn' --- .../forecasting/simple_forecasting.py | 80 ----------- dbmind/common/algorithm/preprocessing.py | 128 ++++++++++++++++++ .../algorithm/duration_time_model/dnn.py | 2 +- requirements-aarch64.txt | 1 - requirements-x86.txt | 2 +- tests/test_detection.py | 19 --- tests/test_preprocessing.py | 37 +++++ 7 files changed, 167 insertions(+), 102 deletions(-) create mode 100644 dbmind/common/algorithm/preprocessing.py create mode 100644 tests/test_preprocessing.py diff --git a/dbmind/common/algorithm/forecasting/simple_forecasting.py b/dbmind/common/algorithm/forecasting/simple_forecasting.py index 8777d46..320d04e 100644 --- a/dbmind/common/algorithm/forecasting/simple_forecasting.py +++ b/dbmind/common/algorithm/forecasting/simple_forecasting.py @@ -10,30 +10,11 @@ # EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, # MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. # See the Mulan PSL v2 for more details. -import numpy as np -from sklearn.linear_model import LinearRegression -from sklearn.model_selection import train_test_split -from sklearn.preprocessing import PolynomialFeatures from dbmind.common.types import Sequence from .forcasting_algorithm import ForecastingAlgorithm -def series_to_supervised(sequence: Sequence, test_size=.1, poly_degree=None, - random_state=None, shuffle=False): - x, y = sequence.to_2d_array() - x_train, x_test, y_train, y_test = train_test_split( - x, y, test_size=test_size, shuffle=shuffle, random_state=random_state - ) - - if poly_degree: - poly = PolynomialFeatures(degree=poly_degree).fit(x) - x_train = poly.transform(x_train) - x_test = poly.transform(x_test) - - return x_train, x_test, y_train, y_test - - class SimpleLinearFitting(ForecastingAlgorithm): def __init__(self, avoid_repetitive_fitting=False): self._a = None @@ -91,64 +72,3 @@ class SimpleLinearFitting(ForecastingAlgorithm): @property def r2_score(self): return self._r2 - - -class SimpleLinearRegression(ForecastingAlgorithm): - def __init__(self): - self.model = LinearRegression(copy_X=False) - self.interval = None - self.last_x = None - - def fit(self, sequence: Sequence): - if sequence.length < 2: - raise ValueError('Unable to fit the sequence due to short length.') - - x, y = sequence.to_2d_array() - self.interval = x[1] - x[0] - self.last_x = x[-1] - x = np.reshape(x, newshape=(-1, 1)) - self.model.fit(x, y) - - def forecast(self, forecast_length): - future = np.arange(start=self.last_x + self.interval, - stop=self.last_x + self.interval * (forecast_length + 1), - step=self.interval).reshape(-1, 1) - result = self.model.predict(future) - return result.tolist() - - -class SupervisedModel(ForecastingAlgorithm): - def __init__(self, model=None, bias=False, poly_degree=None): - self.bias = bias - self.poly_degree = poly_degree - # Use the passed Model instance if exists. - if not model: - self.model = LinearRegression(normalize=True) - else: - self.model = model - self.predict_steps = None - self.sequence = None - - def fit(self, sequence: Sequence): - if sequence.length < 2: - raise ValueError('Unable to fit the sequence due to short length.') - - # dummy to fit - self.sequence = sequence - - def forecast(self, forecast_length): - if not isinstance(forecast_length, int): - raise ValueError('#2 forecasting_minutes must be an integer.') - - self.predict_steps = forecast_length if forecast_length > 1 else 1 - x_train, x_test, y_train, y_test = series_to_supervised(self.sequence) - x_pred = np.arange(start=self.sequence.length, - stop=self.sequence.length + self.predict_steps, - step=1).reshape(-1, 1) - self.model.fit(np.array(x_train).reshape(-1, 1), - np.array(y_train).reshape(-1, 1)) - y_pred = self.model.predict(X=x_pred) - if self.bias: - bias = y_pred.flatten()[0] - self.sequence.values[-1] - y_pred -= bias - return y_pred.flatten().tolist() diff --git a/dbmind/common/algorithm/preprocessing.py b/dbmind/common/algorithm/preprocessing.py new file mode 100644 index 0000000..3164daf --- /dev/null +++ b/dbmind/common/algorithm/preprocessing.py @@ -0,0 +1,128 @@ +# Copyright (c) 2020 Huawei Technologies Co.,Ltd. +# +# openGauss is licensed under Mulan PSL v2. +# You can use this software according to the terms and conditions of the Mulan PSL v2. +# You may obtain a copy of Mulan PSL v2 at: +# +# http://license.coscl.org.cn/MulanPSL2 +# +# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, +# EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, +# MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. +# See the Mulan PSL v2 for more details. + +import numpy as np + + +class MinMaxScaler: + def __init__(self, feature_range=(0, 1)): + self.feature_range = feature_range + + def _reset(self): + # Checking one attribute is enough, because they are all set together + # in partial_fit + if hasattr(self, "scale_"): + del self.scale_ + del self.min_ + del self.data_min_ + del self.data_max_ + del self.data_range_ + + def fit(self, x): + # Reset internal state before fitting + self._reset() + return self.partial_fit(x) + + def partial_fit(self, x): + feature_range = self.feature_range + if feature_range[0] >= feature_range[1]: + raise ValueError( + "Minimum of desired feature range must be smaller than maximum. Got %s." + % str(feature_range) + ) + + x = check_array(x, copy=True, dtype=np.float64) + data_min = np.nanmin(x, axis=0) + data_max = np.nanmax(x, axis=0) + if hasattr(self, "data_min_"): + data_min = np.minimum(self.data_min_, data_min) + if hasattr(self, "data_max_"): + data_max = np.maximum(self.data_max_, data_max) + + data_range = data_max - data_min + self.scale_ = (feature_range[1] - feature_range[0]) / _handle_zeros_in_scale( + data_range, copy=True + ) + self.min_ = feature_range[0] - data_min * self.scale_ + self.data_min_ = data_min + self.data_max_ = data_max + self.data_range_ = data_range + return self + + def transform(self, x): + self.check_is_fitted() + x = check_array(x, copy=True, dtype=np.float64) + x *= self.scale_ + x += self.min_ + return x + + def inverse_transform(self, x): + self.check_is_fitted() + x = check_array(x, copy=True, dtype=np.float64) + x -= self.min_ + x /= self.scale_ + return x + + def fit_transform(self, x): + self.fit(x) + return self.transform(x) + + def check_is_fitted(self): + for v in vars(self): + if v.endswith("_") and not v.startswith("__"): + return + + raise ValueError("This MinMaxScaler instance is not fitted yet. Call 'fit' " + "with appropriate arguments before using this MinMaxScaler.") + + +def _handle_zeros_in_scale(scale, copy=True, constant_mask=None): + """Set scales of near constant features to 1. + + The goal is to avoid division by very small or zero values. + + Near constant features are detected automatically by identifying + scales close to machine precision unless they are precomputed by + the caller and passed with the `constant_mask` kwarg. + + Typically for standard scaling, the scales are the standard + deviation while near constant features are better detected on the + computed variances which are closer to machine precision by + construction. + """ + # if we are fitting on 1D arrays, scale might be a scalar + if np.isscalar(scale): + if scale == 0.0: + scale = 1.0 + return scale + elif isinstance(scale, np.ndarray): + if constant_mask is None: + # Detect near constant values to avoid dividing by a very small + # value that could lead to surprising results and numerical + # stability issues. + constant_mask = scale < 10 * np.finfo(scale.dtype).eps + + if copy: + # New array to avoid side-effects + scale = scale.copy() + scale[constant_mask] = 1.0 + return scale + + +def check_array(array, dtype, copy=False): + if np.isinf(array).any(): + raise ValueError("Input contains Infinite value.") + if copy: + return np.array(array, dtype=dtype) + else: + return array.astype(dtype) diff --git a/dbmind/components/sqldiag/algorithm/duration_time_model/dnn.py b/dbmind/components/sqldiag/algorithm/duration_time_model/dnn.py index 7296821..1761d91 100644 --- a/dbmind/components/sqldiag/algorithm/duration_time_model/dnn.py +++ b/dbmind/components/sqldiag/algorithm/duration_time_model/dnn.py @@ -6,7 +6,6 @@ import sys from abc import ABC import numpy as np -from sklearn.preprocessing import MinMaxScaler from . import AbstractModel from ..word2vec import Word2Vector @@ -77,6 +76,7 @@ class DnnModel(AbstractModel, ABC): self.w2v.fit(self.data) def fit(self, data): + from sklearn.preprocessing import MinMaxScaler self.build_word2vector(data) list_vec = [] list_cost = [] diff --git a/requirements-aarch64.txt b/requirements-aarch64.txt index 3640e65..0a38b59 100644 --- a/requirements-aarch64.txt +++ b/requirements-aarch64.txt @@ -3,7 +3,6 @@ cryptography==2.5 # for paramiko on openEuler paramiko==2.7.2 numpy==1.16.5 # for openEuler aarch64 scipy==1.6.0 -scikit-learn requests sql-metadata sqlparse diff --git a/requirements-x86.txt b/requirements-x86.txt index 557a418..34135c3 100644 --- a/requirements-x86.txt +++ b/requirements-x86.txt @@ -1,7 +1,7 @@ ## For DBMind-core ## paramiko>=2.7.2 numpy -scikit-learn +scipy requests sql-metadata sqlparse diff --git a/tests/test_detection.py b/tests/test_detection.py index e71a7ea..5db5683 100644 --- a/tests/test_detection.py +++ b/tests/test_detection.py @@ -10,10 +10,8 @@ # EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, # MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. # See the Mulan PSL v2 for more details. -from sklearn.svm import SVR from dbmind.common.algorithm.forecasting import ForecastingFactory -from dbmind.common.algorithm.forecasting.simple_forecasting import SupervisedModel from dbmind.common.types import Sequence linear_seq = Sequence(tuple(range(1, 10)), tuple(range(1, 10))) @@ -36,20 +34,3 @@ def test_linear_regression(): assert roughly_compare(result, range(10, 20)) assert ForecastingFactory.get_instance(linear_seq) is linear - - -def test_supervised_linear_regression(): - linear = SupervisedModel() - linear.fit(linear_seq) - result = linear.forecast(10) - assert len(result) == 10 - assert roughly_compare(result, range(9, 19)) - - -def test_supervised_svr(): - # WARNING: the SVR model with nonlinear kernel does not work. - svr = SupervisedModel(SVR(kernel='linear', verbose=True, max_iter=100)) - svr.fit(linear_seq) - result = svr.forecast(10) - assert len(result) == 10 - assert roughly_compare(result, range(9, 19)) diff --git a/tests/test_preprocessing.py b/tests/test_preprocessing.py new file mode 100644 index 0000000..7876a11 --- /dev/null +++ b/tests/test_preprocessing.py @@ -0,0 +1,37 @@ +# Copyright (c) 2020 Huawei Technologies Co.,Ltd. +# +# openGauss is licensed under Mulan PSL v2. +# You can use this software according to the terms and conditions of the Mulan PSL v2. +# You may obtain a copy of Mulan PSL v2 at: +# +# http://license.coscl.org.cn/MulanPSL2 +# +# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, +# EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, +# MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. +# See the Mulan PSL v2 for more details. + +import numpy as np + +from dbmind.common.algorithm import preprocessing + + +def test_min_max_scaler(): + scaler = preprocessing.MinMaxScaler(feature_range=(0, 1)) + x = [1, 2, 3, 4, np.nan, 5] + y = [2, 3, 4, 5, 6] + + scaler.fit(x) + scaled_y = scaler.transform(y) + assert all(scaled_y == [0.25, 0.5, 0.75, 1., 1.25]) + inversed_y = scaler.inverse_transform(scaled_y) + assert all(inversed_y == y) + + scaled_x = scaler.fit_transform(x) + assert ( + np.isnan(scaled_x).any() and + np.nanmax(scaled_x) == 1.0 and + np.nanmin(scaled_x) == 0.0 + ) + scaled_y = scaler.fit_transform(y) + assert all(scaled_y == [0.0, 0.25, 0.5, 0.75, 1.0]) -- Gitee From e0ad4961d10e352d6dbe132c289c8ca538869a67 Mon Sep 17 00:00:00 2001 From: yuchen Date: Fri, 28 Oct 2022 10:46:42 +0800 Subject: [PATCH 07/14] fix(forecast): remove trend from sequence when calculating periodicity and some other fixes --- .../forecasting/forcasting_algorithm.py | 18 +++++------- dbmind/common/algorithm/seasonal.py | 29 ++++++++++++++----- 2 files changed, 30 insertions(+), 17 deletions(-) diff --git a/dbmind/common/algorithm/forecasting/forcasting_algorithm.py b/dbmind/common/algorithm/forecasting/forcasting_algorithm.py index 317790e..8624708 100644 --- a/dbmind/common/algorithm/forecasting/forcasting_algorithm.py +++ b/dbmind/common/algorithm/forecasting/forcasting_algorithm.py @@ -119,21 +119,19 @@ def decompose_sequence(sequence): def compose_sequence(seasonal_data, train_sequence, forecast_values): forecast_length = len(forecast_values) if seasonal_data and seasonal_data.is_seasonal: - start_index = len(train_sequence) % seasonal_data.period seasonal = seasonal_data.seasonal resid = seasonal_data.resid + resid[np.abs(resid - np.mean(resid)) > np.std(resid) * 3] = np.mean(resid) dbmind_assert(len(seasonal) == len(resid)) - - seasonal, resid = seasonal[start_index:], resid[start_index:] - if len(seasonal) < forecast_length: # pad it. - padding_length = forecast_length - len(seasonal) - seasonal = np.pad(seasonal, (0, padding_length), mode='wrap') - resid = np.pad(resid, (0, padding_length), mode='wrap') + period = seasonal_data.period + latest_period = seasonal[-period:] + resid[-period:] + if len(latest_period) < forecast_length: # pad it. + padding_length = forecast_length - len(latest_period) + addition = np.pad(latest_period, (0, padding_length), mode='wrap') else: - seasonal, resid = seasonal[:forecast_length], resid[:forecast_length] + addition = latest_period[:forecast_length] - resid[np.abs(resid - np.mean(resid)) > np.std(resid) * 3] = np.mean(resid) - forecast_values = seasonal + forecast_values + resid + forecast_values = forecast_values + addition forecast_timestamps = [train_sequence.timestamps[-1] + train_sequence.step * i for i in range(1, forecast_length + 1)] diff --git a/dbmind/common/algorithm/seasonal.py b/dbmind/common/algorithm/seasonal.py index e6ad1658..1fbd18a 100644 --- a/dbmind/common/algorithm/seasonal.py +++ b/dbmind/common/algorithm/seasonal.py @@ -18,6 +18,8 @@ from scipy import signal warnings.filterwarnings("ignore") +MIN_WINDOW = 9 + def acovf(x): # auto-covariances function x = np.array(x) @@ -52,14 +54,27 @@ def is_seasonal_series(x, high_ac_threshold: float = 0.5, min_seasonal_freq=3): At last if the peaks found is fewer than 'min_seasonal_freq', The method thinks the input x sequence is not seasonal. """ + # The periodic analysis is independent of the trend. If the trend components + # are not separated, the autocorrelation will be significantly affected and + # it is difficult to identify the period. So we extract the trend components. + window = max(MIN_WINDOW, len(x) // (min_seasonal_freq + 1)) + detrended = x - decompose_trend(x, np.ones(window) / window) + + ac_coef = acf(detrended, nlags=len(x) - 1) # auto-correlation coefficient - ac_coef = acf(x, nlags=len(x) - 1) # auto-correlation coefficient high_ac_peak_pos = signal.find_peaks(ac_coef)[0] - beyond_threshold = np.argwhere(ac_coef >= high_ac_threshold).flatten()[1:-1] # exclude the first and last + beyond_threshold = np.argwhere(ac_coef >= high_ac_threshold).flatten() high_ac_peak_pos = np.intersect1d(high_ac_peak_pos, beyond_threshold) - - high_ac_peak_pos = high_ac_peak_pos[high_ac_peak_pos < len(ac_coef) // 2] + # Noise in autocorrelation coefficients may be mistaken for peaks. + # According to experience, we think that a sequence with obvious periodicity + # will have a minimum value of the autocorrelation coefficient when moving + # for half a period. Therefore, we think that the period can only appear + # after the first minimum value. + high_ac_peak_pos = high_ac_peak_pos[ + (high_ac_peak_pos < len(ac_coef) // 2) & + (high_ac_peak_pos > np.argmin(ac_coef)) + ] if len(high_ac_peak_pos) - 1 >= min_seasonal_freq: return True, int(high_ac_peak_pos[np.argmax(ac_coef[high_ac_peak_pos])]) @@ -73,7 +88,7 @@ def get_seasonal_period(values, high_ac_threshold: float = 0.5, min_seasonal_fre def _conv_kernel(period): """ If period is even, convolution kernel is [0.5, 1, 1, ... 1, 0.5] with the size of (period + 1) - else if period is od, convolution kernel is [1, 1, ... 1] with the size of period + else if period is odd, convolution kernel is [1, 1, ... 1] with the size of period Make sure the the size of convolution kernel is odd. """ @@ -86,10 +101,10 @@ def _conv_kernel(period): def extrapolate(x, head, tail, length): head_template = x[head:head + length] k = np.polyfit(np.arange(1, len(head_template) + 1), head_template, deg=1) - head = k[0] * np.arange(head) + x[head] - head * k[0] + head = k[0] * np.arange(head) + x[0] - head * k[0] tail_template = x[-tail - length:-tail] k = np.polyfit(np.arange(1, len(tail_template) + 1), tail_template, deg=1) - tail = k[0] * np.arange(tail) + x[-tail] + tail = k[0] * np.arange(tail) + x[-1] + k[0] x = np.r_[head, x, tail] return x -- Gitee From 104baa748f96f500295761967a6e710dc97f6c7a Mon Sep 17 00:00:00 2001 From: likun Date: Fri, 28 Oct 2022 17:30:11 +0800 Subject: [PATCH 08/14] feat(xtuner-recommend): Adapt the necessary parameters for recommend function. --- .../app/optimization/knob_recommendation.py | 276 ++++++++++-------- dbmind/service/utils.py | 2 +- 2 files changed, 161 insertions(+), 117 deletions(-) diff --git a/dbmind/app/optimization/knob_recommendation.py b/dbmind/app/optimization/knob_recommendation.py index 2396588..fef7913 100644 --- a/dbmind/app/optimization/knob_recommendation.py +++ b/dbmind/app/optimization/knob_recommendation.py @@ -12,74 +12,94 @@ # See the Mulan PSL v2 for more details. import logging -from dbmind.common.types import Sequence +from dbmind import global_vars from dbmind.components.xtuner.tuner.character import AbstractMetric from dbmind.components.xtuner.tuner.recommend import recommend_knobs as rk from dbmind.components.xtuner.tuner.utils import cached_property from dbmind.service import dai +from dbmind.service.utils import get_master_instance_address from dbmind.service.utils import SequenceUtils -def try_to_get_latest_metric_value(metric_name, **conditions): - seqs = dai.get_latest_metric_value(metric_name).filter(**conditions).fetchall() - if len(seqs) > 0: - return seqs - # If we can't get latest sequences, we expand retrieve interval. - max_attempts = 3 - starting_attempt_minutes = 5 # minutes - i = 0 - while len(seqs) == 0 and i < max_attempts: - seqs = dai.get_latest_metric_sequence( - metric_name, minutes=starting_attempt_minutes * (2 ** i) # double every time - ).filter(**conditions).fetchall() - i += 1 - if len(seqs) == 0: - return [] - - # aggregation - rv = [] +def _fetch_value_by_rpc(sql, database='postgres', default_val=0): + # Ensure that RPC collects correct data + try: + result = global_vars.agent_rpc_client.call('query_in_database', + sql, + database, + return_tuples=True) + return result[0][0] + except Exception: + return default_val + + +def _fetch_all_value(metric_name, from_server=None, **condition): + if from_server is not None: + seqs = dai.get_latest_metric_value(metric_name=metric_name).from_server( + f"{from_server}").filter(**condition).fetchall() + else: + seqs = dai.get_latest_metric_value(metric_name=metric_name).filter(**condition).fetchall() + return seqs + + +def _fetch_one_value(metric_name, from_server=None, default_val=-1, **condition): + if from_server is None: + seq = dai.get_latest_metric_value(metric_name=metric_name).from_server( + f"{from_server}").filter(**condition).fetchone() + else: + seq = dai.get_latest_metric_value(metric_name=metric_name).filter(**condition).fetchone() + if seq.values: + return seq.values[0] + return default_val + + +def _fetch_one_value_by_host(metric_name, host, default_val=0, **condition): + seqs = dai.get_latest_metric_value(metric_name=metric_name).filter(**condition).fetchall() for seq in seqs: - avg = sum(seq.values) / len(seq) - new_seq = Sequence( - timestamps=(seq.timestamps[-1],), - values=(avg,), - name=seq.name, - step=seq.step, - labels=seq.labels - ) - rv.append(new_seq) - return rv - - -def get_database_hosts(): - database_hosts = set() - seqs = try_to_get_latest_metric_value("pg_uptime") + if host in SequenceUtils.from_server(seq): + return seq.values[0] + return default_val + + +def get_database_addresses(): + database_addresses = set() + seqs = _fetch_all_value("pg_node_info_uptime") for seq in seqs: - host = SequenceUtils.from_server(seq).split(":")[0] - database_hosts.add(host) - return database_hosts + host, port = SequenceUtils.from_server(seq).split(':') + database_addresses.add((host, port)) + return database_addresses def recommend_knobs(): - metric = TSDBMetric() - hosts = get_database_hosts() + metric = RPCAndTSDBMetric() + addresses = get_database_addresses() result = dict() - for host in hosts: + primary_host, primary_port = get_master_instance_address() + for host, port in addresses: try: metric.set_host(host) + metric.set_port(port) + metric.set_address() + if host == primary_host and port == primary_port: + metric.is_rpc_valid = True knobs = rk("recommend", metric) - result[host] = [knobs, metric.to_dict()] + address = "%s:%s" % (host, port) + result[address] = [knobs, metric.to_dict()] except Exception as e: logging.warning( - 'Cannot recommend knobs for the host %s maybe because of information lack.' % host, exc_info=e + 'Cannot recommend knobs for the address %s maybe because of information lack.', f"{host}:{port}", + exc_info=e ) return result -class TSDBMetric(AbstractMetric): +class RPCAndTSDBMetric(AbstractMetric): def __init__(self): + self.is_rpc_valid = False self.database_host = None + self.database_port = None + self.database_address = None AbstractMetric.__init__(self) def __getitem__(self, item): @@ -92,23 +112,26 @@ class TSDBMetric(AbstractMetric): @cached_property def most_xact_db(self): - seqs = try_to_get_latest_metric_value("pg_db_xact_commit") + seqs = _fetch_all_value("pg_db_xact_commit", from_server=self.database_address) database = 'postgres' max_xact_commit = -float("inf") for seq in seqs: - host = SequenceUtils.from_server(seq).split(":")[0] - if self.database_host != host: - continue - xact_commit = seq.values[0] - if xact_commit > max_xact_commit: - database = seq.labels.get("datname") - max_xact_commit = xact_commit - + if seq.values: + xact_commit = seq.values[0] + if xact_commit > max_xact_commit: + database = seq.labels.get("datname") + max_xact_commit = xact_commit return database def set_host(self, database_host): self.database_host = database_host + def set_port(self, database_port): + self.database_port = database_port + + def set_address(self): + self.database_address = "%s:%s" % (self.database_host, self.database_port) + def get_one_value_from_seqs_according_to_database_host(self, seqs, default_val=None): val = default_val for seq in seqs: @@ -116,149 +139,168 @@ class TSDBMetric(AbstractMetric): val = seq.values[0] if self.database_host == host: return val - return val - def fetch_current_guc_value(self, guc_name, default_val=-1): - seqs = try_to_get_latest_metric_value("pg_settings_setting", name=guc_name) - return self.get_one_value_from_seqs_according_to_database_host(seqs, default_val=default_val) - - def fetch_metric_value_on_most_xact_database(self, metric_name, default_val=None): - seqs = try_to_get_latest_metric_value(metric_name, datname=self.most_xact_db) - return self.get_one_value_from_seqs_according_to_database_host(seqs, default_val=default_val) - - def fetch_metric_value(self, metric_name, default_val=None): - seqs = try_to_get_latest_metric_value(metric_name) - return self.get_one_value_from_seqs_according_to_database_host(seqs, default_val=default_val) - - def fetch_the_sum_of_metric_values(self, metric_name): - rv = list() - seqs = try_to_get_latest_metric_value(metric_name) - for seq in seqs: - host = SequenceUtils.from_server(seq).split(":")[0] - if self.database_host == host: - rv.append(seq.values[0]) - return sum(rv) + def fetch_current_guc_value(self, guc_name): + result = _fetch_one_value("pg_settings_setting", from_server=self.database_address, name=guc_name) + return result @property def cache_hit_rate(self): # You could define used internal state here. # this is a demo, cache_hit_rate, we will use it while tuning shared_buffer. - pg_db_blks_hit = self.fetch_metric_value_on_most_xact_database("pg_db_blks_hit") - pg_db_blks_read = self.fetch_metric_value_on_most_xact_database("pg_db_blks_read") + pg_db_blks_hit = _fetch_one_value("pg_db_blks_hit", from_server=self.database_address) + pg_db_blks_read = _fetch_one_value("pg_db_blks_read", from_server=self.database_address) cache_hit_rate = pg_db_blks_hit / (pg_db_blks_hit + pg_db_blks_read + 0.001) - return cache_hit_rate @cached_property def is_64bit(self): - seqs = try_to_get_latest_metric_value("node_uname_info", machine="x86_64") - return self.get_one_value_from_seqs_according_to_database_host(seqs) is not None + seq = _fetch_one_value_by_host("node_uname_info", host=self.database_host, machine="x86_64") + return seq != 0 @property def uptime(self): - return self.fetch_metric_value("pg_uptime") + return _fetch_one_value("pg_node_info_uptime", from_server=self.database_address) @property def current_connections(self): - return self.fetch_metric_value_on_most_xact_database("pg_activity_count") + result = 0 + seqs = _fetch_all_value("pg_stat_activity_count", from_server=self.database_address) + for seq in seqs: + value = seq.values[0] if seq.values else 0 + result += value + return result @property def average_connection_age(self): - return self.fetch_metric_value("pg_avg_time") # unit: second + return _fetch_one_value("pg_stat_activity_p95_state_duration", from_server=self.database_address) @property def all_database_size(self): - return self.fetch_the_sum_of_metric_values("pg_database_size_bytes") # unit: kB + return _fetch_one_value("pg_database_all_size", from_server=self.database_address) / 1024 # unit: kB @property def current_prepared_xacts_count(self): - return self.fetch_metric_value("pg_prepared_xacts_count") + return _fetch_one_value("pg_prepared_xacts_count", from_server=self.database_address) @property def current_locks_count(self): - return self.fetch_metric_value_on_most_xact_database("pg_lock_count") + return _fetch_one_value("pg_lock_count", from_server=self.database_address) @property def checkpoint_proactive_triggering_ratio(self): - return self.fetch_metric_value("pg_stat_bgwriter_checkpoint_proactive_triggering_ratio") + return _fetch_one_value("pg_stat_bgwriter_checkpoint_proactive_triggering_ratio", + from_server=self.database_address, default_val=0) @property def checkpoint_avg_sync_time(self): - return self.fetch_metric_value("pg_stat_bgwriter_checkpoint_avg_sync_time") + return _fetch_one_value("pg_stat_bgwriter_checkpoint_avg_sync_time", from_server=self.database_address) @property def shared_buffer_heap_hit_rate(self): - return self.fetch_metric_value("pg_statio_all_tables_shared_buffer_heap_hit_rate") + if self.is_rpc_valid: + stmt = "select pg_catalog.sum(heap_blks_hit)*100 / (pg_catalog.sum(heap_blks_read) + " \ + "pg_catalog.sum(heap_blks_hit)+1) from pg_statio_all_tables;" + return float(_fetch_value_by_rpc(stmt, database=self.most_xact_db, default_val=100)) + return 100.0 @property def shared_buffer_toast_hit_rate(self): - return self.fetch_metric_value("pg_statio_all_tables_shared_buffer_toast_hit_rate") + if self.is_rpc_valid: + stmt = "select pg_catalog.sum(toast_blks_hit)*100 / (pg_catalog.sum(toast_blks_read) + " \ + "pg_catalog.sum(toast_blks_hit)+1) from pg_statio_all_tables;" + return float(_fetch_value_by_rpc(stmt, database=self.most_xact_db, default_val=100)) + return 100.0 @property def shared_buffer_tidx_hit_rate(self): - return self.fetch_metric_value("pg_statio_all_tables_shared_buffer_tidx_hit_rate") + if self.is_rpc_valid: + stmt = "select pg_catalog.sum(tidx_blks_hit)*100 / (pg_catalog.sum(tidx_blks_read) + " \ + "pg_catalog.sum(tidx_blks_hit)+1) from pg_statio_all_tables;" + return float(_fetch_value_by_rpc(stmt, database=self.most_xact_db, default_val=100)) + return 100.0 @property def shared_buffer_idx_hit_rate(self): - return self.fetch_metric_value("pg_statio_all_tables_shared_buffer_idx_hit_rate") + if self.is_rpc_valid: + stmt = "select pg_catalog.sum(idx_blks_hit)*100/(pg_catalog.sum(idx_blks_read) + " \ + "pg_catalog.sum(idx_blks_hit)+1) from pg_statio_all_tables ;" + return float(_fetch_value_by_rpc(stmt, database=self.most_xact_db, default_val=100)) + return 100.0 @property def temp_file_size(self): - return self.fetch_metric_value("pg_stat_database_temp_file_size") # unit: kB + pg_db_temp_bytes = _fetch_one_value("pg_db_temp_bytes", from_server=self.database_address) + pg_db_temp_files = _fetch_one_value("pg_db_temp_files", from_server=self.database_address) + if pg_db_temp_files == 0: + return 0.0 + return (pg_db_temp_bytes / pg_db_temp_files) / 1024 # unit is kB @property def read_write_ratio(self): - tup_returned = self.fetch_metric_value_on_most_xact_database("pg_db_tup_returned") - tup_inserted = self.fetch_metric_value_on_most_xact_database("pg_db_tup_inserted") - tup_updated = self.fetch_metric_value_on_most_xact_database("pg_db_tup_updated") - tup_deleted = self.fetch_metric_value_on_most_xact_database("pg_db_tup_deleted") + tup_returned = _fetch_one_value("pg_db_tup_returned", from_server=self.database_address, + datname=self.most_xact_db) + tup_inserted = _fetch_one_value("pg_db_tup_inserted", from_server=self.database_address, + datname=self.most_xact_db) + tup_updated = _fetch_one_value("pg_db_tup_updated", from_server=self.database_address, + datname=self.most_xact_db) + tup_deleted = _fetch_one_value("pg_db_tup_deleted", from_server=self.database_address, + datname=self.most_xact_db) res = tup_returned / (tup_inserted + tup_updated + tup_deleted + 0.001) return res @property def search_modify_ratio(self): - tup_returned = self.fetch_metric_value_on_most_xact_database("pg_db_tup_returned") - tup_inserted = self.fetch_metric_value_on_most_xact_database("pg_db_tup_inserted") - tup_updated = self.fetch_metric_value_on_most_xact_database("pg_db_tup_updated") - tup_deleted = self.fetch_metric_value_on_most_xact_database("pg_db_tup_deleted") + tup_returned = _fetch_one_value("pg_db_tup_returned", from_server=self.database_address, + datname=self.most_xact_db) + tup_inserted = _fetch_one_value("pg_db_tup_inserted", from_server=self.database_address, + datname=self.most_xact_db) + tup_updated = _fetch_one_value("pg_db_tup_updated", from_server=self.database_address, + datname=self.most_xact_db) + tup_deleted = _fetch_one_value("pg_db_tup_deleted", from_server=self.database_address, + datname=self.most_xact_db) res = (tup_returned + tup_inserted) / (tup_updated + tup_deleted + 0.01) return res @property def fetched_returned_ratio(self): - tup_returned = self.fetch_metric_value_on_most_xact_database("pg_db_tup_returned") - tup_fetched = self.fetch_metric_value_on_most_xact_database("pg_db_tup_fetched") + tup_returned = _fetch_one_value("pg_db_tup_returned", from_server=self.database_address, + datname=self.most_xact_db) + tup_fetched = _fetch_one_value("pg_db_tup_fetched", from_server=self.database_address, + datname=self.most_xact_db) res = tup_fetched / (tup_returned + 0.01) return res @property def rollback_commit_ratio(self): - xact_commit = self.fetch_metric_value_on_most_xact_database("pg_db_xact_commit") - xact_rollback = self.fetch_metric_value_on_most_xact_database("pg_db_xact_rollback") + xact_commit = _fetch_one_value("pg_db_xact_commit", from_server=self.database_address, + datname=self.most_xact_db) + xact_rollback = _fetch_one_value("pg_db_xact_rollback", from_server=self.database_address, + datname=self.most_xact_db) res = xact_rollback / (xact_commit + 0.01) return res @cached_property def os_cpu_count(self): - cores = self.fetch_metric_value("os_cpu_processor_number") + cores = _fetch_one_value("os_cpu_processor_number", from_server=self.database_host, default_val=1) return int(cores) @property def current_free_mem(self): - return self.fetch_metric_value("node_memory_MemFree_bytes") # unit: kB + return _fetch_one_value_by_host("node_memory_MemFree_bytes", host=self.database_host) # unit is Kb @cached_property def os_mem_total(self): - return self.fetch_metric_value("node_memory_MemTotal_bytes") # unit: kB + return _fetch_one_value_by_host("node_memory_MemTotal_bytes", host=self.database_host) # unit is Kb @cached_property def dirty_background_bytes(self): - return self.fetch_metric_value("node_memory_Dirty_bytes") + return _fetch_one_value_by_host("node_memory_Dirty_bytes", host=self.database_host) # unit is Kb @cached_property def block_size(self): @@ -266,9 +308,9 @@ class TSDBMetric(AbstractMetric): @property def load_average(self): - load1 = self.fetch_metric_value("node_load1") # unit: kB - load5 = self.fetch_metric_value("node_load5") # unit: kB - load15 = self.fetch_metric_value("node_load15") # unit: kB + load1 = _fetch_one_value_by_host("node_load1", host=self.database_host) + load5 = _fetch_one_value_by_host("node_load5", host=self.database_host) + load15 = _fetch_one_value_by_host("node_load15", host=self.database_host) if load1: load1 = load1 / self.os_cpu_count if load5: @@ -280,14 +322,16 @@ class TSDBMetric(AbstractMetric): @cached_property def nb_gaussdb(self): - rv = list() - seqs = try_to_get_latest_metric_value("gaussdb_qps_by_instance") + number = 0 + seqs = _fetch_all_value("gaussdb_qps_by_instance") for seq in seqs: - host = SequenceUtils.from_server(seq).split(":")[0] - if self.database_host == host: - rv.append(seq.values[0]) - return len(rv) + if seq.labels and self.database_host in SequenceUtils.from_server(seq): + number += 1 + return number @cached_property def is_hdd(self): return False + + + diff --git a/dbmind/service/utils.py b/dbmind/service/utils.py index fe0c736..b4d547b 100644 --- a/dbmind/service/utils.py +++ b/dbmind/service/utils.py @@ -51,7 +51,7 @@ def get_master_instance_address(): return_tuples=True) instance_host, instance_port = rows[0][0], rows[0][1] except Exception as e: - logging.exception(e) + logging.warning("Maybe the RPC service isn't started.") instance_host, instance_port = None, None return instance_host, instance_port -- Gitee From 8de15d203adb36a9c8f58b9c9ebb897cc9ff523d Mon Sep 17 00:00:00 2001 From: likun Date: Sat, 29 Oct 2022 10:31:40 +0800 Subject: [PATCH 09/14] feat(opengauss_exporter): Add checkpoint_proactive_triggering_ratio and checkpoint_avg_sync_time in exporter. --- .../components/opengauss_exporter/yamls/default.yml | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/dbmind/components/opengauss_exporter/yamls/default.yml b/dbmind/components/opengauss_exporter/yamls/default.yml index 287e6fa..994dfa4 100644 --- a/dbmind/components/opengauss_exporter/yamls/default.yml +++ b/dbmind/components/opengauss_exporter/yamls/default.yml @@ -759,7 +759,9 @@ pg_stat_bgwriter: query: - name: pg_stat_bgwriter sql: |- - select checkpoint_sync_time / 1000 as total_seconds from pg_stat_bgwriter; + select coalesce((checkpoint_sync_time/(case when (checkpoints_timed + checkpoints_req) >0 then (checkpoints_timed + checkpoints_req) else 1 end)),0) as checkpoint_avg_sync_time, + coalesce((checkpoints_req/(case when (checkpoints_timed + checkpoints_req) >0 then (checkpoints_timed + checkpoints_req) else 1 end)),0) as checkpoint_proactive_triggering_ratio, + checkpoint_sync_time / 1000 as total_seconds from pg_stat_bgwriter; version: '>=0.0.0' timeout: 1 ttl: 10 @@ -769,7 +771,12 @@ pg_stat_bgwriter: - name: total_seconds description: total sync time of checkpoints usage: GAUGE - + - name: checkpoint_avg_sync_time + description: checkpoint_avg_sync_time + usage: GAUGE + - name: checkpoint_proactive_triggering_ratio + description: checkpoint_proactive_triggering_ratio + usage: GAUGE pg_locker: name: pg_locker -- Gitee From f73908003291c7d53ac4dcc61afedb87003bc990 Mon Sep 17 00:00:00 2001 From: likun Date: Sat, 29 Oct 2022 14:17:48 +0800 Subject: [PATCH 10/14] feat(forecast): Add early-warning function in forecast. --- dbmind/components/forecast.py | 171 ++++++++++++++++++++++++++++++---- 1 file changed, 155 insertions(+), 16 deletions(-) diff --git a/dbmind/components/forecast.py b/dbmind/components/forecast.py index ba6dae4..dee694e 100644 --- a/dbmind/components/forecast.py +++ b/dbmind/components/forecast.py @@ -15,17 +15,79 @@ import csv import os import sys import time +import logging +from datetime import datetime +from math import inf import traceback from prettytable import PrettyTable from dbmind import constants from dbmind import global_vars +from dbmind.service import dai from dbmind.cmd.config_utils import load_sys_configs from dbmind.common.utils.cli import keep_inputting_until_correct -from dbmind.common.utils import write_to_terminal -from dbmind.common.utils.checking import path_type, date_type, positive_int_type +from dbmind.common.utils import write_to_terminal, read_simple_config_file +from dbmind.common.utils.checking import path_type, date_type +from dbmind.service.utils import SequenceUtils from dbmind.metadatabase.dao import forecasting_metrics +from dbmind.common.algorithm.forecasting import quickly_forecast +from dbmind.common.utils.exporter import KVPairAction + + +def _get_sequences(metric, host, labels, start_datetime, end_datetime): + result = [] + if labels is None: + sequences = dai.get_metric_sequence(metric, start_datetime, end_datetime).\ + from_server(host).fetchall() + else: + sequences = dai.get_metric_sequence(metric, start_datetime, end_datetime).\ + from_server(host).filter(**labels).fetchall() + for sequence in sequences: + name = metric + address = SequenceUtils.from_server(sequence).strip() + if labels is None: + name += str(sequence.labels) + else: + name += str(labels) + name += ' from ' + address + result.append((name, sequence)) + return result + + +def _initialize_tsdb_param(): + from dbmind.common.tsdb import TsdbClientFactory + global_vars.metric_map = read_simple_config_file(constants.METRIC_MAP_CONFIG) + try: + TsdbClientFactory.set_client_info( + global_vars.configs.get('TSDB', 'name'), + global_vars.configs.get('TSDB', 'host'), + global_vars.configs.get('TSDB', 'port'), + global_vars.configs.get('TSDB', 'username'), + global_vars.configs.get('TSDB', 'password'), + global_vars.configs.get('TSDB', 'ssl_certfile'), + global_vars.configs.get('TSDB', 'ssl_keyfile'), + global_vars.configs.get('TSDB', 'ssl_keyfile_password'), + global_vars.configs.get('TSDB', 'ssl_ca_file') + ) + return True + except Exception as e: + logging.error(e) + return False + + +def _save_forecast_result(result, save_path): + dirname = os.path.dirname(save_path) + os.makedirs(dirname, exist_ok=True) + with open(save_path, mode='w+', newline='') as fp: + writer = csv.writer(fp) + for name, value, timestamp in result: + writer.writerow([name]) + writer.writerow([time.strftime("%Y-%m-%d %H:%M:%S", + time.localtime(int(item / 1000))) for item in timestamp]) + writer.writerow(value) + + os.chmod(save_path, 0o600) def show(metric, host, start_time, end_time): @@ -75,27 +137,98 @@ def clean(retention_days): write_to_terminal('Success to delete redundant results.') +def early_warning(metric, host, start_time, end_time, warning_hours, labels=None, + upper=None, lower=None, save_path=None): + output_table = PrettyTable() + output_table.field_names = ('name', 'warning information') + output_table.align = "l" + if not _initialize_tsdb_param(): + logging.error("TSDB initialization failed.") + return + upper = inf if upper is None else upper + lower = -inf if lower is None else lower + warning_minutes = warning_hours * 60 # convert hours to minutes + if end_time is None: + end_time = int(time.time() * 1000) + if start_time is None: + # The default historical sequence is 3 times the length of the predicted sequence + start_time = end_time - warning_minutes * 60 * 1000 * 3 + start_datetime = datetime.fromtimestamp(start_time / 1000) + end_datetime = datetime.fromtimestamp(end_time / 1000) + sequences = _get_sequences(metric, host, labels, start_datetime, end_datetime) + rows = [] + summary_sequence = [] + for name, sequence in sequences: + if sequence.values[-1] >= upper: + warning_information = "metric has exceeded the warning value." + rows.append((name, warning_information)) + continue + if sequence.values[-1] <= lower: + warning_information = "metric has been less than the warning value." + rows.append((name, warning_information)) + continue + forecast_result = quickly_forecast(sequence, warning_minutes) + if save_path is not None: + summary_sequence.append((name, sequence.values + forecast_result.values, + sequence.timestamps + forecast_result.timestamps)) + if not forecast_result.values: + continue + lower_flag, upper_flag = False, False + for val, timestamp in zip(forecast_result.values, forecast_result.timestamps): + if lower_flag and upper_flag: + break + if not upper_flag and val >= upper: + current_timestamp = int(time.time() * 1000) + remaining_hours = round((timestamp - current_timestamp) / 1000 / 60 / 24, 4) + string_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(int(timestamp / 1000))) + warning_information = "exceed the warning value %s at %s(remaining %s hours)." \ + % (upper, string_time, remaining_hours) + rows.append((name, warning_information)) + upper_flag = True + if not lower_flag and val <= lower: + current_timestamp = int(time.time() * 1000) + remaining_hours = round((timestamp - current_timestamp) / 1000 / 60 / 24, 4) + string_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(int(timestamp / 1000))) + warning_information = "lower than the warning value %s at %s(remaining %s hours)." \ + % (upper, string_time, remaining_hours) + rows.append((name, warning_information)) + lower_flag = True + if not rows: + rows.append((metric, 'No warning found.')) + output_table.add_rows(rows) + print(output_table) + if save_path is not None: + _save_forecast_result(summary_sequence, save_path=save_path) + + def main(argv): parser = argparse.ArgumentParser(description='Workload Forecasting: Forecast monitoring metrics') - parser.add_argument('action', choices=('show', 'clean'), help='choose a functionality to perform') + parser.add_argument('action', choices=('show', 'clean', 'early-warning'), help='Choose a functionality to perform') parser.add_argument('-c', '--conf', metavar='DIRECTORY', required=True, type=path_type, - help='set the directory of configuration files') - + help='Set the directory of configuration files') parser.add_argument('--metric-name', metavar='METRIC_NAME', - help='set a metric name you want to retrieve') + help='Set a metric name you want to retrieve') parser.add_argument('--host', metavar='HOST', - help='set a host you want to retrieve') + help="Set a host you want to retrieve. IP only or IP with port.") + parser.add_argument('--labels', metavar='LABELS', action=KVPairAction, + help='A list of label (format is label=name) separated by comma(,). ' + 'Using in warning.') parser.add_argument('--start-time', metavar='TIMESTAMP_IN_MICROSECONDS', - type=date_type, - help='set a start time for retrieving, ' - 'supporting UNIX-timestamp with microsecond or datetime format') + type=date_type, help='Set a start time for retrieving, ' + 'supporting UNIX-timestamp with microsecond or datetime format') parser.add_argument('--end-time', metavar='TIMESTAMP_IN_MICROSECONDS', - type=date_type, - help='set an end time for retrieving, ' - 'supporting UNIX-timestamp with microsecond or datetime format') + type=date_type, help='Set an end time for retrieving, ' + 'supporting UNIX-timestamp with microsecond or datetime format') parser.add_argument('--retention-days', metavar='DAYS', type=float, - help='clear historical diagnosis results and set ' + help='Clear historical diagnosis results and set ' 'the maximum number of days to retain data') + parser.add_argument('--upper', metavar='UPPER', type=float, + help='The upper value of early-warning. Using in warning.') + parser.add_argument('--lower', metavar='LOWER', type=float, + help='The lower value of early-warning. Using in warning.') + parser.add_argument('--warning-hours', metavar='WARNING-HOURS', type=int, help='warning length, unit is hour.') + parser.add_argument('--csv-dump-path', type=os.path.realpath, + help='Dump the result CSV file to the path if it is specified. Use in warning.') args = parser.parse_args(argv) @@ -116,16 +249,22 @@ def main(argv): inputted_char = keep_inputting_until_correct('Press [A] to agree, press [Q] to quit:', ('A', 'Q')) if inputted_char == 'Q': parser.exit(0, "Quitting due to user's instruction.") - + elif args.action == 'early-warning': + if args.upper is None and args.lower is None: + parser.exit(1, 'You did not specify the upper or lower.') + if args.warning_hours is None: + parser.exit(1, 'You did not specify warning hours.') # Set the global_vars so that DAO can login the meta-database. os.chdir(args.conf) global_vars.configs = load_sys_configs(constants.CONFILE_NAME) - try: if args.action == 'show': show(args.metric_name, args.host, args.start_time, args.end_time) elif args.action == 'clean': clean(args.retention_days) + elif args.action == 'early-warning': + early_warning(args.metric_name, args.host, args.start_time, args.end_time, args.warning_hours, + labels=args.labels, upper=args.upper, lower=args.lower, save_path=args.csv_dump_path) except Exception as e: write_to_terminal('An error occurred probably due to database operations, ' 'please check database configurations. For details:\n' -- Gitee From e52c4d87eb32714ba297de9c5a608b833f8533ab Mon Sep 17 00:00:00 2001 From: yuchen Date: Sat, 29 Oct 2022 15:49:04 +0800 Subject: [PATCH 11/14] fix(is_seasonal_series): fix the acf peaks filter and add test case about it --- dbmind/common/algorithm/seasonal.py | 9 +++++---- tests/test_arima.py | 18 ++++++++++++++++++ 2 files changed, 23 insertions(+), 4 deletions(-) diff --git a/dbmind/common/algorithm/seasonal.py b/dbmind/common/algorithm/seasonal.py index 1fbd18a..91616eb 100644 --- a/dbmind/common/algorithm/seasonal.py +++ b/dbmind/common/algorithm/seasonal.py @@ -61,8 +61,9 @@ def is_seasonal_series(x, high_ac_threshold: float = 0.5, min_seasonal_freq=3): detrended = x - decompose_trend(x, np.ones(window) / window) ac_coef = acf(detrended, nlags=len(x) - 1) # auto-correlation coefficient - - high_ac_peak_pos = signal.find_peaks(ac_coef)[0] + valleys = signal.find_peaks(-ac_coef, height=(0, None))[0] + lower_bound = valleys[0] if valleys.size else 0 + high_ac_peak_pos = signal.find_peaks(ac_coef, height=(0, None))[0] beyond_threshold = np.argwhere(ac_coef >= high_ac_threshold).flatten() high_ac_peak_pos = np.intersect1d(high_ac_peak_pos, beyond_threshold) @@ -73,9 +74,9 @@ def is_seasonal_series(x, high_ac_threshold: float = 0.5, min_seasonal_freq=3): # after the first minimum value. high_ac_peak_pos = high_ac_peak_pos[ (high_ac_peak_pos < len(ac_coef) // 2) & - (high_ac_peak_pos > np.argmin(ac_coef)) + (high_ac_peak_pos > lower_bound) ] - if len(high_ac_peak_pos) - 1 >= min_seasonal_freq: + if len(high_ac_peak_pos) >= min_seasonal_freq: return True, int(high_ac_peak_pos[np.argmax(ac_coef[high_ac_peak_pos])]) return False, None diff --git a/tests/test_arima.py b/tests/test_arima.py index 386236b..aed0b25 100644 --- a/tests/test_arima.py +++ b/tests/test_arima.py @@ -19,6 +19,7 @@ from dbmind.common.algorithm.forecasting.forcasting_algorithm import quickly_for sequence_interpolate from dbmind.common.algorithm.stat_utils import trim_head_and_tail_nan from dbmind.common.types.sequence import Sequence +from dbmind.common.algorithm.seasonal import is_seasonal_series DATA = [13313.158424272488, 13325.379505621688, 13334.55192625661, 13340.650475363756, 13343.772205687826, 13344.39494047619, 13344.166964285712, 13344.142559523809, 13343.943303571428, 13343.560714285712, @@ -5167,3 +5168,20 @@ def test_trim_head_and_tail_nan(): trim_head_and_tail_nan(list_data) trim_head_and_tail_nan(array_data) assert list_data == [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 10, 10] + + +def test_is_seasonal_series(): + x = [10, 20, 30, 40] * 20 + [1, 1, 1, 1] * 5 + [10, 20, 30, 40] * 20 + is_seasonal, period = is_seasonal_series( + x, + high_ac_threshold=0.1, + min_seasonal_freq=2 + ) + assert (is_seasonal and period == 4) + x = 10 + 10 * np.sin(np.linspace(0, 10 * np.pi, 1000)) + np.random.random(1000) * 2 + is_seasonal, period = is_seasonal_series( + x, + high_ac_threshold=0.1, + min_seasonal_freq=2 + ) + assert (is_seasonal and period == 200) -- Gitee From e838e096b20aa5ccd1edbc36101c0982b7ff31c6 Mon Sep 17 00:00:00 2001 From: wangtq Date: Sat, 29 Oct 2022 16:20:06 +0800 Subject: [PATCH 12/14] fix(test): polish some test cases --- .../forecasting/arima_model/arima_alg.py | 17 +++++++++++---- dbmind/common/algorithm/stat_utils.py | 4 +--- dbmind/service/dai.py | 6 +++--- tests/test_dai.py | 21 +++++++++++++++++-- tests/test_metadatabase.py | 5 ++--- 5 files changed, 38 insertions(+), 15 deletions(-) diff --git a/dbmind/common/algorithm/forecasting/arima_model/arima_alg.py b/dbmind/common/algorithm/forecasting/arima_model/arima_alg.py index 9dc6914..1726f66 100644 --- a/dbmind/common/algorithm/forecasting/arima_model/arima_alg.py +++ b/dbmind/common/algorithm/forecasting/arima_model/arima_alg.py @@ -308,7 +308,7 @@ class ARIMA(ForecastingAlgorithm): continue _, p0, q0 = sorted(orders)[0] - for p, q in [(p0-1, q0), (p0, q0-1), (p0+1, q0), (p0, q0+1)]: + for p, q in [(p0 - 1, q0), (p0, q0 - 1), (p0 + 1, q0), (p0, q0 + 1)]: if p < 0 or q < 0: continue @@ -434,10 +434,19 @@ class ARIMA(ForecastingAlgorithm): :param params: type->np.array :return llf: type->float """ - resid = self.get_resid(params) - sigma2 = np.sum(resid ** 2) / float(self.nobs) - llf = -self.nobs * (np.log(2 * np.pi * sigma2) + 1) / 2.0 + """Traditional calculation approach is always: + ``` + sigma2 = np.sum(resid ** 2) / self.nobs + llf = -self.nobs * (np.log(2 * np.pi * sigma2) + 1) / 2.0 + ``` + But, we use the formula derivation result below considering the efficiency. + """ + l2 = np.linalg.norm(resid) + llf = -self.nobs * ( + np.log(2 * np.pi) + 2 * np.log(l2) - np.log(self.nobs) + 1 + ) / 2.0 + return llf @property diff --git a/dbmind/common/algorithm/stat_utils.py b/dbmind/common/algorithm/stat_utils.py index 450cb54..a2807db 100644 --- a/dbmind/common/algorithm/stat_utils.py +++ b/dbmind/common/algorithm/stat_utils.py @@ -10,9 +10,6 @@ # EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, # MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. # See the Mulan PSL v2 for more details. - -from types import SimpleNamespace - import numpy as np from scipy.interpolate import interp1d @@ -72,6 +69,7 @@ def np_double_rolling(values, window=(1, 1), diff_mode="diff", agg='median', tri window2 = 1 if values_length < window[1] else window[1] left_rolling = np_rolling(np_shift(values, 1), window=window1, agg=agg) + # Polish later: this `values[::-1]` can be replaced by reverse scan. right_rolling = np_rolling(values[::-1], window=window2, agg=agg)[::-1] r_data = right_rolling - left_rolling diff --git a/dbmind/service/dai.py b/dbmind/service/dai.py index 605b320..a7c6cdf 100644 --- a/dbmind/service/dai.py +++ b/dbmind/service/dai.py @@ -189,11 +189,11 @@ def estimate_appropriate_step_ms(start_time, end_time): return None ONE_HOUR = 3600 # unit: second - fetch_seconds = (end_time - start_time).seconds - if fetch_seconds <= ONE_HOUR: + total_seconds = (end_time - start_time).total_seconds() + if total_seconds <= ONE_HOUR: return None # return unit: microsecond - return fetch_seconds * interval_second // ONE_HOUR * 1000 or None + return int(total_seconds * interval_second // ONE_HOUR * 1000) or None def get_metric_sequence(metric_name, start_time, end_time, step=None): diff --git a/tests/test_dai.py b/tests/test_dai.py index f34ff5c..e86902a 100644 --- a/tests/test_dai.py +++ b/tests/test_dai.py @@ -10,6 +10,8 @@ # EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, # MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. # See the Mulan PSL v2 for more details. +import datetime + from dbmind.common.tsdb import TsdbClientFactory from dbmind.common.types import Alarm, ALARM_TYPES, ALARM_LEVEL, SlowQuery from dbmind.common.types import RootCause @@ -102,5 +104,20 @@ def test_save_xxx(): dai.save_slow_queries([slow_query, slow_query, slow_query]) -def test_estimate_appropriate_step(): - dai.estimate_appropriate_step_ms() +def test_estimate_appropriate_step(monkeypatch): + end = datetime.datetime.now() + start = end - datetime.timedelta(days=1) + total_seconds = (end - start).total_seconds() + + def validate(): + max_length = total_seconds // default_scrape_interval + + tsdb_client = TsdbClientFactory.get_tsdb_client() + monkeypatch.setattr(tsdb_client, 'scrape_interval', default_scrape_interval) + + step = dai.estimate_appropriate_step_ms(start, end) // 1000 + actual_length = total_seconds // step + assert 0 < actual_length <= max_length + + for default_scrape_interval in range(1, 60, 5): + validate() diff --git a/tests/test_metadatabase.py b/tests/test_metadatabase.py index eb685de..3d315e0 100644 --- a/tests/test_metadatabase.py +++ b/tests/test_metadatabase.py @@ -11,11 +11,10 @@ # MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. # See the Mulan PSL v2 for more details. import os -import time import pytest from sqlalchemy import create_engine -from sqlalchemy.orm import sessionmaker +from sqlalchemy.orm import sessionmaker, session from dbmind.constants import DYNAMIC_CONFIG from dbmind.metadatabase import business_db, Base @@ -46,7 +45,7 @@ def initialize_metadb(): yield # Clean up - session_maker.close_all() + session.close_all_sessions() os.path.exists(dbname) and os.remove(dbname) os.path.exists(DYNAMIC_CONFIG) and os.remove(DYNAMIC_CONFIG) -- Gitee From 4248e3c73842bacc8dd5be3d0b479762a3f7f917 Mon Sep 17 00:00:00 2001 From: guowei Date: Mon, 31 Oct 2022 20:42:29 +0800 Subject: [PATCH 13/14] fix(index_advisor): dbmind service, index name parsing Fixed the bug of paring index name from explain results, and index name match. Adapting index advisor for dbmind service. Display other related indexes for the advised indexes. --- .../_index_recommend_client_driver.py | 32 +++---- .../app/optimization/index_recommendation.py | 88 ++++++++++++++----- .../fetch_statement/fetch_statement.py | 24 ++--- .../executors/driver_executor.py | 6 +- .../index_advisor/index_advisor_workload.py | 88 +++++++++++-------- .../components/index_advisor/process_bar.py | 4 +- .../index_advisor/sql_output_parser.py | 26 +++--- dbmind/components/index_advisor/utils.py | 46 ++++++++-- .../core/opengauss_driver.py | 7 +- tests/test_index_advisor_workload.py | 15 ++++ 10 files changed, 228 insertions(+), 108 deletions(-) diff --git a/dbmind/app/optimization/_index_recommend_client_driver.py b/dbmind/app/optimization/_index_recommend_client_driver.py index 662ff90..132f5cd 100644 --- a/dbmind/app/optimization/_index_recommend_client_driver.py +++ b/dbmind/app/optimization/_index_recommend_client_driver.py @@ -12,9 +12,7 @@ # See the Mulan PSL v2 for more details. from contextlib import contextmanager -from typing import List - -import sqlparse +from typing import List, Tuple, Any from dbmind import global_vars from dbmind.components.index_advisor.executors.common import BaseExecutor @@ -22,22 +20,24 @@ from dbmind.components.index_advisor.executors.common import BaseExecutor class RpcExecutor(BaseExecutor): - def execute_sqls(self, sqls) -> List[str]: + def execute_sqls(self, sqls) -> List[Tuple[Any]]: results = [] sqls = ['set current_schema = %s' % self.get_schema()] + sqls - set_sqls = [] - for sql in sqls: - if sql.strip().upper().startswith('SET'): - set_sqls.append(sql) - else: - session_sql = ';'.join(set_sqls + [sql]) - # such as Select or With or Explain - sql_type = sqlparse.parse(sql)[0].tokens[0].value.upper() - res = global_vars.agent_rpc_client.call('query_in_database', - session_sql, + sqls = [sql.strip().strip(';') for sql in sqls] + sql_results = global_vars.agent_rpc_client.call('query_in_database', + ';'.join(sqls), self.dbname, - return_tuples=True) - results.extend([(sql_type,)] + res if res else [('ERROR',)]) + return_tuples=True, + fetch_all=True) + for sql, sql_res in zip(sqls[1:], sql_results[1:]): + sql_type = sql.upper().strip().split()[0] + if sql_type == 'EXPLAIN': + if sql_res: + results.append((sql_type,)) + else: + results.append(('ERROR',)) + if sql_res: + results.extend(sql_res) return results @contextmanager diff --git a/dbmind/app/optimization/index_recommendation.py b/dbmind/app/optimization/index_recommendation.py index b8b11aa..2b6b220 100644 --- a/dbmind/app/optimization/index_recommendation.py +++ b/dbmind/app/optimization/index_recommendation.py @@ -11,6 +11,7 @@ # MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. # See the Mulan PSL v2 for more details. +import os import logging from collections import defaultdict from datetime import datetime, timedelta @@ -19,9 +20,14 @@ from dbmind import global_vars from dbmind.app.optimization._index_recommend_client_driver import RpcExecutor from dbmind.common.parser.sql_parsing import fill_value, standardize_sql from dbmind.components.extract_log import get_workload_template -from dbmind.components.index_advisor import index_advisor_workload +from dbmind.components.index_advisor import index_advisor_workload, process_bar, utils +from dbmind.components.fetch_statement import fetch_statement from dbmind.service import dai +index_advisor_workload.MAX_INDEX_NUM = global_vars.configs.getint('SELF-OPTIMIZATION', 'max_index_num') +index_advisor_workload.MAX_INDEX_STORAGE = global_vars.configs.getint('SELF-OPTIMIZATION', 'max_index_storage') +process_bar.print = lambda *args, **kwargs: None + class TemplateArgs: def __init__(self, max_reserved_period, max_template_num): @@ -35,35 +41,77 @@ def need_recommend_index(): def get_database_schemas(): database_schemas = defaultdict(list) - results = dai.get_latest_metric_value('pg_class_relsize').fetchall() + results = dai.get_latest_metric_value('pg_database_size_bytes').fetchall() for res in results: - db_name, schema_name = res.labels['datname'], res.labels['nspname'] - if schema_name not in database_schemas[db_name]: - database_schemas[db_name].append(schema_name) + if not res.labels: + continue + db_name = res.labels['datname'] + executor = RpcExecutor(db_name, None, None, None, None, 'public') + schemas = executor.execute_sqls(["select distinct(nspname) FROM pg_namespace nsp JOIN pg_class rel ON " + "nsp.oid = rel.relnamespace WHERE nspname NOT IN " + "('pg_catalog', 'information_schema','snapshot', " + "'dbe_pldeveloper', 'db4ai', 'dbe_perf') AND rel.relkind = 'r';"]) + for schema_tuple in schemas: + database_schemas[db_name].append(schema_tuple[0]) return database_schemas +def is_rpc_available(db_name): + try: + global_vars.agent_rpc_client.call('query_in_database', + 'select 1', + db_name, + return_tuples=True) + return True + except Exception as e: + logging.warning(e) + global_vars.agent_rpc_client = None + return False + + +def rpc_index_advise(executor, templates): + if os.path.exists(utils.logfile): + os.remove(utils.logfile) + utils.logger = logging.getLogger() + # only single threads can be used + index_advisor_workload.get_workload_costs = index_advisor_workload.get_plan_cost + detail_info = index_advisor_workload.index_advisor_workload({'historyIndexes': {}}, executor, templates, + multi_iter_mode=False, show_detail=True, n_distinct=1, + reltuples=10, use_all_columns=True, improved_rate=0.1) + return detail_info + + def do_index_recomm(templatization_args, db_name, schemas, database_templates, optimization_interval): - index_advisor_workload.MAX_INDEX_STORAGE = global_vars.configs.getint('SELF-OPTIMIZATION', 'max_index_storage') + if not is_rpc_available(db_name): + return executor = RpcExecutor(db_name, None, None, None, None, schemas) + start_time = datetime.now() - timedelta(seconds=optimization_interval) + end_time = datetime.now() queries = ( standardize_sql(fill_value(pg_sql_statement_full_count.labels['query'])) for pg_sql_statement_full_count in ( - dai.get_metric_sequence('pg_sql_statement_full_count', - datetime.now() - timedelta(seconds=optimization_interval), - datetime.now()).fetchall() - ) if pg_sql_statement_full_count.labels['datname'] == db_name + dai.get_metric_sequence('pg_sql_statement_full_count', + start_time, + end_time).fetchall() + ) if pg_sql_statement_full_count.labels and pg_sql_statement_full_count.labels['datname'] == db_name ) - get_workload_template(database_templates, queries, templatization_args) - index_advisor_workload.MAX_INDEX_NUM = global_vars.configs.getint('SELF-OPTIMIZATION', 'max_index_num') - index_advisor_workload.MAX_INDEX_STORAGE = global_vars.configs.getint('SELF-OPTIMIZATION', 'max_index_storage') - index_advisor_workload.print = lambda *args, **kwargs: None - index_advisor_workload.logger = logging.getLogger() + queries = list(queries) + if not queries: + _executor = RpcExecutor('postgres', None, None, None, None, 'public') + queries = [] + for schema in schemas.split(','): + queries.extend(fetch_statement.fetch_statements(_executor, 'history', db_name, schema)) + database_templates = {} + get_workload_template(database_templates, queries, templatization_args) + else: + get_workload_template(database_templates, queries, templatization_args) - detail_info = index_advisor_workload.index_advisor_workload({'historyIndexes': {}}, executor, database_templates, - multi_iter_mode=False, show_detail=True, n_distinct=1, - reltuples=10, - use_all_columns=True, improved_rate=0.1) + detail_info = rpc_index_advise(executor, database_templates) detail_info['db_name'] = db_name - detail_info['host'] = dai.get_latest_metric_value('pg_class_relsize').fetchone().labels['from_instance'] + detail_info['host'] = '' + for database_info in dai.get_latest_metric_value('pg_database_all_size').fetchall(): + if database_info.labels: + detail_info['host'] = database_info.labels['from_instance'] + break return detail_info, {db_name: database_templates} + diff --git a/dbmind/components/fetch_statement/fetch_statement.py b/dbmind/components/fetch_statement/fetch_statement.py index 2232474..ad4c1de 100644 --- a/dbmind/components/fetch_statement/fetch_statement.py +++ b/dbmind/components/fetch_statement/fetch_statement.py @@ -33,9 +33,8 @@ def get_fetch_queries(statement_type, database, schema, **kwargs) -> List[str]: "G.sample_time > '{start_time}' and G.sample_time < '{end_time}';" fetch_history_query = "select regexp_replace((CASE WHEN t1.query like '%;' THEN t1.query ELSE " \ "t1.query || ';' END), " \ - "E'[\\n\\r]+', ' ', 'g') as q from dbe_perf.statement " \ - "t1 left join dbe_perf.statement_history t2 ON " \ - "t1.unique_sql_id = t2.unique_query_id where db_name='{database}' and schema_name='{schema}';" + "E'[\\n\\r]+', ' ', 'g') as q from statement_history t1 " \ + " where db_name='{database}' and schema_name='{schema}';" fetch_activity_query = "SELECT regexp_replace((CASE WHEN query like '%;' THEN query ELSE query || ';' END), " \ "E'[\\n\\r]+', ' ', 'g') as q FROM pg_stat_activity WHERE state != 'idle' and " \ "datname='{database}';" @@ -75,7 +74,7 @@ def check_parameter(args): raise ValueError('Please set the start_time and the end_time if you specify asp') if args.start_time: - # compatible with '2022-1-4 1:2:3' + # with the format like '2022-01-04 11:22:33' args.start_time = time.strftime('%Y-%m-%d %H:%M:%S', time.strptime(args.start_time, '%Y-%m-%d %H:%M:%S') @@ -91,10 +90,10 @@ def fetch_statements(conn, statement_type, database, schema, **kwargs): fetch_queries = get_fetch_queries(statement_type, database, schema, **kwargs) statements = [] for _tuple in conn.execute_sqls(fetch_queries): - # filtering non-statement results + # Remove information other than the query results introduced by the executor, e.g., 'total time 10ms' statement = _tuple[0] if statement.startswith(('SET;', 'q;', ';', 'total time')) or statement.endswith(' rows);') or \ - re.match('SELECT \d+;', statement): + re.match(r'SELECT \d+;', statement): continue statement = add_semicolon(statement) statement = replace_comma_with_dollar(statement) @@ -117,8 +116,10 @@ def main(argv): default='public', action=CheckWordValid) arg_parser.add_argument('--statement-type', help='The type of statements you want to fetch', choices=['asp', 'slow', 'history', 'activity'], default='asp') - arg_parser.add_argument('--start-time', help='Start time of statements, format: 2022-10-01 00:00:00') - arg_parser.add_argument('--end-time', help='End time of statements, format: 2022-10-01 00:10:00') + arg_parser.add_argument('--start-time', help="Start time of fetching statements, " + "the format is 'YYYY-MM-DD hh:mm:ss, e.g., 2022-10-01 00:00:00'") + arg_parser.add_argument('--end-time', help="End time of fetching statements, " + "the format is 'YYYY-MM-DD hh:mm:ss, e.g., 2022-10-01 00:00:00'") arg_parser.add_argument('--verify', help='Whether to validate statements', action='store_true') arg_parser.add_argument('--driver', help='Whether to use python driver, default use gsql', @@ -134,15 +135,15 @@ def main(argv): from dbmind.components.index_advisor.executors import driver_executor executor = driver_executor.DriverExecutor except ImportError: - logging.warning('Python driver import failed, ' + logging.warning('Failed to import the Python driver, ' 'the gsql mode will be selected to connect to the database.') - # Get queries via gsql or driver. + # pg_asp data can only be retrieved from the postgres database. conn = executor('postgres', args.db_user, args.W, args.db_host, args.db_port, args.schema) statements = fetch_statements(conn, **vars(args)) # Save the fetched query in a file. - with open(args.output, 'w') as fp: + with open(args.output, 'w+') as fp: for statement in statements: if not args.verify or (args.verify and is_valid_statement(conn, statement)): fp.write(statement + '\n') @@ -150,4 +151,3 @@ def main(argv): if __name__ == '__main__': main(sys.argv[1:]) - diff --git a/dbmind/components/index_advisor/executors/driver_executor.py b/dbmind/components/index_advisor/executors/driver_executor.py index e37f52c..f988033 100644 --- a/dbmind/components/index_advisor/executors/driver_executor.py +++ b/dbmind/components/index_advisor/executors/driver_executor.py @@ -10,13 +10,15 @@ # EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, # MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. # See the Mulan PSL v2 for more details. +import sys +sys.path.append('..') from typing import List from contextlib import contextmanager -import logging import psycopg2 from .common import BaseExecutor +from utils import logger class DriverExecutor(BaseExecutor): @@ -48,7 +50,7 @@ class DriverExecutor(BaseExecutor): except psycopg2.ProgrammingError: return [('ERROR',)] except Exception as e: - logging.warning('Found %s while executing SQL statement.', e) + logger.warning('Found %s while executing SQL statement.', e) return [('ERROR ' + str(e),)] finally: self.conn.rollback() diff --git a/dbmind/components/index_advisor/index_advisor_workload.py b/dbmind/components/index_advisor/index_advisor_workload.py index bc54b79..66e1cf5 100644 --- a/dbmind/components/index_advisor/index_advisor_workload.py +++ b/dbmind/components/index_advisor/index_advisor_workload.py @@ -15,14 +15,13 @@ import argparse import copy import getpass import json -import logging import os import random import re import sys import select +from collections import defaultdict from functools import lru_cache -from logging.handlers import RotatingFileHandler from itertools import groupby, chain, combinations from typing import Tuple, List import heapq @@ -44,7 +43,7 @@ try: AdvisedIndex, ExistingIndex, QueryItem, WorkLoad, QueryType, IndexType, COLUMN_DELIMITER, \ lookfor_subsets_configs, has_dollar_placeholder, generate_placeholder_indexes, \ match_columns, infer_workload_benefit, UniqueList, is_multi_node, hypo_index_ctx, split_iter, \ - replace_comma_with_dollar, flatten + replace_comma_with_dollar, flatten, ERROR_KEYWORD, logger from .process_bar import bar_print, ProcessBar except ImportError: from sql_output_parser import parse_single_advisor_results, parse_explain_plan, \ @@ -59,11 +58,12 @@ except ImportError: AdvisedIndex, ExistingIndex, QueryItem, WorkLoad, QueryType, IndexType, COLUMN_DELIMITER, \ lookfor_subsets_configs, has_dollar_placeholder, generate_placeholder_indexes, \ match_columns, infer_workload_benefit, UniqueList, is_multi_node, hypo_index_ctx, split_iter, \ - replace_comma_with_dollar, flatten + replace_comma_with_dollar, flatten, ERROR_KEYWORD, logger from process_bar import bar_print, ProcessBar SAMPLE_NUM = 5 MAX_INDEX_COLUMN_NUM = 5 +MAX_CANDIDATE_COLUMNS = 40 MAX_INDEX_NUM = None MAX_INDEX_STORAGE = None FULL_ARRANGEMENT_THRESHOLD = 20 @@ -81,16 +81,6 @@ SQL_DISPLAY_PATTERN = [r'\'((\')|(.*?\'))', # match all content in single quote NUMBER_SET_PATTERN, # match integer set in the IN collection r'([^\_\d])\d+(\.\d+)?'] # match single integer -handler = RotatingFileHandler( - filename='index_advisor.log', - maxBytes=100 * 1024 * 1024, - backupCount=5, -) -handler.setLevel(logging.INFO) -handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(funcName)s - %(levelname)s - %(message)s')) -logger = logging.getLogger('index advisor') -logger.addHandler(handler) -logger.setLevel(logging.INFO) os.umask(0o0077) @@ -138,6 +128,19 @@ def get_password(): return password +def is_valid_statement(conn, statement): + """Determine if the query is correct by whether the executor throws an exception.""" + queries = get_prepare_sqls(statement) + res = conn.execute_sqls(queries) + # Rpc executor return [] if the statement is not executed successfully. + if not res: + return False + for _tuple in res: + if ERROR_KEYWORD in _tuple[0].upper(): + return False + return True + + def get_positive_sql_count(candidate_indexes: List[AdvisedIndex], workload: WorkLoad): positive_sql_count = 0 for query in workload.get_queries(): @@ -189,7 +192,7 @@ class IndexAdvisor: candidate_indexes) self.filter_redundant_indexes_with_diff_types(opt_config) self.filter_same_columns_indexes(opt_config, self.workload) - self.display_detail_info['positive_stmt_count'] = get_positive_sql_count(candidate_indexes, + self.display_detail_info['positive_stmt_count'] = get_positive_sql_count(opt_config, self.workload) if len(opt_config) == 0: bar_print("No optimal indexes generated!") @@ -225,7 +228,7 @@ class IndexAdvisor: def simple_index_advisor(self, candidate_indexes: List[AdvisedIndex]): estimate_workload_cost_file(self.executor, self.workload) - for index in ProcessBar(candidate_indexes, 'Optimal indexes'): + for index in GLOBAL_PROCESS_BAR.process_bar(candidate_indexes, 'Optimal indexes'): estimate_workload_cost_file(self.executor, self.workload, (index,)) self.workload.set_index_benefit() self.filter_redundant_indexes_with_diff_types(candidate_indexes) @@ -244,9 +247,9 @@ class IndexAdvisor: sql_optimized = 0 negative_sql_ratio = 0 insert_queries, delete_queries, \ - update_queries, select_queries, \ - positive_queries, ineffective_queries, \ - negative_queries = self.workload.get_index_related_queries(index) + update_queries, select_queries, \ + positive_queries, ineffective_queries, \ + negative_queries = self.workload.get_index_related_queries(index) sql_num = self.workload.get_index_sql_num(index) # Calculate the average benefit of each positive SQL. for query in positive_queries: @@ -282,8 +285,11 @@ class IndexAdvisor: cnt += 1 self.determine_indexes.append(index) - def print_benefits(self): + def print_benefits(self, created_indexes: List[ExistingIndex]): print_header_boundary('Index benefits') + table_indexes = defaultdict(UniqueList) + for index in created_indexes: + table_indexes[index.get_schema_table()].append(index) total_origin_cost = self.workload.get_total_origin_cost() for i, index in enumerate(self.determine_indexes): statement = index.get_index_statement() @@ -292,14 +298,22 @@ class IndexAdvisor: bar_print('\tCost benefit for workload: %.2f' % benefit) bar_print('\tCost improved rate for workload: %.2f%%' % (benefit / total_origin_cost * 100)) + + # invalid indexes caused by recommended indexes source_index = index.get_source_index() if source_index and (not source_index.is_primary_key()) and (not source_index.get_is_unique()): bar_print(f'\tCurrently existing useless indexes:') - bar_print(f'\t\tIndex name: {source_index.get_indexname()}') - bar_print(f'\t\tSchema: {source_index.get_schema()}') - bar_print(f'\t\tTable: {source_index.get_table()}') - bar_print(f'\t\tColumns: {source_index.get_columns()}') + bar_print(f'\t\t{source_index.get_indexdef()}') + + # information about existing indexes + created_indexes = table_indexes.get(index.get_table(), []) + if created_indexes: + bar_print(f'\tExisting indexes of this relation:') + for created_index in created_indexes: + bar_print(f'\t\t{created_index.get_indexdef()}') + bar_print(f'\tImproved query:') + # get benefit rate for subsequent sorting and display query_benefit_rate = [] for query in sorted(index.get_positive_queries(), key=lambda query: -query.get_benefit()): query_origin_cost = self.workload.get_origin_cost_of_query(query) @@ -308,6 +322,7 @@ class IndexAdvisor: if not benefit_rate > 10: continue query_benefit_rate.append((query, benefit_rate)) + # sort query by benefit rate for i, (query, benefit_rate) in enumerate(sorted(query_benefit_rate, key=lambda x: -x[1])): bar_print(f'\t\tQuery {i}: {query.get_statement()}') query_origin_cost = self.workload.get_origin_cost_of_query(query) @@ -380,7 +395,7 @@ class IndexAdvisor: sql_info['sqlDetails'].append(sql_detail) self.record_info(index, sql_info, table_name, statement) - def display_advise_indexes_info(self, show_detail: bool, **kwargs): + def display_advise_indexes_info(self, show_detail: bool): self.display_detail_info['workloadCount'] = int( sum(query.get_frequency() for query in self.workload.get_queries())) self.display_detail_info['recommendIndexes'] = [] @@ -726,7 +741,7 @@ def get_valid_indexes(advised_indexes, original_base_indexes, statement, executo single_column_indexes = generate_single_column_indexes(advised_indexes) valid_indexes, cost = query_index_check(executor, statement, single_column_indexes) valid_indexes = filter_candidate_columns_by_cost(valid_indexes, statement, executor, - kwargs.get('max_candidate_columns')) + kwargs.get('max_candidate_columns', MAX_CANDIDATE_COLUMNS)) valid_indexes, cost = query_index_check(executor, statement, valid_indexes) pre_indexes = valid_indexes[:] @@ -985,7 +1000,7 @@ def powerset(iterable): def generate_sorted_atomic_config(queries: List[QueryItem], - candidate_indexes: List[AdvisedIndex]) -> List[Tuple[AdvisedIndex]]: + candidate_indexes: List[AdvisedIndex]) -> List[Tuple[AdvisedIndex, ...]]: atomic_config_total = [] for query in queries: @@ -1010,7 +1025,8 @@ def generate_sorted_atomic_config(queries: List[QueryItem], return atomic_config_total -def generate_atomic_config_containing_same_columns(candidate_indexes: List[AdvisedIndex]) -> List[Tuple[AdvisedIndex]]: +def generate_atomic_config_containing_same_columns(candidate_indexes: List[AdvisedIndex]) \ + -> List[Tuple[AdvisedIndex, AdvisedIndex]]: atomic_configs = [] for _, _indexes in groupby(sorted(candidate_indexes, key=lambda index: (index.get_table(), index.get_index_type())), key=lambda index: (index.get_table(), index.get_index_type())): @@ -1074,10 +1090,11 @@ def display_useless_redundant_indexes(created_indexes, workload_indexnames, deta def greedy_determine_opt_config(workload: WorkLoad, atomic_config_total: List[Tuple[AdvisedIndex]], candidate_indexes: List[AdvisedIndex]): opt_config = [] - for i in range(len(candidate_indexes)): + candidate_indexes_copy = candidate_indexes[:] + for i in range(len(candidate_indexes_copy)): cur_max_benefit = 0 cur_index = None - for index in candidate_indexes: + for index in candidate_indexes_copy: cur_config = copy.copy(opt_config) cur_config.append(index) cur_estimated_benefit = infer_workload_benefit(workload, cur_config, atomic_config_total) @@ -1088,7 +1105,7 @@ def greedy_determine_opt_config(workload: WorkLoad, atomic_config_total: List[Tu if len(opt_config) == MAX_INDEX_NUM: break opt_config.append(cur_index) - candidate_indexes.remove(cur_index) + candidate_indexes_copy.remove(cur_index) else: break @@ -1136,6 +1153,7 @@ def index_advisor_workload(history_advise_indexes, executor: BaseExecutor, workl multi_iter_mode: bool, show_detail: bool, n_distinct: float, reltuples: int, use_all_columns: bool, **kwargs): queries = compress_workload(workload_file_path) + queries = [query for query in queries if is_valid_statement(executor, query.get_statement())] workload = WorkLoad(queries) candidate_indexes = generate_candidate_indexes(workload, executor, n_distinct, reltuples, use_all_columns, **kwargs) print_candidate_indexes(candidate_indexes) @@ -1158,10 +1176,10 @@ def index_advisor_workload(history_advise_indexes, executor: BaseExecutor, workl for query in index.get_positive_queries())) workload.replace_indexes(tuple(determine_indexes), tuple(index_advisor.determine_indexes)) - index_advisor.display_advise_indexes_info(show_detail, **kwargs) - + index_advisor.display_advise_indexes_info(show_detail) + created_indexes = fetch_created_indexes(executor) if kwargs.get('show_benefits'): - index_advisor.print_benefits() + index_advisor.print_benefits(created_indexes) index_advisor.generate_incremental_index(history_advise_indexes) history_invalid_indexes = {} with executor.session(): @@ -1237,7 +1255,7 @@ def main(argv): default=0.1) arg_parser.add_argument("--max-candidate-columns", type=int, help='Maximum number of columns for candidate indexes', - default=40) + default=MAX_CANDIDATE_COLUMNS) arg_parser.add_argument('--max-index-columns', type=int, help='Maximum number of columns in a joint index', default=4) diff --git a/dbmind/components/index_advisor/process_bar.py b/dbmind/components/index_advisor/process_bar.py index 97a0e89..16bd730 100644 --- a/dbmind/components/index_advisor/process_bar.py +++ b/dbmind/components/index_advisor/process_bar.py @@ -12,6 +12,7 @@ # See the Mulan PSL v2 for more details. import os +import re import time @@ -52,7 +53,6 @@ class ProcessBar: @staticmethod def match(content): - import re p = re.compile('[*>]+') res = p.search(str(content)) if res: @@ -63,9 +63,9 @@ class ProcessBar: def __next__(self): self.process_num += 1 - self.percent = self.process_num / len(self.iterable) if self.process_num > len(self.iterable): raise StopIteration + self.percent = self.process_num / len(self.iterable) self.__processbar() return self.iterable[self.process_num - 1] diff --git a/dbmind/components/index_advisor/sql_output_parser.py b/dbmind/components/index_advisor/sql_output_parser.py index 22c78e4..3c10bc9 100644 --- a/dbmind/components/index_advisor/sql_output_parser.py +++ b/dbmind/components/index_advisor/sql_output_parser.py @@ -18,14 +18,11 @@ import logging from sqlparse.tokens import Punctuation, Keyword, Name try: - from utils import match_table_name, IndexItemFactory, ExistingIndex, AdvisedIndex, get_tokens, UniqueList + from utils import match_table_name, IndexItemFactory, ExistingIndex, AdvisedIndex, get_tokens, UniqueList, \ + QUERY_PLAN_SUFFIX, EXPLAIN_SUFFIX, ERROR_KEYWORD, PREPARE_KEYWORD except ImportError: - from .utils import match_table_name, IndexItemFactory, ExistingIndex, AdvisedIndex, get_tokens, UniqueList - - -QUERY_PLAN_SUFFIX = 'QUERY PLAN' -EXPLAIN_SUFFIX = 'EXPLAIN' -ERROR_KEYWORD = 'ERROR' + from .utils import match_table_name, IndexItemFactory, ExistingIndex, AdvisedIndex, get_tokens, UniqueList, \ + QUERY_PLAN_SUFFIX, EXPLAIN_SUFFIX, ERROR_KEYWORD, PREPARE_KEYWORD def __get_columns_from_indexdef(indexdef): @@ -117,17 +114,20 @@ def parse_explain_plan(results, query_num): index_names = UniqueList() for cur_tuple in results: text = cur_tuple[0] + # Save the results of the last index_names according to the EXPLAIN keyword. if QUERY_PLAN_SUFFIX in text or text == EXPLAIN_SUFFIX: - found_plan = True index_names_list.append(index_names) - index_names = [] + index_names = UniqueList() + found_plan = True + continue + # Consider execution errors and ensure that the cost value of an explain is counted only once. if ERROR_KEYWORD in text and 'prepared statement' not in text: if i >= query_num: logging.info(f'Cannot correct parse the explain results: {results}') raise ValueError("The size of queries is not correct!") costs.append(0) - index_names_list.append([]) - index_names = [] + index_names_list.append(index_names) + index_names = UniqueList() i += 1 if found_plan and '(cost=' in text: if i >= query_num: @@ -147,6 +147,10 @@ def parse_explain_plan(results, query_num): index_names.append(ind2) index_names_list.append(index_names) index_names_list = index_names_list[1:] + + # when a syntax error causes multiple explain queries to be run as one query + while len(index_names_list) < query_num: + index_names_list.append([]) while i < query_num: costs.append(0) i += 1 diff --git a/dbmind/components/index_advisor/utils.py b/dbmind/components/index_advisor/utils.py index 8722b77..53b0a63 100644 --- a/dbmind/components/index_advisor/utils.py +++ b/dbmind/components/index_advisor/utils.py @@ -12,6 +12,8 @@ # See the Mulan PSL v2 for more details. import re +import logging +from logging.handlers import RotatingFileHandler from collections import defaultdict from enum import Enum from functools import lru_cache @@ -22,7 +24,22 @@ import sqlparse from sqlparse.tokens import Name COLUMN_DELIMITER = ', ' - +QUERY_PLAN_SUFFIX = 'QUERY PLAN' +EXPLAIN_SUFFIX = 'EXPLAIN' +ERROR_KEYWORD = 'ERROR' +PREPARE_KEYWORD = 'PREPARE' + +logfile = 'index_advisor.log' +handler = RotatingFileHandler( + filename='index_advisor.log', + maxBytes=100 * 1024 * 1024, + backupCount=5, +) +handler.setLevel(logging.INFO) +handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(funcName)s - %(levelname)s - %(message)s')) +logger = logging.getLogger('index advisor') +logger.addHandler(handler) +logger.setLevel(logging.INFO) class QueryType(Enum): INEFFECTIVE = 0 @@ -135,6 +152,9 @@ class AdvisedIndex: def get_table(self): return self.__table + def get_schema(self): + return self.__table.split('.')[0] + def get_columns(self): return self.__columns @@ -160,9 +180,15 @@ class AdvisedIndex: self.association_indexes[association_indexes_name].append(association_benefit) def match_index_name(self, index_name): - return index_name.endswith(f'btree_{self.get_index_type() + "_" if self.get_index_type() else ""}' - f'{self.get_table().split(".")[-1]}_' - f'{"_".join(self.get_columns().split(COLUMN_DELIMITER))}') + schema = self.get_schema() + if schema == 'public': + return index_name.endswith(f'btree_{self.get_index_type() + "_" if self.get_index_type() else ""}' + f'{self.get_table().split(".")[-1]}_' + f'{"_".join(self.get_columns().split(COLUMN_DELIMITER))}') + else: + return index_name.endswith(f'btree_{self.get_index_type() + "_" if self.get_index_type() else ""}' + f'{self.get_table().replace(".", "_")}_' + f'{"_".join(self.get_columns().split(COLUMN_DELIMITER))}') def __str__(self): return f'table: {self.__table} columns: {self.__columns} index_type: ' \ @@ -377,12 +403,14 @@ class WorkLoad: ineffective_queries.append(query) positive_queries = [query for query in insert_queries + delete_queries + update_queries + select_queries if query not in negative_queries + ineffective_queries] - return insert_queries, delete_queries, update_queries, select_queries, positive_queries, ineffective_queries, negative_queries + return insert_queries, delete_queries, update_queries, select_queries, \ + positive_queries, ineffective_queries, negative_queries @lru_cache(maxsize=None) def get_index_sql_num(self, index: AdvisedIndex): - insert_queries, delete_queries, update_queries, select_queries, positive_queries, ineffective_queries, negative_queries = \ - self.get_index_related_queries(index) + insert_queries, delete_queries, update_queries, \ + select_queries, positive_queries, ineffective_queries, \ + negative_queries = self.get_index_related_queries(index) insert_sql_num = sum(query.get_frequency() for query in insert_queries) delete_sql_num = sum(query.get_frequency() for query in delete_queries) update_sql_num = sum(query.get_frequency() for query in update_queries) @@ -564,7 +592,7 @@ def split_iter(iterable, n): index = 0 res = [] for size in size_list: - res.append(iterable[index:index+size]) + res.append(iterable[index:index + size]) index += size return res @@ -576,3 +604,5 @@ def flatten(iterable): yield item else: yield _iter + + diff --git a/dbmind/components/opengauss_exporter/core/opengauss_driver.py b/dbmind/components/opengauss_exporter/core/opengauss_driver.py index 3c1855c..9fadd79 100644 --- a/dbmind/components/opengauss_exporter/core/opengauss_driver.py +++ b/dbmind/components/opengauss_exporter/core/opengauss_driver.py @@ -15,6 +15,7 @@ import time from concurrent.futures import ThreadPoolExecutor, as_completed +import sqlparse import psycopg2 import psycopg2.errors import psycopg2.extensions @@ -134,10 +135,12 @@ class Driver: result = cursor.fetchall() else: result = [] - for sql in stmt.split(';'): + for sql in sqlparse.split(stmt): cursor.execute(sql) if cursor.pgresult_ptr is not None: - result.extend(cursor.fetchall()) + result.append(cursor.fetchall()) + else: + result.append(None) conn.commit() except psycopg2.extensions.QueryCanceledError as e: logging.error('%s: %s.' % (e.pgerror, stmt)) diff --git a/tests/test_index_advisor_workload.py b/tests/test_index_advisor_workload.py index 237f1da..306ec16 100644 --- a/tests/test_index_advisor_workload.py +++ b/tests/test_index_advisor_workload.py @@ -142,6 +142,9 @@ class IndexTester(unittest.TestCase): index = IndexItemFactory().get_index('public.date_dim', 'd_year', '') index_name = '<123>btree_date_dim_d_year' self.assertEqual(index.match_index_name(index_name), True) + index = IndexItemFactory().get_index('other.temptable', 'int2', '') + index_name = '<625283>btree_other_temptable_int2' + self.assertEqual(index.match_index_name(index_name), True) class QueryItemTester(unittest.TestCase): @@ -411,6 +414,18 @@ class SqlOutPutParserTester(unittest.TestCase): expected_index_ids = ['99920'] self.assertEqual((expected_costs, [[]] * 6), parse_explain_plan(explain_results, query_number)) + # test parsing index name for gsql output + results = [('SET',), ('SET',), ('SET',), ('PREPARE',), + (' QUERY PLAN ',), + ('Index Scan using temptable_int1_idx on temptable (cost=0.00..8.27 rows=1 width=8)',), + ('Index Cond: (int1 = 10)',), + ('(2 rows)',), ('',), ('DEALLOCATE',), ('total time: 1 ms',)] + self.assertEqual(([8.27], [['temptable_int1_idx']]), parse_explain_plan(results, 1)) + results = [('SET',), ('SET',), ('SET',), ('PREPARE',), + (' QUERY PLAN ',), + ('Index Scan using <625283>btree_other_temptable_int2 on temptable (cost=0.00..8.27 rows=1 width=8)',), + ('Index Cond: (int2 = 10)',), ('(2 rows)',), ('',), ('DEALLOCATE',), ('total time: 1 ms',)] + self.assertEqual(([8.27], [['<625283>btree_other_temptable_int2']]), parse_explain_plan(results, 1)) def test_parse_single_advisor_result(self): ori_inputs = [' (public,date_dim,d_year,global)', ' (public,store_sales,"ss_sold_date_sk,ss_item_sk","")'] -- Gitee From 46aba5a50cac9780f2a919e2cc83a6e634ac1585 Mon Sep 17 00:00:00 2001 From: yuchen Date: Tue, 1 Nov 2022 14:35:33 +0800 Subject: [PATCH 14/14] fix(anomaly analysis) fix to fit the new trait --- dbmind/components/anomaly_analysis.py | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/dbmind/components/anomaly_analysis.py b/dbmind/components/anomaly_analysis.py index af50a47..7090cd7 100644 --- a/dbmind/components/anomaly_analysis.py +++ b/dbmind/components/anomaly_analysis.py @@ -49,7 +49,7 @@ http.client.HTTPConnection._http_vsn_str = "HTTP/1.0" def get_sequences(arg): - metric, host, start_datetime, end_datetime, length = arg + metric, host, start_datetime, end_datetime = arg result = [] if global_vars.configs.get('TSDB', 'name') == "prometheus": if ":" in host and not check_ip_valid(host.split(":")[0]) and check_port_valid(host.split(":")[1]): @@ -62,6 +62,10 @@ def get_sequences(arg): host_like = {"dbname": host} seqs = dai.get_metric_sequence(metric, start_datetime, end_datetime).filter_like(**host_like).fetchall() + step = dai.get_metric_sequence(metric, start_datetime, end_datetime).step / 1000 + start_time = datetime.timestamp(start_datetime) + end_time = datetime.timestamp(end_datetime) + length = (end_time - start_time) // step for seq in seqs: if DISTINGUISHING_INSTANCE_LABEL not in seq.labels or len(seq) < 0.9 * length: continue @@ -100,8 +104,8 @@ def multi_process_correlation_calculation(metric, sequence_args): with mp.Pool() as pool: sequence_result = pool.map(get_sequences, iterable=sequence_args) - _, host, start_datetime, end_datetime, length = sequence_args[0] - these_sequences = get_sequences((metric, host, start_datetime, end_datetime, length)) + _, host, start_datetime, end_datetime = sequence_args[0] + these_sequences = get_sequences((metric, host, start_datetime, end_datetime)) if not these_sequences: write_to_terminal('The metric was not found.') @@ -181,18 +185,13 @@ def main(argv): global_vars.configs.get('TSDB', 'ssl_ca_file') ) client = TsdbClientFactory.get_tsdb_client() - interval = client.scrape_interval - if not interval: - raise ValueError(f"Invalid scrape interval {interval}.") - interval *= 1000 all_metrics = client.all_metrics actual_start_time = min(start_time, end_time - LEAST_WINDOW) start_datetime = datetime.fromtimestamp(actual_start_time / 1000) end_datetime = datetime.fromtimestamp(end_time / 1000) - length = (end_time - start_time) // interval - sequence_args = [(metric_name, host, start_datetime, end_datetime, length) for metric_name in all_metrics] + sequence_args = [(metric_name, host, start_datetime, end_datetime) for metric_name in all_metrics] if platform.system() != 'Windows': correlation_result = multi_process_correlation_calculation(metric, sequence_args) @@ -212,7 +211,7 @@ def main(argv): csv_path = os.path.join(args.csv_dump_path, new_name + ".csv") with open(csv_path, 'w+', newline='') as f: writer = csv.writer(f) - for _, name, corr, delay, values in sorted(result[this_name].values(), key=lambda t: t[3]): + for _, name, corr, delay, values in sorted(result[this_name].values(), key=lambda t: (t[3], -t[0])): writer.writerow((name, corr, delay) + values) # Discard the first element abs(corr) after sorting. -- Gitee