var $evtype = ''; // any notification...
// this script should only handle EMAIL notifications..
-
+
+ var $server_id;
+
+
var $opts;
var $force = false;
function getAuth()
var $queue = array();
var $domain_queue = array(); // false to use nextquee
var $next_queue = array();
+
function get($r,$opts=array())
{
}
$w->server_id = array_search(gethostname(),array_keys($ff->Core_Notify['servers']));
}
+ if (!empty($this->evtype)) {
+ $w->evtype = $this->evtype;
+ }
+
+
if (!empty($opts['old'])) {
// show old and new...
$w->limit($opts['limit']); // we can run 1000 ...
}
- if (!empty($this->evtype)) {
- $w->evtype = $this->evtype;
- }
$w->autoJoin();
- $w->find();
+ $total = $w->find();
+ if (empty($total)) {
+ $this->logecho("Nothing In Queue - DONE");
+ exit;
+ }
if (!empty($opts['list'])) {
$this->queue[] = clone($w);
$total--;
}
-
+
$this->logecho("BATCH SIZE: Queue=". count($this->queue) . " TOTAL = " . $total );
if (empty($this->queue)) {
sleep(3);
continue;
}
- $email = $p->person() ? $p->person()->email : $p->to_email;
+ // not sure what happesn if person email and to_email is empty!!?
+ $email = empty($p->to_email) ? ($p->person() ? $p->person()->email : $p->to_email) : $p->to_email;
+
+ $black = $this->isBlacklisted($email);
+ if ($black !== false) {
+ $this->logecho("DOMAIN blacklisted - {$email} - moving to another pool");
+ $this->updateServer($p, $black);
+ continue;
+ }
+
if ($this->poolHasDomain($email) > $this->max_to_domain) {
if (!empty($this->next_queue)) {
foreach($this->next_queue as $p) {
- $pp = clone($p);
- $p->act_when = $p->sqlValue('NOW + INTERVAL 1 MINUTE');
$this->updateServer($p);
- $p->update($pp);
-
}
}
exit;
}
- // this sequentially distributes requeued emails.. - to other servers.
- function updateServer($w)
+
+ function isBlacklisted($email)
{
+ // return current server id..
+ $ff = HTML_FlexyFramework::get();
+ //$this->logecho("CHECK BLACKLISTED - {$email}");
+ if (empty($ff->Core_Notify['servers'])) {
+ return false;
+ }
+
+ if (!isset($ff->Core_Notify['servers'][gethostname()]['blacklisted'])) {
+ return false;
+ }
+
+ // get the domain..
+ $ea = explode('@',$email);
+ $dom = strtolower(array_pop($ea));
+
+ //$this->logecho("CHECK BLACKLISTED DOM - {$dom}");
+ if (!in_array($dom, $ff->Core_Notify['servers'][gethostname()]['blacklisted'] )) {
+ return false;
+ }
+ //$this->logecho("RETURN BLACKLISTED TRUE");
+ return array_search(gethostname(),array_keys($ff->Core_Notify['servers']));
+ }
+
+ // this sequentially distributes requeued emails.. - to other servers. (can exclude current one if we have that flagged.)
+ function updateServer($ww, $exclude = -1)
+ {
+ $w = DB_DataObject::factory($ww->tableName());
+ $w->get($ww->id);
+
$ff = HTML_FlexyFramework::get();
static $num = 0;
if (empty($ff->Core_Notify['servers'])) {
return;
}
- $num++;
+ $num = ($num+1) % count(array_keys($ff->Core_Notify['servers']));
+ if ($exclude == $num ) {
+ $num = ($num+1) % count(array_keys($ff->Core_Notify['servers']));
+ }
// next server..
- $w->server_id = $num % count(array_keys($ff->Core_Notify['servers']));
+ $pp = clone($w);
+ $w->server_id = $num;
+
+ $w->act_when = $w->sqlValue('NOW() + INTERVAL 1 MINUTE');
+ $w->update($pp);
+
}
return;
}
+ if (!isset($ff->Core_Notify['servers'][gethostname()])) {
+ $this->jerr("Core_Notify['servers']['" . gethostname() ."'] is not set");
+ }
+ // only run this on the main server...
+ if (array_search(gethostname(),array_keys($ff->Core_Notify['servers'])) > 0) {
+ return;
+ }
+
$num_servers = count(array_keys($ff->Core_Notify['servers']));
$p = DB_DataObject::factory($this->table);
+ $p->whereAdd("
+ sent < '2000-01-01'
+ and
+ event_id = 0
+ and
+ act_start < NOW() + INTERVAL 3 HOUR
+ and
+ server_id < 0"
+
+ );
+ if ($p->count() < 1) {
+ return;
+ }
+ $p = DB_DataObject::factory($this->table);
// 6 seconds on this machne...
$p->query("
UPDATE
and
event_id = 0
and
- act_start < NOW()
+ act_start < NOW() + INTERVAL 3 HOUR
and
server_id < 0
ORDER BY
id ASC
LIMIT
- 20000
+ 10000
");
if ($this->max_pool_size === 1) {
- this->logecho("call passthru [{$email}] $cmd");
+ $this->logecho("call passthru [{$email}] $cmd");
passthru($cmd);
return;
}
//fclose($p['pipes'][1]);
fclose($p['pipes'][0]);
fclose($p['pipes'][2]);
- $this->logecho("TERMINATING: ({$p['pid']}) " . $p['cmd'] . " : " . file_get_contents($p['out']));
+ $this->logecho("TERMINATING: ({$p['pid']}) {$p['email']} " . $p['cmd'] . " : " . file_get_contents($p['out']));
@unlink($p['out']);
// schedule again
// $pool[] = $p;
// continue;
//}
- $this->logecho("ENDED: ({$p['pid']}) " . $p['cmd'] . " : " . file_get_contents($p['out']) );
+ $this->logecho("ENDED: ({$p['pid']}) {$p['email']} " . $p['cmd'] . " : " . file_get_contents($p['out']) );
@unlink($p['out']);
// at this point we could pop onto the queue the
$this->popQueueDomain($p['email']);
{
echo date("Y-m-d H:i:s - ") . $str . "\n";
}
-}
\ No newline at end of file
+}