09db236e40eb347f5bda16e629bba44ef8a63d62
[Pman.Core] / DataObjects / Core_notify_server.php
1 <?php
2 /**
3  * Table Definition for core_notify_server
4  */
5 class_exists('DB_DataObject') ? '' : require_once 'DB/DataObject.php';
6
7 class Pman_Core_DataObjects_Core_notify_server extends DB_DataObject 
8 {
9     ###START_AUTOCODE
10     /* the code below is auto generated do not remove the above tag */
11
12     public $__table = 'core_notify_server';    // table name
13     public $id;                              // int(11)  not_null primary_key auto_increment
14     public $hostname;
15     public $helo;
16     public $poolname;
17     public $is_active;
18     public $last_send;
19     
20     
21     
22     function  applyFilters($q, $au, $roo)
23     {
24         if (isset($q['_with_queue_size'])) {
25             $this->addQueueSize();
26         }
27     }
28     
29     
30     function addQueueSize()
31     {
32         // look for database links for server_id (which should find core_notify + others..)
33         $cn = get_class(DB_DataObject::factory('core_notify'));
34         $tables = array();
35         foreach($this->databaseLinks() as $tbl => $kv) {
36             foreach($kv as $k=>$v) {
37                 if ($v != 'core_notify_server:id') {
38                     continue;
39                 }
40                 
41                 $test = DB_DAtaObject::factory($tbl);
42                 if (!is_a($test, $cn)) {
43                     break;
44                 }
45                 $tables[] = $tbl;
46             }
47         }
48         if (empty($tables)) {
49             die("OOPS - no tables for notify_server references");
50         }
51         $totals = array();
52         foreach($tables as $t) {
53             $totals[] = "
54                 COALESCE((SELECT
55                     count(id)
56                 FROM
57                     $t
58                 WHERE
59                     server_id = core_notify_server.id
60                 AND
61                     sent < '1970-01-01' OR sent IS NULL
62                  and
63                         event_id = 0
64                 ),0)
65             ";
66         }
67         $this->selectAdd("(" . implode(" + ", $totals) . ") as in_queue ");
68         
69         
70         
71         
72         
73     }
74     
75     
76     // most services should call this first..
77     
78     function getCurrent($notify, $force = false)
79     {
80         static $current = false;;
81         
82         if ($current !== false) {
83             return $current;
84         }
85         
86         $ns = DB_DataObject::factory('core_notify_server');
87         
88         $ns->poolname = $notify->poolname;
89         $ns->is_active = 1;
90         $ns->hostname = gethostname();
91         $ns->limit(1);
92         if ($ns->find(true)) {
93             $current = $ns;
94             return $ns;
95         }
96         if (!$force) {
97             $notify->jerr("Server not found for this server " .  gethostname() . " in core_notify_server" );
98         }
99         // fallback to any server - if we are using force. (this is so helo will work...)
100         
101         $ns = DB_DataObject::factory('core_notify_server');
102         $ns->is_active = 1;
103         $ns->hostname = gethostname();
104         if (!$ns->find(true)) {
105             $notify->jerr("Server not found for this server " .  gethostname() . " in core_notify_server" );
106         }
107         $current = $ns;
108         return $ns;
109     }
110     
111     
112     function isFirstServer()
113     {
114         $servers = $this->availableServers();
115         if (empty($servers)) {
116             return false;
117         }
118         // only run this on the first server...
119         return $this->id == $servers[0]->id;
120     }
121     
122     
123     // called on current server.
124     function assignQueues($notify)
125     {
126          
127         
128         $servers = $this->availableServers();
129         $ids = array();
130         foreach($servers as $s) {
131             $ids[] = $s->id;
132         }
133         
134         
135         if (empty($ids)) {
136             $notify->jerr("no configured servers in core_notify_server for poolname = {$notify->poolname}");
137             
138         }
139          
140         // only run this on the first server...
141         if ($this->id != $ids[0]) {
142             return; 
143         }
144         foreach($ids as $rn) {
145             $up[$rn]  = array();
146         }
147         
148         $num_servers = count($ids);
149         
150         if ($num_servers == 1) {
151             $p = DB_DataObject::factory($notify->table);
152             $p->query("
153                 UPDATE
154                     {$notify->table}
155                 SET
156                     server_id = {$ids[0]}
157                 WHERE
158                     sent < '2000-01-01'
159                     and
160                     event_id = 0
161                     and
162                     act_start < NOW() +  INTERVAL 3 HOUR 
163                     and
164                     server_id != {$ids[0]}
165             ");
166             return;
167         }
168         
169         
170         
171         $p = DB_DataObject::factory($notify->table);
172         $p->whereAdd("
173                 sent < '2000-01-01'
174                 and
175                 event_id = 0
176                 and
177                 act_start < NOW() +  INTERVAL 3 HOUR 
178                 and
179                 server_id NOT IN (" . implode(",", $ids) . ")
180         ");
181         $p->orderBy('act_when asc'); //?
182         $total_add = $p->count();
183         if ($total_add < 1) {
184             return;
185         }
186         
187         $to_add = $p->fetchAll('id');
188         
189         $p = DB_DataObject::factory($notify->table);
190         $p->whereAdd("
191                 sent < '2000-01-01'
192                 and
193                 event_id = 0
194         
195                 and
196                 server_id IN (" . implode(",", $ids) . ")
197         ");
198         $p->selectAdd();
199         $p->selectAdd('server_id, count(id) as  n');
200         $p->groupBy('server_id');
201         $in_q = $p->fetchAll('server_id', 'n');
202         
203         // if queue is empty it will not get allocated anything.
204         foreach($ids as $sid) {
205             if (!isset($in_q[$sid])) {
206                 $in_q[$sid] = 0;
207             }
208         }
209         $totalq = 0;
210         foreach($in_q as $sid => $n) {
211             $totalq += $n;
212         }
213         
214         
215         // new average queue
216         $target_len = floor(  ($totalq + $total_add) / $num_servers );
217         
218         foreach($in_q as $sid => $cq) {
219             if ( $cq > $target_len) {
220                 continue;
221             }
222             $up[ $sid ] = array_slice($to_add, 0, $target_len - $cq);
223         }
224         
225         // add the reminder evently
226         foreach($to_add as $n=>$i) {
227             $up[  $ids[$n % $num_servers] ][] = $i;
228         }
229         
230         // distribution needs to go to ones that have the shortest queues. - so to balance out the queues
231         
232          
233         
234         foreach($up as $sid => $nids) {
235             if (empty($nids)) {
236                 continue;
237             }
238             $p = DB_DataObject::factory($notify->table);
239             $p->query("
240                 UPDATE
241                     {$notify->table}
242                 SET
243                     server_id = $sid
244                 WHERE
245                     id IN (". implode(',', $nids). ')'
246             );
247         }
248          
249         DB_DataObject::factory("core_notify_blacklist")->prune();
250         
251     }
252         // called on current server.
253
254     function availableServers()
255     {
256         $ns = DB_DataObject::factory('core_notify_server');
257         $ns->poolname = $this->poolname;
258         $ns->is_active = 1;
259         $ns->orderBy('id ASC');
260         return  $ns->fetchAll();
261         
262     }
263     
264     function updateNotifyToNextServer( $cn , $when = false, $allow_same = false)
265     {
266         // fixme - this should take into account blacklisted - and return false if no more servers are available
267         $email = empty($cn->to_email) ? ($cn->person() ? $cn->person()->email : $cn->to_email) : $cn->to_email;
268
269         $w = DB_DataObject::factory($cn->tableName());
270         $w->get($cn->id);
271         
272         $servers = $this->availableServers();
273         $start = 0;
274         foreach($servers as $i => $s) {
275             if ($s->id == $this->id) {
276                 $start = $i;
277             }
278         }
279         
280         $offset = ($start + 1)  % count($servers);
281         $good = false;
282         while ($offset  != $start) {
283             $s = $servers[$offset];
284             if (!$s->isBlacklisted($email)) {
285                 $good = $s;
286                 break;
287             }
288             $offset = ($offset + 1)  % count($servers);
289             var_dump($offset);
290         }
291         if ($good == false && $allow_same) {
292             $good = $this;
293         }
294         
295         if ($good == false) {
296             return false;
297         }
298         
299         
300         // next server..
301         $pp = clone($w);
302         $w->server_id = $good->id;   
303         $w->act_when = $when === false ? $w->sqlValue('NOW() + INTERVAL 1 MINUTE') : $when;
304         $w->update($pp);
305         return true;
306     }
307     
308     
309     function isBlacklisted($email)
310     {
311         // return current server id..
312         static $cache = array();
313          // get the domain..
314         $ea = explode('@',$email);
315         $dom = strtolower(array_pop($ea));
316         if (isset( $cache[$this->id . '-'. $dom])) {
317             return  $cache[$this->id . '-'. $dom];
318         }
319         
320         $cd = DB_DataObject::factory('core_domain')->loadOrCreate($dom);
321         
322         $bl = DB_DataObject::factory('core_notify_blacklist');
323         $bl->server_id = $this->id;
324         $bl->domain_id = $cd->id;
325         if ($bl->count()) {
326             $cache[$this->id . '-'. $dom] = true;
327             return true;
328         }
329         
330         return false; 
331     }
332     function initHelo()
333     {
334         $ff = HTML_FlexyFramework::get();
335         $ff->Mail['helo'] = $this->helo;
336         
337     }
338     function checkSmtpResponse($errmsg, $core_domain)
339     {
340         $bl = DB_DataObject::factory('core_notify_blacklist');
341         $bl->server_id = $this->id;
342         $bl->domain_id = $core_domain->id;
343         if ($bl->count()) {
344             return true;
345         }
346         // is it a blacklist message
347         if (!$bl->messageIsBlacklisted($errmsg)) {
348             return false;
349         }
350         $bl->error_str = $errmsg;
351         $bl->added_dt = $bl->sqlValue("NOW()");
352         $bl->insert();
353         return true;
354         
355     }
356     
357 }