1 // ========================================================================
2 // Copyright 2008 Mort Bay Consulting Pty. Ltd.
3 // ------------------------------------------------------------------------
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 // http://www.apache.org/licenses/LICENSE-2.0
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 //========================================================================
14 package org.mortbay.cometd;
15
16 import java.lang.reflect.Method;
17 import java.util.Map;
18 import java.util.concurrent.ConcurrentHashMap;
19
20 import org.cometd.Bayeux;
21 import org.cometd.Channel;
22 import org.cometd.Client;
23 import org.cometd.Listener;
24 import org.cometd.Message;
25 import org.cometd.MessageListener;
26 import org.mortbay.component.LifeCycle;
27 import org.mortbay.log.Log;
28 import org.mortbay.thread.QueuedThreadPool;
29 import org.mortbay.thread.ThreadPool;
30
31 /* ------------------------------------------------------------ */
32 /**
33 * Abstract Bayeux Service class. This is a base class to assist with the
34 * creation of server side @ link Bayeux} clients that provide services to
35 * remote Bayeux clients. The class provides a Bayeux {@link Client} and
36 * {@link Listener} together with convenience methods to map subscriptions to
37 * methods on the derived class and to send responses to those methods.
38 *
39 * <p>
40 * If a {@link #set_threadPool(ThreadPool)} is set, then messages are handled in
41 * their own threads. This is desirable if the handling of a message can take
42 * considerable time and it is desired not to hold up the delivering thread
43 * (typically a HTTP request handling thread).
44 *
45 * <p>
46 * If the BayeuxService is constructed asynchronously (the default), then
47 * messages are delivered unsynchronized and multiple simultaneous calls to
48 * handling methods may occur.
49 *
50 * <p>
51 * If the BayeuxService is constructed as a synchronous service, then message
52 * delivery is synchronized on the internal {@link Client} instances used and
53 * only a single call will be made to the handler method (unless a thread pool
54 * is used).
55 *
56 * @see MessageListener
57 * @author gregw
58 *
59 */
60 public abstract class BayeuxService
61 {
62 private String _name;
63 private Bayeux _bayeux;
64 private Client _client;
65 private Map<String,Method> _methods=new ConcurrentHashMap<String,Method>();
66 private ThreadPool _threadPool;
67 private MessageListener _listener;
68 private boolean _seeOwn=false;
69
70 /* ------------------------------------------------------------ */
71 /**
72 * Instantiate the service. Typically the derived constructor will call @
73 * #subscribe(String, String)} to map subscriptions to methods.
74 *
75 * @param bayeux
76 * The bayeux instance.
77 * @param name
78 * The name of the service (used as client ID prefix).
79 */
80 public BayeuxService(Bayeux bayeux, String name)
81 {
82 this(bayeux,name,0,false);
83 }
84
85 /* ------------------------------------------------------------ */
86 /**
87 * Instantiate the service. Typically the derived constructor will call @
88 * #subscribe(String, String)} to map subscriptions to methods.
89 *
90 * @param bayeux
91 * The bayeux instance.
92 * @param name
93 * The name of the service (used as client ID prefix).
94 * @param maxThreads
95 * The size of a ThreadPool to create to handle messages.
96 */
97 public BayeuxService(Bayeux bayeux, String name, int maxThreads)
98 {
99 this(bayeux,name,maxThreads,false);
100 }
101
102 /* ------------------------------------------------------------ */
103 /**
104 * Instantiate the service. Typically the derived constructor will call @
105 * #subscribe(String, String)} to map subscriptions to methods.
106 *
107 * @param bayeux
108 * The bayeux instance.
109 * @param name
110 * The name of the service (used as client ID prefix).
111 * @param maxThreads
112 * The size of a ThreadPool to create to handle messages.
113 * @param synchronous
114 * True if message delivery will be synchronized on the client.
115 */
116 public BayeuxService(Bayeux bayeux, String name, int maxThreads, boolean synchronous)
117 {
118 if (maxThreads > 0)
119 setThreadPool(new QueuedThreadPool(maxThreads));
120 _name=name;
121 _bayeux=bayeux;
122 _client=_bayeux.newClient(name);
123 _listener=(synchronous)?new SyncListen():new AsyncListen();
124 _client.addListener(_listener);
125
126 }
127
128 /* ------------------------------------------------------------ */
129 public Bayeux getBayeux()
130 {
131 return _bayeux;
132 }
133
134 /* ------------------------------------------------------------ */
135 public Client getClient()
136 {
137 return _client;
138 }
139
140 /* ------------------------------------------------------------ */
141 public ThreadPool getThreadPool()
142 {
143 return _threadPool;
144 }
145
146 /* ------------------------------------------------------------ */
147 /**
148 * Set the threadpool. If the {@link ThreadPool} is a {@link LifeCycle},
149 * then it is started by this method.
150 *
151 * @param pool
152 */
153 public void setThreadPool(ThreadPool pool)
154 {
155 try
156 {
157 if (pool instanceof LifeCycle)
158 if (!((LifeCycle)pool).isStarted())
159 ((LifeCycle)pool).start();
160 }
161 catch(Exception e)
162 {
163 throw new IllegalStateException(e);
164 }
165 _threadPool=pool;
166 }
167
168 /* ------------------------------------------------------------ */
169 public boolean isSeeOwnPublishes()
170 {
171 return _seeOwn;
172 }
173
174 /* ------------------------------------------------------------ */
175 public void setSeeOwnPublishes(boolean own)
176 {
177 _seeOwn=own;
178 }
179
180 /* ------------------------------------------------------------ */
181 /**
182 * Subscribe to a channel. Subscribe to channel and map a method to handle
183 * received messages. The method must have a unique name and one of the
184 * following signatures:
185 * <ul>
186 * <li><code>myMethod(Client fromClient,Object data)</code></li>
187 * <li><code>myMethod(Client fromClient,Object data,String id)</code></li>
188 * <li>
189 * <code>myMethod(Client fromClient,String channel,Object data,String id)</code>
190 * </li>
191 * </li>
192 *
193 * The data parameter can be typed if the type of the data object published
194 * by the client is known (typically Map<String,Object>). If the type of the
195 * data parameter is {@link Message} then the message object itself is
196 * passed rather than just the data.
197 * <p>
198 * Typically a service will subscribe to a channel in the "/service/**"
199 * space which is not a broadcast channel. Messages published to these
200 * channels are only delivered to server side clients like this service.
201 *
202 * <p>
203 * Any object returned by a mapped subscription method is delivered to the
204 * calling client and not broadcast. If the method returns void or null,
205 * then no response is sent. A mapped subscription method may also call
206 * {@link #send(Client, String, Object, String)} to deliver a response
207 * message(s) to different clients and/or channels. It may also publish
208 * methods via the normal {@link Bayeux} API.
209 * <p>
210 *
211 *
212 * @param channelId
213 * The channel to subscribe to
214 * @param methodName
215 * The name of the method on this object to call when messages
216 * are recieved.
217 */
218 protected void subscribe(String channelId, String methodName)
219 {
220 Method method=null;
221
222 Class<?> c=this.getClass();
223 while(c != null && c != Object.class)
224 {
225 Method[] methods=c.getDeclaredMethods();
226 for (int i=methods.length; i-- > 0;)
227 {
228 if (methodName.equals(methods[i].getName()))
229 {
230 if (method != null)
231 throw new IllegalArgumentException("Multiple methods called '" + methodName + "'");
232 method=methods[i];
233 }
234 }
235 c=c.getSuperclass();
236 }
237
238 if (method == null)
239 throw new NoSuchMethodError(methodName);
240 int params=method.getParameterTypes().length;
241 if (params < 2 || params > 4)
242 throw new IllegalArgumentException("Method '" + methodName + "' does not have 2or3 parameters");
243 if (!Client.class.isAssignableFrom(method.getParameterTypes()[0]))
244 throw new IllegalArgumentException("Method '" + methodName + "' does not have Client as first parameter");
245
246 Channel channel=_bayeux.getChannel(channelId,true);
247
248 if (((ChannelImpl)channel).getChannelId().isWild())
249 {
250 final Method m=method;
251 Client wild_client=_bayeux.newClient(_name + "-wild");
252 wild_client.addListener(_listener instanceof MessageListener.Asynchronous?new AsyncWildListen(wild_client,m):new SyncWildListen(wild_client,m));
253 channel.subscribe(wild_client);
254 }
255 else
256 {
257 _methods.put(channelId,method);
258 channel.subscribe(_client);
259 }
260 }
261
262 /* ------------------------------------------------------------ */
263 /**
264 * Send data to a individual client. The data passed is sent to the client
265 * as the "data" member of a message with the given channel and id. The
266 * message is not published on the channel and is thus not broadcast to all
267 * channel subscribers. However to the target client, the message appears as
268 * if it was broadcast.
269 * <p>
270 * Typcially this method is only required if a service method sends
271 * response(s) to channels other than the subscribed channel. If the
272 * response is to be sent to the subscribed channel, then the data can
273 * simply be returned from the subscription method.
274 *
275 * @param toClient
276 * The target client
277 * @param onChannel
278 * The channel the message is for
279 * @param data
280 * The data of the message
281 * @param id
282 * The id of the message (or null for a random id).
283 */
284 protected void send(Client toClient, String onChannel, Object data, String id)
285 {
286 toClient.deliver(getClient(),onChannel,data,id);
287 }
288
289 /* ------------------------------------------------------------ */
290 /**
291 * Handle Exception. This method is called when a mapped subscription method
292 * throws and exception while handling a message.
293 *
294 * @param fromClient
295 * @param toClient
296 * @param msg
297 * @param th
298 */
299 protected void exception(Client fromClient, Client toClient, Map<String,Object> msg, Throwable th)
300 {
301 System.err.println(msg);
302 th.printStackTrace();
303 }
304
305 /* ------------------------------------------------------------ */
306 private void invoke(final Method method, final Client fromClient, final Client toClient, final Message msg)
307 {
308 if (_threadPool == null)
309 doInvoke(method,fromClient,toClient,msg);
310 else
311 {
312 _threadPool.dispatch(new Runnable()
313 {
314 public void run()
315 {
316 try
317 {
318 ((MessageImpl)msg).incRef();
319 doInvoke(method,fromClient,toClient,msg);
320 }
321 finally
322 {
323 ((MessageImpl)msg).decRef();
324 }
325 }
326 });
327 }
328 }
329
330 /* ------------------------------------------------------------ */
331 private void doInvoke(Method method, Client fromClient, Client toClient, Message msg)
332 {
333 String channel=(String)msg.get(Bayeux.CHANNEL_FIELD);
334 Object data=msg.get(Bayeux.DATA_FIELD);
335 String id=msg.getId();
336
337 if (method != null)
338 {
339 try
340 {
341 Class<?>[] args=method.getParameterTypes();
342 Object arg=Message.class.isAssignableFrom(args[1])?msg:data;
343
344 Object reply=null;
345 switch(method.getParameterTypes().length)
346 {
347 case 2:
348 reply=method.invoke(this,fromClient,arg);
349 break;
350 case 3:
351 reply=method.invoke(this,fromClient,arg,id);
352 break;
353 case 4:
354 reply=method.invoke(this,fromClient,channel,arg,id);
355 break;
356 }
357
358 if (reply != null)
359 send(fromClient,channel,reply,id);
360 }
361 catch(Exception e)
362 {
363 Log.debug("method",method);
364 exception(fromClient,toClient,msg,e);
365 }
366 catch(Error e)
367 {
368 Log.debug("method",method);
369 exception(fromClient,toClient,msg,e);
370 }
371 }
372 }
373
374 /* ------------------------------------------------------------ */
375 /* ------------------------------------------------------------ */
376 private class AsyncListen implements MessageListener, MessageListener.Asynchronous
377 {
378 public void deliver(Client fromClient, Client toClient, Message msg)
379 {
380 if (!_seeOwn && fromClient == getClient())
381 return;
382 String channel=(String)msg.get(Bayeux.CHANNEL_FIELD);
383 Method method=_methods.get(channel);
384 invoke(method,fromClient,toClient,msg);
385 }
386 }
387
388 /* ------------------------------------------------------------ */
389 /* ------------------------------------------------------------ */
390 private class SyncListen implements MessageListener, MessageListener.Synchronous
391 {
392 public void deliver(Client fromClient, Client toClient, Message msg)
393 {
394 if (!_seeOwn && fromClient == getClient())
395 return;
396 String channel=(String)msg.get(Bayeux.CHANNEL_FIELD);
397 Method method=_methods.get(channel);
398 invoke(method,fromClient,toClient,msg);
399 }
400 }
401
402 /* ------------------------------------------------------------ */
403 /* ------------------------------------------------------------ */
404 private class SyncWildListen implements MessageListener, MessageListener.Synchronous
405 {
406 Client _client;
407 Method _method;
408
409 public SyncWildListen(Client client, Method method)
410 {
411 _client=client;
412 _method=method;
413 }
414
415 public void deliver(Client fromClient, Client toClient, Message msg)
416 {
417 if (!_seeOwn && (fromClient == _client || fromClient == getClient()))
418 return;
419 invoke(_method,fromClient,toClient,msg);
420 }
421 };
422
423 /* ------------------------------------------------------------ */
424 /* ------------------------------------------------------------ */
425 private class AsyncWildListen implements MessageListener, MessageListener.Asynchronous
426 {
427 Client _client;
428 Method _method;
429
430 public AsyncWildListen(Client client, Method method)
431 {
432 _client=client;
433 _method=method;
434 }
435
436 public void deliver(Client fromClient, Client toClient, Message msg)
437 {
438 if (!_seeOwn && (fromClient == _client || fromClient == getClient()))
439 return;
440 invoke(_method,fromClient,toClient,msg);
441 }
442 };
443
444 }