X-Git-Url: http://git.roojs.org/?p=Pman.Core;a=blobdiff_plain;f=Notify.php;h=fd582092af5a14fa15dae313ff2f70dba6e54320;hp=a4620980f7350ba9dadcc6e1535bf1d7f3328790;hb=1524f8b36295809a3eedd6da3c6494f8aa0d86f5;hpb=7e34d4b243eae3b32e75582cda0653dd485ff404 diff --git a/Notify.php b/Notify.php index a4620980..fd582092 100644 --- a/Notify.php +++ b/Notify.php @@ -16,7 +16,13 @@ require_once 'Pman.php'; class Pman_Core_Notify extends Pman { - static $cli_desc = "Send out notification emails (usually from cron)"; + static $cli_desc = "Runs the notification queue (usually from cron) + Normally used to sends out emails to anyone in the notification list. + + /etc/cron.d/pman-core-notify + * * * * * www-data /usr/bin/php /home/gitlive/web.mtrack/admin.php Core/Notify > /dev/null + +"; static $cli_opts = array( 'debug' => array( @@ -36,7 +42,7 @@ class Pman_Core_Notify extends Pman ), 'old' => array( - 'desc' => 'Show old messages..', + 'desc' => 'Show old messages.. (and new messages...)', 'default' => 0, 'short' => 'o', 'min' => 0, @@ -50,16 +56,50 @@ class Pman_Core_Notify extends Pman 'min' => 0, 'max' => 0, ), - 'generate' => array( - 'desc' => 'Generate notifications for a table, eg. cash_invoice', - 'default' => '', - 'short' => 'g', + /* removed - use GenerateNotifcations.php hooked classes + 'generate' => 'Generate notifications for a table, eg. cash_invoice', + + ), + */ + 'limit' => array( + 'desc' => 'Limit search for no. to send to ', + 'default' => 1000, + 'short' => 'L', 'min' => 0, - 'max' => 1, + 'max' => 999, + ), + 'dryrun' => array( + 'desc' => 'Dry run - do not send.', + 'default' => 0, + 'short' => 'D', + 'min' => 0, + 'max' => 0, + ), + 'poolsize' => array( + 'desc' => 'Pool size', + 'default' => 10, + 'short' => 'P', + 'min' => 0, + 'max' => 100, ), ); + /** + * @var $nice_level Unix 'nice' level to stop it jamming server up. + */ + var $nice_level = false; + /** + * @var $max_pool_size maximum runners to start at once. + */ + var $max_pool_size = 10; + /** + * @var $max_to_domain maximum connections to make to a single domain + */ + var $max_to_domain = 10; - + /** + * @var $maxruntime - maximum time a child is allowed to run - defaut 2 minutes + */ + var $maxruntime = 120; var $table = 'core_notify'; var $target = 'Core/NotifySend'; @@ -72,22 +112,27 @@ class Pman_Core_Notify extends Pman if (!$ff->cli) { die("access denied"); } - HTML_FlexyFramework::ensureSingle(__FILE__, $this); + HTML_FlexyFramework::ensureSingle($_SERVER["SCRIPT_NAME"] .'|'. __FILE__, $this); return true; - } var $pool = array(); - function get($r,$opts) + function parseArgs(&$opts) { if ($opts['debug']) { DB_DataObject::debugLevel($opts['debug']); print_r($opts); } - //date_default_timezone_set('UTC'); - // phpinfo();exit; - $showold = !empty($opts['old']); + $this->opts = $opts; + if (!empty($opts['poolsize'])) { + $this->max_pool_size = $opts['poolsize']; + } + + if (empty($opts['limit'])) { + $opts['limit'] = '1000'; // not sure why it's not picking up the defautl.. + } + if (!empty($opts['old'])) { $opts['list'] = 1; // force listing.. } @@ -97,66 +142,89 @@ class Pman_Core_Notify extends Pman if (!empty($opts['send-to'])) { $this->send_to = $opts['send-to']; } - + } + + + function get($r,$opts=array()) + { + $this->parseArgs($opts); + + //date_default_timezone_set('UTC'); - $w = DB_DataObject::factory('core_notify_recur'); - if (is_a($w, 'DB_DataObject')) { - $w->generateNotifications(); - } - if (!empty($opts['generate'])) { - $w = DB_DataObject::factory($opts['generate']); - if (is_a($w, 'DB_DataObject')) { - $w->generateNotifications(); - } - exit; - - - } - + + $this->generateNotifications(); + + //DB_DataObject::debugLevel(1); $w = DB_DataObject::factory($this->table); + $total = 0; - if (!$showold) { - $w->whereAdd('act_when > sent'); // eg.. sent is not valid.. + if (!empty($opts['old'])) { + // show old and new... + $w->orderBy('act_when DESC'); // latest first + $w->limit($opts['limit']); // we can run + $total = min($w->count(), $opts['limit']); + } else { + // standard + + //$w->whereAdd('act_when > sent'); // eg.. sent is not valid.. + $w->whereAdd("sent < '1970-01-01' OR sent IS NULL"); // eg.. sent is not valid.. + + $w->whereAdd('act_start > NOW() - INTERVAL 14 DAY'); // ignore the ones stuck in the queue if (!$this->force) { $w->whereAdd('act_when < NOW()'); // eg.. not if future.. } $w->orderBy('act_when ASC'); // oldest first. - $w->limit(1000); // we can run 1000 ... - } else { - $w->orderBy('act_when DESC'); // latest first - $w->limit(50); // we can run 1000 ... + $total = min($w->count(), $opts['limit']); + $this->logecho("QUEUE is {$w->count()} only running " . ((int) $opts['limit'])); + + $w->limit($opts['limit']); // we can run 1000 ... } + if (!empty($this->evtype)) { $w->evtype = $this->evtype; } $w->autoJoin(); + $w->find(); - - $ar = $w->fetchAll(); + $ar = array(); // $w->fetchAll(); if (!empty($opts['list'])) { - if (empty($ar)) { - die("Nothing in Queue\n"); - } - foreach($ar as $w) { + + + while ($w->fetch()) { $o = $w->object(); - echo "$w->id : $w->person_id_email email : ". - $o->toEventString()." ". $w->status() . "\n"; + $this->logecho("{$w->id} : {$w->person()->email} email : ". + $o->toEventString()." ". $w->status() ); } exit; } - + //echo "BATCH SIZE: ". count($ar) . "\n"; $pushed = array(); + $requeue = array(); while (true) { + if ($w->fetch()) { + $ar[] = clone($w); + $total--; + } + + $this->logecho("BATCH SIZE: ". (count($ar) + $total) ); + if (empty($ar)) { - break; + $this->logecho("COMPLETED MAIN QUEUE - running deleted"); + + if (empty($pushed)) { + break; + } + $ar = $pushed; + $pushed = false; + continue; } @@ -166,48 +234,108 @@ class Pman_Core_Notify extends Pman sleep(3); continue; } - if ($this->poolHasDomain($p->person_id_email)) { - if (in_array($p->person_id_email, $pushed)) { - // it's been pushed to the end, and nothing has - // been pushed since. - // give up, let the next run sort it out. - break; - } + $email = $p->person() ? $p->person()->email : $p->to_email; + + if ($this->poolHasDomain($email) > $this->max_to_domain) { - $ar[] = $p; // push it on the end.. + if ($pushed === false) { + // we only try once to requeue.. + $requeue[] = $p; + continue; + } + $pushed[] = $p; - $pushed[] = $p->person_id_email; - echo "domain {$p->person_id_email} already on queue, pushing to end.\n"; - sleep(3); + //sleep(3); continue; } - $this->run($p->id,$p->person_id_email); - $pushed = array(); + $this->run($p->id,$email); + + } // we should have a time limit here... while(count($this->pool)) { $this->poolfree(); - sleep(3); + sleep(3); + } + + foreach($requeue as $p) { + $pp = clone($p); + $p->act_when = $p->sqlValue('NOW + INTERVAL 1 MINUTE'); + $p->update($pp); + } - die("DONE\n"); + + $this->logecho("DONE"); + exit; } + function generateNotifications() + { + // this should check each module for 'GenerateNotifications.php' class.. + //and run it if found.. + $ff = HTML_FlexyFramework::get(); + + $disabled = explode(',', $ff->disable); + + $modules = array_reverse($this->modulesList()); + + // move 'project' one to the end... + + foreach ($modules as $module){ + if(in_array($module, $disabled)){ + continue; + } + $file = $this->rootDir. "/Pman/$module/GenerateNotifications.php"; + if(!file_exists($file)){ + continue; + } + + require_once $file; + $class = "Pman_{$module}_GenerateNotifications"; + $x = new $class; + if(!method_exists($x, 'generate')){ + continue; + }; + //echo "$module\n"; + $x->generate($this); + } + + + } + + + function run($id, $email, $cmdOpts="") { - // phpinfo();exit; - $tn = tempnam(ini_get('session.save_path'),'stdout') . '.stdout'; + + static $renice = false; + if (!$renice) { + require_once 'System.php'; + $renice = System::which('renice'); + } + + // phpinfo();exit; + $tnx = tempnam(ini_get('session.save_path'),'stdout'); + unlink($tnx); + $tn = $tnx . '.stdout'; $descriptorspec = array( 0 => array("pipe", 'r'), // stdin is a pipe that the child will read from 1 => array("file", $tn, 'w'), // stdout is a pipe that the child will write to 2 => array("pipe", "w") // stderr is a file to write to ); - $php = $_SERVER["_"]; + + static $php = false; + if (!$php) { + require_once 'System.php'; + $php = System::which('php'); + } + $sn = $_SERVER["SCRIPT_NAME"]; $cwd = $sn[0] == '/' ? dirname($sn) : dirname(realpath(getcwd() . '/'. $sn)); // same as run on.. (so script should end up being same relatively..) @@ -222,9 +350,27 @@ class Pman_Core_Notify extends Pman $pipe = array(); - echo "call proc_open $cmd\n"; + $this->logecho("call proc_open $cmd"); + + + if ($this->max_pool_size === 1) { + passthru($cmd); + return; + } + + + if (!empty($this->opts['dryrun'])) { + $this->logecho("DRY RUN"); + return; + } + $p = proc_open($cmd, $descriptorspec, $pipes, $cwd ); $info = proc_get_status($p); + + if ($this->nice_level !== false) { + $rcmd = "$renice {$this->nice_level} {$info['pid']}"; + `$rcmd`; + } $this->pool[] = array( 'proc' => $p, 'pid' => $info['pid'], @@ -232,19 +378,19 @@ class Pman_Core_Notify extends Pman 'cmd' => $cmd, 'email' => $email, 'pipes' => $pipes, + 'notify_id' => $id, 'started' => time() ); - echo "RUN ({$info['pid']}) $cmd \n"; + $this->logecho("RUN ({$info['pid']}) $cmd "); } function poolfree() { $pool = array(); clearstatcache(); - $maxruntime = 2 * 60; // 2 minutes.. ?? should be long enoguh - + foreach($this->pool as $p) { //echo "CHECK PID: " . $p['pid'] . "\n"; @@ -252,27 +398,35 @@ class Pman_Core_Notify extends Pman //var_dump($info); // update if necessday. - if ($info['pid']) { - echo "CHANING PID FROM " . $p['pid'] . " TO ". $info['pid']. "\n"; + if ($info['pid'] && $p['pid'] != $info['pid']) { + $this->logecho("CHANING PID FROM " . $p['pid'] . " TO ". $info['pid']); $p['pid'] = $info['pid']; } - echo @file_get_contents('/proc/'. $p['pid'] .'/cmdline') . "\n"; + //echo @file_get_contents('/proc/'. $p['pid'] .'/cmdline') . "\n"; if ($info['running']) { //if (file_exists('/proc/'.$p['pid'])) { $runtime = time() - $p['started']; //echo "RUNTIME ({$p['pid']}): $runtime\n"; - if ($runtime > $maxruntime) { + if ($runtime > $this->maxruntime) { proc_terminate($p['proc'], 9); //fclose($p['pipes'][1]); fclose($p['pipes'][0]); fclose($p['pipes'][2]); - echo "\nTERMINATING: ({$p['pid']}) " . $p['cmd'] . " : " . file_get_contents($p['out']) . "\n"; + $this->logecho("TERMINATING: ({$p['pid']}) " . $p['cmd'] . " : " . file_get_contents($p['out'])); @unlink($p['out']); + $w = DB_DataObject::factory($this->table); + $w->get($p['notify_id']); + $ww = clone($w); + $this->addEvent('NOTIFY', $w, 'TERMINATED - TIMEOUT'); + $w->act_when = date('Y-m-d H:i:s', strtotime('NOW + 30 MINUTES')); + $w->update($ww); + + continue; } @@ -292,13 +446,13 @@ class Pman_Core_Notify extends Pman // $pool[] = $p; // continue; //} - echo "\nENDED: ({$p['pid']}) " . $p['cmd'] . " : " . file_get_contents($p['out']) . "\n"; + $this->logecho("ENDED: ({$p['pid']}) " . $p['cmd'] . " : " . file_get_contents($p['out']) ); @unlink($p['out']); //unlink($p['out']); } - echo "POOL SIZE: ". count($pool) ."\n"; + $this->logecho("POOL SIZE: ". count($pool) ); $this->pool = $pool; - if (count($pool) < 10) { + if (count($pool) < $this->max_pool_size) { return true; } return false; @@ -311,19 +465,25 @@ class Pman_Core_Notify extends Pman */ function poolHasDomain($email) { + $ret = 0; $dom = strtolower(array_pop(explode('@',$email))); foreach($this->pool as $p) { $mdom = strtolower(array_pop(explode('@',$p['email']))); if ($mdom == $dom) { - return true; + $ret++; } } - return false; + return $ret; } function output() { - die("Done\n"); + $this->logecho("DONE"); + exit; + } + function logecho($str) + { + echo date("Y-m-d H:i:s - ") . $str . "\n"; } } \ No newline at end of file