X-Git-Url: http://git.roojs.org/?p=Pman.Core;a=blobdiff_plain;f=Notify.php;h=fd582092af5a14fa15dae313ff2f70dba6e54320;hp=b24098115bbc7660d3157ade5935ff517906699f;hb=HEAD;hpb=dfe60f191482835b3eab1b5c2e0015b9721ffc11 diff --git a/Notify.php b/Notify.php index b2409811..3fd4c5b4 100644 --- a/Notify.php +++ b/Notify.php @@ -56,13 +56,11 @@ class Pman_Core_Notify extends Pman 'min' => 0, 'max' => 0, ), - 'generate' => array( - 'desc' => 'Generate notifications for a table, eg. cash_invoice', - 'default' => '', - 'short' => 'g', - 'min' => 0, - 'max' => 1, + /* removed - use GenerateNotifcations.php hooked classes + 'generate' => 'Generate notifications for a table, eg. cash_invoice', + ), + */ 'limit' => array( 'desc' => 'Limit search for no. to send to ', 'default' => 1000, @@ -99,22 +97,49 @@ class Pman_Core_Notify extends Pman var $max_to_domain = 10; /** - * @var $maxruntime - maximum time a child is allowed to run - defaut 2 minutes + * @var $maxruntime - maximum seconds a child is allowed to run - defaut 2 minutes */ var $maxruntime = 120; + /** + * @var {Boolean} log_events - default true if events should be logged. + */ + var $log_events = true; + /** + * @var {Number} try_again_minutes how long after failing to try again default = 30 if max runtime fails + */ + var $try_again_minutes = 30; + + /** + * @var {String} table - the table that the class will query for notification events + */ var $table = 'core_notify'; + /** + * @var {String} target - the application that will run for each Row in the table (eg. Pman/Core/NotifySend) + */ var $target = 'Core/NotifySend'; + + + var $evtype = ''; // any notification... // this script should only handle EMAIL notifications.. + + var $server; // core_notify_server + + var $poolname = 'core'; + + var $opts; var $force = false; + + var $clear_interval = '1 WEEK'; // how long to clear the old queue of items. + function getAuth() { $ff = HTML_FlexyFramework::get(); if (!$ff->cli) { die("access denied"); } - HTML_FlexyFramework::ensureSingle($_SERVER["SCRIPT_NAME"] .'|'. __FILE__, $this); + HTML_FlexyFramework::ensureSingle($_SERVER["SCRIPT_NAME"] .'|'. __FILE__ .'|'. (empty($_SERVER['argv'][1]) ? '': $_SERVER['argv'][1]), $this); return true; } @@ -146,39 +171,56 @@ class Pman_Core_Notify extends Pman } } - - function get($r,$opts) + var $queue = array(); + var $domain_queue = array(); // false to use nextquee + var $next_queue = array(); + + + function get($r,$opts=array()) { + + // if ($this->database_is_locked()) { + // die("LATER - DATABASE IS LOCKED"); + //} + + $this->parseArgs($opts); //date_default_timezone_set('UTC'); - // phpinfo();exit; - - $w = DB_DataObject::factory('core_notify_recur'); - if (is_a($w, 'DB_DataObject')) { - $w->generateNotifications(); - } - if (!empty($opts['generate'])) { - $w = DB_DataObject::factory($opts['generate']); - if (is_a($w, 'DB_DataObject')) { - $w->generateNotifications(); - } - exit; - - - } - - //DB_DataObject::debugLevel(1); + $this->generateNotifications(); + + //DB_DataObject::debugLevel(1); $w = DB_DataObject::factory($this->table); + $total = 0; + + + + $ff = HTML_FlexyFramework::get(); + + + $this->server = DB_DataObject::Factory('core_notify_server')->getCurrent($this); + + $this->server->assignQueues($this); + + + $this->clearOld(); + + + if (!empty($this->evtype)) { + $w->evtype = $this->evtype; + } + + $w->server_id = $this->server->id; + if (!empty($opts['old'])) { // show old and new... $w->orderBy('act_when DESC'); // latest first $w->limit($opts['limit']); // we can run - + $total = min($w->count(), $opts['limit']); } else { // standard @@ -189,103 +231,174 @@ class Pman_Core_Notify extends Pman if (!$this->force) { $w->whereAdd('act_when < NOW()'); // eg.. not if future.. } + $w->orderBy('act_when ASC'); // oldest first. - - $this->logecho("QUEUE is {$w->count()}"); + $total = min($w->count(), $opts['limit']); + $this->logecho("QUEUE is {$w->count()} only running " . ((int) $opts['limit'])); $w->limit($opts['limit']); // we can run 1000 ... } - if (!empty($this->evtype)) { - $w->evtype = $this->evtype; - } + + + $w->autoJoin(); + $total = $w->find(); + if (empty($total)) { + $this->logecho("Nothing In Queue - DONE"); + exit; + } - $ar = $w->fetchAll(); if (!empty($opts['list'])) { - if (empty($ar)) { - die("Nothing in Queue\n"); - } - foreach($ar as $w) { + + + while ($w->fetch()) { $o = $w->object(); - $this->logecho("$w->id : $w->person_id_email email : ". + $this->logecho("{$w->id} : {$w->person()->email} email : ". $o->toEventString()." ". $w->status() ); } exit; } //echo "BATCH SIZE: ". count($ar) . "\n"; - $pushed = array(); - $requeue = array(); + + while (true) { + // only add if we don't have any queued up.. + if (empty($this->queue) && $w->fetch()) { + $this->queue[] = clone($w); + $total--; + } + + $this->logecho("BATCH SIZE: Queue=". count($this->queue) . " TOTAL = " . $total ); - - $this->logecho("BATCH SIZE: ". count($ar) ); - - if (empty($ar)) { - $this->logecho("COMPLETED MAIN QUEUE - running delated"); - - if (empty($pushed)) { - break; + if (empty($this->queue)) { + $this->logecho("COMPLETED MAIN QUEUE - running maxed out domains"); + if ($this->domain_queue !== false) { + $this->queue = $this->remainingDomainQueue(); + + continue; } - $ar = $pushed; - $pushed = false; - continue; + break; // nothing more in queue.. and no remaining one } - $p = array_shift($ar); + $p = array_shift($this->queue); if (!$this->poolfree()) { - array_unshift($ar,$p); /// put it back on.. + array_unshift($this->queue,$p); /// put it back on.. sleep(3); continue; } - if ($this->poolHasDomain($p->person_id_email) > $this->max_to_domain) { - - if ($pushed === false) { - // we only try once to requeue.. - $requeue[] = $p; - continue; + // not sure what happesn if person email and to_email is empty!!? + $email = empty($p->to_email) ? ($p->person() ? $p->person()->email : $p->to_email) : $p->to_email; + + $black = $this->server->isBlacklisted($email); + if ($black !== false) { + $this->logecho("Blacklisted - try giving it to next server"); + if (false === $this->server->updateNotifyToNextServer($p)) { + $ev = $this->addEvent('NOTIFY', $p, 'BLACKLISTED FROM our DB'); + // we dont have an althenative server to update it with. + $this->logecho("Blacklisted - next server did not work - try again in 30 mins"); + $this->server->updateNotifyToNextServer($w, date("Y-m-d H:i:s", strtotime('NOW + 30 MINUTES')),true); + // $this->errorHandler( $ev->remarks); + } - $pushed[] = $p; + continue; + } + + + if ($this->poolHasDomain($email) > $this->max_to_domain) { + // push it to a 'domain specific queue' + $this->logecho("REQUEING - maxed out that domain - {$email}"); + $this->pushQueueDomain($p, $email); + //sleep(3); continue; } - $this->run($p->id,$p->person_id_email); + $this->run($p->id,$email); } + $this->logecho("REQUEUING all emails that maxed out:" . count($this->next_queue)); + if (!empty($this->next_queue)) { + + foreach($this->next_queue as $p) { + if (false === $this->server->updateNotifyToNextServer($p)) { + $p->updateState("????"); + } + } + } + + $this->logecho("QUEUE COMPLETE - waiting for pool to end"); // we should have a time limit here... while(count($this->pool)) { $this->poolfree(); - sleep(3); + sleep(3); } - foreach($requeue as $p) { - $pp = clone($p); - $p->act_when = $p->sqlValue('NOW + INTERVAL 1 MINUTE'); - $p->update($pp); - - } + $this->logecho("DONE"); exit; } - function run($id, $email, $cmdOpts="") + + + + // this sequentially distributes requeued emails.. - to other servers. (can exclude current one if we have that flagged.) + + + + function generateNotifications() + { + // this should check each module for 'GenerateNotifications.php' class.. + //and run it if found.. + $ff = HTML_FlexyFramework::get(); + + $disabled = explode(',', $ff->disable); + + $modules = array_reverse($this->modulesList()); + + // move 'project' one to the end... + + foreach ($modules as $module){ + if(in_array($module, $disabled)){ + continue; + } + $file = $this->rootDir. "/Pman/$module/GenerateNotifications.php"; + if(!file_exists($file)){ + continue; + } + + require_once $file; + $class = "Pman_{$module}_GenerateNotifications"; + $x = new $class; + if(!method_exists($x, 'generate')){ + continue; + }; + //echo "$module\n"; + $x->generate($this); + } + + + } + + + + function run($id, $email='', $cmdOpts="") { static $renice = false; @@ -295,13 +408,15 @@ class Pman_Core_Notify extends Pman } // phpinfo();exit; - $tnx = tempnam(ini_get('session.save_path'),'stdout'); - unlink($tnx); - $tn = $tnx . '.stdout'; + + + $tn = $this->tempName('stdout', true); + $tne = $this->tempName('stderr', true); $descriptorspec = array( 0 => array("pipe", 'r'), // stdin is a pipe that the child will read from 1 => array("file", $tn, 'w'), // stdout is a pipe that the child will write to - 2 => array("pipe", "w") // stderr is a file to write to + 2 => array("file", $tne, 'w'), // stderr is a file to write to + // 2 => array("pipe", "w") // stderr is a file to write to ); static $php = false; @@ -324,10 +439,11 @@ class Pman_Core_Notify extends Pman $pipe = array(); - $this->logecho("call proc_open $cmd"); + //$this->logecho("call proc_open $cmd"); if ($this->max_pool_size === 1) { + $this->logecho("call passthru [{$email}] $cmd"); passthru($cmd); return; } @@ -349,14 +465,16 @@ class Pman_Core_Notify extends Pman 'proc' => $p, 'pid' => $info['pid'], 'out' => $tn, + 'oute' => $tne, 'cmd' => $cmd, 'email' => $email, 'pipes' => $pipes, + 'notify_id' => $id, 'started' => time() ); - $this->logecho("RUN ({$info['pid']}) $cmd "); + $this->logecho("RUN [{$email}] ({$info['pid']}) $cmd "); } function poolfree() @@ -367,6 +485,8 @@ class Pman_Core_Notify extends Pman foreach($this->pool as $p) { //echo "CHECK PID: " . $p['pid'] . "\n"; + + $info = proc_get_status($p['proc']); //var_dump($info); @@ -388,9 +508,20 @@ class Pman_Core_Notify extends Pman proc_terminate($p['proc'], 9); //fclose($p['pipes'][1]); fclose($p['pipes'][0]); - fclose($p['pipes'][2]); - $this->logecho("TERMINATING: ({$p['pid']}) " . $p['cmd'] . " : " . file_get_contents($p['out'])); + + $this->logecho("TERMINATING: ({$p['pid']}) {$p['email']} " . $p['cmd'] . " : " . file_get_contents($p['out']) . " : " . file_get_contents($p['oute'])); @unlink($p['out']); + @unlink($p['oute']); + + // schedule again + $w = DB_DataObject::factory($this->table); + $w->get($p['notify_id']); + $ww = clone($w); + if ($this->log_events) { + $this->addEvent('NOTIFY', $w, 'TERMINATED - TIMEOUT'); + } + $w->act_when = date('Y-m-d H:i:s', strtotime("NOW + {$this->try_again_minutes} MINUTES")); + $w->update($ww); continue; } @@ -399,20 +530,28 @@ class Pman_Core_Notify extends Pman continue; } fclose($p['pipes'][0]); - fclose($p['pipes'][2]); //echo "CLOSING: ({$p['pid']}) " . $p['cmd'] . " : " . file_get_contents($p['out']) . "\n"; //fclose($p['pipes'][1]); proc_close($p['proc']); - - + sleep(1); + clearstatcache(); + if (file_exists('/proc/'. $p['pid'])) { + $this->logecho("proc PID={$p['pid']} still here - trying to wait"); + pcntl_waitpid($p['pid'], $status, WNOHANG); + } + //clearstatcache(); //if (file_exists('/proc/'.$p['pid'])) { // $pool[] = $p; // continue; //} - $this->logecho("ENDED: ({$p['pid']}) " . $p['cmd'] . " : " . file_get_contents($p['out']) ); + $this->logecho("ENDED: ({$p['pid']}) {$p['email']} " . $p['cmd'] . " : " . file_get_contents($p['out']) . " : " . file_get_contents($p['oute'])); @unlink($p['out']); + @unlink($p['oute']); + // at this point we could pop onto the queue the + $this->popQueueDomain($p['email']); + //unlink($p['out']); } $this->logecho("POOL SIZE: ". count($pool) ); @@ -431,9 +570,11 @@ class Pman_Core_Notify extends Pman function poolHasDomain($email) { $ret = 0; - $dom = strtolower(array_pop(explode('@',$email))); + $ea = explode('@',$email); + $dom = strtolower(array_pop($ea)); foreach($this->pool as $p) { - $mdom = strtolower(array_pop(explode('@',$p['email']))); + $ea = explode('@',$p['email']); + $mdom = strtolower(array_pop($ea)); if ($mdom == $dom) { $ret++; } @@ -441,6 +582,77 @@ class Pman_Core_Notify extends Pman return $ret; } + function popQueueDomain($email) + { + $ea = explode('@',$email); + $dom = strtolower(array_pop($ea)); + if (empty($this->domain_queue[$dom])) { + return; + } + array_unshift($this->queue, array_shift($this->domain_queue[$dom])); + + } + + function pushQueueDomain($e, $email) + { + if ($this->domain_queue === false) { + $this->next_queue[] = $e; + return; + } + + $ea = explode('@',$email); + $dom = strtolower(array_pop($ea)); + if (!isset($this->domain_queue[$dom])) { + $this->domain_queue[$dom] = array(); + } + $this->domain_queue[$dom][] = $e; + } + function remainingDomainQueue() + { + $ret = array(); + foreach($this->domain_queue as $dom => $ar) { + $ret = array_merge($ret, $ar); + } + $this->domain_queue = false; + return $ret; + } + function clearOld() + { + if ($this->server->isFirstServer()) { + + $p = DB_DataObject::factory($this->table); + $p->whereAdd(" + sent < '2000-01-01' + and + event_id = 0 + and + act_start < NOW() - INTERVAL {$this->clear_interval} + "); + // $p->limit(1000); + if ($p->count()) { + $ev = $this->addEvent('NOTIFY', false, "RETRY TIME EXCEEDED"); + $p = DB_DataObject::factory($this->table); + $p->query(" + UPDATE + {$this->table} + SET + sent = NOW(), + msgid = '', + event_id = {$ev->id} + WHERE + sent < '2000-01-01' + and + event_id = 0 + and + act_start < NOW() - INTERVAL {$this->clear_interval} + LIMIT + 1000 + "); + + } + } + } + function output() { @@ -451,4 +663,4 @@ class Pman_Core_Notify extends Pman { echo date("Y-m-d H:i:s - ") . $str . "\n"; } -} \ No newline at end of file +}