Skip to content

Commit

Permalink
commands.extract.extractors.extractor_webarchive - add webarchive ext…
Browse files Browse the repository at this point in the history
…ractor
  • Loading branch information
MatteoCampinoti94 committed Sep 26, 2024
1 parent 50f13df commit d8dc48a
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 1 deletion.
3 changes: 2 additions & 1 deletion digiarch/commands/extract/extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
from .extractors.extractor_msg import MsgExtractor
from .extractors.extractor_patool import PatoolExtractor
from .extractors.extractor_tnef import TNEFExtractor
from .extractors.extractor_webarchive import WebarchiveExtractor
from .extractors.extractor_zip import ZipExtractor


Expand All @@ -57,7 +58,7 @@ def find_extractor(file: File) -> tuple[Type[ExtractorBase] | None, str | None]:
if not file.action_data.extract:
return None, None

for extractor in (ZipExtractor, TNEFExtractor, MsgExtractor, PatoolExtractor):
for extractor in (ZipExtractor, TNEFExtractor, MsgExtractor, PatoolExtractor, WebarchiveExtractor):
if file.action_data.extract.tool in extractor.tool_names:
return extractor, file.action_data.extract.tool

Expand Down
61 changes: 61 additions & 0 deletions digiarch/commands/extract/extractors/extractor_webarchive.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
from pathlib import Path
from plistlib import load as load_plist
from typing import ClassVar
from typing import Generator
from urllib.parse import urlparse

from acacore.utils.functions import find_files
from acacore.utils.functions import rm_tree

from digiarch.commands.extract.extractors.base import ExtractError
from digiarch.commands.extract.extractors.base import ExtractorBase


class WebarchiveExtractor(ExtractorBase):
tool_names: ClassVar[list[str]] = ["webarchive"]

def extract(self) -> Generator[tuple[Path, Path], None, None]:
extract_folder: Path = self.extract_folder
extract_folder_tmp: Path = extract_folder.with_name(extract_folder.name + "_tmp")
rm_tree(extract_folder_tmp)
extract_folder_tmp.mkdir(parents=True, exist_ok=True)

try:
with self.file.get_absolute_path().open("rb") as fh:
archive: dict = load_plist(fh)

url_scheme: str = urlparse(archive["WebMainResource"]["WebResourceURL"]).scheme
url_domain: str = urlparse(archive["WebMainResource"]["WebResourceURL"]).hostname
url_prefix: str = f"{url_scheme}://{url_domain}/"
archive["WebSubresources"] = archive.get("WebSubresources", [])

index_file: Path = extract_folder_tmp.joinpath("index.html")
index_file.write_bytes(archive["WebMainResource"]["WebResourceData"])

for subframe in archive.get("WebSubframeArchives", []):
if not subframe["WebMainResource"]["WebResourceURL"].startswith(url_prefix):
continue
subframe_index: Path = extract_folder_tmp.joinpath(
subframe["WebMainResource"]["WebResourceURL"].removeprefix(url_prefix)
)

subframe_index.parent.mkdir(parents=True, exist_ok=True)
subframe_index.write_bytes(subframe["WebMainResource"]["WebResourceData"])
archive["WebSubresources"].extend(subframe["WebSubresources"])

for resource in archive["WebSubresources"]:
if not resource["WebResourceURL"].startswith(url_prefix):
continue
resource_file: Path = extract_folder_tmp.joinpath(resource["WebResourceURL"].removeprefix(url_prefix))
resource_file.parent.mkdir(parents=True, exist_ok=True)
resource_file.write_bytes(resource["WebResourceData"])

for file in find_files(extract_folder_tmp):
file_new: Path = extract_folder.joinpath(file.relative_to(extract_folder_tmp))
file_new.parent.mkdir(parents=True, exist_ok=True)
file.replace(file_new)
yield file_new, file_new
except KeyError as e:
raise ExtractError(self.file, "Malformed plist, KeyError", *e.args)
finally:
rm_tree(extract_folder_tmp)

0 comments on commit d8dc48a

Please sign in to comment.