addQueueSize(); } } function addQueueSize() { // look for database links for server_id (which should find core_notify + others..) $cn = get_class(DB_DataObject::factory('core_notify')); $tables = array(); foreach($this->databaseLinks() as $tbl => $kv) { foreach($kv as $k=>$v) { if ($v != 'core_notify_server:id') { continue; } $test = DB_DAtaObject::factory($tbl); if (!is_a($test, $cn)) { break; } $tables[] = $tbl; } } if (empty($tables)) { die("OOPS - no tables for notify_server references"); } $totals = array(); foreach($tables as $t) { $totals[] = " COALESCE((SELECT count(id) FROM $t WHERE server_id = core_notify_server.id AND sent < '1970-01-01' OR sent IS NULL and event_id = 0 ),0) "; } $this->selectAdd("(" . implode(" + ", $totals) . ") as in_queue "); } // most services should call this first.. function getCurrent($notify, $force = false) { static $current = false;; if ($current !== false) { return $current; } $ns = DB_DataObject::factory('core_notify_server'); if (!$ns->count()) { $ns->id = 0; return $ns; } $ns->poolname = $notify->poolname; $ns->is_active = 1; $ns->hostname = gethostname(); $ns->limit(1); if ($ns->find(true)) { $current = $ns; return $ns; } if (!$force) { $notify->jerr("Server not found for this server " . gethostname() . " in core_notify_server" ); } // fallback to any server - if we are using force. (this is so helo will work...) $ns = DB_DataObject::factory('core_notify_server'); $ns->is_active = 1; $ns->hostname = gethostname(); if (!$ns->find(true)) { $notify->jerr("Server not found for this server " . gethostname() . " in core_notify_server" ); } $current = $ns; return $ns; } function isFirstServer() { if (!$this->id) { return true; } $servers = $this->availableServers(); if (empty($servers)) { return false; } // only run this on the first server... return $this->id == $servers[0]->id; } // called on current server. function assignQueues($notify) { if (!$this->id) { return true; } $servers = $this->availableServers(); $ids = array(); $up = array(); foreach($servers as $s) { $ids[] = $s->id; } if (empty($ids)) { $notify->jerr("no configured servers in core_notify_server for poolname = {$notify->poolname}"); } // only run this on the first server... if ($this->id != $ids[0]) { return; } foreach($ids as $rn) { $up[$rn] = array(); } $num_servers = count($ids); if ($num_servers == 1) { $p = DB_DataObject::factory($notify->table); $p->query(" UPDATE {$notify->table} SET server_id = {$ids[0]} WHERE sent < '2000-01-01' and event_id = 0 and act_start < NOW() + INTERVAL 3 HOUR and server_id != {$ids[0]} "); return; } $p = DB_DataObject::factory($notify->table); $p->whereAdd(" sent < '2000-01-01' and event_id = 0 and act_start < NOW() + INTERVAL 3 HOUR and server_id NOT IN (" . implode(",", $ids) . ") "); $p->orderBy('act_when asc'); //? $total_add = $p->count(); if ($total_add < 1) { return; } $to_add = $p->fetchAll('id'); $p = DB_DataObject::factory($notify->table); $p->whereAdd(" sent < '2000-01-01' and event_id = 0 and server_id IN (" . implode(",", $ids) . ") "); $p->selectAdd(); $p->selectAdd('server_id, count(id) as n'); $p->groupBy('server_id'); $in_q = $p->fetchAll('server_id', 'n'); // if queue is empty it will not get allocated anything. foreach($ids as $sid) { if (!isset($in_q[$sid])) { $in_q[$sid] = 0; } } $totalq = 0; foreach($in_q as $sid => $n) { $totalq += $n; } // new average queue $target_len = floor( ($totalq + $total_add) / $num_servers ); foreach($in_q as $sid => $cq) { if ( $cq > $target_len) { continue; } $up[ $sid ] = array_slice($to_add, 0, $target_len - $cq); } // add the reminder evently foreach($to_add as $n=>$i) { $up[ $ids[$n % $num_servers] ][] = $i; } // distribution needs to go to ones that have the shortest queues. - so to balance out the queues foreach($up as $sid => $nids) { if (empty($nids)) { continue; } $p = DB_DataObject::factory($notify->table); $p->query(" UPDATE {$notify->table} SET server_id = $sid WHERE id IN (". implode(',', $nids). ')' ); } DB_DataObject::factory("core_notify_blacklist")->prune(); } // called on current server. function availableServers() { $ns = DB_DataObject::factory('core_notify_server'); $ns->poolname = $this->poolname; $ns->is_active = 1; $ns->orderBy('id ASC'); return $ns->fetchAll(); } function updateNotifyToNextServer( $cn , $when = false, $allow_same = false) { if (!$this->id) { return; } // fixme - this should take into account blacklisted - and return false if no more servers are available $email = empty($cn->to_email) ? ($cn->person() ? $cn->person()->email : $cn->to_email) : $cn->to_email; $w = DB_DataObject::factory($cn->tableName()); $w->get($cn->id); $servers = $this->availableServers(); $start = 0; foreach($servers as $i => $s) { if ($s->id == $this->id) { $start = $i; } } $offset = ($start + 1) % count($servers); $good = false; while ($offset != $start) { $s = $servers[$offset]; if (!$s->isBlacklisted($email)) { $good = $s; break; } $offset = ($offset + 1) % count($servers); var_dump($offset); } if ($good == false && $allow_same) { $good = $this; } if ($good == false) { return false; } // next server.. $pp = clone($w); $w->server_id = $good->id; $w->act_when = $when === false ? $w->sqlValue('NOW() + INTERVAL 1 MINUTE') : $when; $w->update($pp); return true; } function isBlacklisted($email) { if (!$this->id) { return false; } // return current server id.. static $cache = array(); // get the domain.. $ea = explode('@',$email); $dom = strtolower(array_pop($ea)); if (isset( $cache[$this->id . '-'. $dom])) { return $cache[$this->id . '-'. $dom]; } $cd = DB_DataObject::factory('core_domain')->loadOrCreate($dom); $bl = DB_DataObject::factory('core_notify_blacklist'); $bl->server_id = $this->id; $bl->domain_id = $cd->id; if ($bl->count()) { $cache[$this->id . '-'. $dom] = true; return true; } return false; } function initHelo() { if (!$this->id) { return; } $ff = HTML_FlexyFramework::get(); $ff->Mail['helo'] = $this->helo; } function checkSmtpResponse($errmsg, $core_domain) { if (!$this->id) { return false; } $bl = DB_DataObject::factory('core_notify_blacklist'); $bl->server_id = $this->id; $bl->domain_id = $core_domain->id; if ($bl->count()) { return true; } // is it a blacklist message if (!$bl->messageIsBlacklisted($errmsg)) { return false; } $bl->error_str = $errmsg; $bl->added_dt = $bl->sqlValue("NOW()"); $bl->insert(); return true; } }