X-Git-Url: http://git.roojs.org/?p=Pman.Core;a=blobdiff_plain;f=Notify.php;h=fd582092af5a14fa15dae313ff2f70dba6e54320;hp=46f7e8faf33104153897149631ee62ac6680c94b;hb=HEAD;hpb=7b3d204e6292c18b9dfccb645edb2c4693c7733a diff --git a/Notify.php b/Notify.php index 46f7e8fa..3fd4c5b4 100644 --- a/Notify.php +++ b/Notify.php @@ -123,16 +123,23 @@ class Pman_Core_Notify extends Pman var $evtype = ''; // any notification... // this script should only handle EMAIL notifications.. - - + + var $server; // core_notify_server + + var $poolname = 'core'; + + var $opts; var $force = false; + + var $clear_interval = '1 WEEK'; // how long to clear the old queue of items. + function getAuth() { $ff = HTML_FlexyFramework::get(); if (!$ff->cli) { die("access denied"); } - HTML_FlexyFramework::ensureSingle($_SERVER["SCRIPT_NAME"] .'|'. __FILE__, $this); + HTML_FlexyFramework::ensureSingle($_SERVER["SCRIPT_NAME"] .'|'. __FILE__ .'|'. (empty($_SERVER['argv'][1]) ? '': $_SERVER['argv'][1]), $this); return true; } @@ -164,9 +171,19 @@ class Pman_Core_Notify extends Pman } } - + var $queue = array(); + var $domain_queue = array(); // false to use nextquee + var $next_queue = array(); + + function get($r,$opts=array()) { + + // if ($this->database_is_locked()) { + // die("LATER - DATABASE IS LOCKED"); + //} + + $this->parseArgs($opts); //date_default_timezone_set('UTC'); @@ -174,11 +191,30 @@ class Pman_Core_Notify extends Pman $this->generateNotifications(); - - //DB_DataObject::debugLevel(1); + //DB_DataObject::debugLevel(1); $w = DB_DataObject::factory($this->table); $total = 0; + + + $ff = HTML_FlexyFramework::get(); + + + $this->server = DB_DataObject::Factory('core_notify_server')->getCurrent($this); + + $this->server->assignQueues($this); + + + $this->clearOld(); + + + if (!empty($this->evtype)) { + $w->evtype = $this->evtype; + } + + $w->server_id = $this->server->id; + + if (!empty($opts['old'])) { // show old and new... @@ -195,6 +231,7 @@ class Pman_Core_Notify extends Pman if (!$this->force) { $w->whereAdd('act_when < NOW()'); // eg.. not if future.. } + $w->orderBy('act_when ASC'); // oldest first. $total = min($w->count(), $opts['limit']); @@ -203,14 +240,18 @@ class Pman_Core_Notify extends Pman $w->limit($opts['limit']); // we can run 1000 ... } - if (!empty($this->evtype)) { - $w->evtype = $this->evtype; - } + + + $w->autoJoin(); - $w->find(); + $total = $w->find(); + + if (empty($total)) { + $this->logecho("Nothing In Queue - DONE"); + exit; + } - $ar = array(); // $w->fetchAll(); if (!empty($opts['list'])) { @@ -226,47 +267,59 @@ class Pman_Core_Notify extends Pman } //echo "BATCH SIZE: ". count($ar) . "\n"; - $pushed = array(); - $requeue = array(); + + while (true) { // only add if we don't have any queued up.. - if (empty($ar) && $w->fetch()) { - $ar[] = clone($w); + if (empty($this->queue) && $w->fetch()) { + $this->queue[] = clone($w); $total--; } + + $this->logecho("BATCH SIZE: Queue=". count($this->queue) . " TOTAL = " . $total ); - $this->logecho("BATCH SIZE: ". (count($ar) + $total) ); - - if (empty($ar)) { - $this->logecho("COMPLETED MAIN QUEUE - running deleted"); - - if (empty($pushed)) { - break; + if (empty($this->queue)) { + $this->logecho("COMPLETED MAIN QUEUE - running maxed out domains"); + if ($this->domain_queue !== false) { + $this->queue = $this->remainingDomainQueue(); + + continue; } - $ar = $pushed; - $pushed = false; - continue; + break; // nothing more in queue.. and no remaining one } - $p = array_shift($ar); + $p = array_shift($this->queue); if (!$this->poolfree()) { - array_unshift($ar,$p); /// put it back on.. + array_unshift($this->queue,$p); /// put it back on.. sleep(3); continue; } - $email = $p->person() ? $p->person()->email : $p->to_email; + // not sure what happesn if person email and to_email is empty!!? + $email = empty($p->to_email) ? ($p->person() ? $p->person()->email : $p->to_email) : $p->to_email; - if ($this->poolHasDomain($email) > $this->max_to_domain) { - - if ($pushed === false) { - // we only try once to requeue.. - $requeue[] = $p; - continue; + $black = $this->server->isBlacklisted($email); + if ($black !== false) { + $this->logecho("Blacklisted - try giving it to next server"); + if (false === $this->server->updateNotifyToNextServer($p)) { + $ev = $this->addEvent('NOTIFY', $p, 'BLACKLISTED FROM our DB'); + // we dont have an althenative server to update it with. + $this->logecho("Blacklisted - next server did not work - try again in 30 mins"); + $this->server->updateNotifyToNextServer($w, date("Y-m-d H:i:s", strtotime('NOW + 30 MINUTES')),true); + // $this->errorHandler( $ev->remarks); + } - $pushed[] = $p; + continue; + } + + + if ($this->poolHasDomain($email) > $this->max_to_domain) { + // push it to a 'domain specific queue' + $this->logecho("REQUEING - maxed out that domain - {$email}"); + $this->pushQueueDomain($p, $email); + //sleep(3); continue; } @@ -276,26 +329,39 @@ class Pman_Core_Notify extends Pman + } + $this->logecho("REQUEUING all emails that maxed out:" . count($this->next_queue)); + if (!empty($this->next_queue)) { + + foreach($this->next_queue as $p) { + if (false === $this->server->updateNotifyToNextServer($p)) { + $p->updateState("????"); + } + } } + + $this->logecho("QUEUE COMPLETE - waiting for pool to end"); // we should have a time limit here... while(count($this->pool)) { $this->poolfree(); - sleep(3); + sleep(3); } - foreach($requeue as $p) { - $pp = clone($p); - $p->act_when = $p->sqlValue('NOW + INTERVAL 1 MINUTE'); - $p->update($pp); - - } + $this->logecho("DONE"); exit; } + + + + // this sequentially distributes requeued emails.. - to other servers. (can exclude current one if we have that flagged.) + + + function generateNotifications() { // this should check each module for 'GenerateNotifications.php' class.. @@ -330,7 +396,7 @@ class Pman_Core_Notify extends Pman } - + function run($id, $email='', $cmdOpts="") { @@ -345,10 +411,12 @@ class Pman_Core_Notify extends Pman $tn = $this->tempName('stdout', true); + $tne = $this->tempName('stderr', true); $descriptorspec = array( 0 => array("pipe", 'r'), // stdin is a pipe that the child will read from 1 => array("file", $tn, 'w'), // stdout is a pipe that the child will write to - 2 => array("pipe", "w") // stderr is a file to write to + 2 => array("file", $tne, 'w'), // stderr is a file to write to + // 2 => array("pipe", "w") // stderr is a file to write to ); static $php = false; @@ -371,10 +439,11 @@ class Pman_Core_Notify extends Pman $pipe = array(); - $this->logecho("call proc_open $cmd"); + //$this->logecho("call proc_open $cmd"); if ($this->max_pool_size === 1) { + $this->logecho("call passthru [{$email}] $cmd"); passthru($cmd); return; } @@ -396,6 +465,7 @@ class Pman_Core_Notify extends Pman 'proc' => $p, 'pid' => $info['pid'], 'out' => $tn, + 'oute' => $tne, 'cmd' => $cmd, 'email' => $email, 'pipes' => $pipes, @@ -404,7 +474,7 @@ class Pman_Core_Notify extends Pman ); - $this->logecho("RUN ({$info['pid']}) $cmd "); + $this->logecho("RUN [{$email}] ({$info['pid']}) $cmd "); } function poolfree() @@ -415,6 +485,8 @@ class Pman_Core_Notify extends Pman foreach($this->pool as $p) { //echo "CHECK PID: " . $p['pid'] . "\n"; + + $info = proc_get_status($p['proc']); //var_dump($info); @@ -436,9 +508,10 @@ class Pman_Core_Notify extends Pman proc_terminate($p['proc'], 9); //fclose($p['pipes'][1]); fclose($p['pipes'][0]); - fclose($p['pipes'][2]); - $this->logecho("TERMINATING: ({$p['pid']}) " . $p['cmd'] . " : " . file_get_contents($p['out'])); + + $this->logecho("TERMINATING: ({$p['pid']}) {$p['email']} " . $p['cmd'] . " : " . file_get_contents($p['out']) . " : " . file_get_contents($p['oute'])); @unlink($p['out']); + @unlink($p['oute']); // schedule again $w = DB_DataObject::factory($this->table); @@ -457,20 +530,28 @@ class Pman_Core_Notify extends Pman continue; } fclose($p['pipes'][0]); - fclose($p['pipes'][2]); //echo "CLOSING: ({$p['pid']}) " . $p['cmd'] . " : " . file_get_contents($p['out']) . "\n"; //fclose($p['pipes'][1]); proc_close($p['proc']); - - + sleep(1); + clearstatcache(); + if (file_exists('/proc/'. $p['pid'])) { + $this->logecho("proc PID={$p['pid']} still here - trying to wait"); + pcntl_waitpid($p['pid'], $status, WNOHANG); + } + //clearstatcache(); //if (file_exists('/proc/'.$p['pid'])) { // $pool[] = $p; // continue; //} - $this->logecho("ENDED: ({$p['pid']}) " . $p['cmd'] . " : " . file_get_contents($p['out']) ); + $this->logecho("ENDED: ({$p['pid']}) {$p['email']} " . $p['cmd'] . " : " . file_get_contents($p['out']) . " : " . file_get_contents($p['oute'])); @unlink($p['out']); + @unlink($p['oute']); + // at this point we could pop onto the queue the + $this->popQueueDomain($p['email']); + //unlink($p['out']); } $this->logecho("POOL SIZE: ". count($pool) ); @@ -501,6 +582,77 @@ class Pman_Core_Notify extends Pman return $ret; } + function popQueueDomain($email) + { + $ea = explode('@',$email); + $dom = strtolower(array_pop($ea)); + if (empty($this->domain_queue[$dom])) { + return; + } + array_unshift($this->queue, array_shift($this->domain_queue[$dom])); + + } + + function pushQueueDomain($e, $email) + { + if ($this->domain_queue === false) { + $this->next_queue[] = $e; + return; + } + + $ea = explode('@',$email); + $dom = strtolower(array_pop($ea)); + if (!isset($this->domain_queue[$dom])) { + $this->domain_queue[$dom] = array(); + } + $this->domain_queue[$dom][] = $e; + } + function remainingDomainQueue() + { + $ret = array(); + foreach($this->domain_queue as $dom => $ar) { + $ret = array_merge($ret, $ar); + } + $this->domain_queue = false; + return $ret; + } + function clearOld() + { + if ($this->server->isFirstServer()) { + + $p = DB_DataObject::factory($this->table); + $p->whereAdd(" + sent < '2000-01-01' + and + event_id = 0 + and + act_start < NOW() - INTERVAL {$this->clear_interval} + "); + // $p->limit(1000); + if ($p->count()) { + $ev = $this->addEvent('NOTIFY', false, "RETRY TIME EXCEEDED"); + $p = DB_DataObject::factory($this->table); + $p->query(" + UPDATE + {$this->table} + SET + sent = NOW(), + msgid = '', + event_id = {$ev->id} + WHERE + sent < '2000-01-01' + and + event_id = 0 + and + act_start < NOW() - INTERVAL {$this->clear_interval} + LIMIT + 1000 + "); + + } + } + } + function output() { @@ -511,4 +663,4 @@ class Pman_Core_Notify extends Pman { echo date("Y-m-d H:i:s - ") . $str . "\n"; } -} \ No newline at end of file +}