From a3146f7539b403db16fb446a130edf8aa971c88a Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Wed, 15 Jan 2025 11:24:53 +0100 Subject: [PATCH] Fix connectivity for `job-statistics collect` --- CHANGES.md | 1 + cratedb_toolkit/io/cli.py | 11 +++---- cratedb_toolkit/model.py | 21 +++++++++++++ cratedb_toolkit/options.py | 11 +++++++ cratedb_toolkit/wtf/cli.py | 41 +++++++++++-------------- cratedb_toolkit/wtf/query_collector.py | 42 ++++++++++++++------------ 6 files changed, 77 insertions(+), 50 deletions(-) create mode 100644 cratedb_toolkit/options.py diff --git a/CHANGES.md b/CHANGES.md index 5fd34485..9dd8cadc 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,6 +1,7 @@ # Changelog ## Unreleased +- Fixed connectivity for `job-statistics collect` ## 2025/01/13 v0.0.30 - Dependencies: Minimize dependencies of core installation, diff --git a/cratedb_toolkit/io/cli.py b/cratedb_toolkit/io/cli.py index 8ebb7729..7c1eb9e7 100644 --- a/cratedb_toolkit/io/cli.py +++ b/cratedb_toolkit/io/cli.py @@ -8,6 +8,7 @@ from cratedb_toolkit.api.main import ClusterBase, ManagedCluster, StandaloneCluster from cratedb_toolkit.model import DatabaseAddress, InputOutputResource, TableAddress +from cratedb_toolkit.options import cratedb_cluster_id_option, cratedb_http_option, cratedb_sqlalchemy_option from cratedb_toolkit.util.cli import boot_click, make_command logger = logging.getLogger(__name__) @@ -27,13 +28,9 @@ def cli(ctx: click.Context, verbose: bool, debug: bool): @make_command(cli, name="table") @click.argument("url") -@click.option( - "--cluster-id", envvar="CRATEDB_CLOUD_CLUSTER_ID", type=str, required=False, help="CrateDB Cloud cluster identifier" -) -@click.option( - "--cratedb-sqlalchemy-url", envvar="CRATEDB_SQLALCHEMY_URL", type=str, required=False, help="CrateDB SQLAlchemy URL" -) -@click.option("--cratedb-http-url", envvar="CRATEDB_HTTP_URL", type=str, required=False, help="CrateDB HTTP URL") +@cratedb_cluster_id_option +@cratedb_http_option +@cratedb_sqlalchemy_option @click.option("--schema", envvar="CRATEDB_SCHEMA", type=str, required=False, help="Schema where to import the data") @click.option("--table", envvar="CRATEDB_TABLE", type=str, required=False, help="Table where to import the data") @click.option("--format", "format_", type=str, required=False, help="File format of the import resource") diff --git a/cratedb_toolkit/model.py b/cratedb_toolkit/model.py index 1fe3a91a..28cf5ba1 100644 --- a/cratedb_toolkit/model.py +++ b/cratedb_toolkit/model.py @@ -77,6 +77,27 @@ def decode(self) -> t.Tuple[URL, "TableAddress"]: uri.path = "" return uri, TableAddress(database, table) + @property + def username(self) -> t.Union[str, None]: + """ + Return the username of the database URI. + """ + return self.uri.username + + @property + def password(self) -> t.Union[str, None]: + """ + Return the password of the database URI. + """ + return self.uri.password + + @property + def schema(self) -> t.Union[str, None]: + """ + Return the `?schema=` query parameter of the database URI. + """ + return self.uri.query_params.get("schema") + @dataclasses.dataclass class TableAddress: diff --git a/cratedb_toolkit/options.py b/cratedb_toolkit/options.py new file mode 100644 index 00000000..ff68dbb7 --- /dev/null +++ b/cratedb_toolkit/options.py @@ -0,0 +1,11 @@ +import click + +cratedb_cluster_id_option = click.option( + "--cluster-id", envvar="CRATEDB_CLOUD_CLUSTER_ID", type=str, required=False, help="CrateDB Cloud cluster identifier" +) +cratedb_sqlalchemy_option = click.option( + "--cratedb-sqlalchemy-url", envvar="CRATEDB_SQLALCHEMY_URL", type=str, required=False, help="CrateDB SQLAlchemy URL" +) +cratedb_http_option = click.option( + "--cratedb-http-url", envvar="CRATEDB_HTTP_URL", type=str, required=False, help="CrateDB HTTP URL" +) diff --git a/cratedb_toolkit/wtf/cli.py b/cratedb_toolkit/wtf/cli.py index 54806432..6d6e0a68 100644 --- a/cratedb_toolkit/wtf/cli.py +++ b/cratedb_toolkit/wtf/cli.py @@ -1,14 +1,14 @@ # Copyright (c) 2021-2024, Crate.io Inc. # Distributed under the terms of the AGPLv3 license, see LICENSE. import logging -import os -import sys import typing as t -import urllib.parse import click +from click import ClickException from click_aliases import ClickAliasedGroup +from cratedb_toolkit.model import DatabaseAddress +from cratedb_toolkit.options import cratedb_http_option, cratedb_sqlalchemy_option from cratedb_toolkit.util import DatabaseAdapter from cratedb_toolkit.util.cli import ( boot_click, @@ -75,26 +75,25 @@ def help_serve(): """ # noqa: E501 -cratedb_sqlalchemy_option = click.option( - "--cratedb-sqlalchemy-url", envvar="CRATEDB_SQLALCHEMY_URL", type=str, required=False, help="CrateDB SQLAlchemy URL" -) - - @click.group(cls=ClickAliasedGroup) # type: ignore[arg-type] @cratedb_sqlalchemy_option +@cratedb_http_option @click.option("--verbose", is_flag=True, required=False, help="Turn on logging") @click.option("--debug", is_flag=True, required=False, help="Turn on logging with debug level") @click.option("--scrub", envvar="SCRUB", is_flag=True, required=False, help="Blank out identifiable information") @click.version_option() @click.pass_context -def cli(ctx: click.Context, cratedb_sqlalchemy_url: str, verbose: bool, debug: bool, scrub: bool): +def cli( + ctx: click.Context, cratedb_sqlalchemy_url: str, cratedb_http_url: str, verbose: bool, debug: bool, scrub: bool +): """ Diagnostics and informational utilities. """ - if not cratedb_sqlalchemy_url: - logger.error("Unable to operate without database address") - sys.exit(1) - ctx.meta.update({"cratedb_sqlalchemy_url": cratedb_sqlalchemy_url, "scrub": scrub}) + if not cratedb_sqlalchemy_url and not cratedb_http_url: + raise ClickException("Unable to operate without database address") + ctx.meta.update( + {"cratedb_http_url": cratedb_http_url, "cratedb_sqlalchemy_url": cratedb_sqlalchemy_url, "scrub": scrub} + ) return boot_click(ctx, verbose, debug) @@ -149,12 +148,12 @@ def job_statistics(ctx: click.Context): def job_statistics_collect(ctx: click.Context, once: bool): """ Run jobs_log collector. - - # TODO: Forward `cratedb_sqlalchemy_url` properly. """ import cratedb_toolkit.wtf.query_collector - cratedb_toolkit.wtf.query_collector.init() + address = DatabaseAddress.from_string(ctx.meta["cratedb_http_url"] or ctx.meta["cratedb_sqlalchemy_url"]) + + cratedb_toolkit.wtf.query_collector.boot(address=address) if once: cratedb_toolkit.wtf.query_collector.record_once() else: @@ -166,17 +165,11 @@ def job_statistics_collect(ctx: click.Context, once: bool): def job_statistics_view(ctx: click.Context): """ View job statistics about collected queries. - - # TODO: Forward `cratedb_sqlalchemy_url` properly. """ - cratedb_sqlalchemy_url = ctx.meta["cratedb_sqlalchemy_url"] - url = urllib.parse.urlparse(cratedb_sqlalchemy_url) - hostname = f"{url.hostname}:{url.port or 4200}" - os.environ["HOSTNAME"] = hostname - import cratedb_toolkit.wtf.query_collector - cratedb_toolkit.wtf.query_collector.init() + address = DatabaseAddress.from_string(ctx.meta["cratedb_http_url"] or ctx.meta["cratedb_sqlalchemy_url"]) + cratedb_toolkit.wtf.query_collector.boot(address=address) response: t.Dict = {"meta": {}, "data": {}} response["meta"]["remark"] = "WIP! This is a work in progress. The output format will change." diff --git a/cratedb_toolkit/wtf/query_collector.py b/cratedb_toolkit/wtf/query_collector.py index 7d6f55bd..510906a5 100644 --- a/cratedb_toolkit/wtf/query_collector.py +++ b/cratedb_toolkit/wtf/query_collector.py @@ -1,4 +1,4 @@ -# Copyright (c) 2021-2024, Crate.io Inc. +# Copyright (c) 2021-2025, Crate.io Inc. # Distributed under the terms of the AGPLv3 license, see LICENSE. # ruff: noqa: S608 @@ -15,17 +15,9 @@ logger = logging.getLogger(__name__) -cratedb_sqlalchemy_url = os.getenv("CRATEDB_SQLALCHEMY_URL", "crate://crate@localhost:4200") -address = DatabaseAddress.from_string(cratedb_sqlalchemy_url) -host = f"{address.uri.host}:{address.uri.port}" -username = address.uri.username -password = address.uri.password -_, table_address = address.decode() -schema = table_address.schema or "stats" - -interval = float(os.getenv("INTERVAL", 10)) -stmt_log_table = os.getenv("STMT_TABLE", f"{schema}.statement_log") -last_exec_table = os.getenv("LAST_EXEC_TABLE", f"{schema}.last_execution") +TRACING = False + + last_execution_ts = 0 sys_jobs_log = {} bucket_list = [10, 50, 100, 500, 1000, 2000, 5000, 10000, 15000, 20000] @@ -43,16 +35,28 @@ "INF": 0, } +stmt_log_table, last_exec_table, cursor, last_scrape = None, None, None, None -urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) -conn = client.connect(host, username=username, password=password) -cursor = conn.cursor() -last_scrape = int(time.time() * 1000) - (interval * 60000) -TRACING = False +def boot(address: DatabaseAddress): + # TODO: Refactor to non-global variables. + global stmt_log_table, last_exec_table, cursor, last_scrape, interval + schema = address.schema or "stats" + + interval = float(os.getenv("INTERVAL", 10)) + stmt_log_table = os.getenv("STMT_TABLE", f"{schema}.statement_log") + last_exec_table = os.getenv("LAST_EXEC_TABLE", f"{schema}.last_execution") + + urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) + logger.info(f"Connecting to {address.dburi}") + conn = client.connect(address.dburi, username=address.username, password=address.password, schema=schema) + cursor = conn.cursor() + last_scrape = int(time.time() * 1000) - (interval * 60000) + + dbinit() -def init(): +def dbinit(): stmt = ( f"CREATE TABLE IF NOT EXISTS {stmt_log_table} " f"(id TEXT, stmt TEXT, calls INT, bucket OBJECT, last_used TIMESTAMP, " @@ -249,7 +253,7 @@ def record_forever(): def main(): - init() + boot() record_forever()