Skip to content

Commit

Permalink
Check status (#44)
Browse files Browse the repository at this point in the history
Refactored some code, changed views to ModelViewSets and added /check-status/ endpoint to the upload task detail endpoint
  • Loading branch information
JJFlorian authored Apr 9, 2024
1 parent 1d7c6aa commit 78730f9
Show file tree
Hide file tree
Showing 12 changed files with 339 additions and 202 deletions.
59 changes: 37 additions & 22 deletions api/bro_import/bulk_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,14 @@ class BulkImporter:
Finally, it saves the data in the corresponding datamodel in the database.
"""

def __init__(self, import_task_instance: models.ImportTask) -> None:
self.import_task_instance = import_task_instance
def __init__(self, import_task_instance_uuid: str) -> None:
# Lookup and update the import task instance
self.import_task_instance = models.ImportTask.objects.get(
uuid=import_task_instance_uuid
)
self.import_task_instance.status = "PROCESSING"
self.import_task_instance.save()

self.bro_domain = self.import_task_instance.bro_domain
self.kvk_number = self.import_task_instance.kvk_number
self.data_owner = self.import_task_instance.data_owner
Expand All @@ -35,28 +41,37 @@ def __init__(self, import_task_instance: models.ImportTask) -> None:
self.object_importer_class = config.object_importer_mapping[self.bro_domain]

def run(self) -> None:
url = self._create_bro_ids_import_url()
bro_ids = self._fetch_bro_ids(url)

total_bro_ids = len(bro_ids)
counter = 0

for bro_id in bro_ids:
counter += 1
progress = (counter / total_bro_ids) * 100
self.import_task_instance.progress = round(progress, 2)
try:
url = self._create_bro_ids_import_url()
bro_ids = self._fetch_bro_ids(url)

total_bro_ids = len(bro_ids)
counter = 0

for bro_id in bro_ids:
counter += 1
progress = (counter / total_bro_ids) * 100
self.import_task_instance.progress = round(progress, 2)
self.import_task_instance.save()

try:
data_importer = self.object_importer_class(
self.bro_domain, bro_id, self.data_owner
)
data_importer.run()
except requests.RequestException as e:
logger.exception(e)
raise DataImportError(
f"Error while importing data for bro id: {bro_id}: {e}"
) from e

self.import_task_instance.status = "COMPLETED"
self.import_task_instance.save()

try:
data_importer = self.object_importer_class(
self.bro_domain, bro_id, self.data_owner
)
data_importer.run()
except requests.RequestException as e:
logger.exception(e)
raise DataImportError(
f"Error while importing data for bro id: {bro_id}: {e}"
) from e
except Exception as e:
self.import_task_instance.log = e
self.import_task_instance.status = "FAILED"
self.import_task_instance.save()

def _create_bro_ids_import_url(self) -> str:
"""Creates the import url for a given bro object type and kvk combination."""
Expand Down
5 changes: 1 addition & 4 deletions api/bro_import/object_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import xmltodict
from django.conf import settings

from api import models
from api.bro_import import config
from gmn.models import GMN, Measuringpoint
from gmw.models import GMW, MonitoringTube
Expand All @@ -32,9 +31,7 @@ class ObjectImporter(ABC):
4) Save actions into the database
"""

def __init__(
self, bro_domain: str, bro_id: str, data_owner: models.Organisation
) -> None:
def __init__(self, bro_domain: str, bro_id: str, data_owner: str) -> None:
self.bro_domain = bro_domain
self.bro_id = bro_id
self.data_owner = data_owner
Expand Down
84 changes: 74 additions & 10 deletions api/bro_upload/delivery.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@
import time
from typing import Any

import requests
from django.template.exceptions import TemplateDoesNotExist
from django.template.loader import render_to_string

from api.bro_import import bulk_import, config

from .. import models as api_models
from . import utils

Expand Down Expand Up @@ -46,36 +49,96 @@ class BRODelivery:

def __init__(
self,
upload_task_instance: api_models.UploadTask,
upload_task_instance_uuid: str,
bro_username: str,
bro_password: str,
) -> None:
self.upload_task_instance = upload_task_instance
# Lookup and update upload task instance
self.upload_task_instance = api_models.UploadTask.objects.get(
uuid=upload_task_instance_uuid
)
self.upload_task_instance.status = "PROCESSING"
self.upload_task_instance.save()

self.bro_username = bro_username
self.bro_password = bro_password
self.bro_id = None

def process(self) -> None:
# Generate the XML file.
xml_file = self._generate_xml_file()
try:
xml_file = self._generate_xml_file()
self.upload_task_instance.progress = 25.00
self.upload_task_instance.progress.save()
except Exception as e:
self.upload_task_instance.log = e
self.upload_task_instance.status = "FAILED"
self.upload_task_instance.save()
return

# Validate with the BRO API
self._validate_xml_file(xml_file)
try:
self._validate_xml_file(xml_file)
self.upload_task_instance.progress = 50.00
self.upload_task_instance.progress.save()
except Exception as e:
self.upload_task_instance.log = e
self.upload_task_instance.status = "FAILED"
self.upload_task_instance.save()
return

# Deliver the XML file. The deliver_url is returned to use for the check.
deliver_url = self._deliver_xml_file(xml_file)
try:
deliver_url = self._deliver_xml_file(xml_file)
self.upload_task_instance.bro_delivery_url = deliver_url
self.upload_task_instance.progress = 75.00
self.upload_task_instance.progress.save()
except Exception as e:
self.upload_task_instance.log = e
self.upload_task_instance.status = "FAILED"
self.upload_task_instance.save()
return

# Check of the status of the delivery. Retries 3 times before failing
retries_count = 0

while retries_count < 4:
if self._check_delivery(deliver_url):
return self.bro_id
# Update upload task instance
self.upload_task_instance.bro_id = self.bro_id
self.upload_task_instance.progress = 100.0
self.upload_task_instance.status = "COMPLETED"
self.upload_task_instance.log = "The upload was done successfully"
self.upload_task_instance.save()

# After the upload was done succesfully, the data should be imported into the API
try:
object_importer_class = config.object_importer_mapping[
self.upload_task_instance.bro_domain
]
importer = object_importer_class(
bro_domain=self.upload_task_instance.bro_domain,
bro_id=self.upload_task_instance.bro_id,
data_owner=self.upload_task_instance.data_owner,
)
importer.run()

return

except requests.RequestException as e:
logger.exception(e)
raise bulk_import.DataImportError(
f"Error while importing data for bro id: {self.bro_id}: {e}"
) from e
else:
time.sleep(10)
retries_count += 1

raise DeliveryError("Delivery was unsuccesfull")
self.upload_task_instance.status = "UNFINISHED"
self.upload_task_instance.log = "After 4 times checking, the delivery status in the BRO was still not 'DOORGELEVERD'. Please checks its status manually."
self.upload_task_instance.save()

return

def _generate_xml_file(self) -> str:
try:
Expand All @@ -85,6 +148,7 @@ def _generate_xml_file(self) -> str:
self.upload_task_instance.metadata,
self.upload_task_instance.sourcedocument_data,
)

