3 * Table Definition for core_notify_server
5 class_exists('DB_DataObject') ? '' : require_once 'DB/DataObject.php';
7 class Pman_Core_DataObjects_Core_notify_server extends DB_DataObject
10 /* the code below is auto generated do not remove the above tag */
12 public $__table = 'core_notify_server'; // table name
13 public $id; // int(11) not_null primary_key auto_increment
22 function applyFilters($q, $au, $roo)
24 if (isset($q['_with_queue_size'])) {
25 $this->addQueueSize();
30 function addQueueSize()
32 // look for database links for server_id (which should find core_notify + others..)
33 $cn = get_class(DB_DataObject::factory('core_notify'));
35 foreach($this->databaseLinks() as $tbl => $kv) {
36 foreach($kv as $k=>$v) {
37 if ($v != 'core_notify_server:id') {
41 $test = DB_DAtaObject::factory($tbl);
42 if (!is_a($test, $cn)) {
49 die("OOPS - no tables for notify_server references");
52 foreach($tables as $t) {
59 server_id = core_notify_server.id
61 sent < '1970-01-01' OR sent IS NULL
67 $this->selectAdd("(" . implode(" + ", $totals) . ") as in_queue ");
76 // most services should call this first..
78 function getCurrent($notify, $force = false)
80 static $current = false;;
82 if ($current !== false) {
86 $ns = DB_DataObject::factory('core_notify_server');
88 $ns->poolname = $notify->poolname;
90 $ns->hostname = gethostname();
92 if ($ns->find(true)) {
97 $notify->jerr("Server not found for this server " . gethostname() . " in core_notify_server" );
99 // fallback to any server - if we are using force. (this is so helo will work...)
101 $ns = DB_DataObject::factory('core_notify_server');
103 $ns->hostname = gethostname();
104 if (!$ns->find(true)) {
105 $notify->jerr("Server not found for this server " . gethostname() . " in core_notify_server" );
112 function isFirstServer()
114 $servers = $this->availableServers();
115 if (empty($servers)) {
118 // only run this on the first server...
119 return $this->id == $servers[0]->id;
123 // called on current server.
124 function assignQueues($notify)
128 $servers = $this->availableServers();
130 foreach($servers as $s) {
136 $notify->jerr("no configured servers in core_notify_server for poolname = {$notify->poolname}");
140 // only run this on the first server...
141 if ($this->id != $ids[0]) {
144 foreach($ids as $rn) {
148 $num_servers = count($ids);
150 if ($num_servers == 1) {
151 $p = DB_DataObject::factory($notify->table);
156 server_id = {$ids[0]}
162 act_start < NOW() + INTERVAL 3 HOUR
164 server_id != {$ids[0]}
171 $p = DB_DataObject::factory($notify->table);
177 act_start < NOW() + INTERVAL 3 HOUR
179 server_id NOT IN (" . implode(",", $ids) . ")
181 $p->orderBy('act_when asc'); //?
182 $total_add = $p->count();
183 if ($total_add < 1) {
187 $to_add = $p->fetchAll('id');
189 $p = DB_DataObject::factory($notify->table);
196 server_id IN (" . implode(",", $ids) . ")
199 $p->selectAdd('server_id, count(id) as n');
200 $p->groupBy('server_id');
201 $in_q = $p->fetchAll('server_id', 'n');
203 // if queue is empty it will not get allocated anything.
204 foreach($ids as $sid) {
205 if (!isset($in_q[$sid])) {
210 foreach($in_q as $sid => $n) {
216 $target_len = floor( ($totalq + $total_add) / $num_servers );
218 foreach($in_q as $sid => $cq) {
219 if ( $cq > $target_len) {
222 $up[ $sid ] = array_slice($to_add, 0, $target_len - $cq);
225 // add the reminder evently
226 foreach($to_add as $n=>$i) {
227 $up[ $ids[$n % $num_servers] ][] = $i;
230 // distribution needs to go to ones that have the shortest queues. - so to balance out the queues
234 foreach($up as $sid => $nids) {
238 $p = DB_DataObject::factory($notify->table);
245 id IN (". implode(',', $nids). ')'
249 DB_DataObject::factory("core_notify_blacklist")->prune();
252 // called on current server.
254 function availableServers()
256 $ns = DB_DataObject::factory('core_notify_server');
257 $ns->poolname = $this->poolname;
259 $ns->orderBy('id ASC');
260 return $ns->fetchAll();
264 function updateNotifyToNextServer( $cn , $when = false, $allow_same = false)
266 // fixme - this should take into account blacklisted - and return false if no more servers are available
267 $email = empty($cn->to_email) ? ($cn->person() ? $cn->person()->email : $cn->to_email) : $cn->to_email;
269 $w = DB_DataObject::factory($cn->tableName());
272 $servers = $this->availableServers();
274 foreach($servers as $i => $s) {
275 if ($s->id == $this->id) {
280 $offset = ($start + 1) % count($servers);
282 while ($offset != $start) {
283 $s = $servers[$offset];
284 if (!$s->isBlacklisted($email)) {
288 $offset = ($offset + 1) % count($servers);
291 if ($good == false && $allow_same) {
295 if ($good == false) {
302 $w->server_id = $good->id;
303 $w->act_when = $when === false ? $w->sqlValue('NOW() + INTERVAL 1 MINUTE') : $when;
309 function isBlacklisted($email)
311 // return current server id..
312 static $cache = array();
314 $ea = explode('@',$email);
315 $dom = strtolower(array_pop($ea));
316 if (isset( $cache[$this->id . '-'. $dom])) {
317 return $cache[$this->id . '-'. $dom];
320 $cd = DB_DataObject::factory('core_domain')->loadOrCreate($dom);
322 $bl = DB_DataObject::factory('core_notify_blacklist');
323 $bl->server_id = $this->id;
324 $bl->domain_id = $cd->id;
326 $cache[$this->id . '-'. $dom] = true;
334 $ff = HTML_FlexyFramework::get();
335 $ff->Mail['helo'] = $this->helo;
338 function checkSmtpResponse($errmsg, $core_domain)
340 $bl = DB_DataObject::factory('core_notify_blacklist');
341 $bl->server_id = $this->id;
342 $bl->domain_id = $core_domain->id;
346 // is it a blacklist message
347 if (!$bl->messageIsBlacklisted($errmsg)) {
350 $bl->error_str = $errmsg;
351 $bl->added_dt = $bl->sqlValue("NOW()");