X-Git-Url: http://git.roojs.org/?p=Pman.Core;a=blobdiff_plain;f=Notify.php;h=fd582092af5a14fa15dae313ff2f70dba6e54320;hp=0e2d6a247c9c31334622396238b142e5b33b82aa;hb=HEAD;hpb=33de47d95583bfd9521509023bd3365174efba6a diff --git a/Notify.php b/Notify.php index 0e2d6a24..3fd4c5b4 100644 --- a/Notify.php +++ b/Notify.php @@ -124,11 +124,15 @@ class Pman_Core_Notify extends Pman var $evtype = ''; // any notification... // this script should only handle EMAIL notifications.. - var $server_id; + var $server; // core_notify_server + var $poolname = 'core'; var $opts; var $force = false; + + var $clear_interval = '1 WEEK'; // how long to clear the old queue of items. + function getAuth() { $ff = HTML_FlexyFramework::get(); @@ -174,6 +178,12 @@ class Pman_Core_Notify extends Pman function get($r,$opts=array()) { + + // if ($this->database_is_locked()) { + // die("LATER - DATABASE IS LOCKED"); + //} + + $this->parseArgs($opts); //date_default_timezone_set('UTC'); @@ -181,27 +191,29 @@ class Pman_Core_Notify extends Pman $this->generateNotifications(); - $this->assignQueues(); - - //DB_DataObject::debugLevel(1); + //DB_DataObject::debugLevel(1); $w = DB_DataObject::factory($this->table); $total = 0; $ff = HTML_FlexyFramework::get(); - if (!empty($ff->Core_Notify['servers']) && empty($ff->Core_Notify['servers-non-pool'][gethostname()])) { - - if (!isset($ff->Core_Notify['servers'][gethostname()])) { - $this->jerr("Core_Notify['servers']['" . gethostname() ."'] is not set"); - } - $w->server_id = array_search(gethostname(),array_keys($ff->Core_Notify['servers'])); - } + + + $this->server = DB_DataObject::Factory('core_notify_server')->getCurrent($this); + + $this->server->assignQueues($this); + + + $this->clearOld(); + + if (!empty($this->evtype)) { $w->evtype = $this->evtype; } - + $w->server_id = $this->server->id; + if (!empty($opts['old'])) { // show old and new... @@ -219,6 +231,7 @@ class Pman_Core_Notify extends Pman if (!$this->force) { $w->whereAdd('act_when < NOW()'); // eg.. not if future.. } + $w->orderBy('act_when ASC'); // oldest first. $total = min($w->count(), $opts['limit']); @@ -228,7 +241,6 @@ class Pman_Core_Notify extends Pman } - @@ -286,10 +298,18 @@ class Pman_Core_Notify extends Pman // not sure what happesn if person email and to_email is empty!!? $email = empty($p->to_email) ? ($p->person() ? $p->person()->email : $p->to_email) : $p->to_email; - $black = $this->isBlacklisted($email); + $black = $this->server->isBlacklisted($email); if ($black !== false) { - $this->logecho("DOMAIN blacklisted - {$email} - moving to another pool"); - $this->updateServer($p, $black); + $this->logecho("Blacklisted - try giving it to next server"); + if (false === $this->server->updateNotifyToNextServer($p)) { + $ev = $this->addEvent('NOTIFY', $p, 'BLACKLISTED FROM our DB'); + // we dont have an althenative server to update it with. + $this->logecho("Blacklisted - next server did not work - try again in 30 mins"); + $this->server->updateNotifyToNextServer($w, date("Y-m-d H:i:s", strtotime('NOW + 30 MINUTES')),true); + // $this->errorHandler( $ev->remarks); + + } + continue; } @@ -299,8 +319,7 @@ class Pman_Core_Notify extends Pman // push it to a 'domain specific queue' $this->logecho("REQUEING - maxed out that domain - {$email}"); $this->pushQueueDomain($p, $email); - - + //sleep(3); continue; } @@ -311,11 +330,13 @@ class Pman_Core_Notify extends Pman } - $this->logecho("REQUEUING all emails that maxed out:" . count($this->next_queue)); + $this->logecho("REQUEUING all emails that maxed out:" . count($this->next_queue)); if (!empty($this->next_queue)) { foreach($this->next_queue as $p) { - $this->updateServer($p); + if (false === $this->server->updateNotifyToNextServer($p)) { + $p->updateState("????"); + } } } @@ -335,55 +356,10 @@ class Pman_Core_Notify extends Pman } - function isBlacklisted($email) - { - // return current server id.. - $ff = HTML_FlexyFramework::get(); - //$this->logecho("CHECK BLACKLISTED - {$email}"); - if (empty($ff->Core_Notify['servers'])) { - return false; - } - - if (!isset($ff->Core_Notify['servers'][gethostname()]['blacklisted'])) { - return false; - } - - // get the domain.. - $ea = explode('@',$email); - $dom = strtolower(array_pop($ea)); - - //$this->logecho("CHECK BLACKLISTED DOM - {$dom}"); - if (!in_array($dom, $ff->Core_Notify['servers'][gethostname()]['blacklisted'] )) { - return false; - } - //$this->logecho("RETURN BLACKLISTED TRUE"); - return array_search(gethostname(),array_keys($ff->Core_Notify['servers'])); - } + // this sequentially distributes requeued emails.. - to other servers. (can exclude current one if we have that flagged.) - function updateServer($ww, $exclude = -1) - { - $w = DB_DataObject::factory($ww->tableName()); - $w->get($ww->id); - - $ff = HTML_FlexyFramework::get(); - static $num = 0; - if (empty($ff->Core_Notify['servers'])) { - return; - } - $num = ($num+1) % count(array_keys($ff->Core_Notify['servers'])); - if ($exclude == $num ) { - $num = ($num+1) % count(array_keys($ff->Core_Notify['servers'])); - } - // next server.. - $pp = clone($w); - $w->server_id = $num; - - $w->act_when = $w->sqlValue('NOW() + INTERVAL 1 MINUTE'); - $w->update($pp); - - - } + function generateNotifications() @@ -420,65 +396,7 @@ class Pman_Core_Notify extends Pman } - function assignQueues() - { - $ff = HTML_FlexyFramework::get(); - - if (empty($ff->Core_Notify['servers'])) { - return; - } - - if (isset($ff->Core_Notify['servers-non-pool'][gethostname()])) { - return; - } - - if (!isset($ff->Core_Notify['servers'][gethostname()])) { - - $this->jerr("Core_Notify['servers']['" . gethostname() ."'] is not set"); - } - // only run this on the main server... - if (array_search(gethostname(),array_keys($ff->Core_Notify['servers'])) > 0) { - return; - } - - $num_servers = count(array_keys($ff->Core_Notify['servers'])); - $p = DB_DataObject::factory($this->table); - $p->whereAdd(" - sent < '2000-01-01' - and - event_id = 0 - and - act_start < NOW() + INTERVAL 3 HOUR - and - server_id < 0" - - ); - if ($p->count() < 1) { - return; - } - $p = DB_DataObject::factory($this->table); - // 6 seconds on this machne... - $p->query(" - UPDATE - {$this->table} - SET - server_id = ((@row_number := CASE WHEN @row_number IS NULL THEN 0 ELSE @row_number END +1) % {$num_servers}) - WHERE - sent < '2000-01-01' - and - event_id = 0 - and - act_start < NOW() + INTERVAL 3 HOUR - and - server_id < 0 - ORDER BY - id ASC - LIMIT - 10000 - "); - - - } + function run($id, $email='', $cmdOpts="") { @@ -493,10 +411,12 @@ class Pman_Core_Notify extends Pman $tn = $this->tempName('stdout', true); + $tne = $this->tempName('stderr', true); $descriptorspec = array( 0 => array("pipe", 'r'), // stdin is a pipe that the child will read from 1 => array("file", $tn, 'w'), // stdout is a pipe that the child will write to - 2 => array("pipe", "w") // stderr is a file to write to + 2 => array("file", $tne, 'w'), // stderr is a file to write to + // 2 => array("pipe", "w") // stderr is a file to write to ); static $php = false; @@ -545,6 +465,7 @@ class Pman_Core_Notify extends Pman 'proc' => $p, 'pid' => $info['pid'], 'out' => $tn, + 'oute' => $tne, 'cmd' => $cmd, 'email' => $email, 'pipes' => $pipes, @@ -564,6 +485,8 @@ class Pman_Core_Notify extends Pman foreach($this->pool as $p) { //echo "CHECK PID: " . $p['pid'] . "\n"; + + $info = proc_get_status($p['proc']); //var_dump($info); @@ -585,9 +508,10 @@ class Pman_Core_Notify extends Pman proc_terminate($p['proc'], 9); //fclose($p['pipes'][1]); fclose($p['pipes'][0]); - fclose($p['pipes'][2]); - $this->logecho("TERMINATING: ({$p['pid']}) {$p['email']} " . $p['cmd'] . " : " . file_get_contents($p['out'])); + + $this->logecho("TERMINATING: ({$p['pid']}) {$p['email']} " . $p['cmd'] . " : " . file_get_contents($p['out']) . " : " . file_get_contents($p['oute'])); @unlink($p['out']); + @unlink($p['oute']); // schedule again $w = DB_DataObject::factory($this->table); @@ -606,20 +530,25 @@ class Pman_Core_Notify extends Pman continue; } fclose($p['pipes'][0]); - fclose($p['pipes'][2]); //echo "CLOSING: ({$p['pid']}) " . $p['cmd'] . " : " . file_get_contents($p['out']) . "\n"; //fclose($p['pipes'][1]); proc_close($p['proc']); - - + sleep(1); + clearstatcache(); + if (file_exists('/proc/'. $p['pid'])) { + $this->logecho("proc PID={$p['pid']} still here - trying to wait"); + pcntl_waitpid($p['pid'], $status, WNOHANG); + } + //clearstatcache(); //if (file_exists('/proc/'.$p['pid'])) { // $pool[] = $p; // continue; //} - $this->logecho("ENDED: ({$p['pid']}) {$p['email']} " . $p['cmd'] . " : " . file_get_contents($p['out']) ); + $this->logecho("ENDED: ({$p['pid']}) {$p['email']} " . $p['cmd'] . " : " . file_get_contents($p['out']) . " : " . file_get_contents($p['oute'])); @unlink($p['out']); + @unlink($p['oute']); // at this point we could pop onto the queue the $this->popQueueDomain($p['email']); @@ -687,7 +616,42 @@ class Pman_Core_Notify extends Pman $this->domain_queue = false; return $ret; } - + function clearOld() + { + if ($this->server->isFirstServer()) { + + $p = DB_DataObject::factory($this->table); + $p->whereAdd(" + sent < '2000-01-01' + and + event_id = 0 + and + act_start < NOW() - INTERVAL {$this->clear_interval} + "); + // $p->limit(1000); + if ($p->count()) { + $ev = $this->addEvent('NOTIFY', false, "RETRY TIME EXCEEDED"); + $p = DB_DataObject::factory($this->table); + $p->query(" + UPDATE + {$this->table} + SET + sent = NOW(), + msgid = '', + event_id = {$ev->id} + WHERE + sent < '2000-01-01' + and + event_id = 0 + and + act_start < NOW() - INTERVAL {$this->clear_interval} + LIMIT + 1000 + "); + + } + } + } function output()