add queue sizes to server list
[Pman.Core] / DataObjects / Core_notify_server.php
1 <?php
2 /**
3  * Table Definition for core_notify_server
4  */
5 class_exists('DB_DataObject') ? '' : require_once 'DB/DataObject.php';
6
7 class Pman_Core_DataObjects_Core_notify_server extends DB_DataObject 
8 {
9     ###START_AUTOCODE
10     /* the code below is auto generated do not remove the above tag */
11
12     public $__table = 'core_notify_server';    // table name
13     public $id;                              // int(11)  not_null primary_key auto_increment
14     public $hostname;
15     public $helo;
16     public $poolname;
17     public $is_active;
18     public $last_send;
19     
20     
21     
22     function  applyFilters($q, $au, $roo)
23     {
24         if (isset($q['_with_queue_size'])) {
25             $this->addQueueSize();
26         }
27     }
28     
29     
30     function addQueueSize()
31     {
32         // look for database links for server_id (which should find core_notify + others..)
33         $cn = get_class(DB_DataObject::factory('core_notify'));
34         $tables = array();
35         foreach($this->databaseLinks() as $tbl => $kv) {
36             foreach($kv as $k=>$v) {
37                 if ($v != 'core_notify_server:id') {
38                     continue;
39                 }
40                 
41                 $test = DB_DAtaObject::factory($tbl);
42                 if (!is_a($test, $cn)) {
43                     break;
44                 }
45                 $tables[] = $tbl;
46             }
47         }
48         if (empty($tables)) {
49             die("OOPS - no tables for notify_server references");
50         }
51         $totals = array();
52         foreach($tables as $t) {
53             $totals[] = "
54                 COALESCE((SELECT
55                     count(id)
56                 FROM
57                     $t
58                 WHERE
59                     server_id = core_notify_server.id
60                 AND
61                     sent < '1970-01-01' OR sent IS NULL
62                  and
63                         event_id = 0
64                 ),0)
65             ";
66         }
67         $this->selectAdd("(" . implode(" + ", $totals) . ") as in_queue ");
68         
69         
70         
71         
72         
73     }
74     
75     
76     // most services should call this first..
77     
78     function getCurrent($notify)
79     {
80         static $current = false;;
81         
82         if ($current !== false) {
83             return $current;
84         }
85         
86         $ns = DB_DataObject::factory('core_notify_server');
87         $ns->poolname = $notify->poolname;
88         $ns->is_active = 1;
89         $ns->hostname = gethostname();
90         if (!$ns->count()) {
91             $notify->jerr("Server not found for this server " .  gethostname() . " in core_notify_server" );
92         }
93         $ns->find(true);
94         $current = $ns;
95         return $ns;
96     }
97     
98     
99     function isFirstServer()
100     {
101         $servers = $this->availableServers();
102         if (empty($servers)) {
103             return false;
104         }
105         // only run this on the first server...
106         return $this->id == $servers[0]->id;
107     }
108     
109     
110     // called on current server.
111     function assignQueues($notify)
112     {
113          
114         
115         $servers = $this->availableServers();
116         $ids = array();
117         foreach($servers as $s) {
118             $ids[] = $s->id;
119         }
120         
121         
122         if (empty($ids)) {
123             $notify->jerr("no configured servers in core_notify_server for poolname = {$notify->poolname}");
124             
125         }
126          
127         // only run this on the first server...
128         if ($this->id != $ids[0]) {
129             return; 
130         }
131         foreach($ids as $rn) {
132             $up[$rn]  = array();
133         }
134         
135         $num_servers = count($ids);
136         
137         if ($num_servers == 1) {
138             $p = DB_DataObject::factory($notify->table);
139             $p->query("
140                 UPDATE
141                     {$notify->table}
142                 SET
143                     server_id = {$ids[0]}
144                 WHERE
145                     sent < '2000-01-01'
146                     and
147                     event_id = 0
148                     and
149                     act_start < NOW() +  INTERVAL 3 HOUR 
150                     and
151                     server_id != {$ids[0]}
152             ");
153             return;
154         }
155         
156         // ((@row_number := CASE WHEN @row_number IS NULL THEN 0 ELSE @row_number END  +1) % {$num_servers})
157         
158         
159         
160         $p = DB_DataObject::factory($notify->table);
161         $p->whereAdd("
162                 sent < '2000-01-01'
163                 and
164                 event_id = 0
165                 and
166                 act_start < NOW() +  INTERVAL 3 HOUR 
167                 and
168                 server_id NOT IN (" . implode(",", $ids) . ")
169         ");
170         if ($p->count() < 1) {
171             return;
172         }
173         
174         $p->selectAdd();
175         $p->selectAdd("id, ((@row_number := CASE WHEN @row_number IS NULL THEN 0 ELSE @row_number END  +1) % {$num_servers})  as rn");
176         $kv = $p->fetchAll('id','rn');
177         foreach($kv as $id => $r) {
178             $up[ $ids[$r] ][] = $id;
179         }
180         foreach($up as $sid => $nids) {
181             if (empty($nids)) {
182                 continue;
183             }
184             $p = DB_DataObject::factory($notify->table);
185             $p->query("
186                 UPDATE
187                     {$notify->table}
188                 SET
189                     server_id = $sid
190                 WHERE
191                     id IN (". implode(',', $nids). ')'
192             );
193         }
194          
195         DB_DataObject::factory("core_notify_blacklist")->prune();
196         
197     }
198         // called on current server.
199
200     function availableServers()
201     {
202         $ns = DB_DataObject::factory('core_notify_server');
203         $ns->poolname = $this->poolname;
204         $ns->is_active = 1;
205         $ns->orderBy('id ASC');
206         return  $ns->fetchAll();
207         
208     }
209     
210     function updateNotifyToNextServer( $cn , $when = false, $allow_same = false)
211     {
212         // fixme - this should take into account blacklisted - and return false if no more servers are available
213         $email = empty($cn->to_email) ? ($cn->person() ? $cn->person()->email : $cn->to_email) : $cn->to_email;
214
215         $w = DB_DataObject::factory($cn->tableName());
216         $w->get($cn->id);
217         
218         $servers = $this->availableServers();
219         $start = 0;
220         foreach($servers as $i => $s) {
221             if ($s->id == $this->id) {
222                 $start = $i;
223             }
224         }
225         
226         $offset = ($start + 1)  % count($servers);
227         $good = false;
228         while ($offset  != $start) {
229             $s = $servers[$offset];
230             if (!$s->isBlacklisted($email)) {
231                 $good = $s;
232                 break;
233             }
234         }
235         if ($good == false && $allow_same) {
236             $good = $this;
237         }
238         
239         if ($good == false) {
240             return false;
241         }
242         
243         
244         // next server..
245         $pp = clone($w);
246         $w->server_id = $good->id;   
247         $w->act_when = $when === false ? $w->sqlValue('NOW() + INTERVAL 1 MINUTE') : $when;
248         $w->update($pp);
249         return true;
250     }
251     
252     
253     function isBlacklisted($email)
254     {
255         // return current server id..
256         static $cache = array();
257         
258         // get the domain..
259         $ea = explode('@',$email);
260         $dom = strtolower(array_pop($ea));
261         if (isset($cache[$dom])) {
262             return $cache[$dom];
263         }
264         
265         $cd = DB_DataObject::factory('core_domain')->loadOrCreate($dom);
266         
267         $bl = DB_DataObject::factory('core_notify_blacklist');
268         $bl->server_id = $this->id;
269         $bl->domain_id = $cd->id;
270         if ($bl->count()) {
271             $cache[$dom] = true;
272             return true;
273         }
274         
275         return false; 
276     }
277     function initHelo()
278     {
279         $ff = HTML_FlexyFramework::get();
280         $ff->Mail['helo'] = $this->helo;
281         
282     }
283     function checkSmtpResponse($errmsg, $core_domain)
284     {
285         $bl = DB_DataObject::factory('core_notify_blacklist');
286         $bl->server_id = $this->id;
287         $bl->domain_id = $core_domain->id;
288         if ($bl->count()) {
289             return true;
290         }
291         // is it a blacklist message
292         if (!$bl->messageIsBlacklisted($errmsg)) {
293             return false;
294         }
295         $bl->error_str = $errmsg;
296         $bl->added_dt = $bl->sqlValue("NOW()");
297         $bl->insert();
298         return true;
299         
300     }
301     
302 }