$this->act_when = $this->sqlValue('NOW()');
}
$this->sent = $this->sent == '0000-00-00 00:00:00' ? $this->sqlValue('NOW()') :$this->sent; // do not update if sent.....
- $this->msgid = '';
+ $this->msgid = $msgid;
$this->event_id = $event->id;
$this->update($ww);
}
function messageIsBlacklisted($err)
{
$match = array(
- '5.7.0 DT:SPM'. // 163.com
+ '5.7.0 DT:SPM', // 163.com
+ '5.7.1 H:DYNB', // some other black list
'on our block list ', // live.com
'spameatingmonkey.net', // spameatingmonkey.net (users)
'sender is listed on the block', // korian?
public $is_active;
public $last_send;
+
+
+ function applyFilters($q, $au, $roo)
+ {
+ if (isset($q['_with_queue_size'])) {
+ $this->addQueueSize();
+ }
+ }
+
+
+ function addQueueSize()
+ {
+ // look for database links for server_id (which should find core_notify + others..)
+ $cn = get_class(DB_DataObject::factory('core_notify'));
+ $tables = array();
+ foreach($this->databaseLinks() as $tbl => $kv) {
+ foreach($kv as $k=>$v) {
+ if ($v != 'core_notify_server:id') {
+ continue;
+ }
+
+ $test = DB_DAtaObject::factory($tbl);
+ if (!is_a($test, $cn)) {
+ break;
+ }
+ $tables[] = $tbl;
+ }
+ }
+ if (empty($tables)) {
+ die("OOPS - no tables for notify_server references");
+ }
+ $totals = array();
+ foreach($tables as $t) {
+ $totals[] = "
+ COALESCE((SELECT
+ count(id)
+ FROM
+ $t
+ WHERE
+ server_id = core_notify_server.id
+ AND
+ sent < '1970-01-01' OR sent IS NULL
+ and
+ event_id = 0
+ ),0)
+ ";
+ }
+ $this->selectAdd("(" . implode(" + ", $totals) . ") as in_queue ");
+
+
+
+
+
+ }
+
+
// most services should call this first..
- function getCurrent($notify)
+ function getCurrent($notify, $force = false)
{
static $current = false;;
}
$ns = DB_DataObject::factory('core_notify_server');
+
$ns->poolname = $notify->poolname;
$ns->is_active = 1;
$ns->hostname = gethostname();
- if (!$ns->count()) {
+ $ns->limit(1);
+ if ($ns->find(true)) {
+ $current = $ns;
+ return $ns;
+ }
+ if (!$force) {
+ $notify->jerr("Server not found for this server " . gethostname() . " in core_notify_server" );
+ }
+ // fallback to any server - if we are using force. (this is so helo will work...)
+
+ $ns = DB_DataObject::factory('core_notify_server');
+ $ns->is_active = 1;
+ $ns->hostname = gethostname();
+ if (!$ns->find(true)) {
$notify->jerr("Server not found for this server " . gethostname() . " in core_notify_server" );
}
- $ns->find(true);
$current = $ns;
return $ns;
}
return;
}
- // ((@row_number := CASE WHEN @row_number IS NULL THEN 0 ELSE @row_number END +1) % {$num_servers})
-
$p = DB_DataObject::factory($notify->table);
and
server_id NOT IN (" . implode(",", $ids) . ")
");
- if ($p->count() < 1) {
+ $p->orderBy('act_when asc'); //?
+ $total_add = $p->count();
+ if ($total_add < 1) {
return;
}
+ $to_add = $p->fetchAll('id');
+
+ $p = DB_DataObject::factory($notify->table);
+ $p->whereAdd("
+ sent < '2000-01-01'
+ and
+ event_id = 0
+
+ and
+ server_id IN (" . implode(",", $ids) . ")
+ ");
$p->selectAdd();
- $p->selectAdd("id, ((@row_number := CASE WHEN @row_number IS NULL THEN 0 ELSE @row_number END +1) % {$num_servers}) as rn");
- $kv = $p->fetchAll('id','rn');
- foreach($kv as $id => $r) {
- $up[ $ids[$r] ][] = $id;
+ $p->selectAdd('server_id, count(id) as n');
+ $p->groupBy('server_id');
+ $in_q = $p->fetchAll('server_id', 'n');
+
+ // if queue is empty it will not get allocated anything.
+ foreach($ids as $sid) {
+ if (!isset($in_q[$sid])) {
+ $in_q[$sid] = 0;
+ }
+ }
+ $totalq = 0;
+ foreach($in_q as $sid => $n) {
+ $totalq += $n;
+ }
+
+
+ // new average queue
+ $target_len = floor( ($totalq + $total_add) / $num_servers );
+
+ foreach($in_q as $sid => $cq) {
+ if ( $target_len - $cq < 1) {
+ continue;
+ }
+ $up[ $sid ] = array_slice($to_add, 0, $target_len - $cq);
+ }
+ foreach($to_add as $i) {
+ $up[ $ids[0] ][] = $i;
}
+
+ // distribution needs to go to ones that have the shortest queues. - so to balance out the queues
+
+
+
foreach($up as $sid => $nids) {
if (empty($nids)) {
continue;
$good = $s;
break;
}
+ $offset = ($offset + 1) % count($servers);
+ var_dump($offset);
}
if ($good == false && $allow_same) {
$good = $this;
{
// return current server id..
static $cache = array();
-
- // get the domain..
+ // get the domain..
$ea = explode('@',$email);
$dom = strtolower(array_pop($ea));
- if (isset($cache[$dom])) {
- return $cache[$dom];
+ if (isset( $cache[$this->id . '-'. $dom])) {
+ return $cache[$this->id . '-'. $dom];
}
$cd = DB_DataObject::factory('core_domain')->loadOrCreate($dom);
$bl->server_id = $this->id;
$bl->domain_id = $cd->id;
if ($bl->count()) {
- $cache[$dom] = true;
+ $cache[$this->id . '-'. $dom] = true;
return true;
}
$bl->server_id = $this->id;
$bl->domain_id = $core_domain->id;
if ($bl->count()) {
- return;
+ return true;
}
// is it a blacklist message
if (!$bl->messageIsBlacklisted($errmsg)) {
- return;
+ return false;
}
+ $bl->error_str = $errmsg;
$bl->added_dt = $bl->sqlValue("NOW()");
$bl->insert();
-
+ return true;
}
$w->evtype = $this->evtype;
}
-
+ $w->server_id = $this->server->id;
+
if (!empty($opts['old'])) {
// show old and new...
$w->limit($opts['limit']); // we can run 1000 ...
}
- $w->server_id = $this->server->id;
$black = $this->server->isBlacklisted($email);
if ($black !== false) {
-
+ $this->logecho("Blacklisted - try giving it to next server");
if (false === $this->server->updateNotifyToNextServer($p)) {
- $p->updateState("????");
+ $ev = $this->addEvent('NOTIFY', $p, 'BLACKLISTED FROM our DB');
+ // we dont have an althenative server to update it with.
+ $this->logecho("Blacklisted - next server did not work - try again in 30 mins");
+ $this->server->updateNotifyToNextServer($w, date("Y-m-d H:i:s", strtotime('NOW + 30 MINUTES')),true);
+ // $this->errorHandler( $ev->remarks);
+
}
continue;
$tn = $this->tempName('stdout', true);
+ $tne = $this->tempName('stderr', true);
$descriptorspec = array(
0 => array("pipe", 'r'), // stdin is a pipe that the child will read from
1 => array("file", $tn, 'w'), // stdout is a pipe that the child will write to
- 2 => array("pipe", "w") // stderr is a file to write to
+ 2 => array("file", $tne, 'w'), // stderr is a file to write to
+ // 2 => array("pipe", "w") // stderr is a file to write to
);
static $php = false;
'proc' => $p,
'pid' => $info['pid'],
'out' => $tn,
+ 'oute' => $tne,
'cmd' => $cmd,
'email' => $email,
'pipes' => $pipes,
foreach($this->pool as $p) {
//echo "CHECK PID: " . $p['pid'] . "\n";
+
+
$info = proc_get_status($p['proc']);
//var_dump($info);
proc_terminate($p['proc'], 9);
//fclose($p['pipes'][1]);
fclose($p['pipes'][0]);
- fclose($p['pipes'][2]);
- $this->logecho("TERMINATING: ({$p['pid']}) {$p['email']} " . $p['cmd'] . " : " . file_get_contents($p['out']));
+
+ $this->logecho("TERMINATING: ({$p['pid']}) {$p['email']} " . $p['cmd'] . " : " . file_get_contents($p['out']) . " : " . file_get_contents($p['oute']));
@unlink($p['out']);
+ @unlink($p['oute']);
// schedule again
$w = DB_DataObject::factory($this->table);
continue;
}
fclose($p['pipes'][0]);
- fclose($p['pipes'][2]);
//echo "CLOSING: ({$p['pid']}) " . $p['cmd'] . " : " . file_get_contents($p['out']) . "\n";
//fclose($p['pipes'][1]);
proc_close($p['proc']);
-
-
+ sleep(1);
+ clearstatcache();
+ if (file_exists('/proc/'. $p['pid'])) {
+ $this->logecho("proc PID={$p['pid']} still here - trying to wait");
+ pcntl_waitpid($p['pid'], $status, WNOHANG);
+ }
+
//clearstatcache();
//if (file_exists('/proc/'.$p['pid'])) {
// $pool[] = $p;
// continue;
//}
- $this->logecho("ENDED: ({$p['pid']}) {$p['email']} " . $p['cmd'] . " : " . file_get_contents($p['out']) );
+ $this->logecho("ENDED: ({$p['pid']}) {$p['email']} " . $p['cmd'] . " : " . file_get_contents($p['out']) . " : " . file_get_contents($p['oute']));
@unlink($p['out']);
+ @unlink($p['oute']);
// at this point we could pop onto the queue the
$this->popQueueDomain($p['email']);
function clearOld()
{
if ($this->server->isFirstServer()) {
+
$p = DB_DataObject::factory($this->table);
$p->whereAdd("
sent < '2000-01-01'
$this->errorHandler("already sent - repeat to early\n");
}
- $this->server = DB_DataObject::Factory('core_notify_server')->getCurrent($this);
- if (!$force && $w->server_id != $this->server->id) {
+ $this->server = DB_DataObject::Factory('core_notify_server')->getCurrent($this, $force);
+ if (!$force && $w->server_id != $this->server->id) {
$this->errorHandler("Server id does not match - use force to try again\n");
}
+
if (!empty($opts['debug'])) {
print_r($w);
$ff = HTML_FlexyFramework::get();
HTML_FlexyFramework::get()->Core_Mailer['debug'] = true;
}
- $sent = (empty($w->sent) || preg_match('/^0000/', $w->sent)) ? false : true;
+ $sent = (empty($w->sent) || strtotime( $w->sent) < 100 ) ? false : true;
if (!$force && (!empty($w->msgid) || $sent)) {
$ww = clone($w);
if (!$sent) { // fix sent.
- $w->sent = $w->sent == '0000-00-00 00:00:00' ? $w->sqlValue('NOW()') :$w->sent; // do not update if sent.....
+ $w->sent = strtotime( $w->sent) < 100 ? $w->sqlValue('NOW()') :$w->sent; // do not update if sent.....
$w->update($ww);
}
$this->errorHandler("message has been sent already.\n");
}
+ // we have a bug with msgid not getting filled.
+ $cev = DB_DataObject::Factory('Events');
+ $cev->on_table = $this->table;
+ $cev->on_id = $w->id;
+ $cev->whereAdd("action IN ('NOTIFYSENT', 'NOTIFYFAIL')");
+ $cev->limit(1);
+ if ($cev->count()) {
+ $cev->find(true);
+ $w->flagDone($cev, $cev->action == 'NOTIFYSENT' ? 'alreadysent' : '');
+ $this->errorHandler( $cev->action . " (fix old) ". $cev->remarks);
+ }
+
+
$o = $w->object();
if ($o === false) {
$this->errorHandler( $ev->remarks);
}
+ $retry_when = date('Y-m-d H:i:s', strtotime('NOW + ' . $retry . ' MINUTES'));
//$this->addEvent('NOTIFY', $w, 'GREYLISTED ' . $p->email . ' ' . $res->toString());
// we can only update act_when if it has not been sent already (only happens when running in force mode..)
// set act when if it's empty...
- $w->act_when = (!$w->act_when || $w->act_when == '0000-00-00 00:00:00') ? date('Y-m-d H:i:s', strtotime('NOW + ' . $retry . ' MINUTES')) : $w->act_when;
+ $w->act_when = (!$w->act_when || $w->act_when == '0000-00-00 00:00:00') ? $retry_when : $w->act_when;
$w->update($ww);
$ev->writeEventLog($this->debug_str);
- $w->flagDone($ev,$email['headers']['Message-Id']);
+ $w->flagDone($ev, $email['headers']['Message-Id']);
// enable cc in notify..
$code = empty($res->userinfo['smtpcode']) ? -1 : $res->userinfo['smtpcode'];
if (!empty($res->code) && $res->code == 10001) {
// fake greylist if timed out.
- $code = 421;
+ $code = -1;
}
if ($code < 0) {
//print_r($res);
$ev = $this->addEvent('NOTIFY', $w, 'GREYLISTED - ' . $errmsg);
- $this->server->updateNotifyToNextServer($w, strtotime('NOW + ' . $retry . ' MINUTES'),true);
+ $this->server->updateNotifyToNextServer($w, $retry_when,true);
$this->errorHandler( $ev->remarks);
}
$errmsg= $res->userinfo['smtpcode'] . ':' . $res->userinfo['smtptext'];
}
- $ev = $this->addEvent('NOTIFYFAIL', $w, ($fail ? "FAILED - " : "RETRY TIME EXCEEDED - ") . $errmsg);
- $w->flagDone($ev, '');
-
if ($res->userinfo['smtpcode'] == 550) {
- $this->server->checkSmtpResponse($errmsg, $core_domain);
+ if ($this->server->checkSmtpResponse($errmsg, $core_domain)) {
+ $ev = $this->addEvent('NOTIFY', $w, 'BLACKLISTED - ' . $errmsg);
+ $this->server->updateNotifyToNextServer($w, $retry_when,true);
+ $this->errorHandler( $ev->remarks);
+ }
}
-
-
+
+ $ev = $this->addEvent('NOTIFYFAIL', $w, ($fail ? "FAILED - " : "RETRY TIME EXCEEDED - ") . $errmsg);
+ $w->flagDone($ev, '');
+
$this->errorHandler( $ev->remarks);
}
// try again.
- $ev = $this->addEvent('NOTIFY', $w, 'NO HOST CAN BE CONTACTED:' . $p->email);
+ $ev = $this->addEvent('NOTIFY', $w, 'GREYLIST - NO HOST CAN BE CONTACTED:' . $p->email);
- $this->server->updateNotifyToNextServer($w, strtotime('NOW + ' . $retry . ' MINUTES'),true);
+ $this->server->updateNotifyToNextServer($w, $retry_when ,true);
-- ?? why added??? - probably need to document this..
ALTER TABLE core_notify ADD COLUMN domain_id INT(11) NOT NULL DEFAULT 0;
+ALTER TABLE core_notify ADD COLUMN server_id INT(11) NOT NULL DEFAULT -1;
+
ALTER TABLE core_notify ADD INDEX lookup(act_when, msgid);
ALTER TABLE core_notify ADD INDEX lookup_e (onid, ontable, person_id, act_when);
ALTER TABLE core_notify ADD INDEX lookup_f (to_email);
alter table core_notify add index lookup_g(sent, act_start, act_when);
+alter table core_notify add INDEX lookup_h (sent, event_id, server_id, msgid, ontable);
+
ALTER TABLE core_notify ADD INDEX lookup_person_id (person_id);
ALTER TABLE core_notify ADD INDEX lookup_trigger_person_id (trigger_person_id);