Merge branch 'master' into wip_leon_T7841_replace_gmt_lists_with_timezone
authorAlan <alan@roojs.com>
Mon, 16 Oct 2023 04:15:23 +0000 (12:15 +0800)
committerAlan <alan@roojs.com>
Mon, 16 Oct 2023 04:15:23 +0000 (12:15 +0800)
DataObjects/Core_notify.php
DataObjects/Core_notify_blacklist.php
DataObjects/Core_notify_server.php
Notify.php
NotifySend.php
sql/core_notify.sql

index 0998632..0997d37 100644 (file)
@@ -352,7 +352,7 @@ class Pman_Core_DataObjects_Core_notify extends DB_DataObject
             $this->act_when = $this->sqlValue('NOW()');
         }
         $this->sent = $this->sent == '0000-00-00 00:00:00' ? $this->sqlValue('NOW()') :$this->sent; // do not update if sent.....
-        $this->msgid = '';
+        $this->msgid = $msgid;
         $this->event_id = $event->id;
         $this->update($ww);
     }
index 408f542..89d686b 100644 (file)
@@ -20,7 +20,8 @@ class Pman_Core_DataObjects_Core_notify_blacklist extends DB_DataObject
     function messageIsBlacklisted($err)
     {
         $match = array(
-            '5.7.0 DT:SPM'. // 163.com
+            '5.7.0 DT:SPM', // 163.com
+            '5.7.1 H:DYNB', // some other black list
             'on our block list ',  // live.com
             'spameatingmonkey.net', // spameatingmonkey.net (users)
             'sender is listed on the block', // korian?
index 7692e34..ed1aec7 100644 (file)
@@ -17,9 +17,65 @@ class Pman_Core_DataObjects_Core_notify_server extends DB_DataObject
     public $is_active;
     public $last_send;
     
+    
+    
+    function  applyFilters($q, $au, $roo)
+    {
+        if (isset($q['_with_queue_size'])) {
+            $this->addQueueSize();
+        }
+    }
+    
+    
+    function addQueueSize()
+    {
+        // look for database links for server_id (which should find core_notify + others..)
+        $cn = get_class(DB_DataObject::factory('core_notify'));
+        $tables = array();
+        foreach($this->databaseLinks() as $tbl => $kv) {
+            foreach($kv as $k=>$v) {
+                if ($v != 'core_notify_server:id') {
+                    continue;
+                }
+                
+                $test = DB_DAtaObject::factory($tbl);
+                if (!is_a($test, $cn)) {
+                    break;
+                }
+                $tables[] = $tbl;
+            }
+        }
+        if (empty($tables)) {
+            die("OOPS - no tables for notify_server references");
+        }
+        $totals = array();
+        foreach($tables as $t) {
+            $totals[] = "
+                COALESCE((SELECT
+                    count(id)
+                FROM
+                    $t
+                WHERE
+                    server_id = core_notify_server.id
+                AND
+                    sent < '1970-01-01' OR sent IS NULL
+                 and
+                        event_id = 0
+                ),0)
+            ";
+        }
+        $this->selectAdd("(" . implode(" + ", $totals) . ") as in_queue ");
+        
+        
+        
+        
+        
+    }
+    
+    
     // most services should call this first..
     
-    function getCurrent($notify)
+    function getCurrent($notify, $force = false)
     {
         static $current = false;;
         
@@ -28,13 +84,26 @@ class Pman_Core_DataObjects_Core_notify_server extends DB_DataObject
         }
         
         $ns = DB_DataObject::factory('core_notify_server');
+        
         $ns->poolname = $notify->poolname;
         $ns->is_active = 1;
         $ns->hostname = gethostname();
-        if (!$ns->count()) {
+        $ns->limit(1);
+        if ($ns->find(true)) {
+            $current = $ns;
+            return $ns;
+        }
+        if (!$force) {
+            $notify->jerr("Server not found for this server " .  gethostname() . " in core_notify_server" );
+        }
+        // fallback to any server - if we are using force. (this is so helo will work...)
+        
+        $ns = DB_DataObject::factory('core_notify_server');
+        $ns->is_active = 1;
+        $ns->hostname = gethostname();
+        if (!$ns->find(true)) {
             $notify->jerr("Server not found for this server " .  gethostname() . " in core_notify_server" );
         }
-        $ns->find(true);
         $current = $ns;
         return $ns;
     }
@@ -97,8 +166,6 @@ class Pman_Core_DataObjects_Core_notify_server extends DB_DataObject
             return;
         }
         
-        // ((@row_number := CASE WHEN @row_number IS NULL THEN 0 ELSE @row_number END  +1) % {$num_servers})
-        
         
         
         $p = DB_DataObject::factory($notify->table);
@@ -111,16 +178,57 @@ class Pman_Core_DataObjects_Core_notify_server extends DB_DataObject
                 and
                 server_id NOT IN (" . implode(",", $ids) . ")
         ");
-        if ($p->count() < 1) {
+        $p->orderBy('act_when asc'); //?
+        $total_add = $p->count();
+        if ($total_add < 1) {
             return;
         }
         
+        $to_add = $p->fetchAll('id');
+        
+        $p = DB_DataObject::factory($notify->table);
+        $p->whereAdd("
+                sent < '2000-01-01'
+                and
+                event_id = 0
+        
+                and
+                server_id IN (" . implode(",", $ids) . ")
+        ");
         $p->selectAdd();
-        $p->selectAdd("id, ((@row_number := CASE WHEN @row_number IS NULL THEN 0 ELSE @row_number END  +1) % {$num_servers})  as rn");
-        $kv = $p->fetchAll('id','rn');
-        foreach($kv as $id => $r) {
-            $up[ $ids[$r] ][] = $id;
+        $p->selectAdd('server_id, count(id) as  n');
+        $p->groupBy('server_id');
+        $in_q = $p->fetchAll('server_id', 'n');
+        
+        // if queue is empty it will not get allocated anything.
+        foreach($ids as $sid) {
+            if (!isset($in_q[$sid])) {
+                $in_q[$sid] = 0;
+            }
+        }
+        $totalq = 0;
+        foreach($in_q as $sid => $n) {
+            $totalq += $n;
+        }
+        
+        
+        // new average queue
+        $target_len = floor(  ($totalq + $total_add) / $num_servers );
+        
+        foreach($in_q as $sid => $cq) {
+            if ( $target_len - $cq < 1) {
+                continue;
+            }
+            $up[ $sid ] = array_slice($to_add, 0, $target_len - $cq);
+        }
+        foreach($to_add as $i) {
+            $up[  $ids[0] ][] = $i;
         }
+        
+        // distribution needs to go to ones that have the shortest queues. - so to balance out the queues
+        
+         
+        
         foreach($up as $sid => $nids) {
             if (empty($nids)) {
                 continue;
@@ -175,6 +283,8 @@ class Pman_Core_DataObjects_Core_notify_server extends DB_DataObject
                 $good = $s;
                 break;
             }
+            $offset = ($offset + 1)  % count($servers);
+            var_dump($offset);
         }
         if ($good == false && $allow_same) {
             $good = $this;
@@ -198,12 +308,11 @@ class Pman_Core_DataObjects_Core_notify_server extends DB_DataObject
     {
         // return current server id..
         static $cache = array();
-        
-        // get the domain..
+         // get the domain..
         $ea = explode('@',$email);
         $dom = strtolower(array_pop($ea));
-        if (isset($cache[$dom])) {
-            return $cache[$dom];
+        if (isset( $cache[$this->id . '-'. $dom])) {
+            return  $cache[$this->id . '-'. $dom];
         }
         
         $cd = DB_DataObject::factory('core_domain')->loadOrCreate($dom);
@@ -212,7 +321,7 @@ class Pman_Core_DataObjects_Core_notify_server extends DB_DataObject
         $bl->server_id = $this->id;
         $bl->domain_id = $cd->id;
         if ($bl->count()) {
-            $cache[$dom] = true;
+            $cache[$this->id . '-'. $dom] = true;
             return true;
         }
         
@@ -230,15 +339,16 @@ class Pman_Core_DataObjects_Core_notify_server extends DB_DataObject
         $bl->server_id = $this->id;
         $bl->domain_id = $core_domain->id;
         if ($bl->count()) {
-            return;
+            return true;
         }
         // is it a blacklist message
         if (!$bl->messageIsBlacklisted($errmsg)) {
-            return;
+            return false;
         }
+        $bl->error_str = $errmsg;
         $bl->added_dt = $bl->sqlValue("NOW()");
         $bl->insert();
-        
+        return true;
         
     }
     
index 2e5dceb..5ed6a7b 100644 (file)
@@ -206,7 +206,8 @@ class Pman_Core_Notify extends Pman
             $w->evtype = $this->evtype;
         }
         
-        
+        $w->server_id = $this->server->id;
+
         
         if (!empty($opts['old'])) {
             // show old and new...
@@ -232,7 +233,6 @@ class Pman_Core_Notify extends Pman
             $w->limit($opts['limit']); // we can run 1000 ...
         }
         
-        $w->server_id = $this->server->id;
         
     
         
@@ -293,9 +293,14 @@ class Pman_Core_Notify extends Pman
             
             $black = $this->server->isBlacklisted($email);
             if ($black !== false) {
-                
+                $this->logecho("Blacklisted - try giving it to next server");
                 if (false === $this->server->updateNotifyToNextServer($p)) {
-                    $p->updateState("????");
+                    $ev = $this->addEvent('NOTIFY', $p, 'BLACKLISTED  FROM our DB');
+                    // we dont have an althenative server to update it with.
+                    $this->logecho("Blacklisted - next server did not work - try again in 30 mins");
+                    $this->server->updateNotifyToNextServer($w,  date("Y-m-d H:i:s",  strtotime('NOW +  30 MINUTES')),true);
+                   // $this->errorHandler( $ev->remarks);
+                   
                 }
                 
                 continue;
@@ -399,10 +404,12 @@ class Pman_Core_Notify extends Pman
         
         
         $tn =  $this->tempName('stdout', true);
+        $tne =  $this->tempName('stderr', true);
         $descriptorspec = array(
             0 => array("pipe", 'r'),  // stdin is a pipe that the child will read from
             1 => array("file", $tn, 'w'),  // stdout is a pipe that the child will write to
-            2 => array("pipe", "w") // stderr is a file to write to
+            2 => array("file", $tne, 'w'),   // stderr is a file to write to
+          //  2 => array("pipe", "w") // stderr is a file to write to
          );
         
         static $php = false;
@@ -451,6 +458,7 @@ class Pman_Core_Notify extends Pman
                 'proc' => $p,
                 'pid' => $info['pid'],
                 'out' => $tn,
+                'oute' => $tne,
                 'cmd' => $cmd,
                 'email' => $email,
                 'pipes' => $pipes,
@@ -470,6 +478,8 @@ class Pman_Core_Notify extends Pman
         foreach($this->pool as $p) {
              
             //echo "CHECK PID: " . $p['pid'] . "\n";
+            
+            
             $info =  proc_get_status($p['proc']);
             //var_dump($info);
             
@@ -491,9 +501,10 @@ class Pman_Core_Notify extends Pman
                     proc_terminate($p['proc'], 9);
                     //fclose($p['pipes'][1]);
                     fclose($p['pipes'][0]);
-                    fclose($p['pipes'][2]);
-                    $this->logecho("TERMINATING: ({$p['pid']}) {$p['email']} " . $p['cmd'] . " : " . file_get_contents($p['out']));
+                    
+                    $this->logecho("TERMINATING: ({$p['pid']}) {$p['email']} " . $p['cmd'] . " : " . file_get_contents($p['out']) . " : " . file_get_contents($p['oute']));
                     @unlink($p['out']);
+                    @unlink($p['oute']);
                     
                     // schedule again
                     $w = DB_DataObject::factory($this->table);
@@ -512,20 +523,25 @@ class Pman_Core_Notify extends Pman
                 continue;
             }
             fclose($p['pipes'][0]);
-            fclose($p['pipes'][2]);
             //echo "CLOSING: ({$p['pid']}) " . $p['cmd'] . " : " . file_get_contents($p['out']) . "\n";
             //fclose($p['pipes'][1]);
             
             proc_close($p['proc']);
-            
-            
+             sleep(1);
+            clearstatcache();
+            if (file_exists('/proc/'. $p['pid'])) {
+                $this->logecho("proc PID={$p['pid']} still here - trying to wait");
+                pcntl_waitpid($p['pid'], $status, WNOHANG);
+            }
+
             //clearstatcache();
             //if (file_exists('/proc/'.$p['pid'])) {
             //    $pool[] = $p;
             //    continue;
             //}
-            $this->logecho("ENDED: ({$p['pid']}) {$p['email']} " .  $p['cmd'] . " : " . file_get_contents($p['out']) );
+            $this->logecho("ENDED: ({$p['pid']}) {$p['email']} " .  $p['cmd'] . " : " . file_get_contents($p['out']) . " : " . file_get_contents($p['oute']));
             @unlink($p['out']);
+            @unlink($p['oute']);
             // at this point we could pop onto the queue the 
             $this->popQueueDomain($p['email']);
             
@@ -596,6 +612,7 @@ class Pman_Core_Notify extends Pman
     function clearOld()
      {
           if ($this->server->isFirstServer()) {
+            
             $p = DB_DataObject::factory($this->table);
             $p->whereAdd("
                 sent < '2000-01-01'
index 8e4c0c6..e9c37a0 100644 (file)
@@ -109,11 +109,12 @@ class Pman_Core_NotifySend extends Pman
             $this->errorHandler("already sent - repeat to early\n");
         }
         
-        $this->server = DB_DataObject::Factory('core_notify_server')->getCurrent($this);
-        if (!$force && $w->server_id != $this->server->id) {
+        $this->server = DB_DataObject::Factory('core_notify_server')->getCurrent($this, $force);
+        if (!$force &&  $w->server_id != $this->server->id) {
             $this->errorHandler("Server id does not match - use force to try again\n");
         }
         
+        
         if (!empty($opts['debug'])) {
             print_r($w);
             $ff = HTML_FlexyFramework::get();
@@ -123,17 +124,30 @@ class Pman_Core_NotifySend extends Pman
             HTML_FlexyFramework::get()->Core_Mailer['debug'] = true;
         }
         
-        $sent = (empty($w->sent) || preg_match('/^0000/', $w->sent)) ? false : true;
+        $sent = (empty($w->sent) || strtotime( $w->sent) < 100 ) ? false : true;
         
         if (!$force && (!empty($w->msgid) || $sent)) {
             $ww = clone($w);
             if (!$sent) {   // fix sent.
-                $w->sent = $w->sent == '0000-00-00 00:00:00' ? $w->sqlValue('NOW()') :$w->sent; // do not update if sent.....
+                $w->sent = strtotime( $w->sent) < 100 ? $w->sqlValue('NOW()') :$w->sent; // do not update if sent.....
                 $w->update($ww);
             }    
             $this->errorHandler("message has been sent already.\n");
         }
         
+        // we have a bug with msgid not getting filled.
+        $cev = DB_DataObject::Factory('Events');
+        $cev->on_table =  $this->table;
+        $cev->on_id =  $w->id;
+        $cev->whereAdd("action IN ('NOTIFYSENT', 'NOTIFYFAIL')");
+        $cev->limit(1);
+        if ($cev->count()) {
+            $cev->find(true);
+            $w->flagDone($cev, $cev->action == 'NOTIFYSENT' ? 'alreadysent' : '');
+            $this->errorHandler( $cev->action . " (fix old) ".  $cev->remarks);
+        }
+        
+        
         $o = $w->object();
         
         if ($o === false)  {
@@ -332,11 +346,12 @@ class Pman_Core_NotifySend extends Pman
             $this->errorHandler(  $ev->remarks);
         }
         
+        $retry_when = date('Y-m-d H:i:s', strtotime('NOW + ' . $retry . ' MINUTES'));
         
         //$this->addEvent('NOTIFY', $w, 'GREYLISTED ' . $p->email . ' ' . $res->toString());
         // we can only update act_when if it has not been sent already (only happens when running in force mode..)
         // set act when if it's empty...
-        $w->act_when =  (!$w->act_when || $w->act_when == '0000-00-00 00:00:00') ? date('Y-m-d H:i:s', strtotime('NOW + ' . $retry . ' MINUTES')) : $w->act_when;
+        $w->act_when =  (!$w->act_when || $w->act_when == '0000-00-00 00:00:00') ? $retry_when : $w->act_when;
         
         $w->update($ww);
         
@@ -444,7 +459,7 @@ class Pman_Core_NotifySend extends Pman
                 
                 $ev->writeEventLog($this->debug_str);
                  
-                $w->flagDone($ev,$email['headers']['Message-Id']);
+                $w->flagDone($ev, $email['headers']['Message-Id']);
                 
                  
                 // enable cc in notify..
@@ -472,7 +487,7 @@ class Pman_Core_NotifySend extends Pman
             $code = empty($res->userinfo['smtpcode']) ? -1 : $res->userinfo['smtpcode'];
             if (!empty($res->code) && $res->code == 10001) {
                 // fake greylist if timed out.
-                $code = 421;
+                $code = -1; 
             }
             
             if ($code < 0) {
@@ -491,7 +506,7 @@ class Pman_Core_NotifySend extends Pman
                 //print_r($res);
                 $ev = $this->addEvent('NOTIFY', $w, 'GREYLISTED - ' . $errmsg);
                 
-                $this->server->updateNotifyToNextServer($w,  strtotime('NOW + ' . $retry . ' MINUTES'),true);
+                $this->server->updateNotifyToNextServer($w,  $retry_when,true);
                 
                 $this->errorHandler(  $ev->remarks);
             }
@@ -520,14 +535,17 @@ class Pman_Core_NotifySend extends Pman
                 $errmsg=  $res->userinfo['smtpcode'] . ':' . $res->userinfo['smtptext'];
             }
             
-            $ev = $this->addEvent('NOTIFYFAIL', $w, ($fail ? "FAILED - " : "RETRY TIME EXCEEDED - ") .  $errmsg);
-            $w->flagDone($ev, '');
-            
             if ($res->userinfo['smtpcode'] == 550) {
-                $this->server->checkSmtpResponse($errmsg, $core_domain);
+                if ($this->server->checkSmtpResponse($errmsg, $core_domain)) {
+                    $ev = $this->addEvent('NOTIFY', $w, 'BLACKLISTED  - ' . $errmsg);
+                    $this->server->updateNotifyToNextServer($w,  $retry_when,true);
+                    $this->errorHandler( $ev->remarks);
+                }
             }
-            
-
+             
+            $ev = $this->addEvent('NOTIFYFAIL', $w, ($fail ? "FAILED - " : "RETRY TIME EXCEEDED - ") .  $errmsg);
+            $w->flagDone($ev, '');
+             
             $this->errorHandler( $ev->remarks);
         }
         
@@ -536,9 +554,9 @@ class Pman_Core_NotifySend extends Pman
         
         // try again.
         
-        $ev = $this->addEvent('NOTIFY', $w, 'NO HOST CAN BE CONTACTED:' . $p->email);
+        $ev = $this->addEvent('NOTIFY', $w, 'GREYLIST - NO HOST CAN BE CONTACTED:' . $p->email);
         
-        $this->server->updateNotifyToNextServer($w,  strtotime('NOW + ' . $retry . ' MINUTES'),true);
+        $this->server->updateNotifyToNextServer($w,  $retry_when ,true);
 
         
          
index 6f79c56..9751443 100644 (file)
@@ -28,6 +28,8 @@ ALTER TABLE core_notify ADD COLUMN person_table VARCHAR(256) NOT NULL DEFAULT ''
 
 -- ?? why added???  - probably need to document this..
 ALTER TABLE core_notify ADD COLUMN domain_id INT(11)  NOT NULL  DEFAULT 0;
+ALTER TABLE core_notify ADD COLUMN server_id INT(11) NOT NULL DEFAULT -1;
+
 
 ALTER TABLE core_notify ADD   INDEX lookup(act_when, msgid);
 
@@ -41,6 +43,8 @@ ALTER TABLE core_notify add   index lookup_d (person_id, msgid, ontable);
 ALTER TABLE core_notify ADD   INDEX lookup_e (onid, ontable, person_id, act_when);
 ALTER TABLE core_notify ADD   INDEX lookup_f (to_email);
 alter table core_notify add index lookup_g(sent, act_start, act_when);
+alter table core_notify add   INDEX lookup_h (sent, event_id, server_id, msgid, ontable);
+
 
 ALTER TABLE core_notify ADD INDEX lookup_person_id (person_id);
 ALTER TABLE core_notify ADD INDEX lookup_trigger_person_id (trigger_person_id);