From 27f497801ca6da49a31b9ecad6eb64c85915d921 Mon Sep 17 00:00:00 2001 From: Markham Lee Date: Fri, 29 Mar 2024 03:19:54 -0700 Subject: [PATCH] General refactoring, updates for adding real time monitornig & data logging via InfluxDB --- api/monitoring.py | 20 +++++++++----------- api/score_service.py | 4 ++-- api/server.py | 37 +++++++++++++++++++++++++++++++------ 3 files changed, 42 insertions(+), 19 deletions(-) diff --git a/api/monitoring.py b/api/monitoring.py index ed1eb73..86085d5 100644 --- a/api/monitoring.py +++ b/api/monitoring.py @@ -15,22 +15,20 @@ def __init__(self): # load variables self.load_variables() - # get MQTT clientID - self.clientID = str(uuid.uuid4()) - - # get MQTT client - self.client = self.mqttClient() - # Load variables def load_variables(self): - self.userName = os.environ['MQTT_USER'] + self.user_name = os.environ['MQTT_USER'] self.pwd = os.environ['MQTT_SECRET'] self.host = os.environ['MQTT_BROKER'] - self.port = os.environ['MQTT_PORT'] + self.port = int(os.environ['MQTT_PORT']) + + def get_client_id(self): + + return str(uuid.uuid4()) # Generate MQTT client - def mqttClient(self): + def mqttClient(self, client_id): def connectionStatus(client, userdata, flags, code): @@ -40,8 +38,8 @@ def connectionStatus(client, userdata, flags, code): else: logger.debug(f'connection error occured, return code: {code}, retrying...') # noqa: E501 - client = mqtt.Client(self.clientID) - client.username_pw_set(username=self.userName, password=self.pwd) + client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION1, client_id) + client.username_pw_set(username=self.user_name, password=self.pwd) client.on_connect = connectionStatus code = client.connect(self.host, self.port) diff --git a/api/score_service.py b/api/score_service.py index fbfe8b5..7520c15 100644 --- a/api/score_service.py +++ b/api/score_service.py @@ -17,7 +17,7 @@ def cosine_score(reference: float, sample: float) -> float: cosd = F.cosine_similarity(reference, sample) score = (1 - cosd.item()) - logger.debug(f'Cosine distance calculated: {score}') + logger.info(f'Cosine distance calculated: {score}') return score @@ -35,7 +35,7 @@ def euclidean_distance(reference: float, sample: float) -> float: # pull the float value out of the tensor object dist = dist.item() - logger.debug(f'Euclidean distance is: {dist}') + logger.info(f'Euclidean distance is: {dist}') return dist diff --git a/api/server.py b/api/server.py index c542edf..e90b01f 100644 --- a/api/server.py +++ b/api/server.py @@ -29,10 +29,15 @@ com_utilities = ReportingCommunication() # get MQTT Client -mqttClient = com_utilities.mqttClient() + +# get client ID +client_id = com_utilities.get_client_id() + +# mqtt client +mqttClient, code = com_utilities.mqttClient(client_id) logger.info('Communications class instantiated') -MONITORING_TOPIC = os.environ['TOPIC'] +MONITORING_TOPIC = os.environ['MONITORING_TOPIC'] # endpoint for API health check @@ -93,7 +98,7 @@ def embeddings(): score_type, threshold) # send data to MQTT topic for data logging/real time monitoring - mqttClient.publish(MONITORING_TOPIC, resultjson) + send_monitoring_message(resultjson) logger.info('response sent back to client') return flask.Response(response=resultjson, status=200, @@ -141,6 +146,10 @@ def cached(): # and builds response payload resultjson = build_response(latency, cached_tensor, sample_tensor, score_type, threshold) + logger.info(resultjson) + + # send data to MQTT topic for data logging/real time monitoring + send_monitoring_message(resultjson) return flask.Response(response=resultjson, status=200, mimetype='application/json') @@ -148,6 +157,8 @@ def cached(): # method that aggregates data, prepares json response and sends the data # back to the client +# TODO: move this and the methods below to a separate class, add field +# for the endpoint the data was received on. def build_response(latency: float, tensor1: object, tensor2: object, score_type: float, threshold: float) -> dict: @@ -169,16 +180,18 @@ def build_response(latency: float, tensor1: object, tensor2: object, # return data results = {"match_status": status, "score": score, - "score_type": 'cosine distance', + "score_type": score_type, "score_threshold": threshold, - "Inferencing_latency": latency_message} + "inferencing_latency": latency_message} resultjson = json.dumps(results) return resultjson -def load_images(image): +# loading images +# TODO: may need to add transformations in the future +def load_images(image: object) -> object: with Image.open(image) as photo: photo.load() @@ -186,3 +199,15 @@ def load_images(image): app.logger.info('photo loaded') return photo + + +# TODO: add QOS parameters and message re-send logic +def send_monitoring_message(message: dict): + + try: + result = mqttClient.publish(MONITORING_TOPIC, message) + status = result[0] + logger.info(f'Monitoring message sent successfully with status {status}') # noqa: E501 + + except Exception as e: + logger.error(f'MQTT publishing failed with error: {e}')