From bb72f0689da01dd64ea26e1e757e5728bef1568a Mon Sep 17 00:00:00 2001 From: Etienne Delclaux Date: Thu, 2 Jan 2025 16:34:48 +0100 Subject: [PATCH] wip 2 --- .../core/imports/checks/dataframe/core.py | 9 +- .../tests/imports/test_imports_synthese.py | 84 +++++++++++-------- backend/geonature/tests/test_gn_meta.py | 4 + 3 files changed, 59 insertions(+), 38 deletions(-) diff --git a/backend/geonature/core/imports/checks/dataframe/core.py b/backend/geonature/core/imports/checks/dataframe/core.py index 2769226215..5497170cb4 100644 --- a/backend/geonature/core/imports/checks/dataframe/core.py +++ b/backend/geonature/core/imports/checks/dataframe/core.py @@ -262,14 +262,13 @@ def check_datasets( authorized_datasets = { str(ds.unique_dataset_id): ds for ds in db.session.execute( - # sa.select(TDatasets) TDatasets.filter_by_creatable( user=imprt.authors[0], module_code=module_code, object_code=object_code ) - .where(TDatasets.unique_dataset_id.in_(uuid)) - .options(sa.orm.raiseload("*")) - ).scalars() - # .unique() + # .where(TDatasets.unique_dataset_id.in_(uuid)) + # .options(sa.orm.raiseload("*")) + ) + .scalars() .all() } authorized_ds_mask = valid_ds_mask & df[uuid_col].isin(authorized_datasets.keys()) diff --git a/backend/geonature/tests/imports/test_imports_synthese.py b/backend/geonature/tests/imports/test_imports_synthese.py index 9f1927f2bc..66745e7cd4 100644 --- a/backend/geonature/tests/imports/test_imports_synthese.py +++ b/backend/geonature/tests/imports/test_imports_synthese.py @@ -137,41 +137,59 @@ def import_dataset(datasets, import_file_name): ds.nomenclature_data_origin = previous_data_origin -@pytest.fixture() -def new_import(synthese_destination, users): - with db.session.begin_nested(): - imprt = TImports( - destination=synthese_destination, - authors=[users["user"]], - ) - db.session.add(imprt) - return imprt +# @pytest.fixture() +# def new_import(synthese_destination, users): +# # admin_user = User.query.filter(User.identifiant == "admin").one() +# with db.session.begin_nested(): +# imprt = TImports( +# destination=synthese_destination, +# authors=[users["user"]], +# ) +# db.session.add(imprt) +# return imprt + + +# @pytest.fixture() +# def uploaded_import(new_import, datasets, import_file_name): +# with db.session.begin_nested(): +# with open(tests_path / "files" / "synthese" / import_file_name, "rb") as f: +# f.seek(0) +# content = f.read() +# if import_file_name == "jdd_to_import_file.csv": +# content = content.replace( +# b"VALID_DATASET_UUID", +# datasets["own_dataset"].unique_dataset_id.hex.encode("ascii"), +# ) +# content = content.replace( +# b"FORBIDDEN_DATASET_UUID", +# datasets["orphan_dataset"].unique_dataset_id.hex.encode("ascii"), +# ) +# content = content.replace( +# b"PRIVATE_DATASET_UUID", +# datasets["private"].unique_dataset_id.hex.encode("ascii"), +# ) +# new_import.full_file_name = "jdd_to_import_file.csv" +# else: +# new_import.full_file_name = "valid_file.csv" +# new_import.source_file = content +# return new_import @pytest.fixture() -def uploaded_import(new_import, datasets, import_file_name): - with db.session.begin_nested(): - with open(tests_path / "files" / "synthese" / import_file_name, "rb") as f: - f.seek(0) - content = f.read() - if import_file_name == "jdd_to_import_file.csv": - content = content.replace( - b"VALID_DATASET_UUID", - datasets["own_dataset"].unique_dataset_id.hex.encode("ascii"), - ) - content = content.replace( - b"FORBIDDEN_DATASET_UUID", - datasets["orphan_dataset"].unique_dataset_id.hex.encode("ascii"), - ) - content = content.replace( - b"PRIVATE_DATASET_UUID", - datasets["private"].unique_dataset_id.hex.encode("ascii"), - ) - new_import.full_file_name = "jdd_to_import_file.csv" - else: - new_import.full_file_name = "valid_file.csv" - new_import.source_file = content - return new_import +def uploaded_import(client, users): + set_logged_user(client, users["user"]) + + filename = "valid_file.csv" + file = (open(tests_path / "files" / "synthese" / filename, "rb"), filename) + + r = client.post( + url_for("import.upload_file", destination="synthese"), + data={"file": file}, + ) + assert r.status_code == 200, r.data + unset_logged_user(client) + # db.session.refresh(uploaded_import) + # return uploaded_import @pytest.fixture() @@ -489,7 +507,7 @@ def get(import_name): assert r.status_code == 200, r.data assert r.json["id_import"] == imports["own_import"].id_import - def test_delete_import(self, users, imported_import): + def test_delete_import(self, g_permissions, users, imported_import): imprt = imported_import transient_table = imprt.destination.get_transient_table() r = self.client.delete(url_for("import.delete_import", import_id=imprt.id_import)) diff --git a/backend/geonature/tests/test_gn_meta.py b/backend/geonature/tests/test_gn_meta.py index ff91775576..466be14096 100644 --- a/backend/geonature/tests/test_gn_meta.py +++ b/backend/geonature/tests/test_gn_meta.py @@ -598,6 +598,10 @@ def test_datasets_permissions(self, app, datasets, users): ) assert set(sc(dsc.filter_by_scope(2, query=qs)).unique().all()) == set( [ + # The code is attempting to access a dataset named "own_dataset" from a dictionary or list + # named "datasets" in Python. However, the code snippet provided is incomplete and lacks + # context, so it is difficult to determine the exact functionality or purpose of this code + # without additional information. datasets["own_dataset"], datasets["own_dataset_not_activated"], datasets["associate_dataset"],