}
$ns = DB_DataObject::factory('core_notify_server');
+ if (!$ns->count()) {
+ $ns->id = 0;
+ return $ns;
+ }
+
$ns->poolname = $notify->poolname;
$ns->is_active = 1;
function isFirstServer()
{
+ if (!$this->id) {
+ return true;
+ }
+
$servers = $this->availableServers();
if (empty($servers)) {
return false;
// called on current server.
function assignQueues($notify)
{
-
+ if (!$this->id) {
+ return true;
+ }
$servers = $this->availableServers();
$ids = array();
+ $up = array();
foreach($servers as $s) {
$ids[] = $s->id;
}
return;
}
- // ((@row_number := CASE WHEN @row_number IS NULL THEN 0 ELSE @row_number END +1) % {$num_servers})
-
$p = DB_DataObject::factory($notify->table);
and
server_id NOT IN (" . implode(",", $ids) . ")
");
- if ($p->count() < 1) {
+ $p->orderBy('act_when asc'); //?
+ $total_add = $p->count();
+ if ($total_add < 1) {
return;
}
+ $to_add = $p->fetchAll('id');
+
+ $p = DB_DataObject::factory($notify->table);
+ $p->whereAdd("
+ sent < '2000-01-01'
+ and
+ event_id = 0
+
+ and
+ server_id IN (" . implode(",", $ids) . ")
+ ");
$p->selectAdd();
- $p->selectAdd("id, ((@row_number := CASE WHEN @row_number IS NULL THEN 0 ELSE @row_number END +1) % {$num_servers}) as rn");
- $kv = $p->fetchAll('id','rn');
- foreach($kv as $id => $r) {
- $up[ $ids[$r] ][] = $id;
+ $p->selectAdd('server_id, count(id) as n');
+ $p->groupBy('server_id');
+ $in_q = $p->fetchAll('server_id', 'n');
+
+ // if queue is empty it will not get allocated anything.
+ foreach($ids as $sid) {
+ if (!isset($in_q[$sid])) {
+ $in_q[$sid] = 0;
+ }
+ }
+ $totalq = 0;
+ foreach($in_q as $sid => $n) {
+ $totalq += $n;
}
+
+
+ // new average queue
+ $target_len = floor( ($totalq + $total_add) / $num_servers );
+
+ foreach($in_q as $sid => $cq) {
+ if ( $cq > $target_len) {
+ continue;
+ }
+ $up[ $sid ] = array_slice($to_add, 0, $target_len - $cq);
+ }
+
+ // add the reminder evently
+ foreach($to_add as $n=>$i) {
+
+ $up[ $ids[$n % $num_servers] ][] = $i;
+ }
+
+ // distribution needs to go to ones that have the shortest queues. - so to balance out the queues
+
+
+
foreach($up as $sid => $nids) {
if (empty($nids)) {
continue;
function updateNotifyToNextServer( $cn , $when = false, $allow_same = false)
{
+ if (!$this->id) {
+ return;
+ }
+
// fixme - this should take into account blacklisted - and return false if no more servers are available
$email = empty($cn->to_email) ? ($cn->person() ? $cn->person()->email : $cn->to_email) : $cn->to_email;
$good = $s;
break;
}
+ $offset = ($offset + 1) % count($servers);
+ var_dump($offset);
}
if ($good == false && $allow_same) {
$good = $this;
function isBlacklisted($email)
{
+ if (!$this->id) {
+ return false;
+ }
+
// return current server id..
static $cache = array();
-
- // get the domain..
+ // get the domain..
$ea = explode('@',$email);
$dom = strtolower(array_pop($ea));
- if (isset($cache[$dom])) {
- return $cache[$dom];
+ if (isset( $cache[$this->id . '-'. $dom])) {
+ return $cache[$this->id . '-'. $dom];
}
$cd = DB_DataObject::factory('core_domain')->loadOrCreate($dom);
$bl->server_id = $this->id;
$bl->domain_id = $cd->id;
if ($bl->count()) {
- $cache[$dom] = true;
+ $cache[$this->id . '-'. $dom] = true;
return true;
}
}
function initHelo()
{
+ if (!$this->id) {
+ return;
+ }
$ff = HTML_FlexyFramework::get();
$ff->Mail['helo'] = $this->helo;
}
function checkSmtpResponse($errmsg, $core_domain)
{
+ if (!$this->id) {
+ return false;
+ }
$bl = DB_DataObject::factory('core_notify_blacklist');
$bl->server_id = $this->id;
$bl->domain_id = $core_domain->id;