DataObjects/Core_event_audit.php
[Pman.Core] / Notify.php
index 24b188d..dc3558c 100644 (file)
@@ -16,6 +16,49 @@ require_once 'Pman.php';
 class Pman_Core_Notify extends Pman
 {
     
+    static $cli_desc = "Send out notification emails (usually from cron)";
+    
+    static $cli_opts = array(
+        'debug' => array(
+            'desc' => 'Turn on debugging (see DataObjects debugLevel )',
+            'default' => 0,
+            'short' => 'v',
+            'min' => 1,
+            'max' => 1,
+            
+        ),
+        'list' => array(
+            'desc' => 'List message to send, do not send them..',
+            'default' => 0,
+            'short' => 'l',
+            'min' => 0,
+            'max' => 0,
+            
+        ),
+        'old' => array(
+            'desc' => 'Show old messages..',
+            'default' => 0,
+            'short' => 'o',
+            'min' => 0,
+            'max' => 0,
+            
+        ),
+         'force' => array(
+            'desc' => 'Force redelivery, even if it has been sent before or not queued...',
+            'default' => 0,
+            'short' => 'f',
+            'min' => 0,
+            'max' => 0,
+        ),
+    );
+    
+    
+    
+    var $table = 'core_notify';
+    var $target = 'Core/NotifySend';
+    var $evtype = ''; // any notification...
+                    // this script should only handle EMAIL notifications..
+    
     function getAuth()
     {
         $ff = HTML_FlexyFramework::get();
@@ -29,51 +72,141 @@ class Pman_Core_Notify extends Pman
     
     var $pool = array();
     
-    function get()    
+    function get($r,$opts)    
     {
-        //DB_DataObject::debugLevel(1);
+        if ($opts['debug']) {
+            DB_DataObject::debugLevel($opts['debug']);
+            print_r($opts);
+        }
         //date_default_timezone_set('UTC');
        // phpinfo();exit;
+        $showold = !empty($opts['old']);
+        if (!empty($opts['old'])) {
+            $opts['list'] = 1; // force listing..
+        }
+        
+        $this->force = empty($opts['force']) ? 0 : 1;
+     
+        if (!empty($opts['send-to'])) {
+            $this->send_to = $opts['send-to'];
+        }
+     
+        
+        $w = DB_DataObject::factory($this->table);
+        
+        if (!$showold) {
+            $w->whereAdd('act_when > sent'); // eg.. sent is not valid..
+            
+            if (!$this->force) {
+                $w->whereAdd('act_when < NOW()'); // eg.. not if future..
+            }
+    
+            $w->orderBy('act_when ASC'); // oldest first.
+            $w->limit(1000); // we can run 1000 ...
+        } else {
+            $w->orderBy('act_when DESC'); // latest first
+            $w->limit(50); // we can run 1000 ...
+        }
+        if (!empty($this->evtype)) {
+            $w->evtype = $this->evtype;
+        }
+        
+        $w->autoJoin();
+        
+        
+        $ar = $w->fetchAll();
+        
+        if (!empty($opts['list'])) {
+            if (empty($ar)) {
+                die("Nothing in Queue\n");
+            }
+            foreach($ar as $w) {
+                $o = $w->object();
+                
+                
+                echo "$w->id : $w->person_id_email email    : ".
+                        $o->toEventString()."    ". $w->status() . "\n";
+            }
+            exit;
+        }
+        
         
-        $w = DB_DataObject::factory('core_notify');
-        $w->whereAdd('act_when < sent');
-        $w->orderBy('act_when ASC'); // oldest first.
-        $w->limit(1000); // we can run 1000 ...
-        $ar = $w->fetchAll('id');
         
         while (true) {
             if (empty($ar)) {
                 break;
             }
+            
+            $p = array_shift($ar);
             if (!$this->poolfree()) {
-                sleep(10);
+                array_unshift($ar,$p); /// put it back on..
+                sleep(3);
                 continue;
             }
-            $p = array_shift($ar);
-            $this->run($p);
+            if ($this->poolHasDomain($p->person_id_email)) {
+                $ar[] = $p; // push it on the end..
+                echo "domain {$p->person_id_email} already on queue, pushing to end.\n";
+                sleep(3);
+                continue;
+            }
+            
+            
+            $this->run($p->id,$p->person_id_email);
+        }
+        while(count($this->pool)) {
+            $this->poolfree();
+            sleep(3);
         }
-         
+        
+        die("DONE\n");
     }
     
-    function run($id)
+    function run($id, $email)
     {
-        
+       // phpinfo();exit;
+        $tn = tempnam(ini_get('session.save_path'),'stdout') . '.stdout';
+        $descriptorspec = array(
+            0 => array("pipe", 'r'),  // stdin is a pipe that the child will read from
+            1 => array("file", $tn, 'w'),  // stdout is a pipe that the child will write to
+            2 => array("pipe", "w") // stderr is a file to write to
+         );
         $php = $_SERVER["_"];
-        $cwd = realpath(dirname(__FILE__) . '/../..');
-        $app = $cwd . '/'. $_SERVER["SCRIPT_NAME"] . '  Core/NotifySend/'. $id;
-        $cmd = $php . ' ' . $app;
+        $sn =  $_SERVER["SCRIPT_NAME"];
+        
+        $cwd = $sn[0] == '/' ? dirname($sn) : dirname(realpath(getcwd() . $sn)); // same as run on.. (so script should end up being same relatively..)
+        $app = $cwd . '/' . basename($_SERVER["SCRIPT_NAME"]) . '  ' . $this->target . '/'. $id;
+        if ($this->force) {
+            $app .= ' -f';
+        }
+        if (!empty($this->send_to)) {
+            $app .= ' --sent-to='.escapeshellarg($this->send_to);
+        }
+        $cmd = $php . ' ' . $app. ' &';
+        
         echo $cmd . "\n";
-        $p = proc_open($cmd, $cwd );
-        $this->pool[] = $p;
+        $pipe = array();
+        $p = proc_open($cmd, $descriptorspec, $pipes, $cwd );
+        $this->pool[] = array(
+                'proc' => $p,
+                'out' => $tn,
+                'cmd' => $cmd,
+                'email' => $email
+        );
     }
     
-    function poolfree() {
+    function poolfree()
+    {
         $pool = array();
         foreach($this->pool as $p) {
-            $ar = proc_get_Status($p);
-            if (!$p['running']) {
+            $ar = proc_get_status($p['proc']);
+           // print_r($p);
+            //print_r($ar);
+            if ($ar['running']) {
                 $pool[] = $p;
+                continue;
             }
+            echo $p['cmd'] . " : " . file_get_contents($p['out']);
+            //unlink($p['out']);
         }
         $this->pool = $pool;
         if (count($pool) < 10) {
@@ -82,6 +215,22 @@ class Pman_Core_Notify extends Pman
         return false;
         
     }
-    
-    
+    /**
+     * see if pool is already trying to deliver to this domain.?
+     * -- if so it get's pushed to the end of the queue.
+     *
+     */
+    function poolHasDomain($email)
+    {
+        $dom = strtolower(array_pop(explode('@',$email)));
+        foreach($this->pool as $p) {
+            $mdom = strtolower(array_pop(explode('@',$p['email'])));
+            if ($mdom == $dom) {
+                return true;
+            }
+        }
+        return false;
+        
+    }
+
 }
\ No newline at end of file