}
}
-
+ var $queue = array();
+ var $domain_queue = array(); // false to use nextquee
+ var $next_queue = array();
+
function get($r,$opts=array())
{
$this->parseArgs($opts);
$w->autoJoin();
$w->find();
- $ar = array(); // $w->fetchAll();
+
if (!empty($opts['list'])) {
}
//echo "BATCH SIZE: ". count($ar) . "\n";
- $pushed = array();
- $requeue = array();
+
+
while (true) {
// only add if we don't have any queued up..
- if (empty($ar) && $w->fetch()) {
- $ar[] = clone($w);
+ if (empty($this->queue) && $w->fetch()) {
+ $this->queue[] = clone($w);
$total--;
}
- $this->logecho("BATCH SIZE: ". (count($ar) + $total) );
+ $this->logecho("BATCH SIZE: ". (count($this->queue) + $total) );
- if (empty($ar)) {
- $this->logecho("COMPLETED MAIN QUEUE - running deleted");
-
- if (empty($pushed)) {
- break;
+ if (empty($this->queue)) {
+ $this->logecho("COMPLETED MAIN QUEUE - running maxed out domains");
+ if ($this->domain_queue !== false) {
+ $this->queue = $this->remainingDomainQueue();
+ $this->domain_queue = false;
}
- $ar = $pushed;
- $pushed = false;
continue;
}
- $p = array_shift($ar);
+ $p = array_shift($this->queue);
if (!$this->poolfree()) {
- array_unshift($ar,$p); /// put it back on..
+ array_unshift($this->queue,$p); /// put it back on..
sleep(3);
continue;
}
if ($this->poolHasDomain($email) > $this->max_to_domain) {
- if ($pushed === false) {
- // we only try once to requeue..
- $this->logecho("REQUEING - maxed out that domain - {$email}");
- $requeue[] = $p;
- continue;
- }
- $this->logecho("PUSHING - maxed out that domain - {$email}");
- $pushed[] = $p;
-
+ // push it to a 'domain specific queue'
+ $this->logecho("REQUEING - maxed out that domain - {$email}");
+ $this->pushQueueDomain($p, $email);
+
//sleep(3);
continue;
// we should have a time limit here...
while(count($this->pool)) {
$this->poolfree();
- sleep(3);
+ sleep(3);
}
- foreach($requeue as $p) {
+ foreach($this->next_queue as $p) {
$pp = clone($p);
$p->act_when = $p->sqlValue('NOW + INTERVAL 1 MINUTE');
$this->updateServer($p);
//}
$this->logecho("ENDED: ({$p['pid']}) " . $p['cmd'] . " : " . file_get_contents($p['out']) );
@unlink($p['out']);
+ // at this point we could pop onto the queue the
+ $this->popQueueDomain($p['email']);
+
//unlink($p['out']);
}
$this->logecho("POOL SIZE: ". count($pool) );
return $ret;
}
+ function popQueueDomain($email)
+ {
+ $ea = explode('@',$email);
+ $dom = strtolower(array_pop($ea));
+ if (empty($this->domain_queue[$dom])) {
+ return;
+ }
+ array_unshift($this->queue, array_shift($this->domain_queue[$dom]));
+
+ }
+
+ function pushQueueDomain($e, $email)
+ {
+ if ($this->domain_queue === false) {
+ $this->next_queue[] = $e;
+ return;
+ }
+
+ $ea = explode('@',$email);
+ $dom = strtolower(array_pop($ea));
+ if (!isset($this->domain_queue[$dom])) {
+ $this->domain_queue[$dom] = array();
+ }
+ $this->domain_queue[$dom][] = $e;
+ }
+ function remainingDomainQueue()
+ {
+ $ret = array();
+ foreach($this->domain_queue as $dom => $ar) {
+ $ret = array_merge($ret, $ar);
+ }
+ return $ret;
+ }
+
+
function output()
{