var $evtype = ''; // any notification...
// this script should only handle EMAIL notifications..
-
+ var $opts;
var $force = false;
function getAuth()
{
if (!$ff->cli) {
die("access denied");
}
- HTML_FlexyFramework::ensureSingle($_SERVER["SCRIPT_NAME"] .'|'. __FILE__, $this);
+ HTML_FlexyFramework::ensureSingle($_SERVER["SCRIPT_NAME"] .'|'. __FILE__ .'|'. (empty($_SERVER['argv'][1]) ? '': $_SERVER['argv'][1]), $this);
return true;
}
}
}
-
+ var $queue = array();
+ var $domain_queue = array(); // false to use nextquee
+ var $next_queue = array();
+
function get($r,$opts=array())
{
$this->parseArgs($opts);
$this->generateNotifications();
-
+ $this->assignQueues();
+
//DB_DataObject::debugLevel(1);
$w = DB_DataObject::factory($this->table);
$total = 0;
$w->evtype = $this->evtype;
}
+ $ff = HTML_FlexyFramework::get();
+ if (!empty($ff->Core_Notify['servers'])) {
+ if (!isset($ff->Core_Notify['servers'][gethostname()])) {
+ $this->jerr("Core_Notify['servers']['" . gethostname() ."'] is not set");
+ }
+ $w->server_id = array_search(gethostname(),array_keys($ff->Core_Notify['servers']));
+ }
+
+
+
+
$w->autoJoin();
$w->find();
- $ar = array(); // $w->fetchAll();
+
if (!empty($opts['list'])) {
}
//echo "BATCH SIZE: ". count($ar) . "\n";
- $pushed = array();
- $requeue = array();
+
+
while (true) {
// only add if we don't have any queued up..
- if (empty($ar) && $w->fetch()) {
- $ar[] = clone($w);
+ if (empty($this->queue) && $w->fetch()) {
+ $this->queue[] = clone($w);
$total--;
}
- $this->logecho("BATCH SIZE: ". (count($ar) + $total) );
+ $this->logecho("BATCH SIZE: ". (count($this->queue) + $total) );
- if (empty($ar)) {
- $this->logecho("COMPLETED MAIN QUEUE - running deleted");
-
- if (empty($pushed)) {
- break;
+ if (empty($this->queue)) {
+ $this->logecho("COMPLETED MAIN QUEUE - running maxed out domains");
+ if ($this->domain_queue !== false) {
+ $this->queue = $this->remainingDomainQueue();
+ $this->domain_queue = false;
}
- $ar = $pushed;
- $pushed = false;
continue;
}
- $p = array_shift($ar);
+ $p = array_shift($this->queue);
if (!$this->poolfree()) {
- array_unshift($ar,$p); /// put it back on..
+ array_unshift($this->queue,$p); /// put it back on..
sleep(3);
continue;
}
if ($this->poolHasDomain($email) > $this->max_to_domain) {
- if ($pushed === false) {
- // we only try once to requeue..
- $requeue[] = $p;
- continue;
- }
- $pushed[] = $p;
-
+ // push it to a 'domain specific queue'
+ $this->logecho("REQUEING - maxed out that domain - {$email}");
+ $this->pushQueueDomain($p, $email);
+
//sleep(3);
continue;
// we should have a time limit here...
while(count($this->pool)) {
$this->poolfree();
- sleep(3);
+ sleep(3);
}
- foreach($requeue as $p) {
+ foreach($this->next_queue as $p) {
$pp = clone($p);
$p->act_when = $p->sqlValue('NOW + INTERVAL 1 MINUTE');
+ $this->updateServer($p);
$p->update($pp);
}
exit;
}
+ // this sequentially distributes requeued emails.. - to other servers.
+ function updateServer($w)
+ {
+ $ff = HTML_FlexyFramework::get();
+ static $num = 0;
+ if (empty($ff->Core_Notify['servers'])) {
+ return;
+ }
+ $num++;
+ // next server..
+ $w->server_id = $num % count(array_keys($ff->Core_Notify['servers']));
+
+ }
+
+
function generateNotifications()
{
// this should check each module for 'GenerateNotifications.php' class..
}
-
+ function assignQueues()
+ {
+ $ff = HTML_FlexyFramework::get();
+
+ if (empty($ff->Core_Notify['servers'])) {
+ return;
+ }
+
+ $num_servers = count(array_keys($ff->Core_Notify['servers']));
+ $p = DB_DataObject::factory($this->table);
+ // 6 seconds on this machne...
+ $p->query("
+ UPDATE
+ {$this->table}
+ SET
+ server_id = ((@row_number := CASE WHEN @row_number IS NULL THEN 0 ELSE @row_number END +1) % {$num_servers})
+ WHERE
+ sent < '2000-01-01'
+ and
+ event_id = 0
+ and
+ act_start < NOW()
+ and
+ server_id < 0
+ ORDER BY
+ id ASC
+ LIMIT
+ 20000
+ ");
+
+
+ }
function run($id, $email='', $cmdOpts="")
{
//}
$this->logecho("ENDED: ({$p['pid']}) " . $p['cmd'] . " : " . file_get_contents($p['out']) );
@unlink($p['out']);
+ // at this point we could pop onto the queue the
+ $this->popQueueDomain($p['email']);
+
//unlink($p['out']);
}
$this->logecho("POOL SIZE: ". count($pool) );
return $ret;
}
+ function popQueueDomain($email)
+ {
+ $ea = explode('@',$email);
+ $dom = strtolower(array_pop($ea));
+ if (empty($this->domain_queue[$dom])) {
+ return;
+ }
+ array_unshift($this->queue, array_shift($this->domain_queue[$dom]));
+
+ }
+
+ function pushQueueDomain($e, $email)
+ {
+ if ($this->domain_queue === false) {
+ $this->next_queue[] = $e;
+ return;
+ }
+
+ $ea = explode('@',$email);
+ $dom = strtolower(array_pop($ea));
+ if (!isset($this->domain_queue[$dom])) {
+ $this->domain_queue[$dom] = array();
+ }
+ $this->domain_queue[$dom][] = $e;
+ }
+ function remainingDomainQueue()
+ {
+ $ret = array();
+ foreach($this->domain_queue as $dom => $ar) {
+ $ret = array_merge($ret, $ar);
+ }
+ return $ret;
+ }
+
+
function output()
{