Skip to content
This repository has been archived by the owner on Sep 1, 2023. It is now read-only.

Latest commit

 

History

History
90 lines (67 loc) · 2.43 KB

README.adoc

File metadata and controls

90 lines (67 loc) · 2.43 KB

ProcessBalancer

ProcessBalancer is a background job runner that is targeted toward the specific use-case of long running jobs.

If you need a job runner that runs small background jobs, look to Sidekiq.

ProcessBalancer has built-in functionality to balance your jobs across multiple instances.

Installation

Add this line to your application’s Gemfile:

gem 'process_balancer'

And then execute:

$ bundle

Or install it yourself as:

$ gem install process_balancer

Usage

Build a Job class that works through an iteration of your processing. The iteration does not have to be one record, it can be working through 1000 records. The iteration needs to be designed to lock its work atomically so that multiple concurrent workers could be running. The ProcessBalancer takes care of scaling out to run however many workers you want running across however many nodes you have running. Each instance of the Job will have a unique worker_id to ensure they do not trample on each other.

class ProcessQueue < ProcessBalancer::Base
  # set a worker locking algorithm
  lock_driver :simple_redis

  LOCK_SQL = <<~SQL
    WITH T as (
      SELECT ctid
      FROM queue_table
      WHERE status = #{QueueRecord::QUEUED} AND lock IS NULL
      ORDER BY id
      LIMIT 1000
      FOR UPDATE SKIP LOCKED
    )
    UPDATE queue_records
    SET lock = :lock, updated_at = :now
    WHERE ctid = ANY(ARRAY(SELECT ctid FROM T))
  SQL

  def lock_records
    # grab a # of records and lock them with the worker_id
    sql = ActiveRecord::Base.sanitize_sql([LOCK_SQL, {lock: worker_index, now: Time.now}])
    ActiveRecord::Base.connection.execute(sql)
    # process those records
    QueueRecord.where(lock: worker_index)
  end

  def process_record(entry)
    # do processing
    # mark record as processed and release the lock on that record
    entry.update(lock: nil, status: QueueRecord::PROCESSED)
  end

  def unlock_records
    # if any error occurs unlock any of our unprocessed records
    QueueRecord.where(lock: worker_index).update_all(lock: nil)
  end
end

Configuration file

jobs:
  process_queue:
    class: 'ProcessQueue'

Contributing

Bug reports and pull requests are welcome on GitHub at https://github.com/NetsoftHoldings/process_balancer.

License

The gem is available as open source under the terms of the LGPLv3 License.