X-Git-Url: http://git.roojs.org/?p=Pman.Core;a=blobdiff_plain;f=Notify.php;h=fd582092af5a14fa15dae313ff2f70dba6e54320;hp=684b4eadeafe1ee41364b042db196adc7f11a374;hb=HEAD;hpb=fa65cb48d392ed8fb760c82216d1f1b92cc22aed diff --git a/Notify.php b/Notify.php index 684b4ead..3fd4c5b4 100644 --- a/Notify.php +++ b/Notify.php @@ -123,9 +123,16 @@ class Pman_Core_Notify extends Pman var $evtype = ''; // any notification... // this script should only handle EMAIL notifications.. - + + var $server; // core_notify_server + + var $poolname = 'core'; + var $opts; var $force = false; + + var $clear_interval = '1 WEEK'; // how long to clear the old queue of items. + function getAuth() { $ff = HTML_FlexyFramework::get(); @@ -167,9 +174,16 @@ class Pman_Core_Notify extends Pman var $queue = array(); var $domain_queue = array(); // false to use nextquee var $next_queue = array(); + function get($r,$opts=array()) { + + // if ($this->database_is_locked()) { + // die("LATER - DATABASE IS LOCKED"); + //} + + $this->parseArgs($opts); //date_default_timezone_set('UTC'); @@ -177,26 +191,29 @@ class Pman_Core_Notify extends Pman $this->generateNotifications(); - $this->assignQueues(); - - //DB_DataObject::debugLevel(1); + //DB_DataObject::debugLevel(1); $w = DB_DataObject::factory($this->table); $total = 0; $ff = HTML_FlexyFramework::get(); - if (!empty($ff->Core_Notify['servers'])) { - if (!isset($ff->Core_Notify['servers'][gethostname()])) { - $this->jerr("Core_Notify['servers']['" . gethostname() ."'] is not set"); - } - $w->server_id = array_search(gethostname(),array_keys($ff->Core_Notify['servers'])); - } + + + $this->server = DB_DataObject::Factory('core_notify_server')->getCurrent($this); + + $this->server->assignQueues($this); + + + $this->clearOld(); + + if (!empty($this->evtype)) { $w->evtype = $this->evtype; } - + $w->server_id = $this->server->id; + if (!empty($opts['old'])) { // show old and new... @@ -214,6 +231,7 @@ class Pman_Core_Notify extends Pman if (!$this->force) { $w->whereAdd('act_when < NOW()'); // eg.. not if future.. } + $w->orderBy('act_when ASC'); // oldest first. $total = min($w->count(), $opts['limit']); @@ -223,13 +241,16 @@ class Pman_Core_Notify extends Pman } - $w->autoJoin(); $total = $w->find(); + if (empty($total)) { + $this->logecho("Nothing In Queue - DONE"); + exit; + } if (!empty($opts['list'])) { @@ -254,7 +275,7 @@ class Pman_Core_Notify extends Pman $this->queue[] = clone($w); $total--; } - + $this->logecho("BATCH SIZE: Queue=". count($this->queue) . " TOTAL = " . $total ); if (empty($this->queue)) { @@ -274,15 +295,31 @@ class Pman_Core_Notify extends Pman sleep(3); continue; } - $email = $p->person() ? $p->person()->email : $p->to_email; + // not sure what happesn if person email and to_email is empty!!? + $email = empty($p->to_email) ? ($p->person() ? $p->person()->email : $p->to_email) : $p->to_email; + + $black = $this->server->isBlacklisted($email); + if ($black !== false) { + $this->logecho("Blacklisted - try giving it to next server"); + if (false === $this->server->updateNotifyToNextServer($p)) { + $ev = $this->addEvent('NOTIFY', $p, 'BLACKLISTED FROM our DB'); + // we dont have an althenative server to update it with. + $this->logecho("Blacklisted - next server did not work - try again in 30 mins"); + $this->server->updateNotifyToNextServer($w, date("Y-m-d H:i:s", strtotime('NOW + 30 MINUTES')),true); + // $this->errorHandler( $ev->remarks); + + } + + continue; + } + if ($this->poolHasDomain($email) > $this->max_to_domain) { // push it to a 'domain specific queue' $this->logecho("REQUEING - maxed out that domain - {$email}"); $this->pushQueueDomain($p, $email); - - + //sleep(3); continue; } @@ -293,15 +330,13 @@ class Pman_Core_Notify extends Pman } - $this->logecho("REQUEUING all emails that maxed out:" . count($this->next_queue)); + $this->logecho("REQUEUING all emails that maxed out:" . count($this->next_queue)); if (!empty($this->next_queue)) { foreach($this->next_queue as $p) { - $pp = clone($p); - $p->act_when = $p->sqlValue('NOW + INTERVAL 1 MINUTE'); - $this->updateServer($p); - $p->update($pp); - + if (false === $this->server->updateNotifyToNextServer($p)) { + $p->updateState("????"); + } } } @@ -320,19 +355,11 @@ class Pman_Core_Notify extends Pman exit; } - // this sequentially distributes requeued emails.. - to other servers. - function updateServer($w) - { - $ff = HTML_FlexyFramework::get(); - static $num = 0; - if (empty($ff->Core_Notify['servers'])) { - return; - } - $num++; - // next server.. - $w->server_id = $num % count(array_keys($ff->Core_Notify['servers'])); - - } + + + + // this sequentially distributes requeued emails.. - to other servers. (can exclude current one if we have that flagged.) + function generateNotifications() @@ -369,52 +396,7 @@ class Pman_Core_Notify extends Pman } - function assignQueues() - { - $ff = HTML_FlexyFramework::get(); - - if (empty($ff->Core_Notify['servers'])) { - return; - } - - $num_servers = count(array_keys($ff->Core_Notify['servers'])); - $p = DB_DataObject::factory($this->table); - $p->whereAdd(" - sent < '2000-01-01' - and - event_id = 0 - and - act_start < NOW() + INTERVAL 3 HOUR - and - server_id < 0" - - ); - if ($p->count() < 1) { - return; - } - - // 6 seconds on this machne... - $p->query(" - UPDATE - {$this->table} - SET - server_id = ((@row_number := CASE WHEN @row_number IS NULL THEN 0 ELSE @row_number END +1) % {$num_servers}) - WHERE - sent < '2000-01-01' - and - event_id = 0 - and - act_start < NOW() + INTERVAL 3 HOUR - and - server_id < 0 - ORDER BY - id ASC - LIMIT - 10000 - "); - - - } + function run($id, $email='', $cmdOpts="") { @@ -429,10 +411,12 @@ class Pman_Core_Notify extends Pman $tn = $this->tempName('stdout', true); + $tne = $this->tempName('stderr', true); $descriptorspec = array( 0 => array("pipe", 'r'), // stdin is a pipe that the child will read from 1 => array("file", $tn, 'w'), // stdout is a pipe that the child will write to - 2 => array("pipe", "w") // stderr is a file to write to + 2 => array("file", $tne, 'w'), // stderr is a file to write to + // 2 => array("pipe", "w") // stderr is a file to write to ); static $php = false; @@ -481,6 +465,7 @@ class Pman_Core_Notify extends Pman 'proc' => $p, 'pid' => $info['pid'], 'out' => $tn, + 'oute' => $tne, 'cmd' => $cmd, 'email' => $email, 'pipes' => $pipes, @@ -500,6 +485,8 @@ class Pman_Core_Notify extends Pman foreach($this->pool as $p) { //echo "CHECK PID: " . $p['pid'] . "\n"; + + $info = proc_get_status($p['proc']); //var_dump($info); @@ -521,9 +508,10 @@ class Pman_Core_Notify extends Pman proc_terminate($p['proc'], 9); //fclose($p['pipes'][1]); fclose($p['pipes'][0]); - fclose($p['pipes'][2]); - $this->logecho("TERMINATING: ({$p['pid']}) " . $p['cmd'] . " : " . file_get_contents($p['out'])); + + $this->logecho("TERMINATING: ({$p['pid']}) {$p['email']} " . $p['cmd'] . " : " . file_get_contents($p['out']) . " : " . file_get_contents($p['oute'])); @unlink($p['out']); + @unlink($p['oute']); // schedule again $w = DB_DataObject::factory($this->table); @@ -542,20 +530,25 @@ class Pman_Core_Notify extends Pman continue; } fclose($p['pipes'][0]); - fclose($p['pipes'][2]); //echo "CLOSING: ({$p['pid']}) " . $p['cmd'] . " : " . file_get_contents($p['out']) . "\n"; //fclose($p['pipes'][1]); proc_close($p['proc']); - - + sleep(1); + clearstatcache(); + if (file_exists('/proc/'. $p['pid'])) { + $this->logecho("proc PID={$p['pid']} still here - trying to wait"); + pcntl_waitpid($p['pid'], $status, WNOHANG); + } + //clearstatcache(); //if (file_exists('/proc/'.$p['pid'])) { // $pool[] = $p; // continue; //} - $this->logecho("ENDED: ({$p['pid']}) " . $p['cmd'] . " : " . file_get_contents($p['out']) ); + $this->logecho("ENDED: ({$p['pid']}) {$p['email']} " . $p['cmd'] . " : " . file_get_contents($p['out']) . " : " . file_get_contents($p['oute'])); @unlink($p['out']); + @unlink($p['oute']); // at this point we could pop onto the queue the $this->popQueueDomain($p['email']); @@ -623,7 +616,42 @@ class Pman_Core_Notify extends Pman $this->domain_queue = false; return $ret; } - + function clearOld() + { + if ($this->server->isFirstServer()) { + + $p = DB_DataObject::factory($this->table); + $p->whereAdd(" + sent < '2000-01-01' + and + event_id = 0 + and + act_start < NOW() - INTERVAL {$this->clear_interval} + "); + // $p->limit(1000); + if ($p->count()) { + $ev = $this->addEvent('NOTIFY', false, "RETRY TIME EXCEEDED"); + $p = DB_DataObject::factory($this->table); + $p->query(" + UPDATE + {$this->table} + SET + sent = NOW(), + msgid = '', + event_id = {$ev->id} + WHERE + sent < '2000-01-01' + and + event_id = 0 + and + act_start < NOW() - INTERVAL {$this->clear_interval} + LIMIT + 1000 + "); + + } + } + } function output() @@ -635,4 +663,4 @@ class Pman_Core_Notify extends Pman { echo date("Y-m-d H:i:s - ") . $str . "\n"; } -} \ No newline at end of file +}