From 4a8823414b405a060526126a7b45e1fe5d2d2338 Mon Sep 17 00:00:00 2001 From: Alan Date: Wed, 9 Aug 2023 17:01:43 +0800 Subject: [PATCH] Fix #7767 - active requeue domains --- Notify.php | 88 +++++++++++++++++++++++++++++++++++++----------------- 1 file changed, 61 insertions(+), 27 deletions(-) diff --git a/Notify.php b/Notify.php index 69dd1cd6..5dd8a0b5 100644 --- a/Notify.php +++ b/Notify.php @@ -164,7 +164,10 @@ class Pman_Core_Notify extends Pman } } - + var $queue = array(); + var $domain_queue = array(); // false to use nextquee + var $next_queue = array(); + function get($r,$opts=array()) { $this->parseArgs($opts); @@ -222,7 +225,7 @@ class Pman_Core_Notify extends Pman $w->autoJoin(); $w->find(); - $ar = array(); // $w->fetchAll(); + if (!empty($opts['list'])) { @@ -238,32 +241,30 @@ class Pman_Core_Notify extends Pman } //echo "BATCH SIZE: ". count($ar) . "\n"; - $pushed = array(); - $requeue = array(); + + while (true) { // only add if we don't have any queued up.. - if (empty($ar) && $w->fetch()) { - $ar[] = clone($w); + if (empty($this->queue) && $w->fetch()) { + $this->queue[] = clone($w); $total--; } - $this->logecho("BATCH SIZE: ". (count($ar) + $total) ); + $this->logecho("BATCH SIZE: ". (count($this->queue) + $total) ); - if (empty($ar)) { - $this->logecho("COMPLETED MAIN QUEUE - running deleted"); - - if (empty($pushed)) { - break; + if (empty($this->queue)) { + $this->logecho("COMPLETED MAIN QUEUE - running maxed out domains"); + if ($this->domain_queue !== false) { + $this->queue = $this->remainingDomainQueue(); + $this->domain_queue = false; } - $ar = $pushed; - $pushed = false; continue; } - $p = array_shift($ar); + $p = array_shift($this->queue); if (!$this->poolfree()) { - array_unshift($ar,$p); /// put it back on.. + array_unshift($this->queue,$p); /// put it back on.. sleep(3); continue; } @@ -271,15 +272,10 @@ class Pman_Core_Notify extends Pman if ($this->poolHasDomain($email) > $this->max_to_domain) { - if ($pushed === false) { - // we only try once to requeue.. - $this->logecho("REQUEING - maxed out that domain - {$email}"); - $requeue[] = $p; - continue; - } - $this->logecho("PUSHING - maxed out that domain - {$email}"); - $pushed[] = $p; - + // push it to a 'domain specific queue' + $this->logecho("REQUEING - maxed out that domain - {$email}"); + $this->pushQueueDomain($p, $email); + //sleep(3); continue; @@ -295,10 +291,10 @@ class Pman_Core_Notify extends Pman // we should have a time limit here... while(count($this->pool)) { $this->poolfree(); - sleep(3); + sleep(3); } - foreach($requeue as $p) { + foreach($this->next_queue as $p) { $pp = clone($p); $p->act_when = $p->sqlValue('NOW + INTERVAL 1 MINUTE'); $this->updateServer($p); @@ -532,6 +528,9 @@ class Pman_Core_Notify extends Pman //} $this->logecho("ENDED: ({$p['pid']}) " . $p['cmd'] . " : " . file_get_contents($p['out']) ); @unlink($p['out']); + // at this point we could pop onto the queue the + $this->popQueueDomain($p['email']); + //unlink($p['out']); } $this->logecho("POOL SIZE: ". count($pool) ); @@ -562,6 +561,41 @@ class Pman_Core_Notify extends Pman return $ret; } + function popQueueDomain($email) + { + $ea = explode('@',$email); + $dom = strtolower(array_pop($ea)); + if (empty($this->domain_queue[$dom])) { + return; + } + array_unshift($this->queue, array_shift($this->domain_queue[$dom])); + + } + + function pushQueueDomain($e, $email) + { + if ($this->domain_queue === false) { + $this->next_queue[] = $e; + return; + } + + $ea = explode('@',$email); + $dom = strtolower(array_pop($ea)); + if (!isset($this->domain_queue[$dom])) { + $this->domain_queue[$dom] = array(); + } + $this->domain_queue[$dom][] = $e; + } + function remainingDomainQueue() + { + $ret = array(); + foreach($this->domain_queue as $dom => $ar) { + $ret = array_merge($ret, $ar); + } + return $ret; + } + + function output() { -- 2.39.2