dont add if queue len to large already
[Pman.Core] / DataObjects / Core_notify_server.php
index 9e2924f..10db21f 100644 (file)
@@ -17,9 +17,65 @@ class Pman_Core_DataObjects_Core_notify_server extends DB_DataObject
     public $is_active;
     public $last_send;
     
+    
+    
+    function  applyFilters($q, $au, $roo)
+    {
+        if (isset($q['_with_queue_size'])) {
+            $this->addQueueSize();
+        }
+    }
+    
+    
+    function addQueueSize()
+    {
+        // look for database links for server_id (which should find core_notify + others..)
+        $cn = get_class(DB_DataObject::factory('core_notify'));
+        $tables = array();
+        foreach($this->databaseLinks() as $tbl => $kv) {
+            foreach($kv as $k=>$v) {
+                if ($v != 'core_notify_server:id') {
+                    continue;
+                }
+                
+                $test = DB_DAtaObject::factory($tbl);
+                if (!is_a($test, $cn)) {
+                    break;
+                }
+                $tables[] = $tbl;
+            }
+        }
+        if (empty($tables)) {
+            die("OOPS - no tables for notify_server references");
+        }
+        $totals = array();
+        foreach($tables as $t) {
+            $totals[] = "
+                COALESCE((SELECT
+                    count(id)
+                FROM
+                    $t
+                WHERE
+                    server_id = core_notify_server.id
+                AND
+                    sent < '1970-01-01' OR sent IS NULL
+                 and
+                        event_id = 0
+                ),0)
+            ";
+        }
+        $this->selectAdd("(" . implode(" + ", $totals) . ") as in_queue ");
+        
+        
+        
+        
+        
+    }
+    
+    
     // most services should call this first..
     
-    function getCurrent($notify)
+    function getCurrent($notify, $force = false)
     {
         static $current = false;;
         
@@ -28,13 +84,26 @@ class Pman_Core_DataObjects_Core_notify_server extends DB_DataObject
         }
         
         $ns = DB_DataObject::factory('core_notify_server');
+        
         $ns->poolname = $notify->poolname;
         $ns->is_active = 1;
         $ns->hostname = gethostname();
-        if (!$ns->count()) {
+        $ns->limit(1);
+        if ($ns->find(true)) {
+            $current = $ns;
+            return $ns;
+        }
+        if (!$force) {
+            $notify->jerr("Server not found for this server " .  gethostname() . " in core_notify_server" );
+        }
+        // fallback to any server - if we are using force. (this is so helo will work...)
+        
+        $ns = DB_DataObject::factory('core_notify_server');
+        $ns->is_active = 1;
+        $ns->hostname = gethostname();
+        if (!$ns->find(true)) {
             $notify->jerr("Server not found for this server " .  gethostname() . " in core_notify_server" );
         }
-        $ns->find(true);
         $current = $ns;
         return $ns;
     }
@@ -97,8 +166,6 @@ class Pman_Core_DataObjects_Core_notify_server extends DB_DataObject
             return;
         }
         
-        // ((@row_number := CASE WHEN @row_number IS NULL THEN 0 ELSE @row_number END  +1) % {$num_servers})
-        
         
         
         $p = DB_DataObject::factory($notify->table);
@@ -111,25 +178,61 @@ class Pman_Core_DataObjects_Core_notify_server extends DB_DataObject
                 and
                 server_id NOT IN (" . implode(",", $ids) . ")
         ");
-        if ($p->count() < 1) {
+        $p->orderBy('act_when asc'); //?
+        $total_add = $p->count();
+        if ($total_add < 1) {
             return;
         }
         
+        $to_add = $p->fetchAll('id');
+        
+        $p = DB_DataObject::factory($notify->table);
+        $p->whereAdd("
+                sent < '2000-01-01'
+                and
+                event_id = 0
+        
+                and
+                server_id IN (" . implode(",", $ids) . ")
+        ");
         $p->selectAdd();
-        $p->selectAdd("id, ((@row_number := CASE WHEN @row_number IS NULL THEN 0 ELSE @row_number END  +1) % {$num_servers})  as rn");
-        $kv = $p->fetchAll('id,rn');
-        foreach($kv as $id => $r) {
-            $up[ $ids[$r] ][] = $id;
+        $p->selectAdd('server_id, count(id) as $n');
+        $p->groupBy('server_id');
+        $in_q = $p->fetchAll('server_id', 'n');
+        $totalq = 0;
+        foreach($in_q as $sid => $n) {
+            $totalq += $n;
+        }
+        
+        // new average queue
+        $target_len = floor(  ($totalq + $total_add) / $num_servers );
+        
+        foreach($in_q as $sid => $cq) {
+            if ( $target_len - $cq < 1) {
+                continue;
+            }
+            $up[ $sid ] = array_slice($to_add, 0, $target_len - $cq);
+        }
+        foreach($to_add as $i) {
+            $up[  $ids[0] ][] = $i;
         }
+        
+        // distribution needs to go to ones that have the shortest queues. - so to balance out the queues
+        
+         
+        
         foreach($up as $sid => $nids) {
+            if (empty($nids)) {
+                continue;
+            }
             $p = DB_DataObject::factory($notify->table);
             $p->query("
                 UPDATE
-                    {$this->table}
+                    {$notify->table}
                 SET
                     server_id = $sid
                 WHERE
-                    id IN (". implode(",', $nids"). ')'
+                    id IN (". implode(',', $nids). ')'
             );
         }
          
@@ -156,7 +259,7 @@ class Pman_Core_DataObjects_Core_notify_server extends DB_DataObject
         $w = DB_DataObject::factory($cn->tableName());
         $w->get($cn->id);
         
-        $servers = $this->availableServerIds();
+        $servers = $this->availableServers();
         $start = 0;
         foreach($servers as $i => $s) {
             if ($s->id == $this->id) {
@@ -164,7 +267,7 @@ class Pman_Core_DataObjects_Core_notify_server extends DB_DataObject
             }
         }
         
-        $offset = ($start + 1)  % count($ids);
+        $offset = ($start + 1)  % count($servers);
         $good = false;
         while ($offset  != $start) {
             $s = $servers[$offset];
@@ -217,7 +320,7 @@ class Pman_Core_DataObjects_Core_notify_server extends DB_DataObject
     }
     function initHelo()
     {
-        
+        $ff = HTML_FlexyFramework::get();
         $ff->Mail['helo'] = $this->helo;
         
     }
@@ -227,15 +330,16 @@ class Pman_Core_DataObjects_Core_notify_server extends DB_DataObject
         $bl->server_id = $this->id;
         $bl->domain_id = $core_domain->id;
         if ($bl->count()) {
-            return;
+            return true;
         }
         // is it a blacklist message
         if (!$bl->messageIsBlacklisted($errmsg)) {
-            return;
+            return false;
         }
+        $bl->error_str = $errmsg;
         $bl->added_dt = $bl->sqlValue("NOW()");
         $bl->insert();
-        
+        return true;
         
     }