From bcd6156768d598a7ad2f9637713145acd7701559 Mon Sep 17 00:00:00 2001 From: Gouderg Date: Tue, 14 Jan 2025 14:24:08 +0400 Subject: [PATCH] Fix code after mypy verification --- .../manager/ssm_base_manager.py | 103 ++++++++++++++---- .../manager/ssm_default_session.py | 60 +--------- .../manager/ssm_uav_session.py | 4 +- src/seatizen_session/ss_metadata.py | 2 +- src/seatizen_session/ss_zipper.py | 2 +- src/utils/lib_tools.py | 33 +++--- src/zenodo_api/za_error.py | 9 +- 7 files changed, 109 insertions(+), 104 deletions(-) diff --git a/src/seatizen_session/manager/ssm_base_manager.py b/src/seatizen_session/manager/ssm_base_manager.py index e58ea21..d7e0676 100644 --- a/src/seatizen_session/manager/ssm_base_manager.py +++ b/src/seatizen_session/manager/ssm_base_manager.py @@ -14,7 +14,7 @@ from scipy.spatial import ConvexHull from shapely.geometry import LineString, Polygon from ..ss_zipper import SessionZipper -from ...utils.constants import MAXIMAL_DEPOSIT_FILE_SIZE, IMG_EXTENSION, BYTE_TO_GIGA_BYTE, MULTILABEL_MODEL_NAME +from ...utils.constants import MAXIMAL_DEPOSIT_FILE_SIZE, IMG_EXTENSION, BYTE_TO_GIGA_BYTE, MULTILABEL_MODEL_NAME, JACQUES_MODEL_NAME class BaseType(Enum): RGP = "RGP Station from IGN" @@ -38,7 +38,7 @@ def __init__(self, session_path: str, temp_folder: str) -> None: self.temp_folder.mkdir(parents=True, exist_ok=True) # Compute informations. - self.place, self.date, self.country, self.platform = None, None, None, None + self.place, self.date, self.country, self.platform = "", "", "", "" self.compute_basics_info() @abstractmethod @@ -136,7 +136,7 @@ def _zip_folder(self, folder_to_zip: str)-> None: t_start = datetime.now() print(f"Preparing {folder_to_zip} folder") - shutil.make_archive(zip_folder, "zip", raw_folder) + shutil.make_archive(str(zip_folder), "zip", raw_folder) # Add photog report in preview files. if "PHOTOGRAMMETRY" in folder_to_zip: @@ -230,9 +230,9 @@ def _zip_gps_raw(self) -> None: def __set_place(self) -> None: """ Set country and place as variable from session_name. """ place = self.session_name.split("_")[1].split("-") - self.country = pycountry.countries.get(alpha_3=place[0]) - if self.country != None: - self.country = self.country.name.lower().title() + country = pycountry.countries.get(alpha_3=place[0]) + if country != None: + self.country = country.name.lower().title() else: print("[WARNING] Error in country code") self.place = "-".join([a.lower().title() for a in place[1:]]) @@ -283,7 +283,7 @@ def move_into_subfolder_if_needed(self) -> list[Path]: return [self.temp_folder] # We need to move file to subdir - cum_size, nb_ses = 0, 1 + cum_size, nb_ses = 0.0, 1.0 f_to_move = Path(self.temp_folder, "RAW_DATA") f_to_move.mkdir(exist_ok=True) folders_to_upload = [f_to_move] @@ -302,6 +302,63 @@ def move_into_subfolder_if_needed(self) -> list[Path]: print(f"We will have {nb_ses} versions for RAW DATA") return folders_to_upload + def get_jacques_csv(self) -> pd.DataFrame: + """ Return jacques model data from csv.""" + IA_path = Path(self.session_path, "PROCESSED_DATA", "IA") + if not IA_path.exists() or not IA_path.is_dir(): return pd.DataFrame() + + jacques_name = Path() + for file in IA_path.iterdir(): + if JACQUES_MODEL_NAME in file.name: + jacques_name = file + break + + if jacques_name == "": + print("[WARNING] Cannot find jacques predictions file.") + return pd.DataFrame() + + jacques_csv = pd.read_csv(jacques_name) + if len(jacques_csv) == 0: return pd.DataFrame() + + return jacques_csv + + + def get_jacques_stat(self) -> tuple[float, float]: + """ Return proportion of useful/useless. """ + + jacques_csv = self.get_jacques_csv() + if len(jacques_csv) == 0: return 0, 0 + + useful = round(len(jacques_csv[jacques_csv["Useless"] == 0]) * 100 / len(jacques_csv), 2) + useless = round(len(jacques_csv[jacques_csv["Useless"] == 1]) * 100 / len(jacques_csv), 2) + + return useful, useless + + + def get_useful_frames_name(self) -> list[str]: + """ Return a list of frames path predicted useful by jacques. """ + useful_frames = [] + try_ia = False + # Get frame predictions. + df_predictions_gps = self.get_predictions_gps() + if len(df_predictions_gps) == 0: + print(f"Predictions GPS empty for session {self.session_name}\n") + try_ia = True + else: + useful_frames = df_predictions_gps["FileName"].to_list() # CSV without useless images + + if not try_ia: return useful_frames + + print("We didn't find predictions gps, so we try with jacques csv annotations to select useful frames.") + # Cannot find predictions_gps, try with jacques annotation_files + + df_jacques = self.get_jacques_csv() + if len(df_jacques) == 0: return useful_frames + + useful_frames = df_jacques[df_jacques["Useless"] == 0]["FileName"].to_list() + + return useful_frames + def is_video_or_images(self) -> tuple[DCIMType, float]: """ Return media type of raw data. """ @@ -319,12 +376,12 @@ def is_video_or_images(self) -> tuple[DCIMType, float]: return isVideoOrImagesOrNothing, 0 if isVideoOrImagesOrNothing == DCIMType.NONE else self.get_file_dcim_size([".mp4"] if isVideoOrImagesOrNothing == DCIMType.VIDEO else [".jpg", ".jpeg"]) - def get_file_dcim_size(self, extension: str) -> float: + def get_file_dcim_size(self, extension: list[str]) -> float: """ Return Sum of filesize in Gb""" dcim_path = Path(self.session_path, "DCIM") if not Path.exists(dcim_path) or not dcim_path.is_dir(): return 0 - size = 0 + size = 0.0 for file in dcim_path.iterdir(): if file.suffix.lower() in extension: size += os.path.getsize(str(file)) / BYTE_TO_GIGA_BYTE @@ -336,7 +393,7 @@ def get_multilabel_csv(self, isScore: bool = False, indexingByFilename: bool = F IA_path = Path(self.session_path, "PROCESSED_DATA", "IA") if not Path.exists(IA_path) or not IA_path.is_dir(): - return {} + return pd.DataFrame() multilabel_model_filename = None for file in IA_path.iterdir(): @@ -345,7 +402,7 @@ def get_multilabel_csv(self, isScore: bool = False, indexingByFilename: bool = F multilabel_model_filename = file - if multilabel_model_filename == None: return {} + if multilabel_model_filename == None: return pd.DataFrame() index_col = None if indexingByFilename: @@ -357,17 +414,17 @@ def get_multilabel_csv(self, isScore: bool = False, indexingByFilename: bool = F multilabel_model_csv = pd.read_csv(multilabel_model_filename, index_col=index_col) - if len(multilabel_model_csv) == 0: return {} + if len(multilabel_model_csv) == 0: return pd.DataFrame() return multilabel_model_csv def get_predictions_gps(self) -> pd.DataFrame: """ Return predictions_gps content else {} if not found. """ predictions_gps_path = Path(self.session_path, "METADATA", "predictions_gps.csv") - if not Path.exists(predictions_gps_path): return {} + if not Path.exists(predictions_gps_path): return pd.DataFrame() predictions_gps = pd.read_csv(predictions_gps_path) - return predictions_gps if len(predictions_gps) != 0 else {} # Avoid dataframe with just header and no data. + return predictions_gps if len(predictions_gps) != 0 else pd.DataFrame() # Avoid dataframe with just header and no data. def get_predictions_gps_with_filtering(self) -> pd.DataFrame: @@ -375,15 +432,15 @@ def get_predictions_gps_with_filtering(self) -> pd.DataFrame: predictions_gps = self.get_predictions_gps() - if "GPSLongitude" not in predictions_gps or "GPSLatitude" not in predictions_gps: return {} # No GPS coordinate - if round(predictions_gps["GPSLatitude"].std(), 10) == 0.0 or round(predictions_gps["GPSLongitude"].std(), 10) == 0.0: return {} # All frames have the same gps coordinate + if "GPSLongitude" not in predictions_gps or "GPSLatitude" not in predictions_gps: return pd.DataFrame() # No GPS coordinate + if round(predictions_gps["GPSLatitude"].std(), 10) == 0.0 or round(predictions_gps["GPSLongitude"].std(), 10) == 0.0: return pd.DataFrame() # All frames have the same gps coordinate return predictions_gps - def get_frames_list(self) -> list: + def get_frames_list(self) -> list[Path]: """ Return list of frames from relative path in metadata csv. """ - frames_path = [] + frames_path: list[Path] = [] # Get frame relative path. metadata_df = self.get_metadata_csv() @@ -404,7 +461,7 @@ def get_frames_list(self) -> list: return frames_path - def get_frame_parent_folder(self, list_frames: str) -> str: + def get_frame_parent_folder(self, list_frames: list) -> str: """ Extract common parent name from all relative path. """ if len(list_frames) == 0: return "" @@ -429,9 +486,9 @@ def get_metadata_csv(self, indexingByFilename: bool = False) -> pd.DataFrame: metadata_path = Path(self.session_path, "METADATA/metadata.csv") if not Path.exists(metadata_path): print(f"No metadata_csv found for session {self.session_name}\n") - return {} + return pd.DataFrame() - index_col = False + index_col: bool | int = False if indexingByFilename: with open(metadata_path, "r") as f: try: @@ -446,7 +503,7 @@ def get_waypoints_file(self) -> pd.DataFrame: sensors_path = Path(self.session_path, "SENSORS") if not Path.exists(sensors_path) or not sensors_path.is_dir(): print(f"No SENSORS folder for session {self.session_name}") - return {} + return pd.DataFrame() for file in sensors_path.iterdir(): @@ -468,7 +525,7 @@ def get_waypoints_file(self) -> pd.DataFrame: return pd.DataFrame(waypoints, columns=["GPSLatitude", "GPSLongitude"]) print(f"No waypoints file found for session {self.session_name}") - return {} + return pd.DataFrame() def get_bit_size_zip_folder(self) -> dict: diff --git a/src/seatizen_session/manager/ssm_default_session.py b/src/seatizen_session/manager/ssm_default_session.py index 0ee86f0..8c895b9 100644 --- a/src/seatizen_session/manager/ssm_default_session.py +++ b/src/seatizen_session/manager/ssm_default_session.py @@ -222,39 +222,6 @@ def check_frames(self) -> tuple[int, bool]: metadata_df.dropna(how='all', axis=1, inplace=True) isGeoreferenced = "GPSLongitude" in metadata_df and "GPSLatitude" in metadata_df return nb_frames, isGeoreferenced - - - def get_jacques_csv(self) -> pd.DataFrame: - " Return jacques model data from csv." - IA_path = Path(self.session_path, "PROCESSED_DATA", "IA") - if not Path.exists(IA_path) or not IA_path.is_dir(): return {} - - jacques_name = "" - for file in IA_path.iterdir(): - if JACQUES_MODEL_NAME in file.name: - jacques_name = file - break - - if jacques_name == "": - print("[WARNING] Cannot find jacques predictions file.") - return {} - - jacques_csv = pd.read_csv(jacques_name) - if len(jacques_csv) == 0: return {} - - return jacques_csv - - - def get_jacques_stat(self) -> tuple[float, float]: - """ Return proportion of useful/useless. """ - - jacques_csv = self.get_jacques_csv() - if len(jacques_csv) == 0: return 0, 0 - - useful = round(len(jacques_csv[jacques_csv["Useless"] == 0]) * 100 / len(jacques_csv), 2) - useless = round(len(jacques_csv[jacques_csv["Useless"] == 1]) * 100 / len(jacques_csv), 2) - - return useful, useless def get_echo_sounder_name(self) -> str: @@ -275,29 +242,4 @@ def get_prog_json(self) -> dict: with open(prog_path, "r") as f: prog_config = json.load(f) - return prog_config - - - def get_useful_frames_name(self) -> list[str]: - """ Return a list of frames path predicted useful by jacques. """ - useful_frames = [] - try_ia = False - # Get frame predictions. - df_predictions_gps = self.get_predictions_gps() - if len(df_predictions_gps) == 0: - print(f"Predictions GPS empty for session {self.session_name}\n") - try_ia = True - else: - useful_frames = df_predictions_gps["FileName"].to_list() # CSV without useless images - - if not try_ia: return useful_frames - - print("We didn't find predictions gps, so we try with jacques csv annotations to select useful frames.") - # Cannot find predictions_gps, try with jacques annotation_files - - df_jacques = self.get_jacques_csv() - if len(df_jacques) == 0: return useful_frames - - useful_frames = df_jacques[df_jacques["Useless"] == 0]["FileName"].to_list() - - return useful_frames \ No newline at end of file + return prog_config \ No newline at end of file diff --git a/src/seatizen_session/manager/ssm_uav_session.py b/src/seatizen_session/manager/ssm_uav_session.py index 5e3f2e9..6ff0ad6 100644 --- a/src/seatizen_session/manager/ssm_uav_session.py +++ b/src/seatizen_session/manager/ssm_uav_session.py @@ -48,8 +48,8 @@ def __get_survey_information(self) -> str: # Check for video metadata_csv = self.get_metadata_csv() first_image = metadata_csv.iloc[0] - extension = Path(first_image["relative_file_path"]).suffix.lower() - size_images = self.get_file_dcim_size(extension) + extensions = [Path(first_image["relative_file_path"]).suffix.lower()] + size_images = self.get_file_dcim_size(extensions) number_images = len(metadata_csv) camera = first_image["Make"] + " " + first_image["Model"] diff --git a/src/seatizen_session/ss_metadata.py b/src/seatizen_session/ss_metadata.py index 1c2fe64..111d268 100644 --- a/src/seatizen_session/ss_metadata.py +++ b/src/seatizen_session/ss_metadata.py @@ -270,7 +270,7 @@ def build_colaborator_information(data: dict, type_work: str = "Creators") -> di return creators, contributors - def __build_communities(self) -> dict | None: + def __build_communities(self) -> list | None: communities = [{'identifier': name} for name in self.metadata_json[COMMUNITIES_KEY]] return None if len(communities) == 0 else communities diff --git a/src/seatizen_session/ss_zipper.py b/src/seatizen_session/ss_zipper.py index 6b789ac..6dfff9b 100644 --- a/src/seatizen_session/ss_zipper.py +++ b/src/seatizen_session/ss_zipper.py @@ -11,7 +11,7 @@ def __init__(self, base_zip_path: Path) -> None: self.zip_name = Path(base_zip_path).stem self.zip = ZipFile(self.zip_path, "w") - self.tot_zip_size, self.nb_zip_file = 0, 1 + self.tot_zip_size, self.nb_zip_file = 0.0, 1.0 def add_file(self, file: Path, output_struc_file: str | Path) -> None: diff --git a/src/utils/lib_tools.py b/src/utils/lib_tools.py index c2cd8c4..e9e293e 100644 --- a/src/utils/lib_tools.py +++ b/src/utils/lib_tools.py @@ -9,7 +9,7 @@ class Sources(enum.Enum): FOLDER = 1 SESSION = 2 -def get_mode_from_opt(opt) -> Sources: +def get_mode_from_opt(opt) -> Sources | None: """ Retrieve mode from input option """ mode = None @@ -22,36 +22,37 @@ def get_mode_from_opt(opt) -> Sources: return mode -def get_src_from_mode(mode: Sources, opt) -> str: +def get_src_from_mode(mode: Sources, opt) -> Path: """ Retrieve src path from mode """ - src = "" + src = Path() if mode == Sources.CSV_SESSION: - src = opt.path_csv_file + src = Path(opt.path_csv_file) elif mode == Sources.FOLDER: - src = opt.path_folder + src = Path(opt.path_folder) elif mode == Sources.SESSION: - src = opt.path_session + src = Path(opt.path_session) return src def get_list_sessions(opt) -> list[Path]: """ Retrieve list of sessions from input """ - list_sessions = [] + list_sessions: list[Path] = [] mode = get_mode_from_opt(opt) + if mode == None: return list_sessions + src = get_src_from_mode(mode, opt) if mode == Sources.SESSION: - list_sessions = [Path(src)] + list_sessions = [src] elif mode == Sources.FOLDER: - list_sessions = sorted(list(Path(src).iterdir())) + list_sessions = sorted(list(src.iterdir())) elif mode == Sources.CSV_SESSION: - src = Path(src) - if Path.exists(src): + if src.exists(): df_ses = pd.read_csv(src) list_sessions = [Path(row.root_folder, row.session_name) for row in df_ses.itertuples(index=False)] @@ -88,7 +89,7 @@ def get_custom_folders_to_upload(opt) -> list: return folder_to_upload -def clean_doi(doi) -> int | None: +def clean_doi(doi) -> str | None: # check for doi is not float nan if doi != doi or doi in ["", None, np.nan]: return None @@ -96,9 +97,9 @@ def clean_doi(doi) -> int | None: # In case user take the whole url if "zenodo." in doi: doi = doi.split("zenodo.")[1] - return int(doi) + return doi -def get_session_name_doi_from_opt(opt) -> list[tuple[str | None, int | None]]: +def get_session_name_doi_from_opt(opt) -> list[tuple[str | None, str | None]]: """ Return a list who contains tuple (name, doi)""" def clean_name(name): @@ -130,12 +131,12 @@ def get_doi_from_custom_frames_csv(opt) -> dict[str, list[str]]: """ Extract doi from custom_frames_csv """ csv_path = Path(opt.path_custom_frames_csv) - if not Path.exists(csv_path) or not csv_path.is_file(): return [] + if not Path.exists(csv_path) or not csv_path.is_file(): return {} df = pd.read_csv(csv_path) if "version_doi" not in df or "relative_file_path" not in df: print("If you want to download specific frames, you will need to have version_doi column and relative_file_path column.") - return [] + return {} data = {} for doi_unformatted in list(set(df["version_doi"].to_list())): diff --git a/src/zenodo_api/za_error.py b/src/zenodo_api/za_error.py index 0822518..aea4d1a 100644 --- a/src/zenodo_api/za_error.py +++ b/src/zenodo_api/za_error.py @@ -1,4 +1,5 @@ import enum +from requests import Response class ParsingReturnType(enum.Enum): LINK = 1 @@ -9,21 +10,25 @@ class ParsingReturnType(enum.Enum): class ZenodoErrorHandler: + @staticmethod def parse_errors(response) -> None: - print(response.json(), response) + print(response) raise NameError("Something Failed with Zenodo") + @staticmethod def parse_links(response) -> None: [print(key, response["metadata"]["links"][key]) for key in response["metadata"]["links"]] + @staticmethod def parse_files(response) -> None: [print(f) for f in response["files"]] + @staticmethod def parse_metadata(response) -> None: [print(key, response["metadata"][key]) for key in response["metadata"]] @staticmethod - def parse(r, parsing_type=ParsingReturnType.NONE) -> int | None: + def parse(r: Response, parsing_type=ParsingReturnType.NONE) -> int | None: if r.status_code >= 400: ZenodoErrorHandler.parse_errors(r)