diff --git a/awx/main/dispatch/periodic.py b/awx/main/dispatch/periodic.py index 52e84524297c..d58282368157 100644 --- a/awx/main/dispatch/periodic.py +++ b/awx/main/dispatch/periodic.py @@ -88,8 +88,10 @@ def __init__(self, schedule): # internally times are all referenced relative to startup time, add grace period self.global_start = time.time() + 2.0 - def get_and_mark_pending(self): - relative_time = time.time() - self.global_start + def get_and_mark_pending(self, reftime=None): + if reftime is None: + reftime = time.time() # mostly for tests + relative_time = reftime - self.global_start to_run = [] for job in self.jobs: if job.due_to_run(relative_time): @@ -98,8 +100,10 @@ def get_and_mark_pending(self): job.mark_run(relative_time) return to_run - def time_until_next_run(self): - relative_time = time.time() - self.global_start + def time_until_next_run(self, reftime=None): + if reftime is None: + reftime = time.time() # mostly for tests + relative_time = reftime - self.global_start next_job = min(self.jobs, key=lambda j: j.next_run) delta = next_job.next_run - relative_time if delta <= 0.1: @@ -115,10 +119,11 @@ def time_until_next_run(self): def debug(self, *args, **kwargs): data = dict() data['title'] = 'Scheduler status' + reftime = time.time() - now = datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S UTC') + now = datetime.fromtimestamp(reftime).strftime('%Y-%m-%d %H:%M:%S UTC') start_time = datetime.fromtimestamp(self.global_start).strftime('%Y-%m-%d %H:%M:%S UTC') - relative_time = time.time() - self.global_start + relative_time = reftime - self.global_start data['started_time'] = start_time data['current_time'] = now data['current_time_relative'] = round(relative_time, 3) diff --git a/awx/main/dispatch/worker/base.py b/awx/main/dispatch/worker/base.py index 264205a8ed6d..b5ede6e267fe 100644 --- a/awx/main/dispatch/worker/base.py +++ b/awx/main/dispatch/worker/base.py @@ -203,7 +203,11 @@ def run_periodic_tasks(self): except Exception as exc: logger.warning(f'Failed to save dispatcher statistics {exc}') - for job in self.scheduler.get_and_mark_pending(): + # Everything benchmarks to the same original time, so that skews due to + # runtime of the actions, themselves, do not mess up scheduling expectations + reftime = time.time() + + for job in self.scheduler.get_and_mark_pending(reftime=reftime): if 'control' in job.data: try: job.data['control']() @@ -220,7 +224,7 @@ def run_periodic_tasks(self): self.listen_start = time.time() - return self.scheduler.time_until_next_run() + return self.scheduler.time_until_next_run(reftime=reftime) def run(self, *args, **kwargs): super(AWXConsumerPG, self).run(*args, **kwargs)