# This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. # # This Source Code Form is "Incompatible With Secondary Licenses", as # defined by the Mozilla Public License, v. 2.0. package Bugzilla::JobQueue; use 5.10.1; use strict; use warnings; use Bugzilla::Constants; use Bugzilla::Error; use Bugzilla::Install::Util qw(install_string); use Bugzilla::Util qw(read_text); use File::Basename; use base qw(TheSchwartz); use fields qw(_worker_pidfile); # This maps job names for Bugzilla::JobQueue to the appropriate modules. # If you add new types of jobs, you should add a mapping here. use constant JOB_MAP => {send_mail => 'Bugzilla::Job::Mailer', bug_mail => 'Bugzilla::Job::BugMail',}; # Without a driver cache TheSchwartz opens a new database connection # for each email it sends. This cached connection doesn't persist # across requests. use constant DRIVER_CACHE_TIME => 300; # 5 minutes # To avoid memory leak/fragmentation, a worker process won't process more than # MAX_MESSAGES messages. use constant MAX_MESSAGES => 1000; sub job_map { if (!defined(Bugzilla->request_cache->{job_map})) { my $job_map = JOB_MAP; Bugzilla::Hook::process('job_map', {job_map => $job_map}); Bugzilla->request_cache->{job_map} = $job_map; } return Bugzilla->request_cache->{job_map}; } sub new { my $class = shift; if (!Bugzilla->feature('jobqueue')) { ThrowUserError('feature_disabled', {feature => 'jobqueue'}); } my $lc = Bugzilla->localconfig; # We need to use the main DB as TheSchwartz module is going # to write to it. my $self = $class->SUPER::new( databases => [{ dsn => Bugzilla->dbh_main->{private_bz_dsn}, user => $lc->{db_user}, pass => $lc->{db_pass}, prefix => 'ts_', }], driver_cache_expiration => DRIVER_CACHE_TIME, prioritize => 1, ); return $self; } # A way to get access to the underlying databases directly. sub bz_databases { my $self = shift; my @hashes = keys %{$self->{databases}}; return map { $self->driver_for($_) } @hashes; } # inserts a job into the queue to be processed and returns immediately sub insert { my $self = shift; my $job = shift; if (!ref($job)) { my $mapped_job = Bugzilla::JobQueue->job_map()->{$job}; ThrowCodeError('jobqueue_no_job_mapping', {job => $job}) if !$mapped_job; $job = new TheSchwartz::Job( funcname => $mapped_job, arg => $_[0], priority => $_[1] || 5 ); } my $retval = $self->SUPER::insert($job); # XXX Need to get an error message here if insert fails, but # I don't see any way to do that in TheSchwartz. ThrowCodeError('jobqueue_insert_failed', {job => $job, errmsg => $@}) if !$retval; return $retval; } # To avoid memory leaks/fragmentation which tends to happen for long running # perl processes; check for jobs, and spawn a new process to empty the queue. sub subprocess_worker { my $self = shift; my $command = "$0 -d -p '" . $self->{_worker_pidfile} . "' onepass"; while (1) { my $time = (time); my @jobs = $self->list_jobs({ funcname => $self->{all_abilities}, run_after => $time, grabbed_until => $time, limit => 1, }); if (@jobs) { $self->debug("Spawning queue worker process"); # Run the worker as a daemon system $command; # And poll the PID to detect when the working has finished. # We do this instead of system() to allow for the INT signal to # interrup us and trigger kill_worker(). my $pid = read_text($self->{_worker_pidfile}, err_mode => 'quiet'); if ($pid) { sleep(3) while (kill(0, $pid)); } $self->debug("Queue worker process completed"); } else { $self->debug("No jobs found"); } sleep(5); } } sub kill_worker { my $self = Bugzilla->job_queue(); if ($self->{_worker_pidfile} && -e $self->{_worker_pidfile}) { my $worker_pid = read_text($self->{_worker_pidfile}); if ($worker_pid && kill(0, $worker_pid)) { $self->debug("Stopping worker process"); system "$0 -f -p '" . $self->{_worker_pidfile} . "' stop"; } } } sub set_pidfile { my ($self, $pidfile) = @_; $self->{_worker_pidfile} = bz_locations->{'datadir'} . '/worker-' . basename($pidfile); } # Clear the request cache at the start of each run. sub work_once { my $self = shift; Bugzilla->clear_request_cache(); return $self->SUPER::work_once(@_); } # Never process more than MAX_MESSAGES in one batch, to avoid memory # leak/fragmentation issues. sub work_until_done { my $self = shift; my $count = 0; while ($count++ < MAX_MESSAGES) { $self->work_once or last; } } 1; __END__ =head1 NAME Bugzilla::JobQueue - Interface between Bugzilla and TheSchwartz. =head1 SYNOPSIS use Bugzilla; my $obj = Bugzilla->job_queue(); $obj->insert('send_mail', { msg => $message }); =head1 DESCRIPTION Certain tasks should be done asyncronously. The job queue system allows Bugzilla to use some sort of service to schedule jobs to happen asyncronously. =head2 Inserting a Job See the synopsis above for an easy to follow example on how to insert a job into the queue. Give it a name and some arguments and the job will be sent away to be done later. =head1 B =over =item insert =item bz_databases =item job_map =item set_pidfile =item kill_worker =back