From 75dfa27d3c2886ea46e4d0c6bfcc0cc322af554b Mon Sep 17 00:00:00 2001 From: Philippe Modard Date: Mon, 17 Jul 2023 16:59:29 +0000 Subject: [PATCH] 1.5.16 --- CHANGELOG.md | 10 + kaggle/api/kaggle_api_extended.py | 545 +++++++++++++++++++----- kaggle/cli.py | 41 +- kaggle/models/kaggle_models_extended.py | 31 ++ setup.py | 2 +- 5 files changed, 517 insertions(+), 112 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a58f9cd..48821ef 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,16 @@ Changelog ==== +#### 1.5.16 +Release date: 7/17/23 +* Fix dataset download bug with locale + +#### 1.6.0a4 +Release date: 7/07/23 +* Resumable uploads +* Retry some failed requests +* Flag `-y` to delete model/instance/version without confirmation + #### 1.6.0a3 Release date: 7/06/23 * Confirmation for deleting a model, instance or version diff --git a/kaggle/api/kaggle_api_extended.py b/kaggle/api/kaggle_api_extended.py index 45fdae0..c4e4891 100644 --- a/kaggle/api/kaggle_api_extended.py +++ b/kaggle/api/kaggle_api_extended.py @@ -38,9 +38,11 @@ import json import os from os.path import expanduser +from random import random import sys import shutil import tarfile +import time import zipfile import tempfile from ..api_client import ApiClient @@ -66,6 +68,7 @@ from ..models.kaggle_models_extended import Model from ..models.kaggle_models_extended import ModelNewResponse from ..models.kaggle_models_extended import ModelDeleteResponse +from ..models.kaggle_models_extended import ResumableUploadResult from ..models.kaggle_models_extended import Submission from ..models.kaggle_models_extended import SubmitResult from ..models.kernel_push_request import KernelPushRequest @@ -76,9 +79,11 @@ from ..models.model_update_request import ModelUpdateRequest from ..models.model_instance_update_request import ModelInstanceUpdateRequest from ..models.start_blob_upload_request import StartBlobUploadRequest +from ..models.start_blob_upload_response import StartBlobUploadResponse from ..models.upload_file import UploadFile import requests from requests.adapters import HTTPAdapter +import requests.packages.urllib3.exceptions as urllib3_exceptions from requests.packages.urllib3.util.retry import Retry from ..rest import ApiException import six @@ -107,13 +112,166 @@ def __enter__(self): os.path.join(self._temp_dir, dir_name), self._format, self._fullpath) _, self.name = os.path.split(self.path) + return self def __exit__(self, *args): shutil.rmtree(self._temp_dir) +class ResumableUploadContext(object): + def __init__(self, no_resume=False): + self.no_resume = no_resume + self._temp_dir = os.path.join(tempfile.gettempdir(), '.kaggle/uploads') + self._file_uploads = [] + + def __enter__(self): + if self.no_resume: + return + self._create_temp_dir() + return self + + def __exit__(self, exc_type, exc_value, exc_traceback): + if self.no_resume: + return + if exc_type is not None: + # Don't delete the upload file info when there is an error + # to give it a chance to retry/resume on the next invocation. + return + for file_upload in self._file_uploads: + file_upload.cleanup() + + def get_upload_info_file_path(self, path): + return os.path.join( + self._temp_dir, + '%s.json' % path.replace(os.path.sep, '_').replace(':', '_')) + + def new_resumable_file_upload(self, path, start_blob_upload_request): + file_upload = ResumableFileUpload(path, start_blob_upload_request, + self) + self._file_uploads.append(file_upload) + file_upload.load() + return file_upload + + def _create_temp_dir(self): + try: + os.makedirs(self._temp_dir) + except FileExistsError: + pass + + +class ResumableFileUpload(object): + # Reference: https://cloud.google.com/storage/docs/resumable-uploads + # A resumable upload must be completed within a week of being initiated + RESUMABLE_UPLOAD_EXPIRY_SECONDS = 6 * 24 * 3600 + + def __init__(self, path, start_blob_upload_request, context): + self.path = path + self.start_blob_upload_request = start_blob_upload_request + self.context = context + self.timestamp = int(time.time()) + self.start_blob_upload_response = None + self.can_resume = False + self.upload_complete = False + if self.context.no_resume: + return + self._upload_info_file_path = self.context.get_upload_info_file_path( + path) + + def get_token(self): + if self.upload_complete: + return self.start_blob_upload_response.token + return None + + def load(self): + if self.context.no_resume: + return + self._load_previous_if_any() + + def _load_previous_if_any(self): + if not os.path.exists(self._upload_info_file_path): + return False + + try: + with io.open(self._upload_info_file_path, 'r') as f: + previous = ResumableFileUpload.from_dict( + json.load(f), self.context) + if self._is_previous_valid(previous): + self.start_blob_upload_response = previous.start_blob_upload_response + self.timestamp = previous.timestamp + self.can_resume = True + except Exception as e: + print('Error while trying to load upload info:', e) + + def _is_previous_valid(self, previous): + return previous.path == self.path and \ + previous.start_blob_upload_request == self.start_blob_upload_request and \ + previous.timestamp > time.time() - ResumableFileUpload.RESUMABLE_UPLOAD_EXPIRY_SECONDS + + def upload_initiated(self, start_blob_upload_response): + if self.context.no_resume: + return + + self.start_blob_upload_response = start_blob_upload_response + with io.open(self._upload_info_file_path, 'w') as f: + json.dump(self.to_dict(), f, indent=True) + + def upload_completed(self): + if self.context.no_resume: + return + + self.upload_complete = True + self._save() + + def _save(self): + with io.open(self._upload_info_file_path, 'w') as f: + json.dump(self.to_dict(), f, indent=True) + + def cleanup(self): + if self.context.no_resume: + return + + try: + os.remove(self._upload_info_file_path) + except OSError: + pass + + def to_dict(self): + return { + 'path': + self.path, + 'start_blob_upload_request': + self.start_blob_upload_request.to_dict(), + 'timestamp': + self.timestamp, + 'start_blob_upload_response': + self.start_blob_upload_response.to_dict() + if self.start_blob_upload_response is not None else None, + 'upload_complete': + self.upload_complete, + } + + def from_dict(other, context): + new = ResumableFileUpload( + other['path'], + StartBlobUploadRequest(**other['start_blob_upload_request']), + context) + new.timestamp = other.get('timestamp') + start_blob_upload_response = other.get('start_blob_upload_response') + if start_blob_upload_response is not None: + new.start_blob_upload_response = StartBlobUploadResponse( + **start_blob_upload_response) + new.upload_complete = other.get('upload_complete') or False + return new + + def to_str(self): + return str(self.to_dict()) + + def __repr__(self): + return self.to_str() + + class KaggleApi(KaggleApi): - __version__ = '1.6.0a3' + __version__ = '1.5.16' CONFIG_NAME_PROXY = 'proxy' CONFIG_NAME_COMPETITION = 'competition' @@ -129,6 +287,7 @@ class KaggleApi(KaggleApi): MODEL_METADATA_FILE = 'model-metadata.json' MODEL_INSTANCE_METADATA_FILE = 'model-instance-metadata.json' MAX_NUM_INBOX_FILES_TO_UPLOAD = 1000 + MAX_UPLOAD_RESUME_ATTEMPTS = 10 config_dir = os.environ.get('KAGGLE_CONFIG_DIR') or os.path.join( expanduser('~'), '.kaggle') @@ -180,6 +339,44 @@ class KaggleApi(KaggleApi): reload(sys) sys.setdefaultencoding('latin1') + def _is_retriable(self, e): + return issubclass(type(e), ConnectionError) or \ + issubclass(type(e), urllib3_exceptions.ConnectionError) or \ + issubclass(type(e), urllib3_exceptions.ConnectTimeoutError) or \ + issubclass(type(e), urllib3_exceptions.ProtocolError) or \ + issubclass(type(e), requests.exceptions.ConnectionError) or \ + issubclass(type(e), requests.exceptions.ConnectTimeout) + + def _calculate_backoff_delay(self, attempt, initial_delay_millis, + retry_multiplier, randomness_factor): + delay_ms = initial_delay_millis * (retry_multiplier**attempt) + random_wait_ms = int(random() - 0.5) * 2 * delay_ms * randomness_factor + total_delay = (delay_ms + random_wait_ms) / 1000.0 + return total_delay + + def with_retry(self, + func, + max_retries=10, + initial_delay_millis=500, + retry_multiplier=1.7, + randomness_factor=0.5): + def retriable_func(*args): + for i in range(1, max_retries + 1): + try: + return func(*args) + except Exception as e: + if self._is_retriable(e) and i < max_retries: + total_delay = self._calculate_backoff_delay( + i, initial_delay_millis, retry_multiplier, + randomness_factor) + print('Request failed: %s. Will retry in %2.1f seconds' + % (e, total_delay)) + time.sleep(total_delay) + continue + raise + + return retriable_func + ## Authentication def authenticate(self): @@ -569,10 +766,10 @@ def competition_submit(self, file_name, message, competition, quiet=False): upload_result_token = upload_result['token'] else: # New submissions path! - success = self.upload_complete(file_name, - url_result['createUrl'], quiet) - if not success: - # Actual error is printed during upload_complete. Not + upload_status = self.upload_complete( + file_name, url_result['createUrl'], quiet) + if upload_status != ResumableUploadResult.COMPLETE: + # Actual error is printed during upload_complete. Not # ideal but changing would not be backwards compatible return "Could not submit to competition" @@ -1297,7 +1494,7 @@ def dataset_download_cli(self, self.dataset_download_file( dataset, file_name, path=path, force=force, quiet=quiet) - def _upload_blob(self, path, quiet, blob_type): + def _upload_blob(self, path, quiet, blob_type, upload_context): """ upload a file Parameters @@ -1305,23 +1502,45 @@ def _upload_blob(self, path, quiet, blob_type): path: the complete path to upload quiet: suppress verbose output (default is False) blob_type (ApiBlobType): To which entity the file/blob refers + upload_context (ResumableUploadContext): Context for resumable uploads """ file_name = os.path.basename(path) content_length = os.path.getsize(path) last_modified_epoch_seconds = int(os.path.getmtime(path)) - request = StartBlobUploadRequest( + start_blob_upload_request = StartBlobUploadRequest( blob_type, file_name, content_length, last_modified_epoch_seconds=last_modified_epoch_seconds) - response = self.process_response( - self.upload_file_with_http_info(request)) - success = self.upload_complete(path, response.create_url, quiet) - if success: - return response.token - return None + file_upload = upload_context.new_resumable_file_upload( + path, start_blob_upload_request) + + for i in range(0, self.MAX_UPLOAD_RESUME_ATTEMPTS): + if file_upload.upload_complete: + return file_upload + + if not file_upload.can_resume: + # Initiate upload on Kaggle backend to get the url and token. + start_blob_upload_response = self.process_response( + self.with_retry(self.upload_file_with_http_info)( + file_upload.start_blob_upload_request)) + file_upload.upload_initiated(start_blob_upload_response) + + upload_result = self.upload_complete( + path, + file_upload.start_blob_upload_response.create_url, + quiet, + resume=file_upload.can_resume) + if upload_result == ResumableUploadResult.INCOMPLETE: + continue # Continue (i.e., retry/resume) only if the upload is incomplete. + + if upload_result == ResumableUploadResult.COMPLETE: + file_upload.upload_completed() + break + + return file_upload.get_token() def dataset_create_version(self, folder, @@ -1373,30 +1592,33 @@ def dataset_create_version(self, convert_to_csv=convert_to_csv, category_ids=keywords, delete_old_versions=delete_old_versions) - self.upload_files(request, resources, folder, ApiBlobType.DATASET, - quiet, dir_mode) - if id_no: - result = DatasetNewVersionResponse( - self.process_response( - self.datasets_create_version_by_id_with_http_info( - id_no, request))) - else: - if ref == self.config_values[ - self.CONFIG_NAME_USER] + '/INSERT_SLUG_HERE': - raise ValueError( - 'Default slug detected, please change values before ' - 'uploading') - self.validate_dataset_string(ref) - ref_list = ref.split('/') - owner_slug = ref_list[0] - dataset_slug = ref_list[1] - result = DatasetNewVersionResponse( - self.process_response( - self.datasets_create_version_with_http_info( - owner_slug, dataset_slug, request))) + with ResumableUploadContext() as upload_context: + self.upload_files(request, resources, folder, ApiBlobType.DATASET, + upload_context, quiet, dir_mode) - return result + if id_no: + result = DatasetNewVersionResponse( + self.process_response( + self.with_retry( + self.datasets_create_version_by_id_with_http_info)( + id_no, request))) + else: + if ref == self.config_values[ + self.CONFIG_NAME_USER] + '/INSERT_SLUG_HERE': + raise ValueError( + 'Default slug detected, please change values before ' + 'uploading') + self.validate_dataset_string(ref) + ref_list = ref.split('/') + owner_slug = ref_list[0] + dataset_slug = ref_list[1] + result = DatasetNewVersionResponse( + self.process_response( + self.datasets_create_version_with_http_info( + owner_slug, dataset_slug, request))) + + return result def dataset_create_version_cli(self, folder, @@ -1539,11 +1761,14 @@ def dataset_create_new(self, is_private=not public, convert_to_csv=convert_to_csv, category_ids=keywords) - self.upload_files(request, resources, folder, ApiBlobType.DATASET, - quiet, dir_mode) - result = DatasetNewResponse( - self.process_response( - self.datasets_create_new_with_http_info(request))) + + with ResumableUploadContext() as upload_context: + self.upload_files(request, resources, folder, ApiBlobType.DATASET, + upload_context, quiet, dir_mode) + result = DatasetNewResponse( + self.process_response( + self.with_retry( + self.datasets_create_new_with_http_info)(request))) return result @@ -1602,7 +1827,7 @@ def download_file(self, size_read = 0 open_mode = 'wb' remote_date = datetime.strptime(response.headers['Last-Modified'], - '%a, %d %b %Y %X %Z') + '%a, %d %b %Y %H:%M:%S %Z') remote_date_timestamp = time.mktime(remote_date.timetuple()) if not quiet: @@ -2491,32 +2716,35 @@ def model_create_new_cli(self, folder=None): else: print('Model creation error: ' + result.error) - def model_delete(self, model): + def model_delete(self, model, yes): """ call to delete a model from the API Parameters ========== model: the string identified of the model should be in format [owner]/[model-name] + yes: automatic confirmation """ owner_slug, model_slug = self.split_model_string(model) - if not self.confirmation(): - print('Deletion cancelled') - exit(0) + if not yes: + if not self.confirmation(): + print('Deletion cancelled') + exit(0) res = ModelDeleteResponse( self.process_response( self.delete_model_with_http_info(owner_slug, model_slug))) return res - def model_delete_cli(self, model): + def model_delete_cli(self, model, yes): """ wrapper for client for model_delete Parameters ========== model: the string identified of the model should be in format [owner]/[model-name] + yes: automatic confirmation """ - result = self.model_delete(model) + result = self.model_delete(model, yes) if result.hasError: print('Model deletion error: ' + result.error) @@ -2770,14 +2998,17 @@ def model_instance_create(self, folder, quiet=False, dir_mode='skip'): fine_tunable=fine_tunable, training_data=training_data, files=[]) - self.upload_files(request, None, folder, ApiBlobType.MODEL, quiet, - dir_mode) - result = ModelNewResponse( - self.process_response( - self.models_create_instance_with_http_info( - owner_slug, model_slug, request))) - return result + with ResumableUploadContext() as upload_context: + self.upload_files(request, None, folder, ApiBlobType.MODEL, + upload_context, quiet, dir_mode) + result = ModelNewResponse( + self.process_response( + self.with_retry( + self.models_create_instance_with_http_info)( + owner_slug, model_slug, request))) + + return result def model_instance_create_cli(self, folder, quiet=False, dir_mode='skip'): """ client wrapper for creating a new model instance @@ -2796,21 +3027,23 @@ def model_instance_create_cli(self, folder, quiet=False, dir_mode='skip'): else: print('Model instance creation error: ' + result.error) - def model_instance_delete(self, model_instance): + def model_instance_delete(self, model_instance, yes): """ call to delete a model instance from the API Parameters ========== model_instance: the string identified of the model instance should be in format [owner]/[model-name]/[framework]/[instance-slug] + yes: automatic confirmation """ if model_instance is None: raise ValueError('A model instance must be specified') owner_slug, model_slug, framework, instance_slug = self.split_model_instance_string( model_instance) - if not self.confirmation(): - print('Deletion cancelled') - exit(0) + if not yes: + if not self.confirmation(): + print('Deletion cancelled') + exit(0) res = ModelDeleteResponse( self.process_response( @@ -2818,14 +3051,15 @@ def model_instance_delete(self, model_instance): owner_slug, model_slug, framework, instance_slug))) return res - def model_instance_delete_cli(self, model_instance): + def model_instance_delete_cli(self, model_instance, yes): """ wrapper for client for model_instance_delete Parameters ========== model_instance: the string identified of the model instance should be in format [owner]/[model-name]/[framework]/[instance-slug] + yes: automatic confirmation """ - result = self.model_instance_delete(model_instance) + result = self.model_instance_delete(model_instance, yes) if result.hasError: print('Model instance deletion error: ' + result.error) @@ -2946,15 +3180,18 @@ def model_instance_version_create(self, request = ModelInstanceNewVersionRequest( version_notes=version_notes, files=[]) - self.upload_files(request, None, folder, ApiBlobType.MODEL, quiet, - dir_mode) - result = ModelNewResponse( - self.process_response( - self.models_create_instance_version_with_http_info( - owner_slug, model_slug, framework, instance_slug, - request))) - return result + with ResumableUploadContext() as upload_context: + self.upload_files(request, None, folder, ApiBlobType.MODEL, + upload_context, quiet, dir_mode) + result = ModelNewResponse( + self.process_response( + self.with_retry( + self.models_create_instance_version_with_http_info)( + owner_slug, model_slug, framework, instance_slug, + request))) + + return result def model_instance_version_create_cli(self, model_instance, @@ -3071,12 +3308,13 @@ def model_instance_version_download_cli(self, force=force, quiet=quiet) - def model_instance_version_delete(self, model_instance_version): + def model_instance_version_delete(self, model_instance_version, yes): """ call to delete a model instance version from the API Parameters ========== model_instance_version: the string identified of the model instance version should be in format [owner]/[model-name]/[framework]/[instance-slug]/[version-number] + yes: automatic confirmation """ if model_instance_version is None: raise ValueError('A model instance version must be specified') @@ -3089,9 +3327,10 @@ def model_instance_version_delete(self, model_instance_version): instance_slug = urls[3] version_number = urls[4] - if not self.confirmation(): - print('Deletion cancelled') - exit(0) + if not yes: + if not self.confirmation(): + print('Deletion cancelled') + exit(0) res = ModelDeleteResponse( self.process_response( @@ -3100,45 +3339,59 @@ def model_instance_version_delete(self, model_instance_version): version_number))) return res - def model_instance_version_delete_cli(self, model_instance_version): + def model_instance_version_delete_cli(self, model_instance_version, yes): """ wrapper for client for model_instance_version_delete Parameters ========== model_instance_version: the string identified of the model instance version should be in format [owner]/[model-name]/[framework]/[instance-slug]/[version-number] + yes: automatic confirmation """ - result = self.model_instance_version_delete(model_instance_version) + result = self.model_instance_version_delete(model_instance_version, + yes) if result.hasError: print('Model instance version deletion error: ' + result.error) else: print('The model instance version was deleted.') - def files_upload_cli(self, local_paths, inbox_path=None): + def files_upload_cli(self, local_paths, inbox_path, no_resume, + no_compress): if len(local_paths) > self.MAX_NUM_INBOX_FILES_TO_UPLOAD: - print('Cannot upload more than', - self.MAX_NUM_INBOX_FILES_TO_UPLOAD, 'files!') + print('Cannot upload more than %d files!' % + self.MAX_NUM_INBOX_FILES_TO_UPLOAD) return - for local_path in local_paths: - self.file_upload_cli(local_path, inbox_path) + files_to_create = [] + with ResumableUploadContext(no_resume) as upload_context: + for local_path in local_paths: + (upload_file, file_name) = self.file_upload_cli( + local_path, inbox_path, no_compress, upload_context) + if upload_file is None: + continue + + create_inbox_file_request = CreateInboxFileRequest( + virtual_directory=inbox_path, + blob_file_token=upload_file.token) + files_to_create.append((create_inbox_file_request, file_name)) + + for (create_inbox_file_request, file_name) in files_to_create: + self.process_response( + self.with_retry( + self.create_inbox_file)(create_inbox_file_request)) + print('Inbox file created:', file_name) - def file_upload_cli(self, local_path, inbox_path=None): + def file_upload_cli(self, local_path, inbox_path, no_compress, + upload_context): full_path = os.path.abspath(local_path) parent_path = os.path.dirname(full_path) file_or_folder_name = os.path.basename(full_path) + dir_mode = 'tar' if no_compress else 'zip' upload_file = self._upload_file_or_folder( - parent_path, file_or_folder_name, ApiBlobType.INBOX, 'zip') - if upload_file is None: - return - - inbox_path = inbox_path or '' - create_inbox_file_request = CreateInboxFileRequest( - virtual_directory=inbox_path, blob_file_token=upload_file.token) - self.process_response( - self.create_inbox_file(create_inbox_file_request)) - print('Upload complete:', file_or_folder_name) + parent_path, file_or_folder_name, ApiBlobType.INBOX, + upload_context, dir_mode) + return (upload_file, file_or_folder_name) def print_obj(self, obj, indent=2): pretty = json.dumps(obj, indent=indent) @@ -3155,7 +3408,7 @@ def download_needed(self, response, outfile, quiet=True): """ try: remote_date = datetime.strptime(response.headers['Last-Modified'], - '%a, %d %b %Y %X %Z') + '%a, %d %b %Y %H:%M:%S %Z') file_exists = os.path.isfile(outfile) if file_exists: local_date = datetime.fromtimestamp(os.path.getmtime(outfile)) @@ -3315,6 +3568,7 @@ def upload_files(self, resources, folder, blob_type, + upload_context, quiet=False, dir_mode='skip'): """ upload files in a folder @@ -3324,6 +3578,7 @@ def upload_files(self, resources: the files to upload folder: the folder to upload from blob_type (ApiBlobType): To which entity the file/blob refers + upload_context (ResumableUploadContext): Context for resumable uploads quiet: suppress verbose output (default is False) """ for file_name in os.listdir(folder): @@ -3334,7 +3589,8 @@ def upload_files(self, ]): continue upload_file = self._upload_file_or_folder( - folder, file_name, blob_type, dir_mode, quiet, resources) + folder, file_name, blob_type, upload_context, dir_mode, quiet, + resources) if upload_file is not None: request.files.append(upload_file) @@ -3342,20 +3598,21 @@ def _upload_file_or_folder(self, parent_path, file_or_folder_name, blob_type, + upload_context, dir_mode, quiet=False, resources=None): full_path = os.path.join(parent_path, file_or_folder_name) if os.path.isfile(full_path): return self._upload_file(file_or_folder_name, full_path, blob_type, - quiet, resources) + upload_context, quiet, resources) elif os.path.isdir(full_path): if dir_mode in ['zip', 'tar']: - archive = DirectoryArchive(full_path, dir_mode) - with archive: + with DirectoryArchive(full_path, dir_mode) as archive: return self._upload_file(archive.name, archive.path, - blob_type, quiet, resources) + blob_type, upload_context, quiet, + resources) elif not quiet: print("Skipping folder: " + file_or_folder_name + "; use '--dir-mode' to upload folders") @@ -3364,13 +3621,15 @@ def _upload_file_or_folder(self, print('Skipping: ' + file_or_folder_name) return None - def _upload_file(self, file_name, full_path, blob_type, quiet, resources): + def _upload_file(self, file_name, full_path, blob_type, upload_context, + quiet, resources): """ Helper function to upload a single file Parameters ========== file_name: name of the file to upload full_path: path to the file to upload blob_type (ApiBlobType): To which entity the file/blob refers + upload_context (ResumableUploadContext): Context for resumable uploads quiet: suppress verbose output resources: optional file metadata :return: None - upload unsuccessful; instance of UploadFile - upload successful @@ -3380,7 +3639,7 @@ def _upload_file(self, file_name, full_path, blob_type, quiet, resources): print('Starting upload for file ' + file_name) content_length = os.path.getsize(full_path) - token = self._upload_blob(full_path, quiet, blob_type) + token = self._upload_blob(full_path, quiet, blob_type, upload_context) if token is None: if not quiet: print('Upload unsuccessful: ' + file_name) @@ -3438,7 +3697,7 @@ def process_column(self, column): processed_column.type = original_type return processed_column - def upload_complete(self, path, url, quiet): + def upload_complete(self, path, url, quiet, resume=False): """ function to complete an upload to retrieve a path from a url Parameters ========== @@ -3447,26 +3706,108 @@ def upload_complete(self, path, url, quiet): quiet: suppress verbose output (default is False) """ file_size = os.path.getsize(path) + resumable_upload_result = ResumableUploadResult.Incomplete() + try: + if resume: + resumable_upload_result = self._resume_upload( + url, file_size, quiet) + if resumable_upload_result.result != ResumableUploadResult.INCOMPLETE: + return resumable_upload_result.result + + start_at = resumable_upload_result.start_at + upload_size = file_size - start_at + with tqdm( - total=file_size, + total=upload_size, unit='B', unit_scale=True, unit_divisor=1024, disable=quiet) as progress_bar: with io.open(path, 'rb', buffering=0) as fp: - reader = TqdmBufferedReader(fp, progress_bar) session = requests.Session() + if start_at > 0: + fp.seek(start_at) + session.headers.update({ + 'Content-Length': + '%d' % upload_size, + 'Content-Range': + 'bytes %d-%d/%d' % (start_at, file_size - 1, + file_size) + }) + reader = TqdmBufferedReader(fp, progress_bar) retries = Retry(total=10, backoff_factor=0.5) adapter = HTTPAdapter(max_retries=retries) session.mount('http://', adapter) session.mount('https://', adapter) response = session.put(url, data=reader) + if self._is_upload_successful(response): + return ResumableUploadResult.COMPLETE + if response.status_code == 503: + return ResumableUploadResult.INCOMPLETE + # Server returned a non-resumable error so give up. + return ResumableUploadResult.FAILED except Exception as error: print(error) - return False + # There is probably some weird bug in our code so try to resume the upload + # in case it works on the next try. + return ResumableUploadResult.INCOMPLETE + + def _resume_upload(self, url, content_length, quiet): + # Documentation: https://developers.google.com/drive/api/guides/manage-uploads#resume-upload + session = requests.Session() + session.headers.update({ + 'Content-Length': '0', + 'Content-Range': 'bytes */%d' % content_length, + }) + + response = session.put(url) + + if self._is_upload_successful(response): + return ResumableUploadResult.Complete() + if response.status_code == 404: + # Upload expired so need to start from scratch. + if not query: + print('Upload of %s expired. Please try again.' % path) + return ResumableUploadResult.Failed() + if response.status_code == 308: # Resume Incomplete + bytes_uploaded = self._get_bytes_already_uploaded(response, quiet) + if bytes_uploaded is None: + # There is an error with the Range header so need to start from scratch. + return ResumableUploadResult.Failed() + result = ResumableUploadResult.Incomplete(bytes_uploaded) + if not quiet: + print('Already uploaded %d bytes. Will resume upload at %d.' % + (result.bytes_uploaded, result.start_at)) + return result + else: + if not quiet: + print('Server returned %d. Please try again.' % + response.status_code) + return ResumableUploadResult.Failed() + + def _is_upload_successful(self, response): return response.status_code == 200 or response.status_code == 201 + def _get_bytes_already_uploaded(self, response, quiet): + range_val = response.headers.get('Range') + if range_val is None: + return 0 # This means server hasn't received anything before. + items = range_val.split('-') # Example: bytes=0-1000 => ['0', '1000'] + if len(items) != 2: + if not quiet: + print('Invalid Range header format: %s. Will try again.' % + range_val) + return None # Shouldn't happen, something's wrong with Range header format. + bytes_uploaded_str = items[-1] # Example: ['0', '1000'] => '1000' + try: + return int(bytes_uploaded_str) # Example: '1000' => 1000 + except ValueError: + if not quiet: + print('Invalid Range header format: %s. Will try again.' % + range_val) + return None # Shouldn't happen, something's wrong with Range header format. + def validate_dataset_string(self, dataset): """ determine if a dataset string is valid, meaning it is in the format of {username}/{dataset-slug} or {username}/{dataset-slug}/{version-number}. diff --git a/kaggle/cli.py b/kaggle/cli.py index f4a15f4..1bc864c 100644 --- a/kaggle/cli.py +++ b/kaggle/cli.py @@ -57,7 +57,7 @@ def main(): parse_competitions(subparsers) parse_datasets(subparsers) parse_kernels(subparsers) - parse_models(subparsers) + # parse_models(subparsers) parse_files(subparsers) parse_config(subparsers) args = parser.parse_args() @@ -978,6 +978,8 @@ def parse_models(subparsers): help=Help.command_models_delete) parser_models_delete_optional = parser_models_delete._action_groups.pop() parser_models_delete_optional.add_argument('model', help=Help.param_model) + parser_models_delete_optional.add_argument( + '-y', '--yes', dest='yes', action='store_true', help=Help.param_yes) parser_models_delete._action_groups.append(parser_models_delete_optional) parser_models_delete.set_defaults(func=api.model_delete_cli) @@ -1089,6 +1091,8 @@ def parse_model_instances(subparsers): ) parser_model_instances_delete_optional.add_argument( 'model_instance', help=Help.param_model_instance) + parser_model_instances_delete_optional.add_argument( + '-y', '--yes', dest='yes', action='store_true', help=Help.param_yes) parser_model_instances_delete._action_groups.append( parser_model_instances_delete_optional) parser_model_instances_delete.set_defaults( @@ -1207,6 +1211,8 @@ def parse_model_instance_versions(subparsers): ) parser_model_instance_versions_delete_optional.add_argument( 'model_instance_version', help=Help.param_model_instance_version) + parser_model_instance_versions_delete_optional.add_argument( + '-y', '--yes', dest='yes', action='store_true', help=Help.param_yes) parser_model_instance_versions_delete._action_groups.append( parser_model_instance_versions_delete_optional) parser_model_instance_versions_delete.set_defaults( @@ -1237,12 +1243,27 @@ def parse_files(subparsers): '--inbox-path', dest='inbox_path', required=False, + default='', help=Help.param_files_upload_inbox_path) parser_files_upload_optional.add_argument( 'local_paths', metavar='local-path', nargs='+', help=Help.param_files_upload_local_paths) + parser_files_upload_optional.add_argument( + '--no-resume', + dest='no_resume', + action='store_true', + required=False, + default=False, + help=Help.param_files_upload_no_resume) + parser_files_upload_optional.add_argument( + '--no-compress', + dest='no_compress', + action='store_true', + required=False, + default=False, + help=Help.param_files_upload_no_compress) parser_files_upload._action_groups.append(parser_files_upload_optional) parser_files_upload.set_defaults(func=api.files_upload_cli) @@ -1325,12 +1346,9 @@ class Help(object): kaggle = 'Use one of:\ncompetitions {' + ', '.join( competitions_choices) + '}\ndatasets {' + ', '.join( - datasets_choices) + '}\nmodels {' + ', '.join( - models_choices) + '}\nmodels instances {' + ', '.join( - model_instances_choices - ) + '}\nmodels instances versions {' + ', '.join( - model_instance_versions_choices - ) + '}\nconfig {' + ', '.join(config_choices) + '}' + datasets_choices) + '}\nkernels {' + ', '.join( + kernels_choices) + '}\nconfig {' + ', '.join( + config_choices) + '}' group_competitions = 'Commands related to Kaggle competitions' group_datasets = 'Commands related to Kaggle datasets' @@ -1421,6 +1439,9 @@ class Help(object): 'Unzip the downloaded file. Will delete the zip file when completed.') param_untar = ( 'Untar the downloaded file. Will delete the tar file when completed.') + param_yes = ( + 'Sets any confirmation values to "yes" automatically. Users will not be asked to confirm.' + ) # Competitions params param_competition = ( @@ -1580,8 +1601,10 @@ class Help(object): param_files_upload_inbox_path = 'Virtual path on the server where the uploaded files will be stored' param_files_upload_local_paths = ( 'List of local filesystem paths. Each path creates a separate file on the server. ' - 'Directories are uploaded as zip archives (e.g., a directory called "data" will be uploaded as "data.zip")' - ) + 'Directories are uploaded as zip archives by default (e.g., a directory called ' + '"data" will be uploaded as "data.zip")') + param_files_upload_no_compress = 'Whether to compress directories (zip) or not (tar)' + param_files_upload_no_resume = 'Whether to skip resumable uploads.' # Config params param_config_name = ('Name of the configuration parameter\n(one of ' diff --git a/kaggle/models/kaggle_models_extended.py b/kaggle/models/kaggle_models_extended.py index 8b97c47..fc222a0 100644 --- a/kaggle/models/kaggle_models_extended.py +++ b/kaggle/models/kaggle_models_extended.py @@ -32,6 +32,7 @@ # coding=utf-8 import os +import time from datetime import datetime @@ -226,3 +227,33 @@ def parse(string): except: pass return string + + +class ResumableUploadResult(object): + # Upload was complete, i.e., all bytes were received by the server. + COMPLETE = 1 + + # There was a non-transient error during the upload or the upload expired. + # The upload cannot be resumed so it should be restarted from scratch + # (i.e., call /api/v1/files/upload to initiate the upload and get the + # create/upload url and token). + FAILED = 2 + + # Upload was interrupted due to some (transient) failure but it can be + # safely resumed. + INCOMPLETE = 3 + + def __init__(self, result, bytes_uploaded=None): + self.result = result + self.bytes_uploaded = bytes_uploaded + self.start_at = 0 if bytes_uploaded is None else bytes_uploaded + 1 + + def Complete(): + return ResumableUploadResult(ResumableUploadResult.COMPLETE) + + def Failed(): + return ResumableUploadResult(ResumableUploadResult.FAILED) + + def Incomplete(bytes_uploaded=None): + return ResumableUploadResult(ResumableUploadResult.INCOMPLETE, + bytes_uploaded) diff --git a/setup.py b/setup.py index 24109aa..dff324e 100644 --- a/setup.py +++ b/setup.py @@ -19,7 +19,7 @@ setup( name='kaggle', - version='1.6.0a3', + version='1.5.16', description='Kaggle API', long_description= ('Official API for https://www.kaggle.com, accessible using a command line '