Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Vt 5650 sharq #51

Open
wants to merge 17 commits into
base: master
Choose a base branch
from
Open
8 changes: 2 additions & 6 deletions ci/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,15 +1,11 @@
FROM python:3.6-slim-buster

ENV CONSUL_TEMPLATE_VERSION 0.19.5


FROM 857556598075.dkr.ecr.us-west-1.amazonaws.com/plivo/app/base/python:3.9-buster
USER root
RUN mkdir -p /opt/sharq-server
WORKDIR /opt/sharq-server
COPY . /opt/sharq-server
RUN mkdir /etc/supervisord && mkdir /etc/supervisord/conf.d && mkdir /var/log/supervisord && pip install supervisor
RUN apt-get update && apt-get install -y nginx g++ git curl && pip install virtualenv envtpl

RUN curl -L https://releases.hashicorp.com/consul-template/${CONSUL_TEMPLATE_VERSION}/consul-template_${CONSUL_TEMPLATE_VERSION}_linux_amd64.tgz | tar -C /usr/sbin -xzf -
RUN virtualenv /opt/sharq-server
RUN . /opt/sharq-server/bin/activate && /opt/sharq-server/bin/pip install --no-cache-dir -r /opt/sharq-server/requirements.txt && /opt/sharq-server/bin/python setup.py install -f

Expand Down
5 changes: 4 additions & 1 deletion ci/config.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
parent: common
serviceName: sharq
hipchatRoom: cicd-tests-dev-notifications
language: python
language: python
dockerOnly: true
build:
platform: "linux/amd64,linux/arm64"
27 changes: 27 additions & 0 deletions helper_functions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from redis_helper import RedisConnection
MAX_ALLOWED_HOURS = 6


def validate_queue_length(self, max_queued_length, request_data):
try:
print('validate_queue_length :: fetching current queue_length for auth_id : {}'.format(request_data))
queue_type = request_data.get('queue_type', None)
queue_id = request_data.get('queue_id', None)
key_prefix = self._key_prefix

allowed_queue_length = max_queued_length

redis_key = '{}:{}:{}'.format(key_prefix, queue_type, queue_id)
redis_conn = RedisConnection()
redis_conn.create_redis_conn()
key_length = redis_conn.get_key_length(redis_key)

print("key_length :: ", key_length)

if key_length < allowed_queue_length:
return True
else:
return False
except Exception as e:
print('validate_queue_length :: error occurred as {}'.format(e))
return True
40 changes: 40 additions & 0 deletions redis_helper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import configparser
import argparse
import os
import redis


class RedisConnection:
def __init__(self):
parser = argparse.ArgumentParser(description='SharQ Server.')
parser.add_argument('-c', '--config', action='store', required=True,
help='Absolute path of the SharQ configuration file.',
dest='sharq_config')
args = parser.parse_args()
self.config_file = os.path.abspath(args.sharq_config)
self.redis_conn = None

def create_redis_conn(self):
try:
# Read config file
config = configparser.SafeConfigParser()
config.read(self.config_file)

# Get Redis details from config file
redis_config = config['redis']
redis_port = int(redis_config.get('port', None))
redis_host = redis_config.get('host', None)
redis_pass = redis_config.get('password', None)

# Create Redis connection
self.redis_conn = redis.Redis(host=redis_host, port=redis_port, password=redis_pass)
self.redis_conn.ping()
return self.redis_conn
except Exception as e:
print('create_redis_conn :: error occurred as {}'.format(e))

def get_key_length(self, key):
try:
return self.redis_conn.llen(key)
except Exception as e:
print('get_key_length :: error occurred as {}'.format(e))
6 changes: 3 additions & 3 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ Jinja2==2.7.2
MarkupSafe==0.23
Werkzeug==0.9.4
argparse==1.2.1
gevent==20.5.0
greenlet==0.4.15
gevent==22.10.2
greenlet==2.0.2
gunicorn==19.0.0
itsdangerous==0.24
msgpack==0.5.6
ujson==2.0.0
uWSGI==2.0.19.1
uWSGI==2.0.21
SharQ==1.2.0
4 changes: 2 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
'Jinja2==2.7.2',
'MarkupSafe==0.23',
'Werkzeug==0.9.4',
'gevent==20.5.0',
'greenlet==0.4.15',
'gevent==22.10.2',
'greenlet==2.0.2',
'itsdangerous==0.24',
'gunicorn==19.0',
'ujson==2.0.0'
Expand Down
26 changes: 17 additions & 9 deletions sharq_server/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from flask import Flask, request, jsonify
from redis.exceptions import LockError
import traceback

from helper_functions import validate_queue_length
from sharq import SharQ


Expand All @@ -23,7 +23,6 @@ def __init__(self, config_path):
self.config.read(config_path)
# pass the config file to configure the SharQ core.
self.sq = SharQ(config_path)

self.app = Flask(__name__)
# set the routes
self.app.add_url_rule(
Expand Down Expand Up @@ -100,6 +99,7 @@ def _view_index(self):

def _view_enqueue(self, queue_type, queue_id):
"""Enqueues a job into SharQ."""
print('Inside _view_enqueue function')
response = {
'status': 'failure'
}
Expand All @@ -114,14 +114,22 @@ def _view_enqueue(self, queue_type, queue_id):
'queue_id': queue_id
})

try:
response = self.sq.enqueue(**request_data)
except Exception as e:
traceback.print_exc()
response['message'] = e.message
return jsonify(**response), 400
print('request_data :: ', request_data)
max_queued_length = 5

enqueue_allow = validate_queue_length(self, max_queued_length, request_data)
if enqueue_allow:
try:
response = self.sq.enqueue(**request_data)
except Exception as e:
traceback.print_exc()
response['message'] = e.message
return jsonify(**response), 400

return jsonify(**response), 201
return jsonify(**response), 201
else:
response['message'] = 'Max call queue reached'
return jsonify(**response), 429

def _view_dequeue(self, queue_type):
"""Dequeues a job from SharQ."""
Expand Down