diff --git a/build/lib/oras/__init__.py b/build/lib/oras/__init__.py new file mode 100644 index 0000000..3003622 --- /dev/null +++ b/build/lib/oras/__init__.py @@ -0,0 +1 @@ +from oras.version import __version__ diff --git a/build/lib/oras/auth.py b/build/lib/oras/auth.py new file mode 100644 index 0000000..6e3ba5d --- /dev/null +++ b/build/lib/oras/auth.py @@ -0,0 +1,83 @@ +__author__ = "Vanessa Sochat" +__copyright__ = "Copyright The ORAS Authors." +__license__ = "Apache-2.0" + +import base64 +import os +import re +from typing import List, Optional + +import oras.utils +from oras.logger import logger + + +def load_configs(configs: Optional[List[str]] = None): + """ + Load one or more configs with credentials from the filesystem. + + :param configs: list of configuration paths to load, defaults to None + :type configs: optional list + """ + configs = configs or [] + default_config = oras.utils.find_docker_config() + + # Add the default docker config + if default_config: + configs.append(default_config) + configs = set(configs) # type: ignore + + # Load configs until we find our registry hostname + auths = {} + for config in configs: + if not os.path.exists(config): + logger.warning(f"{config} does not exist.") + continue + cfg = oras.utils.read_json(config) + auths.update(cfg.get("auths", {})) + return auths + + +def get_basic_auth(username: str, password: str): + """ + Prepare basic auth from a username and password. + + :param username: the user account name + :type username: str + :param password: the user account password + :type password: str + """ + auth_str = "%s:%s" % (username, password) + return base64.b64encode(auth_str.encode("utf-8")).decode("utf-8") + + +class authHeader: + def __init__(self, lookup: dict): + """ + Given a dictionary of values, match them to class attributes + + :param lookup : dictionary of key,value pairs to parse into auth header + :type lookup: dict + """ + self.service: Optional[str] = None + self.realm: Optional[str] = None + self.scope: Optional[str] = None + for key in lookup: + if key in ["realm", "service", "scope"]: + setattr(self, key, lookup[key]) + + +def parse_auth_header(authHeaderRaw: str) -> authHeader: + """ + Parse authentication header into pieces + + :param username: the user account name + :type username: str + :param password: the user account password + :type password: str + """ + regex = re.compile('([a-zA-z]+)="(.+?)"') + matches = regex.findall(authHeaderRaw) + lookup = dict() + for match in matches: + lookup[match[0]] = match[1] + return authHeader(lookup) diff --git a/build/lib/oras/client.py b/build/lib/oras/client.py new file mode 100644 index 0000000..b788ef2 --- /dev/null +++ b/build/lib/oras/client.py @@ -0,0 +1,246 @@ +__author__ = "Vanessa Sochat" +__copyright__ = "Copyright The ORAS Authors." +__license__ = "Apache-2.0" + + +import sys +from typing import List, Optional, Union + +import oras.auth +import oras.container +import oras.main.login as login +import oras.provider +import oras.utils +import oras.version + + +class OrasClient: + """ + Create an OCI Registry as Storage (ORAS) Client. + + This is intended for controlled interactions. The user of oras-py can use + this client, the terminal command line wrappers, or the functions in main + in isolation as an internal Python API. The user can provide a custom + registry as a parameter, if desired. If not provided we default to standard + oras. + """ + + def __init__( + self, + hostname: Optional[str] = None, + registry: Optional[oras.provider.Registry] = None, + insecure: bool = False, + tls_verify: bool = True, + ): + """ + Create an ORAS client. + + The hostname is the remote registry to ping. + + :param hostname: the hostname of the registry to ping + :type hostname: str + :param registry: if provided, use this custom provider instead of default + :type registry: oras.provider.Registry or None + :param insecure: use http instead of https + :type insecure: bool + """ + self.remote = registry or oras.provider.Registry(hostname, insecure, tls_verify) + + def __repr__(self) -> str: + return str(self) + + def __str__(self) -> str: + return "[oras-client]" + + def set_token_auth(self, token: str): + """ + Set token authentication. + + :param token: the bearer token + :type token: str + """ + self.remote.set_token_auth(token) + + def set_basic_auth(self, username: str, password: str): + """ + Add basic authentication to the request. + + :param username: the user account name + :type username: str + :param password: the user account password + :type password: str + """ + self.remote.set_basic_auth(username, password) + + def version(self, return_items: bool = False) -> Union[dict, str]: + """ + Get the version of the client. + + :param return_items : return the dict of version info instead of string + :type return_items: bool + """ + version = oras.version.__version__ + + python_version = "%s.%s.%s" % ( + sys.version_info.major, + sys.version_info.minor, + sys.version_info.micro, + ) + versions = {"Version": version, "Python version": python_version} + + # If the user wants the dictionary of items returned + if return_items: + return versions + + # Otherwise return a string that can be printed + return "\n".join(["%s: %s" % (k, v) for k, v in versions.items()]) + + def get_tags(self, name: str, N=None) -> List[str]: + """ + Retrieve tags for a package. + + :param name: container URI to parse + :type name: str + :param N: number of tags (None to get all tags) + :type N: int + """ + return self.remote.get_tags(name, N=N) + + def delete_tags(self, name: str, tags=Union[str, list]) -> List[str]: + """ + Delete one or more tags for a unique resource identifier. + + Returns those successfully deleted. + + :param name: container URI to parse + :type name: str + :param tags: single or multiple tags name to delete + :type N: string or list + """ + if isinstance(tags, str): + tags = [tags] + deleted = [] + for tag in tags: + if self.remote.delete_tag(name, tag): + deleted.append(tag) + return deleted + + def push(self, *args, **kwargs): + """ + Push a container to the remote. + """ + return self.remote.push(*args, **kwargs) + + def pull(self, *args, **kwargs): + """ + Pull a container from the remote. + """ + return self.remote.pull(*args, **kwargs) + + def login( + self, + username: str, + password: str, + password_stdin: bool = False, + insecure: bool = False, + tls_verify: bool = True, + hostname: Optional[str] = None, + config_path: Optional[List[str]] = None, + ) -> dict: + """ + Login to a registry. + + :param registry: if provided, use this custom provider instead of default + :type registry: oras.provider.Registry or None + :param username: the user account name + :type username: str + :param password: the user account password + :type password: str + :param password_stdin: get the password from standard input + :type password_stdin: bool + :param insecure: use http instead of https + :type insecure: bool + :param tls_verify: verify tls + :type tls_verify: bool + :param hostname: the hostname to login to + :type hostname: str + :param config_path: list of config paths to add + :type config_path: list + """ + login_func = self._login + if hasattr(self.remote, "login"): + login_func = self.remote.login # type: ignore + return login_func( + username=username, + password=password, + password_stdin=password_stdin, + tls_verify=tls_verify, + hostname=hostname, + config_path=config_path, # type: ignore + ) + + def logout(self, hostname: str): + """ + Logout from a registry, meaning removing any auth (if loaded) + + :param hostname: the hostname to login to + :type hostname: str + """ + self.remote.logout(hostname) + + def _login( + self, + username: Optional[str] = None, + password: Optional[str] = None, + password_stdin: bool = False, + tls_verify: bool = True, + hostname: Optional[str] = None, + config_path: Optional[str] = None, + ) -> dict: + """ + Login to an OCI registry. + + The username and password can come from stdin. Most people use username + password to get a token, so we are disabling providing just a token for + now. A tool that wants to provide a token should use set_token_auth. + """ + # Read password from stdin + if password_stdin: + password = oras.utils.readline() + + # No username, try to get from stdin + if not username: + username = input("Username: ") + + # No password provided + if not password: + password = input("Password: ") + if not password: + raise ValueError("password required") + + # Cut out early if we didn't get what we need + if not password or not username: + return {"Login": "Not successful"} + + # Set basic auth for the client + self.set_basic_auth(username, password) + + # Login + # https://docker-py.readthedocs.io/en/stable/client.html?highlight=login#docker.client.DockerClient.login + try: + client = oras.utils.get_docker_client(tls_verify=tls_verify) + return client.login( + username=username, + password=password, + registry=hostname, + dockercfg_path=config_path, + ) + + # Fallback to manual login + except Exception: + return login.DockerClient().login( + username=username, # type: ignore + password=password, # type: ignore + registry=hostname, # type: ignore + dockercfg_path=config_path, + ) diff --git a/build/lib/oras/container.py b/build/lib/oras/container.py new file mode 100644 index 0000000..81f117d --- /dev/null +++ b/build/lib/oras/container.py @@ -0,0 +1,124 @@ +__author__ = "Vanessa Sochat" +__copyright__ = "Copyright The ORAS Authors." +__license__ = "Apache-2.0" + + +import re +from typing import Optional + +import oras.defaults + +docker_regex = re.compile( + "(?:(?P[^/@]+[.:][^/@]*)/)?" + "(?P(?:[^:@/]+/)+)?" + "(?P[^:@/]+)" + "(?::(?P[^:@]+))?" + "(?:@(?P.+))?" + "$" +) + + +class Container: + def __init__(self, name: str, registry: Optional[str] = None): + """ + Parse a container name and easily get urls for registry interactions. + + :param name: the full name of the container to parse (with any components) + :type name: str + :param registry: a custom registry name, if not provided with URI + :type registry: str + """ + self.registry = registry or oras.defaults.registry.index_name + + # Registry is the name takes precendence + self.parse(name) + + @property + def api_prefix(self): + """ + Return the repository prefix for the v2 API endpoints. + """ + if self.namespace: + return f"{self.namespace}/{self.repository}" + return self.repository + + def get_blob_url(self, digest: str) -> str: + """ + Get the URL to download a blob + + :param digest: the digest to download + :type digest: str + """ + return f"{self.registry}/v2/{self.api_prefix}/blobs/{digest}" + + def upload_blob_url(self) -> str: + return f"{self.registry}/v2/{self.api_prefix}/blobs/uploads/" + + def tags_url(self, N=None) -> str: + if N is None: + return f"{self.registry}/v2/{self.api_prefix}/tags/list" + return f"{self.registry}/v2/{self.api_prefix}/tags/list?n={N}" + + def manifest_url(self, tag: Optional[str] = None) -> str: + """ + Get the manifest url for a specific tag, or the one for this container. + + The tag provided can also correspond to a digest. + + :param tag: an optional tag to provide (if not provided defaults to container) + :type tag: None or str + """ + + # an explicitly defined tag has precedence over everything, + # but from the already defined ones, prefer the digest for consistency. + tag = tag or (self.digest or self.tag) + return f"{self.registry}/v2/{self.api_prefix}/manifests/{tag}" + + def __str__(self) -> str: + return self.uri + + @property + def uri(self) -> str: + """ + Assemble the complete unique resource identifier + """ + if self.namespace: + uri = f"{self.namespace}/{self.repository}" + else: + uri = f"{self.repository}" + if self.registry: + uri = f"{self.registry}/{uri}" + + # Digest takes preference because more specific + if self.digest: + uri = f"{uri}@{self.digest}" + elif self.tag: + uri = f"{uri}:{self.tag}" + return uri + + def parse(self, name: str): + """ + Parse the container name into registry, repository, and tag. + + :param name: the full name of the container to parse (with any components) + :type name: str + """ + match = re.search(docker_regex, name) + if not match: + raise ValueError( + f"{name} does not match a recognized registry unique resource identifier. Try //:" + ) + items = match.groupdict() # type: ignore + self.repository = items["repository"] + self.registry = items["registry"] or self.registry + self.namespace = items["namespace"] + self.tag = items["tag"] or oras.defaults.default_tag + self.digest = items["digest"] + + # Repository is required + if not self.repository: + raise ValueError( + "You are minimally required to include a /" + ) + if self.namespace: + self.namespace = self.namespace.strip("/") diff --git a/build/lib/oras/decorator.py b/build/lib/oras/decorator.py new file mode 100644 index 0000000..885ec52 --- /dev/null +++ b/build/lib/oras/decorator.py @@ -0,0 +1,83 @@ +__author__ = "Vanessa Sochat" +__copyright__ = "Copyright The ORAS Authors." +__license__ = "Apache-2.0" + +import time +from functools import partial, update_wrapper + +from oras.logger import logger + + +class Decorator: + """ + Shared parent decorator class + """ + + def __init__(self, func): + update_wrapper(self, func) + self.func = func + + def __get__(self, obj, objtype): + return partial(self.__call__, obj) + + +class ensure_container(Decorator): + """ + Ensure the first argument is a container, and not a string. + """ + + def __call__(self, cls, *args, **kwargs): + if "container" in kwargs: + kwargs["container"] = cls.get_container(kwargs["container"]) + elif args: + container = cls.get_container(args[0]) + args = (container, *args[1:]) + return self.func(cls, *args, **kwargs) + + +class classretry(Decorator): + """ + Retry a function that is part of a class + """ + + def __init__(self, func, attempts=5, timeout=2): + super().__init__(func) + self.attempts = attempts + self.timeout = timeout + + def __call__(self, cls, *args, **kwargs): + attempt = 0 + attempts = self.attempts + timeout = self.timeout + while attempt < attempts: + try: + return self.func(cls, *args, **kwargs) + except Exception as e: + sleep = timeout + 3**attempt + logger.info(f"Retrying in {sleep} seconds - error: {e}") + time.sleep(sleep) + attempt += 1 + return self.func(cls, *args, **kwargs) + + +def retry(attempts, timeout=2): + """ + A simple retry decorator + """ + + def decorator(func): + def inner(*args, **kwargs): + attempt = 0 + while attempt < attempts: + try: + return func(*args, **kwargs) + except Exception as e: + sleep = timeout + 3**attempt + logger.info(f"Retrying in {sleep} seconds - error: {e}") + time.sleep(sleep) + attempt += 1 + return func(*args, **kwargs) + + return inner + + return decorator diff --git a/build/lib/oras/defaults.py b/build/lib/oras/defaults.py new file mode 100644 index 0000000..2992290 --- /dev/null +++ b/build/lib/oras/defaults.py @@ -0,0 +1,48 @@ +__author__ = "Vanessa Sochat" +__copyright__ = "Copyright The ORAS Authors" +__license__ = "Apache-2.0" + + +# Default tag to use +default_tag = "latest" + + +# https://github.com/moby/moby/blob/master/registry/config.go#L29 +class registry: + index_hostname = "index.docker.io" + index_server = "https://index.docker.io/v1/" + index_name = "docker.io" + default_v2_registry = {"scheme": "https", "host": "registry-1.docker.io"} + + +# DefaultBlobDirMediaType specifies the default blob directory media type +default_blob_dir_media_type = "application/vnd.oci.image.layer.v1.tar+gzip" + +# MediaTypeImageLayer is the media type used for layers referenced by the manifest. +default_blob_media_type = "application/vnd.oci.image.layer.v1.tar" +unknown_config_media_type = "application/vnd.unknown.config.v1+json" +default_manifest_media_type = "application/vnd.oci.image.manifest.v1+json" + +# AnnotationDigest is the annotation key for the digest of the uncompressed content +annotation_digest = "io.deis.oras.content.digest" + +# AnnotationTitle is the annotation key for the human-readable title of the image. +annotation_title = "org.opencontainers.image.title" + +# AnnotationUnpack is the annotation key for indication of unpacking +annotation_unpack = "io.deis.oras.content.unpack" + +# OCIImageIndexFile is the file name of the index from the OCI Image Layout Specification +# Reference: https://github.com/opencontainers/image-spec/blob/master/image-layout.md#indexjson-file +oci_image_index_file = "index.json" + +# DefaultBlocksize default size of each slice of bytes read in each write through in gunzipand untar. +default_blocksize = 32768 + +# what you get for a blank digest, so we don't need to save and recalculate +blank_hash = "sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855" + +# what you get for a blank config digest, so we don't need to save and recalculate +blank_config_hash = ( + "sha256:44136fa355b3678a1146ad16f7e8649e94fb4fc21fe77e8310c060f61caaff8a" +) diff --git a/build/lib/oras/logger.py b/build/lib/oras/logger.py new file mode 100644 index 0000000..2b3c992 --- /dev/null +++ b/build/lib/oras/logger.py @@ -0,0 +1,306 @@ +__author__ = "Vanessa Sochat" +__copyright__ = "Copyright The ORAS Authors." +__license__ = "Apache-2.0" + +import inspect +import logging as _logging +import os +import platform +import sys +import threading +from pathlib import Path +from typing import Optional, Text, TextIO, Union + + +class ColorizingStreamHandler(_logging.StreamHandler): + BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = range(8) + RESET_SEQ = "\033[0m" + COLOR_SEQ = "\033[%dm" + BOLD_SEQ = "\033[1m" + + colors = { + "WARNING": YELLOW, + "INFO": GREEN, + "DEBUG": BLUE, + "CRITICAL": RED, + "ERROR": RED, + } + + def __init__( + self, + nocolor: bool = False, + stream: Union[Text, Path, TextIO] = sys.stderr, + use_threads: bool = False, + ): + """ + Create a new ColorizingStreamHandler + + :param nocolor: do not use color + :type nocolor: bool + :param stream: stream list to this output + :type stream: bool + :param use_threads: use threads! lol + :type use_threads: bool + """ + super().__init__(stream=stream) + self._output_lock = threading.Lock() + self.nocolor = nocolor or not self.can_color_tty() + + def can_color_tty(self) -> bool: + """ + Determine if the tty supports color + """ + if "TERM" in os.environ and os.environ["TERM"] == "dumb": + return False + return self.is_tty and not platform.system() == "Windows" + + @property + def is_tty(self) -> bool: + """ + Determine if we have a tty environment + """ + isatty = getattr(self.stream, "isatty", None) + return isatty and isatty() # type: ignore + + def emit(self, record: _logging.LogRecord): + """ + Emit a log record + + :param record: the record to emit + :type record: logging.LogRecord + """ + with self._output_lock: + try: + self.format(record) # add the message to the record + self.stream.write(self.decorate(record)) + self.stream.write(getattr(self, "terminator", "\n")) + self.flush() + except BrokenPipeError as e: + raise e + except (KeyboardInterrupt, SystemExit): + # ignore any exceptions in these cases as any relevant messages have been printed before + pass + except Exception: + self.handleError(record) + + def decorate(self, record) -> str: + """ + Decorate a log record + + :param record: the record to emit + """ + message = record.message + message = [message] + if not self.nocolor and record.levelname in self.colors: + message.insert(0, self.COLOR_SEQ % (30 + self.colors[record.levelname])) + message.append(self.RESET_SEQ) + return "".join(message) + + +class Logger: + def __init__(self): + """ + Create a new logger + """ + self.logger = _logging.getLogger(__name__) + self.log_handler = [self.text_handler] + self.stream_handler = None + self.printshellcmds = False + self.quiet = False + self.logfile = None + self.last_msg_was_job_info = False + self.logfile_handler = None + + def cleanup(self): + """ + Close open files, etc. for the logger + """ + if self.logfile_handler is not None: + self.logger.removeHandler(self.logfile_handler) + self.logfile_handler.close() + self.log_handler = [self.text_handler] + + def handler(self, msg: dict): + """ + Handle a log message. + + :param msg: the message to handle + :type msg: dict + """ + for handler in self.log_handler: + handler(msg) + + def set_stream_handler(self, stream_handler: _logging.Handler): + """ + Set a stream handler. + + :param stream_handler : the stream handler + :type stream_handler: logging.Handler + """ + if self.stream_handler is not None: + self.logger.removeHandler(self.stream_handler) + self.stream_handler = stream_handler + self.logger.addHandler(stream_handler) + + def set_level(self, level: int): + """ + Set the logging level. + + :param level: the logging level to set + :type level: int + """ + self.logger.setLevel(level) + + def location(self, msg: str): + """ + Debug level message with location info. + + :param msg: the logging message + :type msg: dict + """ + callerframerecord = inspect.stack()[1] + frame = callerframerecord[0] + info = inspect.getframeinfo(frame) + self.debug( + "{}: {info.filename}, {info.function}, {info.lineno}".format(msg, info=info) + ) + + def info(self, msg: str): + """ + Info level message + + :param msg: the informational message + :type msg: str + """ + self.handler({"level": "info", "msg": msg}) + + def warning(self, msg: str): + """ + Warning level message + + :param msg: the warning message + :type msg: str + """ + self.handler({"level": "warning", "msg": msg}) + + def debug(self, msg: str): + """ + Debug level message + + :param msg: the debug message + :type msg: str + """ + self.handler({"level": "debug", "msg": msg}) + + def error(self, msg: str): + """ + Error level message + + :param msg: the error message + :type msg: str + """ + self.handler({"level": "error", "msg": msg}) + + def exit(self, msg: str, return_code: int = 1): + """ + Error level message and exit with error code + + :param msg: the exiting (error) message + :type msg: str + :param return_code: return code to exit on + :type return_code: int + """ + self.handler({"level": "error", "msg": msg}) + sys.exit(return_code) + + def progress(self, done: int, total: int): + """ + Show piece of a progress bar + + :param done: count of total that is complete + :type done: int + :param total: count of total + :type total: int + """ + self.handler({"level": "progress", "done": done, "total": total}) + + def shellcmd(self, msg: Optional[str]): + """ + Shellcmd message + + :param msg: the message + :type msg: str + """ + if msg is not None: + self.handler({"level": "shellcmd", "msg": msg}) + + def text_handler(self, msg: dict): + """ + The default log handler that prints to the console. + + :param msg: the log message dict + :type msg: dict + """ + level = msg["level"] + if level == "info" and not self.quiet: + self.logger.info(msg["msg"]) + if level == "warning": + self.logger.warning(msg["msg"]) + elif level == "error": + self.logger.error(msg["msg"]) + elif level == "debug": + self.logger.debug(msg["msg"]) + elif level == "progress" and not self.quiet: + done = msg["done"] + total = msg["total"] + p = done / total + percent_fmt = ("{:.2%}" if p < 0.01 else "{:.0%}").format(p) + self.logger.info( + "{} of {} steps ({}) done".format(done, total, percent_fmt) + ) + elif level == "shellcmd": + if self.printshellcmds: + self.logger.warning(msg["msg"]) + + +logger = Logger() + + +def setup_logger( + quiet: bool = False, + printshellcmds: bool = False, + nocolor: bool = False, + stdout: bool = False, + debug: bool = False, + use_threads: bool = False, +): + """ + Setup the logger. This should be called from an init or client. + + :param quiet: set logging level to quiet + :type quiet: bool + :param printshellcmds: a special level to print shell commands + :type printshellcmds: bool + :param nocolor: do not use color + :type nocolor: bool + :param stdout: print to standard output for the logger + :type stdout: bool + :param debug: debug level logging + :type debug: bool + :param use_threads: use threads! + :type use_threads: bool + """ + # console output only if no custom logger was specified + stream_handler = ColorizingStreamHandler( + nocolor=nocolor, + stream=sys.stdout if stdout else sys.stderr, + use_threads=use_threads, + ) + level = _logging.INFO + if debug: + level = _logging.DEBUG + + logger.set_stream_handler(stream_handler) + logger.set_level(level) + logger.quiet = quiet + logger.printshellcmds = printshellcmds diff --git a/build/lib/oras/main/__init__.py b/build/lib/oras/main/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/build/lib/oras/main/login.py b/build/lib/oras/main/login.py new file mode 100644 index 0000000..9b5e493 --- /dev/null +++ b/build/lib/oras/main/login.py @@ -0,0 +1,52 @@ +__author__ = "Vanessa Sochat" +__copyright__ = "Copyright The ORAS Authors." +__license__ = "Apache-2.0" + +import os +from typing import Optional + +import oras.auth +import oras.utils + + +class DockerClient: + """ + If running inside a container (or similar without docker) do a manual login + """ + + def login( + self, + username: str, + password: str, + registry: str, + dockercfg_path: Optional[str] = None, + ) -> dict: + """ + Manual login means loading and checking the config file + + :param registry: if provided, use this custom provider instead of default + :type registry: oras.provider.Registry or None + :param username: the user account name + :type username: str + :param password: the user account password + :type password: str + :param dockercfg_str: docker config path + :type dockercfg_str: list + """ + if not dockercfg_path: + dockercfg_path = oras.utils.find_docker_config(exists=False) + if os.path.exists(dockercfg_path): # type: ignore + cfg = oras.utils.read_json(dockercfg_path) # type: ignore + else: + oras.utils.mkdir_p(os.path.dirname(dockercfg_path)) # type: ignore + cfg = {"auths": {}} + if registry in cfg["auths"]: + cfg["auths"][registry]["auth"] = oras.auth.get_basic_auth( + username, password + ) + else: + cfg["auths"][registry] = { + "auth": oras.auth.get_basic_auth(username, password) + } + oras.utils.write_json(cfg, dockercfg_path) # type: ignore + return {"Status": "Login Succeeded"} diff --git a/build/lib/oras/oci.py b/build/lib/oras/oci.py new file mode 100644 index 0000000..4265f7d --- /dev/null +++ b/build/lib/oras/oci.py @@ -0,0 +1,153 @@ +__author__ = "Vanessa Sochat" +__copyright__ = "Copyright The ORAS Authors." +__license__ = "Apache-2.0" + +import copy +import os +from typing import Dict, Optional, Tuple + +import jsonschema + +import oras.defaults +import oras.schemas +import oras.utils + +EmptyManifest = { + "schemaVersion": 2, + "mediaType": oras.defaults.default_manifest_media_type, + "config": {}, + "layers": [], + "annotations": {}, +} + + +class Annotations: + """ + Create a new set of annotations + """ + + def __init__(self, filename=None): + self.lookup = {} + self.load(filename) + + def add(self, section, key, value): + """ + Add key/value pairs to a named section. + """ + if section not in self.lookup: + self.lookup[section] = {} + self.lookup[section][key] = value + + def load(self, filename: str): + if filename and os.path.exists(filename): + self.lookup = oras.utils.read_json(filename) + if filename and not os.path.exists(filename): + raise FileNotFoundError(f"Annotation file {filename} does not exist.") + + def get_annotations(self, section: str) -> dict: + """ + Given the name (a relative path or named section) get annotations + """ + for name in section, os.path.abspath(section): + if name in self.lookup: + return self.lookup[name] + return {} + + +class Layer: + def __init__( + self, blob_path: str, media_type: Optional[str] = None, is_dir: bool = False + ): + """ + Create a new Layer + + :param blob_path: the path of the blob for the layer + :type blob_path: str + :param media_type: media type for the blob (optional) + :type media_type: str + :param is_dir: is the blob a directory? + :type is_dir: bool + """ + self.blob_path = blob_path + self.set_media_type(media_type, is_dir) + + def set_media_type(self, media_type: Optional[str] = None, is_dir: bool = False): + """ + Vary the media type to be directory or default layer + + :param media_type: media type for the blob (optional) + :type media_type: str + :param is_dir: is the blob a directory? + :type is_dir: bool + """ + self.media_type = media_type + if is_dir and not media_type: + self.media_type = oras.defaults.default_blob_dir_media_type + elif not is_dir and not media_type: + self.media_type = oras.defaults.default_blob_media_type + + def to_dict(self): + """ + Return a dictionary representation of the layer + """ + layer = { + "mediaType": self.media_type, + "size": oras.utils.get_size(self.blob_path), + "digest": "sha256:" + oras.utils.get_file_hash(self.blob_path), + } + jsonschema.validate(layer, schema=oras.schemas.layer) + return layer + + +def NewLayer( + blob_path: str, media_type: Optional[str] = None, is_dir: bool = False +) -> dict: + """ + Courtesy function to create and retrieve a layer as dict + + :param blob_path: the path of the blob for the layer + :type blob_path: str + :param media_type: media type for the blob (optional) + :type media_type: str + :param is_dir: is the blob a directory? + :type is_dir: bool + """ + return Layer(blob_path=blob_path, media_type=media_type, is_dir=is_dir).to_dict() + + +def ManifestConfig( + path: Optional[str] = None, media_type: Optional[str] = None +) -> Tuple[Dict[str, object], Optional[str]]: + """ + Write an empty config, if one is not provided + + :param path: the path of the manifest config, if exists. + :type path: str + :param media_type: media type for the manifest config (optional) + :type media_type: str + """ + # Create an empty config if we don't have one + if not path or not os.path.exists(path): + path = None + conf = { + "mediaType": media_type or oras.defaults.unknown_config_media_type, + "size": 2, + "digest": oras.defaults.blank_config_hash, + } + + else: + conf = { + "mediaType": media_type or oras.defaults.unknown_config_media_type, + "size": oras.utils.get_size(path), + "digest": "sha256:" + oras.utils.get_file_hash(path), + } + + jsonschema.validate(conf, schema=oras.schemas.layer) + return conf, path + + +def NewManifest() -> dict: + """ + Get an empty manifest config. + """ + return copy.deepcopy(EmptyManifest) diff --git a/build/lib/oras/provider.py b/build/lib/oras/provider.py new file mode 100644 index 0000000..1416fd7 --- /dev/null +++ b/build/lib/oras/provider.py @@ -0,0 +1,1074 @@ +__author__ = "Vanessa Sochat" +__copyright__ = "Copyright The ORAS Authors." +__license__ = "Apache-2.0" + +import copy +import os +import urllib +from contextlib import contextmanager, nullcontext +from dataclasses import asdict, dataclass +from http.cookiejar import DefaultCookiePolicy +from tempfile import TemporaryDirectory +from typing import Callable, Generator, List, Optional, Tuple, Union + +import jsonschema +import requests + +import oras.auth +import oras.container +import oras.decorator as decorator +import oras.oci +import oras.schemas +import oras.utils +from oras.logger import logger +from oras.utils.fileio import PathAndOptionalContent + +# container type can be string or container +container_type = Union[str, oras.container.Container] + + +@contextmanager +def temporary_empty_config() -> Generator[str, None, None]: + with TemporaryDirectory() as tmpdir: + config_file = oras.utils.get_tmpfile(tmpdir=tmpdir, suffix=".json") + oras.utils.write_file(config_file, "{}") + yield config_file + + +@dataclass +class Subject: + mediaType: str + digest: str + size: int + + +class Registry: + """ + Direct interactions with an OCI registry. + + This could also be called a "provider" when we add in the "copy" logic + and the registry isn't necessarily the "remote" endpoint. + """ + + def __init__( + self, + hostname: Optional[str] = None, + insecure: bool = False, + tls_verify: bool = True, + ): + """ + Create a new registry provider. + + :param hostname: the registry hostname (optional) + :type hostname: str + :param insecure: use http instead of https + :type insecure: bool + :param tls_verify: verify TLS certificates + :type tls_verify: bool + """ + self.hostname: Optional[str] = hostname + self.headers: dict = {} + self.session: requests.Session = requests.Session() + self.prefix: str = "http" if insecure else "https" + self.token: Optional[str] = None + self._auths: dict = {} + self._basic_auth = None + self._tls_verify = tls_verify + + if not tls_verify: + requests.packages.urllib3.disable_warnings() # type: ignore + + # Ignore all cookies: some registries try to set one + # and take it as a sign they are talking to a browser, + # trying to set further CSRF cookies (Harbor is such a case) + self.session.cookies.set_policy(DefaultCookiePolicy(allowed_domains=[])) + + def logout(self, hostname: str): + """ + If auths are loaded, remove a hostname. + + :param hostname: the registry hostname to remove + :type hostname: str + """ + # Remove any basic auth or token + self._basic_auth = None + self.token = None + + if not self._auths: + logger.info(f"You are not logged in to {hostname}") + return + + for host in oras.utils.iter_localhosts(hostname): + if host in self._auths: + del self._auths[host] + logger.info(f"You have successfully logged out of {hostname}") + return + logger.info(f"You are not logged in to {hostname}") + + @decorator.ensure_container + def load_configs(self, container: container_type, configs: Optional[list] = None): + """ + Load configs to discover credentials for a specific container. + + This is typically just called once. We always add the default Docker + config to the set.s + + :param container: the parsed container URI with components + :type container: oras.container.Container + :param configs: list of configs to read (optional) + :type configs: list + """ + if not self._auths: + self._auths = oras.auth.load_configs(configs) + for registry in oras.utils.iter_localhosts(container.registry): # type: ignore + if self._load_auth(registry): + return + + def _load_auth(self, hostname: str) -> bool: + """ + Look for and load a named authentication token. + + :param hostname: the registry hostname to look for + :type hostname: str + """ + # Note that the hostname can be defined without a token + if hostname in self._auths: + auth = self._auths[hostname].get("auth") + + # Case 1: they use a credsStore we don't know how to read + if not auth and "credsStore" in self._auths[hostname]: + logger.warning( + '"credsStore" found in your ~/.docker/config.json, which is not supported by oras-py. Remove it, docker login, and try again.' + ) + return False + + # Case 2: no auth there (wonky file) + elif not auth: + return False + self._basic_auth = auth + return True + return False + + def set_basic_auth(self, username: str, password: str): + """ + Set basic authentication. + + :param username: the user account name + :type username: str + :param password: the user account password + :type password: str + """ + self._basic_auth = oras.auth.get_basic_auth(username, password) + self.set_header("Authorization", "Basic %s" % self._basic_auth) + + def set_token_auth(self, token: str): + """ + Set token authentication. + + :param token: the bearer token + :type token: str + """ + self.token = token + self.set_header("Authorization", "Bearer %s" % token) + + def reset_basic_auth(self): + """ + Given we have basic auth, reset it. + """ + if "Authorization" in self.headers: + del self.headers["Authorization"] + if self._basic_auth: + self.set_header("Authorization", "Basic %s" % self._basic_auth) + + def set_header(self, name: str, value: str): + """ + Courtesy function to set a header + + :param name: header name to set + :type name: str + :param value: header value to set + :type value: str + """ + self.headers.update({name: value}) + + def _validate_path(self, path: str) -> bool: + """ + Ensure a blob path is in the present working directory or below. + + :param path: the path to validate + :type path: str + """ + return os.getcwd() in os.path.abspath(path) + + def _parse_manifest_ref(self, ref: str) -> Tuple[str, str]: + """ + Parse an optional manifest config. + + Examples + -------- + : + path/to/config:application/vnd.oci.image.config.v1+json + /dev/null:application/vnd.oci.image.config.v1+json + + :param ref: the manifest reference to parse (examples above) + :type ref: str + :return - A Tuple of the path and the content-type, using the default unknown + config media type if none found in the reference + """ + path_content: PathAndOptionalContent = oras.utils.split_path_and_content(ref) + if not path_content.content: + path_content.content = oras.defaults.unknown_config_media_type + return path_content.path, path_content.content + + def upload_blob( + self, + blob: str, + container: container_type, + layer: dict, + do_chunked: bool = False, + refresh_headers: bool = True + ) -> requests.Response: + """ + Prepare and upload a blob. + + Sizes > 1024 are uploaded via a chunked approach (post, patch+, put) + and <= 1024 is a single post then put. + + :param blob: path to blob to upload + :type blob: str + :param container: parsed container URI + :type container: oras.container.Container or str + :param layer: dict from oras.oci.NewLayer + :type layer: dict + :param refresh_headers: if true, headers are refreshed + :type refresh_headers: bool + """ + blob = os.path.abspath(blob) + container = self.get_container(container) + + # Always reset headers between uploads + self.reset_basic_auth() + + # Chunked for large, otherwise POST and PUT + # This is currently disabled unless the user asks for it, as + # it doesn't seem to work for all registries + if not do_chunked: + response = self.put_upload(blob, container, layer, refresh_headers=refresh_headers) + else: + response = self.chunked_upload(blob, container, layer, refresh_headers=refresh_headers) + + # If we have an empty layer digest and the registry didn't accept, just return dummy successful response + if ( + response.status_code not in [200, 201, 202] + and layer["digest"] == oras.defaults.blank_hash + ): + response = requests.Response() + response.status_code = 200 + return response + + @decorator.ensure_container + def delete_tag(self, container: container_type, tag: str) -> bool: + """ + Delete a tag for a container. + + :param container: parsed container URI + :type container: oras.container.Container or str + :param tag: name of tag to delete + :type tag: str + """ + logger.debug(f"Deleting tag {tag} for {container}") + + head_url = f"{self.prefix}://{container.manifest_url(tag)}" # type: ignore + + # get digest of manifest to delete + response = self.do_request( + head_url, + "HEAD", + headers={"Accept": "application/vnd.oci.image.manifest.v1+json"}, + ) + if response.status_code == 404: + logger.error(f"Cannot find tag {container}:{tag}") + return False + + digest = response.headers.get("Docker-Content-Digest") + if not digest: + raise RuntimeError("Expected to find Docker-Content-Digest header.") + + delete_url = f"{self.prefix}://{container.manifest_url(digest)}" # type: ignore + response = self.do_request(delete_url, "DELETE") + if response.status_code != 202: + raise RuntimeError("Delete was not successful: {response.json()}") + return True + + @decorator.ensure_container + def get_tags(self, container: container_type, N=None) -> List[str]: + """ + Retrieve tags for a package. + + :param container: parsed container URI + :type container: oras.container.Container or str + :param N: limit number of tags, None for all (default) + :type N: Optional[int] + """ + retrieve_all = N is None + tags_url = f"{self.prefix}://{container.tags_url(N=N)}" # type: ignore + tags: List[str] = [] + + def extract_tags(response: requests.Response): + """ + Determine if we should continue based on new tags and under limit. + """ + json = response.json() + new_tags = json.get("tags") or [] + tags.extend(new_tags) + return len(new_tags) and (retrieve_all or len(tags) < N) + + self._do_paginated_request(tags_url, callable=extract_tags) + + # If we got a longer set than was asked for + if N is not None and len(tags) > N: + tags = tags[:N] + return tags + + def _do_paginated_request( + self, url: str, callable: Callable[[requests.Response], bool] + ): + """ + Paginate a request for a URL. + + We look for the "Link" header to get the next URL to ping. If + the callable returns True, we continue to the next page, otherwise + we stop. + """ + # Save the base url to add parameters to, assuming only the params change + parts = urllib.parse.urlparse(url) + base_url = f"{parts.scheme}://{parts.netloc}" + + # get all results using the pagination + while True: + response = self.do_request(url, "GET", headers=self.headers) + + # Check 200 response, show errors if any + self._check_200_response(response) + + want_more = callable(response) + if not want_more: + break + + link = response.links.get("next", {}).get("url") + + # Get the next link + if not link: + break + + # use link + base url to continue with next page + url = f"{base_url}{link}" + + @decorator.ensure_container + def get_blob( + self, + container: container_type, + digest: str, + stream: bool = False, + head: bool = False, + ) -> requests.Response: + """ + Retrieve a blob for a package. + + :param container: parsed container URI + :type container: oras.container.Container or str + :param digest: sha256 digest of the blob to retrieve + :type digest: str + :param stream: stream the response (or not) + :type stream: bool + :param head: use head to determine if blob exists + :type head: bool + """ + method = "GET" if not head else "HEAD" + blob_url = f"{self.prefix}://{container.get_blob_url(digest)}" # type: ignore + return self.do_request(blob_url, method, headers=self.headers, stream=stream) + + def get_container(self, name: container_type) -> oras.container.Container: + """ + Courtesy function to get a container from a URI. + + :param name: unique resource identifier to parse + :type name: oras.container.Container or str + """ + if isinstance(name, oras.container.Container): + return name + return oras.container.Container(name, registry=self.hostname) + + # Functions to be deprecated in favor of exposed ones + @decorator.ensure_container + def _download_blob( + self, container: container_type, digest: str, outfile: str + ) -> str: + logger.warning( + "This function is deprecated in favor of download_blob and will be removed by 0.1.2" + ) + return self.download_blob(container, digest, outfile) + + def _put_upload( + self, blob: str, container: oras.container.Container, layer: dict + ) -> requests.Response: + logger.warning( + "This function is deprecated in favor of put_upload and will be removed by 0.1.2" + ) + return self.put_upload(blob, container, layer) + + def _chunked_upload( + self, blob: str, container: oras.container.Container, layer: dict + ) -> requests.Response: + logger.warning( + "This function is deprecated in favor of chunked_upload and will be removed by 0.1.2" + ) + return self.chunked_upload(blob, container, layer) + + def _upload_manifest( + self, manifest: dict, container: oras.container.Container + ) -> requests.Response: + logger.warning( + "This function is deprecated in favor of upload_manifest and will be removed by 0.1.2" + ) + return self.upload_manifest(manifest, container) + + def _upload_blob( + self, + blob: str, + container: container_type, + layer: dict, + do_chunked: bool = False, + ) -> requests.Response: + logger.warning( + "This function is deprecated in favor of upload_blob and will be removed by 0.1.2" + ) + return self.upload_blob(blob, container, layer, do_chunked) + + @decorator.ensure_container + def download_blob( + self, container: container_type, digest: str, outfile: str + ) -> str: + """ + Stream download a blob into an output file. + + This function is a wrapper around get_blob. + + :param container: parsed container URI + :type container: oras.container.Container or str + """ + try: + # Ensure output directory exists first + outdir = os.path.dirname(outfile) + if outdir and not os.path.exists(outdir): + oras.utils.mkdir_p(outdir) + with self.get_blob(container, digest, stream=True) as r: + r.raise_for_status() + with open(outfile, "wb") as f: + for chunk in r.iter_content(chunk_size=8192): + if chunk: + f.write(chunk) + + # Allow an empty layer to fail and return /dev/null + except Exception as e: + if digest == oras.defaults.blank_hash: + return os.devnull + raise e + return outfile + + def put_upload( + self, blob: str, container: oras.container.Container, layer: dict, refresh_headers: bool = True + ) -> requests.Response: + """ + Upload to a registry via put. + + :param blob: path to blob to upload + :type blob: str + :param container: parsed container URI + :type container: oras.container.Container or str + :param layer: dict from oras.oci.NewLayer + :type layer: dict + :param refresh_headers: if true, headers are refreshed + :type refresh_headers: bool + """ + # Start an upload session + headers = {"Content-Type": "application/octet-stream"} + + if not refresh_headers: + headers.update(self.headers) + + upload_url = f"{self.prefix}://{container.upload_blob_url()}" + r = self.do_request(upload_url, "POST", headers=headers) + + # Location should be in the header + session_url = self._get_location(r, container) + if not session_url: + raise ValueError(f"Issue retrieving session url: {r.json()}") + + # PUT to upload blob url + headers = { + "Content-Length": str(layer["size"]), + "Content-Type": "application/octet-stream", + } + headers.update(self.headers) + blob_url = oras.utils.append_url_params( + session_url, {"digest": layer["digest"]} + ) + with open(blob, "rb") as fd: + response = self.do_request( + blob_url, + method="PUT", + data=fd.read(), + headers=headers, + ) + return response + + def _get_location( + self, r: requests.Response, container: oras.container.Container + ) -> str: + """ + Parse the location header and ensure it includes a hostname. + This currently assumes if there isn't a hostname, we are pushing to + the same registry hostname of the original request. + + :param r: requests response with headers + :type r: requests.Response + :param container: parsed container URI + :type container: oras.container.Container or str + """ + session_url = r.headers.get("location", "") + if not session_url: + return session_url + + # Some registries do not return the full registry hostname. Check that + # the url starts with a protocol scheme, change tracked with: + # https://github.com/oras-project/oras-py/issues/78 + prefix = f"{self.prefix}://{container.registry}" + + if not session_url.startswith("http"): + session_url = f"{prefix}{session_url}" + return session_url + + def chunked_upload( + self, blob: str, container: oras.container.Container, layer: dict, refresh_headers: bool = True + ) -> requests.Response: + """ + Upload via a chunked upload. + + :param blob: path to blob to upload + :type blob: str + :param container: parsed container URI + :type container: oras.container.Container or str + :param layer: dict from oras.oci.NewLayer + :type layer: dict + :param refresh_headers: if true, headers are refreshed + :type refresh_headers: bool + """ + # Start an upload session + headers = {"Content-Type": "application/octet-stream", "Content-Length": "0"} + if not refresh_headers: + headers.update(self.headers) + + upload_url = f"{self.prefix}://{container.upload_blob_url()}" + r = self.do_request(upload_url, "POST", headers=headers) + + # Location should be in the header + session_url = self._get_location(r, container) + if not session_url: + raise ValueError(f"Issue retrieving session url: {r.json()}") + + # Read the blob in chunks, for each do a patch + start = 0 + with open(blob, "rb") as fd: + for chunk in oras.utils.read_in_chunks(fd): + if not chunk: + break + + end = start + len(chunk) - 1 + content_range = "%s-%s" % (start, end) + headers = { + "Content-Range": content_range, + "Content-Length": str(len(chunk)), + "Content-Type": "application/octet-stream", + } + + # Important to update with auth token if acquired + headers.update(self.headers) + start = end + 1 + self._check_200_response( + self.do_request(session_url, "PATCH", data=chunk, headers=headers) + ) + + # Finally, issue a PUT request to close blob + session_url = oras.utils.append_url_params( + session_url, {"digest": layer["digest"]} + ) + return self.do_request(session_url, "PUT", headers=self.headers) + + def _check_200_response(self, response: requests.Response): + """ + Helper function to ensure some flavor of 200 + + :param response: request response to inspect + :type response: requests.Response + """ + if response.status_code not in [200, 201, 202]: + self._parse_response_errors(response) + raise ValueError(f"Issue with {response.request.url}: {response.reason}") + + def _parse_response_errors(self, response: requests.Response): + """ + Given a failed request, look for OCI formatted error messages. + + :param response: request response to inspect + :type response: requests.Response + """ + try: + msg = response.json() + for error in msg.get("errors", []): + if isinstance(error, dict) and "message" in error: + logger.error(error["message"]) + except Exception: + pass + + def upload_manifest( + self, manifest: dict, container: oras.container.Container, refresh_headers: bool = True + ) -> requests.Response: + """ + Read a manifest file and upload it. + + :param manifest: manifest to upload + :type manifest: dict + :param container: parsed container URI + :type container: oras.container.Container or str + :param refresh_headers: if true, headers are refreshed + :type refresh_headers: bool + """ + self.reset_basic_auth() + jsonschema.validate(manifest, schema=oras.schemas.manifest) + headers = { + "Content-Type": oras.defaults.default_manifest_media_type, + "Content-Length": str(len(manifest)), + } + + if not refresh_headers: + headers.update(self.headers) + + return self.do_request( + f"{self.prefix}://{container.manifest_url()}", # noqa + "PUT", + headers=headers, + json=manifest, + ) + + def push(self, *args, **kwargs) -> requests.Response: + """ + Push a set of files to a target + + :param config_path: path to a config file + :type config_path: str + :param disable_path_validation: ensure paths are relative to the running directory. + :type disable_path_validation: bool + :param files: list of files to push + :type files: list + :param insecure: allow registry to use http + :type insecure: bool + :param annotation_file: manifest annotations file + :type annotation_file: str + :param manifest_annotations: manifest annotations + :type manifest_annotations: dict + :param target: target location to push to + :type target: str + :param refresh_headers: if true or None, headers are refreshed + :type refresh_headers: bool + :param subject: optional subject reference + :type subject: Subject + """ + container = self.get_container(kwargs["target"]) + self.load_configs(container, configs=kwargs.get("config_path")) + + # Hold state of request for http/https + validate_path = not kwargs.get("disable_path_validation", False) + + # Prepare a new manifest + manifest = oras.oci.NewManifest() + + # A lookup of annotations we can add (to blobs or manifest) + annotset = oras.oci.Annotations(kwargs.get("annotation_file")) + media_type = None + + refresh_headers = kwargs.get("refresh_headers") + if refresh_headers is None: + refresh_headers = True + + # Upload files as blobs + for blob in kwargs.get("files", []): + # You can provide a blob + content type + path_content: PathAndOptionalContent = oras.utils.split_path_and_content( + str(blob) + ) + blob = path_content.path + media_type = path_content.content + + # Must exist + if not os.path.exists(blob): + raise FileNotFoundError(f"{blob} does not exist.") + + # Path validation means blob must be relative to PWD. + if validate_path: + if not self._validate_path(blob): + raise ValueError( + f"Blob {blob} is not in the present working directory context." + ) + + # Save directory or blob name before compressing + blob_name = os.path.basename(blob) + + # If it's a directory, we need to compress + cleanup_blob = False + if os.path.isdir(blob): + blob = oras.utils.make_targz(blob) + cleanup_blob = True + + # Create a new layer from the blob + layer = oras.oci.NewLayer(blob, is_dir=cleanup_blob, media_type=media_type) + annotations = annotset.get_annotations(blob) + + # Always strip blob_name of path separator + layer["annotations"] = { + oras.defaults.annotation_title: blob_name.strip(os.sep) + } + if annotations: + layer["annotations"].update(annotations) + + # update the manifest with the new layer + manifest["layers"].append(layer) + logger.debug(f"Preparing layer {layer}") + + # Upload the blob layer + response = self.upload_blob(blob, container, layer, refresh_headers=refresh_headers) + self._check_200_response(response) + + # Do we need to cleanup a temporary targz? + if cleanup_blob and os.path.exists(blob): + os.remove(blob) + + # Add annotations to the manifest, if provided + manifest_annots = annotset.get_annotations("$manifest") or {} + + # Custom manifest annotations from client key=value pairs + # These over-ride any potentially provided from file + custom_annots = kwargs.get("manifest_annotations") + if custom_annots: + manifest_annots.update(custom_annots) + if manifest_annots: + manifest["annotations"] = manifest_annots + + subject = kwargs.get("subject") + if subject: + manifest["subject"] = asdict(subject) + + # Prepare the manifest config (temporary or one provided) + manifest_config = kwargs.get("manifest_config") + config_annots = annotset.get_annotations("$config") + if manifest_config: + ref, media_type = self._parse_manifest_ref(manifest_config) + conf, config_file = oras.oci.ManifestConfig(ref, media_type) + else: + conf, config_file = oras.oci.ManifestConfig() + + # Config annotations? + if config_annots: + conf["annotations"] = config_annots + + # Config is just another layer blob! + logger.debug(f"Preparing config {conf}") + with ( + temporary_empty_config() + if config_file is None + else nullcontext(config_file) + ) as config_file: + response = self.upload_blob(config_file, container, conf, refresh_headers=refresh_headers) + + self._check_200_response(response) + + # Final upload of the manifest + manifest["config"] = conf + self._check_200_response(self.upload_manifest(manifest, container, refresh_headers=refresh_headers)) + print(f"Successfully pushed {container}") + return response + + def pull(self, *args, **kwargs) -> List[str]: + """ + Pull an artifact from a target + + :param config_path: path to a config file + :type config_path: str + :param allowed_media_type: list of allowed media types + :type allowed_media_type: list or None + :param overwrite: if output file exists, overwrite + :type overwrite: bool + :param refresh_headers: if true, headers are refreshed when fetching manifests + :type refresh_headers: bool + :param manifest_config_ref: save manifest config to this file + :type manifest_config_ref: str + :param outdir: output directory path + :type outdir: str + :param target: target location to pull from + :type target: str + """ + allowed_media_type = kwargs.get("allowed_media_type") + refresh_headers = kwargs.get("refresh_headers") + if refresh_headers is None: + refresh_headers = True + container = self.get_container(kwargs["target"]) + self.load_configs(container, configs=kwargs.get("config_path")) + manifest = self.get_manifest(container, allowed_media_type, refresh_headers) + outdir = kwargs.get("outdir") or oras.utils.get_tmpdir() + overwrite = kwargs.get("overwrite", True) + + files = [] + for layer in manifest.get("layers", []): + filename = (layer.get("annotations") or {}).get( + oras.defaults.annotation_title + ) + + # If we don't have a filename, default to digest. Hopefully does not happen + if not filename: + filename = layer["digest"] + + # This raises an error if there is a malicious path + outfile = oras.utils.sanitize_path(outdir, os.path.join(outdir, filename)) + + if not overwrite and os.path.exists(outfile): + logger.warning( + f"{outfile} already exists and --keep-old-files set, will not overwrite." + ) + continue + + # A directory will need to be uncompressed and moved + if layer["mediaType"] == oras.defaults.default_blob_dir_media_type: + targz = oras.utils.get_tmpfile(suffix=".tar.gz") + self.download_blob(container, layer["digest"], targz) + + # The artifact will be extracted to the correct name + oras.utils.extract_targz(targz, os.path.dirname(outfile)) + + # Anything else just extracted directly + else: + self.download_blob(container, layer["digest"], outfile) + logger.info(f"Successfully pulled {outfile}.") + files.append(outfile) + return files + + @decorator.ensure_container + def get_manifest( + self, + container: container_type, + allowed_media_type: Optional[list] = None, + refresh_headers: bool = True, + ) -> dict: + """ + Retrieve a manifest for a package. + + :param container: parsed container URI + :type container: oras.container.Container or str + :param allowed_media_type: one or more allowed media types + :type allowed_media_type: str + :param refresh_headers: if true, headers are refreshed + :type refresh_headers: bool + """ + if not allowed_media_type: + allowed_media_type = [oras.defaults.default_manifest_media_type] + headers = {"Accept": ";".join(allowed_media_type)} + + if not refresh_headers: + headers.update(self.headers) + + get_manifest = f"{self.prefix}://{container.manifest_url()}" # type: ignore + response = self.do_request(get_manifest, "GET", headers=headers) + self._check_200_response(response) + manifest = response.json() + jsonschema.validate(manifest, schema=oras.schemas.manifest) + return manifest + + @decorator.classretry + def do_request( + self, + url: str, + method: str = "GET", + data: Optional[Union[dict, bytes]] = None, + headers: Optional[dict] = None, + json: Optional[dict] = None, + stream: bool = False, + ): + """ + Do a request. This is a wrapper around requests to handle retry auth. + + :param url: the URL to issue the request to + :type url: str + :param method: the method to use (GET, DELETE, POST, PUT, PATCH) + :type method: str + :param data: data for requests + :type data: dict or bytes + :param headers: headers for the request + :type headers: dict + :param json: json data for requests + :type json: dict + :param stream: stream the responses + :type stream: bool + """ + headers = headers or {} + + # Make the request and return to calling function, unless requires auth + response = self.session.request( + method, + url, + data=data, + json=json, + headers=headers, + stream=stream, + verify=self._tls_verify, + ) + + # A 401 response is a request for authentication + if response.status_code not in [401, 404]: + return response + + # Otherwise, authenticate the request and retry + if self.authenticate_request(response): + headers.update(self.headers) + response = self.session.request( + method, + url, + data=data, + json=json, + headers=headers, + stream=stream, + ) + + # Fallback to using Authorization if already required + # This is a catch for EC2. I don't think this is correct + # A basic token should be used for a bearer one. + if response.status_code in [401, 404] and "Authorization" in self.headers: + logger.debug("Trying with provided Basic Authorization...") + headers.update(self.headers) + response = self.session.request( + method, + url, + data=data, + json=json, + headers=headers, + stream=stream, + ) + + return response + + def authenticate_request(self, originalResponse: requests.Response) -> bool: + """ + Authenticate Request + Given a response, look for a Www-Authenticate header to parse. + + We return True/False to indicate if the request should be retried. + + :param originalResponse: original response to get the Www-Authenticate header + :type originalResponse: requests.Response + """ + authHeaderRaw = originalResponse.headers.get("Www-Authenticate") + if not authHeaderRaw: + logger.debug( + "Www-Authenticate not found in original response, cannot authenticate." + ) + return False + + # If we have a token, set auth header (base64 encoded user/pass) + if self.token: + self.set_header("Authorization", "Bearer %s" % self._basic_auth) + return True + + headers = copy.deepcopy(self.headers) + h = oras.auth.parse_auth_header(authHeaderRaw) + + if "Authorization" not in headers: + # First try to request an anonymous token + logger.debug("No Authorization, requesting anonymous token") + if self.request_anonymous_token(h): + logger.debug("Successfully obtained anonymous token!") + return True + + logger.error( + "This endpoint requires a token. Please set " + "oras.provider.Registry.set_basic_auth(username, password) " + "first or use oras-py login to do the same." + ) + return False + + params = {} + + # Prepare request to retry + if h.service: + logger.debug(f"Service: {h.service}") + params["service"] = h.service + headers.update( + { + "Service": h.service, + "Accept": "application/json", + "User-Agent": "oras-py", + } + ) + + # Ensure the realm starts with http + if not h.realm.startswith("http"): # type: ignore + h.realm = f"{self.prefix}://{h.realm}" + + # If the www-authenticate included a scope, honor it! + if h.scope: + logger.debug(f"Scope: {h.scope}") + params["scope"] = h.scope + + authResponse = self.session.get(h.realm, headers=headers, params=params) # type: ignore + if authResponse.status_code != 200: + logger.debug(f"Auth response was not successful: {authResponse.text}") + return False + + # Request the token + info = authResponse.json() + token = info.get("token") or info.get("access_token") + + # Set the token to the original request and retry + self.headers.update({"Authorization": "Bearer %s" % token}) + return True + + def request_anonymous_token(self, h: oras.auth.authHeader) -> bool: + """ + Given no basic auth, fall back to trying to request an anonymous token. + + Returns: boolean if headers have been updated with token. + """ + if not h.realm: + logger.debug("Request anonymous token: no realm provided, exiting early") + return False + + params = {} + if h.service: + params["service"] = h.service + if h.scope: + params["scope"] = h.scope + + logger.debug(f"Final params are {params}") + response = self.session.request("GET", h.realm, params=params) + if response.status_code != 200: + logger.debug(f"Response for anon token failed: {response.text}") + return False + + # From https://docs.docker.com/registry/spec/auth/token/ section + # We can get token OR access_token OR both (when both they are identical) + data = response.json() + token = data.get("token") or data.get("access_token") + + # Update the headers but not self.token (expects Basic) + if token: + self.headers.update({"Authorization": "Bearer %s" % token}) + return True + logger.debug("Warning: no token or access_token present in response.") + return False diff --git a/build/lib/oras/schemas.py b/build/lib/oras/schemas.py new file mode 100644 index 0000000..3a0eb7a --- /dev/null +++ b/build/lib/oras/schemas.py @@ -0,0 +1,58 @@ +__author__ = "Vanessa Sochat" +__copyright__ = "Copyright The ORAS Authors." +__license__ = "Apache-2.0" + +## Manifest and Layer schemas + +schema_url = "http://json-schema.org/draft-07/schema" + +# Layer + +layerProperties = { + "type": "object", + "properties": { + "mediaType": {"type": "string"}, + "size": {"type": "number"}, + "digest": {"type": "string"}, + "annotations": {"type": ["object", "null", "array"]}, + }, +} + +layer = { + "$schema": schema_url, + "title": "Layer Schema", + "required": [ + "mediaType", + "size", + "digest", + ], + "additionalProperties": True, +} + +layer.update(layerProperties) + + +# Manifest + +manifestProperties = { + "schemaVersion": {"type": "number"}, + "subject": {"type": ["null", "object"]}, + "mediaType": {"type": "string"}, + "layers": {"type": "array", "items": layerProperties}, + "config": layerProperties, + "annotations": {"type": ["object", "null", "array"]}, +} + + +manifest = { + "$schema": schema_url, + "title": "Manifest Schema", + "type": "object", + "required": [ + "schemaVersion", + "config", + "layers", + ], + "properties": manifestProperties, + "additionalProperties": True, +} diff --git a/build/lib/oras/tests/__init__.py b/build/lib/oras/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/build/lib/oras/tests/annotations.json b/build/lib/oras/tests/annotations.json new file mode 100644 index 0000000..d0be02f --- /dev/null +++ b/build/lib/oras/tests/annotations.json @@ -0,0 +1,6 @@ +{ + "$manifest": { + "holiday": "Christmas", + "candy": "candy-corn" + } +} diff --git a/build/lib/oras/tests/conftest.py b/build/lib/oras/tests/conftest.py new file mode 100644 index 0000000..7b038dc --- /dev/null +++ b/build/lib/oras/tests/conftest.py @@ -0,0 +1,58 @@ +import os +from dataclasses import dataclass + +import pytest + + +@dataclass +class TestCredentials: + with_auth: bool + user: str + password: str + + +@pytest.fixture +def registry(): + host = os.environ.get("ORAS_HOST") + port = os.environ.get("ORAS_PORT") + + if not host or not port: + pytest.skip( + "You must export ORAS_HOST and ORAS_PORT" + " for a running registry before running tests." + ) + + return f"{host}:{port}" + + +@pytest.fixture +def credentials(request): + with_auth = os.environ.get("ORAS_AUTH") == "true" + user = os.environ.get("ORAS_USER", "myuser") + pwd = os.environ.get("ORAS_PASS", "mypass") + + if with_auth and not user or not pwd: + pytest.skip("To test auth you need to export ORAS_USER and ORAS_PASS") + + marks = [m.name for m in request.node.iter_markers()] + if request.node.parent: + marks += [m.name for m in request.node.parent.iter_markers()] + + if request.node.get_closest_marker("with_auth"): + if request.node.get_closest_marker("with_auth").args[0] != with_auth: + if with_auth: + pytest.skip("test requires un-authenticated access to registry") + else: + pytest.skip("test requires authenticated access to registry") + + return TestCredentials(with_auth, user, pwd) + + +@pytest.fixture +def target(registry): + return f"{registry}/dinosaur/artifact:v1" + + +@pytest.fixture +def target_dir(registry): + return f"{registry}/dinosaur/directory:v1" diff --git a/build/lib/oras/tests/run_registry.sh b/build/lib/oras/tests/run_registry.sh new file mode 100755 index 0000000..e86145d --- /dev/null +++ b/build/lib/oras/tests/run_registry.sh @@ -0,0 +1,4 @@ +#!/bin/bash + +# A helper script to easily run a development, local registry +docker run -it -e REGISTRY_STORAGE_DELETE_ENABLED=true --rm -p 5000:5000 ghcr.io/oras-project/registry:latest diff --git a/build/lib/oras/tests/test_oras.py b/build/lib/oras/tests/test_oras.py new file mode 100644 index 0000000..06df44f --- /dev/null +++ b/build/lib/oras/tests/test_oras.py @@ -0,0 +1,138 @@ +__author__ = "Vanessa Sochat" +__copyright__ = "Copyright The ORAS Authors." +__license__ = "Apache-2.0" + +import os +import shutil + +import pytest + +import oras.client + +here = os.path.abspath(os.path.dirname(__file__)) + + +def test_basic_oras(registry): + """ + Basic tests for oras (without authentication) + """ + client = oras.client.OrasClient(hostname=registry, insecure=True) + assert "Python version" in client.version() + + +@pytest.mark.with_auth(True) +def test_login_logout(registry, credentials): + """ + Login and logout are all we can test with basic auth! + """ + client = oras.client.OrasClient(hostname=registry, insecure=True) + res = client.login( + hostname=registry, + username=credentials.user, + password=credentials.password, + insecure=True, + ) + assert res["Status"] == "Login Succeeded" + client.logout(registry) + + +@pytest.mark.with_auth(False) +def test_basic_push_pull(tmp_path, registry, credentials, target): + """ + Basic tests for oras (without authentication) + """ + client = oras.client.OrasClient(hostname=registry, insecure=True) + artifact = os.path.join(here, "artifact.txt") + + assert os.path.exists(artifact) + + res = client.push(files=[artifact], target=target) + assert res.status_code in [200, 201] + + # Test pulling elsewhere + files = client.pull(target=target, outdir=tmp_path) + assert len(files) == 1 + assert os.path.basename(files[0]) == "artifact.txt" + assert str(tmp_path) in files[0] + assert os.path.exists(files[0]) + + # Move artifact outside of context (should not work) + moved_artifact = tmp_path / os.path.basename(artifact) + shutil.copyfile(artifact, moved_artifact) + with pytest.raises(ValueError): + client.push(files=[moved_artifact], target=target) + + # This should work because we aren't checking paths + res = client.push(files=[artifact], target=target, disable_path_validation=True) + assert res.status_code == 201 + + +@pytest.mark.with_auth(False) +def test_get_delete_tags(tmp_path, registry, credentials, target): + """ + Test creationg, getting, and deleting tags. + """ + client = oras.client.OrasClient(hostname=registry, insecure=True) + artifact = os.path.join(here, "artifact.txt") + assert os.path.exists(artifact) + + res = client.push(files=[artifact], target=target) + assert res.status_code in [200, 201] + + # Test getting tags + tags = client.get_tags(target) + assert "v1" in tags + + # Test deleting not-existence tag + assert not client.delete_tags(target, "v1-boop-boop") + assert "v1" in client.delete_tags(target, "v1") + tags = client.get_tags(target) + assert not tags + + +def test_get_many_tags(): + """ + Test getting many tags + """ + client = oras.client.OrasClient(hostname="ghcr.io", insecure=False) + + # Test getting tags with a limit set + tags = client.get_tags( + "channel-mirrors/conda-forge/linux-aarch64/arrow-cpp", N=1005 + ) + assert len(tags) == 1005 + + # This should retrieve all tags (defaults to None) + tags = client.get_tags("channel-mirrors/conda-forge/linux-aarch64/arrow-cpp") + assert len(tags) > 1500 + + # Same result if explicitly set + same_tags = client.get_tags( + "channel-mirrors/conda-forge/linux-aarch64/arrow-cpp", N=None + ) + assert not set(tags).difference(set(same_tags)) + + # Small number of tags + tags = client.get_tags("channel-mirrors/conda-forge/linux-aarch64/arrow-cpp", N=10) + assert not set(tags).difference(set(same_tags)) + assert len(tags) == 10 + + +@pytest.mark.with_auth(False) +def test_directory_push_pull(tmp_path, registry, credentials, target_dir): + """ + Test push and pull for directory + """ + client = oras.client.OrasClient(hostname=registry, insecure=True) + + # Test upload of a directory + upload_dir = os.path.join(here, "upload_data") + res = client.push(files=[upload_dir], target=target_dir) + assert res.status_code == 201 + files = client.pull(target=target_dir, outdir=tmp_path) + + assert len(files) == 1 + assert os.path.basename(files[0]) == "upload_data" + assert str(tmp_path) in files[0] + assert os.path.exists(files[0]) + assert "artifact.txt" in os.listdir(files[0]) diff --git a/build/lib/oras/tests/test_provider.py b/build/lib/oras/tests/test_provider.py new file mode 100644 index 0000000..d3b014b --- /dev/null +++ b/build/lib/oras/tests/test_provider.py @@ -0,0 +1,134 @@ +__author__ = "Vanessa Sochat" +__copyright__ = "Copyright The ORAS Authors." +__license__ = "Apache-2.0" + +import os +from pathlib import Path + +import pytest + +import oras.client +import oras.defaults +import oras.provider +import oras.utils + +here = os.path.abspath(os.path.dirname(__file__)) + + +@pytest.mark.with_auth(False) +def test_annotated_registry_push(tmp_path, registry, credentials, target): + """ + Basic tests for oras push with annotations + """ + + # Direct access to registry functions + remote = oras.provider.Registry(hostname=registry, insecure=True) + client = oras.client.OrasClient(hostname=registry, insecure=True) + artifact = os.path.join(here, "artifact.txt") + + assert os.path.exists(artifact) + + # Custom manifest annotations + annots = {"holiday": "Halloween", "candy": "chocolate"} + res = client.push(files=[artifact], target=target, manifest_annotations=annots) + assert res.status_code in [200, 201] + + # Get the manifest + manifest = remote.get_manifest(target) + assert "annotations" in manifest + for k, v in annots.items(): + assert k in manifest["annotations"] + assert manifest["annotations"][k] == v + + # Annotations from file with $manifest + annotation_file = os.path.join(here, "annotations.json") + file_annots = oras.utils.read_json(annotation_file) + assert "$manifest" in file_annots + res = client.push(files=[artifact], target=target, annotation_file=annotation_file) + assert res.status_code in [200, 201] + manifest = remote.get_manifest(target) + + assert "annotations" in manifest + for k, v in file_annots["$manifest"].items(): + assert k in manifest["annotations"] + assert manifest["annotations"][k] == v + + # File that doesn't exist + annotation_file = os.path.join(here, "annotations-nope.json") + with pytest.raises(FileNotFoundError): + res = client.push( + files=[artifact], target=target, annotation_file=annotation_file + ) + + +def test_parse_manifest(registry): + """ + Test parse manifest function. + + Parse manifest function has additional logic for Windows - this isn't included in + these tests as they don't usually run on Windows. + """ + testref = "path/to/config:application/vnd.oci.image.config.v1+json" + remote = oras.provider.Registry(hostname=registry, insecure=True) + ref, content_type = remote._parse_manifest_ref(testref) + assert ref == "path/to/config" + assert content_type == "application/vnd.oci.image.config.v1+json" + + testref = "path/to/config:application/vnd.oci.image.config.v1+json:extra" + remote = oras.provider.Registry(hostname=registry, insecure=True) + ref, content_type = remote._parse_manifest_ref(testref) + assert ref == "path/to/config" + assert content_type == "application/vnd.oci.image.config.v1+json:extra" + + testref = "/dev/null:application/vnd.oci.image.manifest.v1+json" + ref, content_type = remote._parse_manifest_ref(testref) + assert ref == "/dev/null" + assert content_type == "application/vnd.oci.image.manifest.v1+json" + + testref = "/dev/null" + ref, content_type = remote._parse_manifest_ref(testref) + assert ref == "/dev/null" + assert content_type == oras.defaults.unknown_config_media_type + + testref = "path/to/config.json" + ref, content_type = remote._parse_manifest_ref(testref) + assert ref == "path/to/config.json" + assert content_type == oras.defaults.unknown_config_media_type + + +def test_sanitize_path(): + HOME_DIR = str(Path.home()) + assert str(oras.utils.sanitize_path(HOME_DIR, HOME_DIR)) == f"{HOME_DIR}" + assert ( + str(oras.utils.sanitize_path(HOME_DIR, os.path.join(HOME_DIR, "username"))) + == f"{HOME_DIR}/username" + ) + assert ( + str(oras.utils.sanitize_path(HOME_DIR, os.path.join(HOME_DIR, ".", "username"))) + == f"{HOME_DIR}/username" + ) + + with pytest.raises(Exception) as e: + assert oras.utils.sanitize_path(HOME_DIR, os.path.join(HOME_DIR, "..")) + assert ( + str(e.value) + == f"Filename {Path(os.path.join(HOME_DIR, '..')).resolve()} is not in {HOME_DIR} directory" + ) + + assert oras.utils.sanitize_path("", "") == str(Path(".").resolve()) + assert oras.utils.sanitize_path("/opt", os.path.join("/opt", "image_name")) == str( + Path("/opt/image_name").resolve() + ) + assert oras.utils.sanitize_path("/../../", "/") == str(Path("/").resolve()) + assert oras.utils.sanitize_path( + Path(os.getcwd()).parent.absolute(), os.path.join(os.getcwd(), "..") + ) == str(Path("..").resolve()) + + with pytest.raises(Exception) as e: + assert oras.utils.sanitize_path( + Path(os.getcwd()).parent.absolute(), os.path.join(os.getcwd(), "..", "..") + ) != str(Path("../..").resolve()) + assert ( + str(e.value) + == f"Filename {Path(os.path.join(os.getcwd(), '..', '..')).resolve()} is not in {Path('../').resolve()} directory" + ) diff --git a/build/lib/oras/tests/test_utils.py b/build/lib/oras/tests/test_utils.py new file mode 100644 index 0000000..440d381 --- /dev/null +++ b/build/lib/oras/tests/test_utils.py @@ -0,0 +1,132 @@ +__author__ = "Vanessa Sochat" +__copyright__ = "Copyright The ORAS Authors." +__license__ = "Apache-2.0" + +import json +import os +import pathlib +import shutil + +import pytest + +import oras.utils as utils + + +def test_write_read_files(tmp_path): + print("Testing utils.write_file...") + + tmpfile = str(tmp_path / "written_file.txt") + assert not os.path.exists(tmpfile) + utils.write_file(tmpfile, "hello!") + assert os.path.exists(tmpfile) + + print("Testing utils.read_file...") + + content = utils.read_file(tmpfile) + assert content == "hello!" + + +def test_workdir(tmp_path): + print("Testing utils.workdir") + noodle_base = os.path.join(tmp_path, "noodles") + os.makedirs(noodle_base) + pathlib.Path(os.path.join(noodle_base, "pasta.txt")).touch() + assert "pasta.txt" not in os.listdir() + with utils.workdir(noodle_base): + assert "pasta.txt" in os.listdir() + + +def test_write_bad_json(tmp_path): + bad_json = {"Wakkawakkawakka'}": [{True}, "2", 3]} + tmpfile = str(tmp_path / "json_file.txt") + assert not os.path.exists(tmpfile) + with pytest.raises(TypeError): + utils.write_json(bad_json, tmpfile) + + +def test_write_json(tmp_path): + good_json = {"Wakkawakkawakka": [True, "2", 3]} + tmpfile = str(tmp_path / "good_json_file.txt") + + assert not os.path.exists(tmpfile) + utils.write_json(good_json, tmpfile) + with open(tmpfile, "r") as f: + content = json.loads(f.read()) + assert isinstance(content, dict) + assert "Wakkawakkawakka" in content + content = utils.read_json(tmpfile) + assert "Wakkawakkawakka" in content + + +def test_copyfile(tmp_path): + print("Testing utils.copyfile") + + original = str(tmp_path / "location1.txt") + dest = str(tmp_path / "location2.txt") + print(original) + print(dest) + utils.write_file(original, "CONTENT IN FILE") + utils.copyfile(original, dest) + assert os.path.exists(original) + assert os.path.exists(dest) + + +def test_get_tmpdir_tmpfile(): + print("Testing utils.get_tmpdir, get_tmpfile") + + tmpdir = utils.get_tmpdir() + assert os.path.exists(tmpdir) + assert os.path.basename(tmpdir).startswith("oras") + shutil.rmtree(tmpdir) + tmpdir = utils.get_tmpdir(prefix="name") + assert os.path.basename(tmpdir).startswith("name") + shutil.rmtree(tmpdir) + tmpfile = utils.get_tmpfile() + assert "oras" in tmpfile + os.remove(tmpfile) + tmpfile = utils.get_tmpfile(prefix="pancakes") + assert "pancakes" in tmpfile + os.remove(tmpfile) + + +def test_mkdir_p(tmp_path): + print("Testing utils.mkdir_p") + + dirname = str(tmp_path / "input") + result = os.path.join(dirname, "level1", "level2", "level3") + utils.mkdir_p(result) + assert os.path.exists(result) + + +def test_print_json(): + print("Testing utils.print_json") + result = utils.print_json({1: 1}) + assert result == '{\n "1": 1\n}' + + +def test_split_path_and_content(): + """ + Test split path and content function. + + Function has additional logic for Windows - this isn't included in these tests as + they don't usually run on Windows. + """ + testref = "path/to/config:application/vnd.oci.image.config.v1+json" + path_content = utils.split_path_and_content(testref) + assert path_content.path == "path/to/config" + assert path_content.content == "application/vnd.oci.image.config.v1+json" + + testref = "/dev/null:application/vnd.oci.image.config.v1+json" + path_content = utils.split_path_and_content(testref) + assert path_content.path == "/dev/null" + assert path_content.content == "application/vnd.oci.image.config.v1+json" + + testref = "/dev/null" + path_content = utils.split_path_and_content(testref) + assert path_content.path == "/dev/null" + assert not path_content.content + + testref = "path/to/config.json" + path_content = utils.split_path_and_content(testref) + assert path_content.path == "path/to/config.json" + assert not path_content.content diff --git a/build/lib/oras/utils/__init__.py b/build/lib/oras/utils/__init__.py new file mode 100644 index 0000000..a64749c --- /dev/null +++ b/build/lib/oras/utils/__init__.py @@ -0,0 +1,27 @@ +from .fileio import ( + copyfile, + extract_targz, + get_file_hash, + get_size, + get_tmpdir, + get_tmpfile, + make_targz, + mkdir_p, + print_json, + read_file, + read_in_chunks, + read_json, + readline, + recursive_find, + sanitize_path, + split_path_and_content, + workdir, + write_file, + write_json, +) +from .request import ( + append_url_params, + find_docker_config, + get_docker_client, + iter_localhosts, +) diff --git a/build/lib/oras/utils/fileio.py b/build/lib/oras/utils/fileio.py new file mode 100644 index 0000000..db30820 --- /dev/null +++ b/build/lib/oras/utils/fileio.py @@ -0,0 +1,374 @@ +__author__ = "Vanessa Sochat" +__copyright__ = "Copyright The ORAS Authors." +__license__ = "Apache-2.0" + +import errno +import hashlib +import io +import json +import os +import pathlib +import re +import shutil +import stat +import sys +import tarfile +import tempfile +from contextlib import contextmanager +from typing import Generator, Optional, TextIO, Union + + +class PathAndOptionalContent: + """Class for holding a path reference and optional content parsed from a string.""" + + def __init__(self, path: str, content: Optional[str] = None): + self.path = path + self.content = content + + +def make_targz(source_dir: str, dest_name: Optional[str] = None) -> str: + """ + Make a targz (compressed) archive from a source directory. + """ + dest_name = dest_name or get_tmpfile(suffix=".tar.gz") + with tarfile.open(dest_name, "w:gz") as tar: + tar.add(source_dir, arcname=os.path.basename(source_dir)) + return dest_name + + +def sanitize_path(expected_dir, path): + """ + Ensure a path resolves to be in the expected parent directory. + + It can be directly there or a child, but not outside it. + We raise an error if it does not - this should not happen + """ + base_dir = pathlib.Path(expected_dir).expanduser().resolve() + path = pathlib.Path(path).expanduser().resolve() # path = base_dir + file_name + if not ((base_dir in path.parents) or (str(base_dir) == str(path))): + raise Exception(f"Filename {path} is not in {base_dir} directory") + return str(path) + + +@contextmanager +def workdir(dirname): + """ + Provide context for a working directory, e.g., + + with workdir(name): + # do stuff + """ + here = os.getcwd() + os.chdir(dirname) + try: + yield + finally: + os.chdir(here) + + +def readline() -> str: + """ + Read lines from stdin + """ + content = sys.stdin.readlines() + return content[0].strip() + + +def extract_targz(targz: str, outdir: str, numeric_owner: bool = False): + """ + Extract a .tar.gz to an output directory. + """ + with tarfile.open(targz, "r:gz") as tar: + for member in tar.getmembers(): + member_path = os.path.join(outdir, member.name) + if not is_within_directory(outdir, member_path): + raise Exception("Attempted Path Traversal in Tar File") + tar.extractall(outdir, members=None, numeric_owner=numeric_owner) + + +def is_within_directory(directory: str, target: str) -> bool: + """ + Determine whether a file is within a directory + """ + abs_directory = os.path.abspath(directory) + abs_target = os.path.abspath(target) + prefix = os.path.commonprefix([abs_directory, abs_target]) + return prefix == abs_directory + + +def get_size(path: str) -> int: + """ + Get the size of a blob + + :param path : the path to get the size for + :type path: str + """ + return pathlib.Path(path).stat().st_size + + +def get_file_hash(path: str, algorithm: str = "sha256") -> str: + """ + Return an sha256 hash of the file based on an algorithm + Raises AttributeError if incorrect algorithm supplied. + + :param path: the path to get the size for + :type path: str + :param algorithm: the algorithm to use + :type algorithm: str + """ + hasher = getattr(hashlib, algorithm)() + with open(path, "rb") as f: + for chunk in iter(lambda: f.read(4096), b""): + hasher.update(chunk) + return hasher.hexdigest() + + +def mkdir_p(path: str): + """ + Make a directory path if it does not exist, akin to mkdir -p + + :param path : the path to create + :type path: str + """ + try: + os.makedirs(path) + except OSError as e: + if e.errno == errno.EEXIST and os.path.isdir(path): + pass + else: + raise ValueError(f"Error creating path {path}.") + + +def get_tmpfile( + tmpdir: Optional[str] = None, prefix: str = "", suffix: str = "" +) -> str: + """ + Get a temporary file with an optional prefix. + + :param tmpdir : an optional temporary directory + :type tmpdir: str + :param prefix: an optional prefix for the temporary path + :type prefix: str + :param suffix: an optional suffix (extension) + :type suffix: str + """ + # First priority for the base goes to the user requested. + tmpdir = get_tmpdir(tmpdir) + + # If tmpdir is set, add to prefix + if tmpdir: + prefix = os.path.join(tmpdir, os.path.basename(prefix)) + + fd, tmp_file = tempfile.mkstemp(prefix=prefix, suffix=suffix) + os.close(fd) + + return tmp_file + + +def get_tmpdir( + tmpdir: Optional[str] = None, prefix: Optional[str] = "", create: bool = True +) -> str: + """ + Get a temporary directory for an operation. + + :param tmpdir: an optional temporary directory + :type tmpdir: str + :param prefix: an optional prefix for the temporary path + :type prefix: str + :param create: create the directory + :type create: bool + """ + tmpdir = tmpdir or tempfile.gettempdir() + prefix = prefix or "oras-tmp" + prefix = "%s.%s" % (prefix, next(tempfile._get_candidate_names())) # type: ignore + tmpdir = os.path.join(tmpdir, prefix) + + if not os.path.exists(tmpdir) and create is True: + os.mkdir(tmpdir) + + return tmpdir + + +def recursive_find(base: str, pattern: Optional[str] = None) -> Generator: + """ + Find filenames that match a particular pattern, and yield them. + + :param base : the root to search + :type base: str + :param pattern: an optional file pattern to use with fnmatch + :type pattern: str + """ + # We can identify modules by finding module.lua + for root, folders, files in os.walk(base): + for file in files: + fullpath = os.path.abspath(os.path.join(root, file)) + + if pattern and not re.search(pattern, fullpath): + continue + + yield fullpath + + +def copyfile(source: str, destination: str, force: bool = True) -> str: + """ + Copy a file from a source to its destination. + + :param source: the source to copy from + :type source: str + :param destination: the destination to copy to + :type destination: str + :param force: force copy if destination already exists + :type force: bool + """ + # Case 1: It's already there, we aren't replacing it :) + if source == destination and force is False: + return destination + + # Case 2: It's already there, we ARE replacing it :) + if os.path.exists(destination) and force is True: + os.remove(destination) + + shutil.copyfile(source, destination) + return destination + + +def write_file( + filename: str, content: str, mode: str = "w", make_exec: bool = False +) -> str: + """ + Write content to a filename + + :param filename: filname to write + :type filename: str + :param content: content to write + :type content: str + :param mode: mode to write + :type mode: str + :param make_exec: make executable + :type make_exec: bool + """ + with open(filename, mode) as filey: + filey.writelines(content) + if make_exec: + st = os.stat(filename) + + # Execute / search permissions for the user and others + os.chmod(filename, st.st_mode | stat.S_IEXEC | stat.S_IXGRP | stat.S_IXOTH) + return filename + + +def read_in_chunks(image: Union[TextIO, io.BufferedReader], chunk_size: int = 1024): + """ + Helper function to read file in chunks, with default size 1k. + + :param image: file descriptor + :type image: TextIO or io.BufferedReader + :param chunk_size: size of the chunk + :type chunk_size: int + """ + while True: + data = image.read(chunk_size) + if not data: + break + yield data + + +def write_json(json_obj: dict, filename: str, mode: str = "w") -> str: + """ + Write json to a filename + + :param json_obj: json object to write + :type json_obj: dict + :param filename: filename to write + :type filename: str + :param mode: mode to write + :type mode: str + """ + with open(filename, mode) as filey: + filey.writelines(print_json(json_obj)) + return filename + + +def print_json(json_obj: dict) -> str: + """ + Pretty print json. + + :param json_obj: json object to print + :type json_obj: dict + """ + return json.dumps(json_obj, indent=4, separators=(",", ": ")) + + +def read_file(filename: str, mode: str = "r") -> str: + """ + Read a file. + + :param filename: filename to read + :type filename: str + :param mode: mode to read + :type mode: str + """ + with open(filename, mode) as filey: + content = filey.read() + return content + + +def read_json(filename: str, mode: str = "r") -> dict: + """ + Read a json file to a dictionary. + + :param filename: filename to read + :type filename: str + :param mode: mode to read + :type mode: str + """ + return json.loads(read_file(filename)) + + +def split_path_and_content(ref: str) -> PathAndOptionalContent: + """ + Parse a string containing a path and an optional content + + Examples + -------- + : + path/to/config:application/vnd.oci.image.config.v1+json + /dev/null:application/vnd.oci.image.config.v1+json + C:\\myconfig:application/vnd.oci.image.config.v1+json + + Or, + + /dev/null + C:\\myconfig + + :param ref: the manifest reference to parse (examples above) + :type ref: str + : return: A Tuple of the path in the reference, and the content-type if one found, + otherwise None. + """ + if ":" not in ref: + return PathAndOptionalContent(ref, None) + + if pathlib.Path(ref).drive: + # Running on Windows and Path has Windows drive letter in it, it definitely has + # one colon and could have two or feasibly more, e.g. + # C:\test.tar + # C:\test.tar:application/vnd.oci.image.layer.v1.tar + # C:\test.tar:application/vnd.oci.image.layer.v1.tar:somethingelse + # + # This regex matches two colons in the string and returns everything before + # the second colon as the "path" group and everything after the second colon + # as the "context" group. + # i.e. + # (C:\test.tar):(application/vnd.oci.image.layer.v1.tar) + # (C:\test.tar):(application/vnd.oci.image.layer.v1.tar:somethingelse) + # But C:\test.tar along will not match and we just return it as is. + path_and_content = re.search(r"(?P.*?:.*?):(?P.*)", ref) + if path_and_content: + return PathAndOptionalContent( + path_and_content.group("path"), path_and_content.group("content") + ) + return PathAndOptionalContent(ref, None) + else: + path_content_list = ref.split(":", 1) + return PathAndOptionalContent(path_content_list[0], path_content_list[1]) diff --git a/build/lib/oras/utils/request.py b/build/lib/oras/utils/request.py new file mode 100644 index 0000000..7fa48ae --- /dev/null +++ b/build/lib/oras/utils/request.py @@ -0,0 +1,63 @@ +__author__ = "Vanessa Sochat" +__copyright__ = "Copyright The ORAS Authors." +__license__ = "Apache-2.0" + +import os +import urllib.parse as urlparse +from urllib.parse import urlencode + + +def iter_localhosts(name: str): + """ + Given a url with localhost, always resolve to 127.0.0.1. + + :param name : the name of the original host string + :type name: str + """ + names = [name] + if "localhost" in name: + names.append(name.replace("localhost", "127.0.0.1")) + elif "127.0.0.1" in name: + names.append(name.replace("127.0.0.1", "localhost")) + for name in names: + yield name + + +def find_docker_config(exists: bool = True): + """ + Return the docker default config path. + """ + path = os.path.expanduser("~/.docker/config.json") + + # Allow the caller to request the path regardless of existing + if os.path.exists(path) or not exists: + return path + + +def append_url_params(url: str, params: dict) -> str: + """ + Given a dictionary of params and a url, parse the url and add extra params. + + :param url: the url string to parse + :type url: str + :param params: parameters to add + :type params: dict + """ + parts = urlparse.urlparse(url) + query = dict(urlparse.parse_qsl(parts.query)) + query.update(params) + updated = list(parts) + updated[4] = urlencode(query) + return urlparse.urlunparse(updated) + + +def get_docker_client(tls_verify: bool = True, **kwargs): + """ + Get a docker client. + + :param tls_verify : enable tls + :type tls_verify: bool + """ + import docker + + return docker.DockerClient(tls=tls_verify, **kwargs) diff --git a/build/lib/oras/version.py b/build/lib/oras/version.py new file mode 100644 index 0000000..5fa3cc4 --- /dev/null +++ b/build/lib/oras/version.py @@ -0,0 +1,26 @@ +__author__ = "Vanessa Sochat" +__copyright__ = "Copyright The ORAS Authors." +__license__ = "Apache-2.0" + +__version__ = "0.1.29" +AUTHOR = "Vanessa Sochat" +EMAIL = "vsoch@users.noreply.github.com" +NAME = "oras" +PACKAGE_URL = "https://github.com/oras-project/oras-py" +KEYWORDS = "oci, registry, storage" +DESCRIPTION = "OCI Registry as Storage Python SDK" +LICENSE = "LICENSE" + +################################################################################ +# Global requirements + +INSTALL_REQUIRES = ( + ("jsonschema", {"min_version": None}), + ("requests", {"min_version": None}), +) + +TESTS_REQUIRES = (("pytest", {"min_version": "4.6.2"}),) + +DOCKER_REQUIRES = (("docker", {"exact_version": "5.0.1"}),) + +INSTALL_REQUIRES_ALL = INSTALL_REQUIRES + TESTS_REQUIRES + DOCKER_REQUIRES diff --git a/dist/oras-0.1.29-py3.10.egg b/dist/oras-0.1.29-py3.10.egg new file mode 100644 index 0000000..582f46e Binary files /dev/null and b/dist/oras-0.1.29-py3.10.egg differ diff --git a/oras/provider.py b/oras/provider.py index 1416fd7..b57d02a 100644 --- a/oras/provider.py +++ b/oras/provider.py @@ -221,12 +221,7 @@ def _parse_manifest_ref(self, ref: str) -> Tuple[str, str]: return path_content.path, path_content.content def upload_blob( - self, - blob: str, - container: container_type, - layer: dict, - do_chunked: bool = False, - refresh_headers: bool = True + self, blob: str, container: container_type, layer: dict, do_chunked: bool = False, refresh_headers: bool = True ) -> requests.Response: """ Prepare and upload a blob. @@ -258,10 +253,7 @@ def upload_blob( response = self.chunked_upload(blob, container, layer, refresh_headers=refresh_headers) # If we have an empty layer digest and the registry didn't accept, just return dummy successful response - if ( - response.status_code not in [200, 201, 202] - and layer["digest"] == oras.defaults.blank_hash - ): + if response.status_code not in [200, 201, 202] and layer["digest"] == oras.defaults.blank_hash: response = requests.Response() response.status_code = 200 return response @@ -330,9 +322,7 @@ def extract_tags(response: requests.Response): tags = tags[:N] return tags - def _do_paginated_request( - self, url: str, callable: Callable[[requests.Response], bool] - ): + def _do_paginated_request(self, url: str, callable: Callable[[requests.Response], bool]): """ Paginate a request for a URL. @@ -401,36 +391,20 @@ def get_container(self, name: container_type) -> oras.container.Container: # Functions to be deprecated in favor of exposed ones @decorator.ensure_container - def _download_blob( - self, container: container_type, digest: str, outfile: str - ) -> str: - logger.warning( - "This function is deprecated in favor of download_blob and will be removed by 0.1.2" - ) + def _download_blob(self, container: container_type, digest: str, outfile: str) -> str: + logger.warning("This function is deprecated in favor of download_blob and will be removed by 0.1.2") return self.download_blob(container, digest, outfile) - def _put_upload( - self, blob: str, container: oras.container.Container, layer: dict - ) -> requests.Response: - logger.warning( - "This function is deprecated in favor of put_upload and will be removed by 0.1.2" - ) + def _put_upload(self, blob: str, container: oras.container.Container, layer: dict) -> requests.Response: + logger.warning("This function is deprecated in favor of put_upload and will be removed by 0.1.2") return self.put_upload(blob, container, layer) - def _chunked_upload( - self, blob: str, container: oras.container.Container, layer: dict - ) -> requests.Response: - logger.warning( - "This function is deprecated in favor of chunked_upload and will be removed by 0.1.2" - ) + def _chunked_upload(self, blob: str, container: oras.container.Container, layer: dict) -> requests.Response: + logger.warning("This function is deprecated in favor of chunked_upload and will be removed by 0.1.2") return self.chunked_upload(blob, container, layer) - def _upload_manifest( - self, manifest: dict, container: oras.container.Container - ) -> requests.Response: - logger.warning( - "This function is deprecated in favor of upload_manifest and will be removed by 0.1.2" - ) + def _upload_manifest(self, manifest: dict, container: oras.container.Container) -> requests.Response: + logger.warning("This function is deprecated in favor of upload_manifest and will be removed by 0.1.2") return self.upload_manifest(manifest, container) def _upload_blob( @@ -440,15 +414,11 @@ def _upload_blob( layer: dict, do_chunked: bool = False, ) -> requests.Response: - logger.warning( - "This function is deprecated in favor of upload_blob and will be removed by 0.1.2" - ) + logger.warning("This function is deprecated in favor of upload_blob and will be removed by 0.1.2") return self.upload_blob(blob, container, layer, do_chunked) @decorator.ensure_container - def download_blob( - self, container: container_type, digest: str, outfile: str - ) -> str: + def download_blob(self, container: container_type, digest: str, outfile: str) -> str: """ Stream download a blob into an output file. @@ -511,9 +481,7 @@ def put_upload( "Content-Type": "application/octet-stream", } headers.update(self.headers) - blob_url = oras.utils.append_url_params( - session_url, {"digest": layer["digest"]} - ) + blob_url = oras.utils.append_url_params(session_url, {"digest": layer["digest"]}) with open(blob, "rb") as fd: response = self.do_request( blob_url, @@ -523,9 +491,7 @@ def put_upload( ) return response - def _get_location( - self, r: requests.Response, container: oras.container.Container - ) -> str: + def _get_location(self, r: requests.Response, container: oras.container.Container) -> str: """ Parse the location header and ensure it includes a hostname. This currently assumes if there isn't a hostname, we are pushing to @@ -595,14 +561,10 @@ def chunked_upload( # Important to update with auth token if acquired headers.update(self.headers) start = end + 1 - self._check_200_response( - self.do_request(session_url, "PATCH", data=chunk, headers=headers) - ) + self._check_200_response(self.do_request(session_url, "PATCH", data=chunk, headers=headers)) # Finally, issue a PUT request to close blob - session_url = oras.utils.append_url_params( - session_url, {"digest": layer["digest"]} - ) + session_url = oras.utils.append_url_params(session_url, {"digest": layer["digest"]}) return self.do_request(session_url, "PUT", headers=self.headers) def _check_200_response(self, response: requests.Response): @@ -704,9 +666,7 @@ def push(self, *args, **kwargs) -> requests.Response: # Upload files as blobs for blob in kwargs.get("files", []): # You can provide a blob + content type - path_content: PathAndOptionalContent = oras.utils.split_path_and_content( - str(blob) - ) + path_content: PathAndOptionalContent = oras.utils.split_path_and_content(str(blob)) blob = path_content.path media_type = path_content.content @@ -717,9 +677,7 @@ def push(self, *args, **kwargs) -> requests.Response: # Path validation means blob must be relative to PWD. if validate_path: if not self._validate_path(blob): - raise ValueError( - f"Blob {blob} is not in the present working directory context." - ) + raise ValueError(f"Blob {blob} is not in the present working directory context.") # Save directory or blob name before compressing blob_name = os.path.basename(blob) @@ -735,9 +693,7 @@ def push(self, *args, **kwargs) -> requests.Response: annotations = annotset.get_annotations(blob) # Always strip blob_name of path separator - layer["annotations"] = { - oras.defaults.annotation_title: blob_name.strip(os.sep) - } + layer["annotations"] = {oras.defaults.annotation_title: blob_name.strip(os.sep)} if annotations: layer["annotations"].update(annotations) @@ -783,11 +739,7 @@ def push(self, *args, **kwargs) -> requests.Response: # Config is just another layer blob! logger.debug(f"Preparing config {conf}") - with ( - temporary_empty_config() - if config_file is None - else nullcontext(config_file) - ) as config_file: + with temporary_empty_config() if config_file is None else nullcontext(config_file) as config_file: response = self.upload_blob(config_file, container, conf, refresh_headers=refresh_headers) self._check_200_response(response) @@ -829,9 +781,7 @@ def pull(self, *args, **kwargs) -> List[str]: files = [] for layer in manifest.get("layers", []): - filename = (layer.get("annotations") or {}).get( - oras.defaults.annotation_title - ) + filename = (layer.get("annotations") or {}).get(oras.defaults.annotation_title) # If we don't have a filename, default to digest. Hopefully does not happen if not filename: @@ -841,9 +791,7 @@ def pull(self, *args, **kwargs) -> List[str]: outfile = oras.utils.sanitize_path(outdir, os.path.join(outdir, filename)) if not overwrite and os.path.exists(outfile): - logger.warning( - f"{outfile} already exists and --keep-old-files set, will not overwrite." - ) + logger.warning(f"{outfile} already exists and --keep-old-files set, will not overwrite.") continue # A directory will need to be uncompressed and moved @@ -976,9 +924,7 @@ def authenticate_request(self, originalResponse: requests.Response) -> bool: """ authHeaderRaw = originalResponse.headers.get("Www-Authenticate") if not authHeaderRaw: - logger.debug( - "Www-Authenticate not found in original response, cannot authenticate." - ) + logger.debug("Www-Authenticate not found in original response, cannot authenticate.") return False # If we have a token, set auth header (base64 encoded user/pass)