Commit 95fde080 authored by Gaël Berthaud-Müller's avatar Gaël Berthaud-Müller
Browse files

add redis queue support

parent 0cdcb53b
......@@ -9,7 +9,10 @@ RUN apt-get update && \
RUN apt-get install -y jq libclass-method-modifiers-perl libconfig-inifiles-perl libdbd-sqlite3-perl libdaemon-control-perl libdbi-perl libfile-sharedir-perl libfile-slurp-perl libhtml-parser-perl libmojolicious-perl libio-stringy-perl libjson-pp-perl libjson-rpc-perl libjson-validator-perl liblog-any-adapter-dispatch-perl liblog-any-perl liblog-dispatch-perl libmoose-perl libparallel-forkmanager-perl libplack-perl libplack-middleware-debug-perl libplack-middleware-reverseproxy-perl librole-tiny-perl librouter-simple-perl libstring-shellquote-perl libtest-nowarnings-perl libtry-tiny-perl libintl-perl perl-doc starman libdbd-pg-perl libnet-statsd-perl && \
cpanm -n JSON::Validator
RUN apt-get install -y libredis-perl libdata-messagepack-perl libnet-ip-xs-perl
COPY ./Zonemaster-Backend.tar.gz ./Zonemaster-Backend.tar.gz
COPY ./Zonemaster-Engine.tar.gz ./Zonemaster-Engine.tar.gz
RUN cpanm -n ./Zonemaster-Engine.tar.gz ./Zonemaster-Backend.tar.gz
RUN cpanm -n ./Zonemaster-Engine.tar.gz
RUN cpanm -n ./Zonemaster-Backend.tar.gz
......@@ -34,6 +34,7 @@ lib/Zonemaster/Backend/DB/PostgreSQL.pm
lib/Zonemaster/Backend/DB/SQLite.pm
lib/Zonemaster/Backend/Errors.pm
lib/Zonemaster/Backend/Metrics.pm
lib/Zonemaster/Backend/RedisQueue.pm
lib/Zonemaster/Backend/RPCAPI.pm
lib/Zonemaster/Backend/TestAgent.pm
lib/Zonemaster/Backend/Translator.pm
......@@ -44,6 +45,7 @@ MANIFEST This list of files
META.yml
README.md
script/add-batch-job.pl
script/queue_batch.pl
script/zmb
script/zmtest
script/zonemaster_backend_rpcapi.psgi
......
......@@ -39,6 +39,7 @@ requires
'Zonemaster::LDNS' => 2.002,
'Plack::Middleware::ReverseProxy' => 0,
'Locale::TextDomain' => 1.20,
'Redis' => 0,
;
test_requires 'DBD::SQLite';
......
package Zonemaster::Backend::RedisQueue;
use 5.14.2;
use Moose;
use Redis;
use JSON::PP;
has 'redis' => (
is => 'ro',
isa => 'Redis',
default => sub { Redis->new },
);
has 'dbh' => (
is => 'ro',
isa => 'DBI::db',
required => 1,
);
sub get_test_request {
my ( $self, $timeout ) = @_;
my $params;
my ( $queue, $encoded_params ) = $self->redis->blpop("zm-test-requests", $timeout);
if ( $encoded_params ) {
$params = decode_json($encoded_params);
}
return $params;
}
sub queue_batch_job {
my ( $self, $batch_id ) = @_;
my $query = q[
SELECT hash_id, params FROM test_results WHERE batch_id = ? AND progress = 0
];
my $sth = $self->dbh->prepare($query);
$sth->execute($batch_id);
my $count = 0;
while (my $h = $sth->fetchrow_hashref) {
my $params = decode_json($h->{params});
$params->{hash_id} = $h->{hash_id};
$self->redis->rpush("zm-test-requests", encode_json($params));
$count++;
}
return $count;
}
1;
......@@ -61,16 +61,16 @@ sub new {
}
sub run {
my ( $self, $test_id ) = @_;
my ( $self, $params ) = @_;
my @accumulator;
my %counter;
my %counter_for_progress_indicator;
my $params;
#my $params;
my $progress = $self->{_db}->test_progress( $test_id, 1 );
#my $progress = $self->{_db}->test_progress( $test_id, 1 );
$params = $self->{_db}->get_test_params( $test_id );
#$params = $self->{_db}->get_test_params( $test_id );
my %methods = Zonemaster::Engine->all_methods;
......@@ -210,14 +210,14 @@ sub run {
my @entries = grep { $_->numeric_level >= $numeric{INFO} } @{ Zonemaster::Engine->logger->entries };
Zonemaster::Backend::Metrics::timing("zonemaster.testagent.log_callback_add_result_entry_grep_duration", tv_interval($start_time_2) * 1000);
$self->{_db}->add_result_entries( $test_id, \@entries);
$self->{_db}->add_result_entries( $params->{hash_id}, \@entries);
my $callback_add_result_entry_duration = tv_interval($start_time_2);
Zonemaster::Backend::Metrics::timing("zonemaster.testagent.log_callback_add_result_entry_duration", $callback_add_result_entry_duration * 1000);
#$log->debug("Callback timing for $test_id: $callback_duration / $callback_add_result_entry_duration ");
$progress = $self->{_db}->test_progress( $test_id, 100 );
$self->{_db}->test_progress( $params->{hash_id}, 100 );
return;
} ## end sub run
......
#!/usr/bin/env perl
use 5.14.2;
use warnings;
use strict;
use Zonemaster::Backend::DB;
use Zonemaster::Backend::Config;
use Zonemaster::Backend::RedisQueue;
my $config = Zonemaster::Backend::Config->load_config();
my $dbtype = $config->DB_engine;
my $dbclass = Zonemaster::Backend::DB->get_db_class( $dbtype );
my $db = $dbclass->from_config( $config );
my $queue = Zonemaster::Backend::RedisQueue->new( dbh => $db->dbh );
my $batch_id = $db->dbh->selectrow_arrayref(
'SELECT max( id ) FROM batch_jobs'
);
$batch_id = $$batch_id[0];
my $queued_jobs_count = $queue->queue_batch_job($batch_id);
print "Queued $queued_jobs_count jobs";
......@@ -6,6 +6,7 @@ use warnings;
use Zonemaster::Backend::TestAgent;
use Zonemaster::Backend::Config;
use Zonemaster::Backend::Metrics;
use Zonemaster::Backend::RedisQueue;
use Parallel::ForkManager;
use Daemon::Control;
......@@ -134,6 +135,7 @@ sub main {
local $SIG{TERM} = $catch_sigterm;
local $SIG{INT} = $catch_sigterm;
my $queue = Zonemaster::Backend::RedisQueue->new( dbh => $self->db->dbh );
my $agent = Zonemaster::Backend::TestAgent->new( { config => $self->config } );
# Disconnect from database in parent to avoid sharing the connection between children
$agent->{_db}->dbh->disconnect;
......@@ -151,21 +153,23 @@ sub main {
my $fetch_test_timer = [ gettimeofday ];
my $id = $self->db->get_test_request( $self->config->ZONEMASTER_lock_on_queue );
$self->db->process_unfinished_tests(
$self->config->ZONEMASTER_lock_on_queue,
$self->config->ZONEMASTER_max_zonemaster_execution_time,
);
my $params = $queue->get_test_request($self->config->DB_polling_interval);
#$self->db->process_unfinished_tests(
# $self->config->ZONEMASTER_lock_on_queue,
# $self->config->ZONEMASTER_max_zonemaster_execution_time,
#);
Zonemaster::Backend::Metrics::timing("zonemaster.testagent.fetchtests_duration_seconds", tv_interval($fetch_test_timer) * 1000);
if ( $id ) {
if ( $params ) {
my $id = $params->{hash_id};
$log->info( "Test found: $id" );
if ( $self->pm->start( $id ) == 0 ) { # Forks off child process
$log->info( "Test starting: $id" );
Zonemaster::Backend::Metrics::increment("zonemaster.testagent.tests_started");
my $start_time = [ gettimeofday ];
eval { $agent->run( $id ) };
eval { $agent->run( $params ) };
if ( $@ ) {
chomp $@;
Zonemaster::Backend::Metrics::increment("zonemaster.testagent.tests_died");
......@@ -181,9 +185,6 @@ sub main {
$self->pm->finish; # Terminates child process
}
}
else {
sleep $self->config->DB_polling_interval;
}
}
$log->notice( "Daemon entered graceful shutdown" );
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment