function get($r,$opts=array())
{
+
+ // if ($this->database_is_locked()) {
+ // die("LATER - DATABASE IS LOCKED");
+ //}
+
+
$this->parseArgs($opts);
//date_default_timezone_set('UTC');
if (!$this->force) {
$w->whereAdd('act_when < NOW()'); // eg.. not if future..
}
+
$w->orderBy('act_when ASC'); // oldest first.
$total = min($w->count(), $opts['limit']);
$black = $this->server->isBlacklisted($email);
if ($black !== false) {
-
+ $this->logecho("Blacklisted - try giving it to next server");
if (false === $this->server->updateNotifyToNextServer($p)) {
$ev = $this->addEvent('NOTIFY', $p, 'BLACKLISTED FROM our DB');
- $this->server->updateNotifyToNextServer($w, strtotime('NOW + 5 MINUTES'),true);
+ // we dont have an althenative server to update it with.
+ $this->logecho("Blacklisted - next server did not work - try again in 30 mins");
+ $this->server->updateNotifyToNextServer($w, date("Y-m-d H:i:s", strtotime('NOW + 30 MINUTES')),true);
// $this->errorHandler( $ev->remarks);
+
}
continue;
foreach($this->pool as $p) {
//echo "CHECK PID: " . $p['pid'] . "\n";
+
+
$info = proc_get_status($p['proc']);
//var_dump($info);
//fclose($p['pipes'][1]);
proc_close($p['proc']);
-
-
+ sleep(1);
+ clearstatcache();
+ if (file_exists('/proc/'. $p['pid'])) {
+ $this->logecho("proc PID={$p['pid']} still here - trying to wait");
+ pcntl_waitpid($p['pid'], $status, WNOHANG);
+ }
+
//clearstatcache();
//if (file_exists('/proc/'.$p['pid'])) {
// $pool[] = $p;
// continue;
//}
- $this->logecho("ENDED: ({$p['pid']}) {$p['email']} " . $p['cmd'] . " : " . file_get_contents($p['out']) );
+ $this->logecho("ENDED: ({$p['pid']}) {$p['email']} " . $p['cmd'] . " : " . file_get_contents($p['out']) . " : " . file_get_contents($p['oute']));
@unlink($p['out']);
@unlink($p['oute']);
// at this point we could pop onto the queue the
function clearOld()
{
if ($this->server->isFirstServer()) {
+
$p = DB_DataObject::factory($this->table);
$p->whereAdd("
sent < '2000-01-01'