return generator.create_xml_file()

except Exception as e:
Expand All @@ -100,9 +164,9 @@ def _validate_xml_file(self, xml_file: str) -> None:
)

if validation_response["status"] != "VALIDE":
raise XMLValidationError(
f"Errors while validating the XML file: {validation_response['errors']}"
)
self.upload_task_instance.bro_errors = validation_response["errors"]
self.upload_task_instance.save()
raise XMLValidationError("Errors while validating the XML file")
else:
return

Expand Down
1 change: 1 addition & 0 deletions api/choices.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
("PROCESSING", "Processing"),
("COMPLETED", "Completed"),
("FAILED", "Failed"),
("UNFINISHED", "Unfinished"),
]

BRO_DOMAIN_CHOICES = [
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# Generated by Django 5.0.1 on 2024-04-09 09:47

from django.db import migrations, models


class Migration(migrations.Migration):
dependencies = [
("api", "0028_rename_last_updated_uploadtask_updated_and_more"),
]

operations = [
migrations.AddField(
model_name="uploadtask",
name="bro_delivery_url",
field=models.CharField(blank=True, max_length=500, null=True),
),
migrations.AlterField(
model_name="importtask",
name="status",
field=models.CharField(
blank=True,
choices=[
("PENDING", "Pending"),
("PROCESSING", "Processing"),
("COMPLETED", "Completed"),
("FAILED", "Failed"),
("UNFINISHED", "Unfinished"),
],
default="PENDING",
max_length=20,
),
),
migrations.AlterField(
model_name="uploadtask",
name="status",
field=models.CharField(
blank=True,
choices=[
("PENDING", "Pending"),
("PROCESSING", "Processing"),
("COMPLETED", "Completed"),
("FAILED", "Failed"),
("UNFINISHED", "Unfinished"),
],
default="PENDING",
max_length=20,
),
),
]
41 changes: 40 additions & 1 deletion api/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from django.db.models import JSONField
from encrypted_model_fields.fields import EncryptedCharField

from . import choices
from . import choices, tasks


class Organisation(models.Model):
Expand Down Expand Up @@ -51,6 +51,23 @@ class ImportTask(models.Model):
log = models.TextField(blank=True)
progress = models.FloatField(blank=True, null=True)

def save(self, request=None, *args, **kwargs):
"""
Initialize an upload task by posting the bro_domain, registration_type, request_type, and the sourcedocument_data
"""
if not self.status:
super().save(*args, **kwargs)
# Update the status of the new task
self.status = "PENDING"
self.save()

# Start the celery task
tasks.import_bro_data_task.delay(self.uuid)
else:
# This is an existing object being edited: no upload celery task required
super().save(*args, **kwargs)
pass

def __str__(self):
return f"{self.bro_domain} import - {self.data_owner}"

Expand Down Expand Up @@ -81,6 +98,28 @@ class UploadTask(models.Model):
bro_errors = models.TextField(blank=True)
progress = models.FloatField(blank=True, null=True)
bro_id = models.CharField(max_length=500, blank=True, null=True)
bro_delivery_url = models.CharField(max_length=500, blank=True, null=True)

def save(self, request=None, *args, **kwargs):
"""
Initialize an upload task by posting the bro_domain, registration_type, request_type, and the sourcedocument_data
"""
if not self.status or self.status == "PENDING":
super().save(*args, **kwargs)
# Accessing the authenticated user's username and token
username = self.data_owner.bro_user_token
password = self.data_owner.bro_user_password

# Update the status of the new task
self.status = "PENDING"
self.save()

# Start the celery task
tasks.upload_bro_data_task.delay(self.uuid, username, password)
else:
# This is an existing object being edited: no upload celery task required
super().save(*args, **kwargs)
pass

def __str__(self) -> str:
return f"{self.data_owner}: {self.registration_type} ({self.request_type})"
3 changes: 2 additions & 1 deletion api/serializers.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from django.contrib.auth.models import User
from rest_framework import serializers

from . import models as api_models
from api import models as api_models

from .mixins import UrlFieldMixin


Expand Down
Loading

0 comments on commit 78730f9

Please sign in to comment.