Mon Sep 20 2010 00:22:39

Asterisk developer's documentation


evt.c

Go to the documentation of this file.
00001 /*
00002  * Asterisk -- An open source telephony toolkit.
00003  *
00004  * Copyright (C) 2007, Digium, Inc.
00005  *
00006  * Russell Bryant <russell@digium.com>
00007  *
00008  * See http://www.asterisk.org for more information about
00009  * the Asterisk project. Please do not directly contact
00010  * any of the maintainers of this project for assistance;
00011  * the project provides a web site, mailing lists and IRC
00012  * channels for your use.
00013  *
00014  * This program is free software, distributed under the terms of
00015  * the GNU General Public License Version 2. See the LICENSE file
00016  * at the top of the source tree.
00017  */
00018 
00019 /*! 
00020  * \file
00021  * \author Russell Bryant <russell@digium.com>
00022  *
00023  * \brief Usage of the SAForum AIS (Application Interface Specification)
00024  *
00025  * \arg http://www.openais.org/
00026  *
00027  * This file contains the code specific to the use of the EVT 
00028  * (Event) Service.
00029  */
00030 
00031 #include "asterisk.h"
00032 
00033 ASTERISK_FILE_VERSION(__FILE__, "$Revision: 184632 $");
00034 
00035 #include <stdlib.h>
00036 #include <stdio.h>
00037 #include <string.h>
00038 #include <unistd.h>
00039 #include <errno.h>
00040 
00041 #include "ais.h"
00042 
00043 #include "asterisk/module.h"
00044 #include "asterisk/utils.h"
00045 #include "asterisk/cli.h"
00046 #include "asterisk/logger.h"
00047 #include "asterisk/event.h"
00048 #include "asterisk/config.h"
00049 #include "asterisk/linkedlists.h"
00050 #include "asterisk/devicestate.h"
00051 
00052 #ifndef AST_MODULE
00053 /* XXX HACK */
00054 #define AST_MODULE "res_ais"
00055 #endif
00056 
00057 SaEvtHandleT evt_handle;
00058 
00059 void evt_channel_open_cb(SaInvocationT invocation, SaEvtChannelHandleT channel_handle,
00060    SaAisErrorT error);
00061 void evt_event_deliver_cb(SaEvtSubscriptionIdT subscription_id,
00062    const SaEvtEventHandleT event_handle, const SaSizeT event_datalen);
00063 
00064 static const SaEvtCallbacksT evt_callbacks = {
00065    .saEvtChannelOpenCallback  = evt_channel_open_cb,
00066    .saEvtEventDeliverCallback = evt_event_deliver_cb, 
00067 };
00068 
00069 static const struct {
00070    const char *str;
00071    enum ast_event_type type;
00072 } supported_event_types[] = {
00073    { "mwi", AST_EVENT_MWI },
00074    { "device_state", AST_EVENT_DEVICE_STATE_CHANGE },
00075 };
00076 
00077 /*! Used to provide unique id's to egress subscriptions */
00078 static int unique_id;
00079 
00080 struct subscribe_event {
00081    AST_LIST_ENTRY(subscribe_event) entry;
00082    /*! This is a unique identifier to identify this subscription in the event
00083     *  channel through the different API calls, subscribe, unsubscribe, and
00084     *  the event deliver callback. */
00085    SaEvtSubscriptionIdT id;
00086    enum ast_event_type type;
00087 };
00088 
00089 struct publish_event {
00090    AST_LIST_ENTRY(publish_event) entry;
00091    /*! We subscribe to events internally so that we can publish them
00092     *  on this event channel. */
00093    struct ast_event_sub *sub;
00094    enum ast_event_type type;
00095 };
00096 
00097 struct event_channel {
00098    AST_RWLIST_ENTRY(event_channel) entry;
00099    AST_LIST_HEAD_NOLOCK(, subscribe_event) subscribe_events;
00100    AST_LIST_HEAD_NOLOCK(, publish_event) publish_events;
00101    SaEvtChannelHandleT handle;
00102    char name[1];
00103 };
00104 
00105 static AST_RWLIST_HEAD_STATIC(event_channels, event_channel);
00106 
00107 void evt_channel_open_cb(SaInvocationT invocation, SaEvtChannelHandleT channel_handle,
00108    SaAisErrorT error)
00109 {
00110 
00111 }
00112 
00113 static void queue_event(struct ast_event *ast_event)
00114 {
00115    ast_event_queue_and_cache(ast_event);
00116 }
00117 
00118 void evt_event_deliver_cb(SaEvtSubscriptionIdT sub_id,
00119    const SaEvtEventHandleT event_handle, const SaSizeT event_datalen)
00120 {
00121    /* It is important to note that this works because we *know* that this
00122     * function will only be called by a single thread, the dispatch_thread.
00123     * If this module gets changed such that this is no longer the case, this
00124     * should get changed to a thread-local buffer, instead. */
00125    static unsigned char buf[4096];
00126    struct ast_event *event_dup, *event = (void *) buf;
00127    SaAisErrorT ais_res;
00128    SaSizeT len = sizeof(buf);
00129 
00130    if (event_datalen > len) {
00131       ast_log(LOG_ERROR, "Event received with size %u, which is too big\n"
00132          "for the allocated size %u. Change the code to increase the size.\n",
00133          (unsigned int) event_datalen, (unsigned int) len);
00134       return;
00135    }
00136 
00137    ais_res = saEvtEventDataGet(event_handle, event, &len);
00138    if (ais_res != SA_AIS_OK) {
00139       ast_log(LOG_ERROR, "Error retrieving event payload: %s\n", 
00140          ais_err2str(ais_res));
00141       return;
00142    }
00143 
00144    if (!ast_eid_cmp(&ast_eid_default, ast_event_get_ie_raw(event, AST_EVENT_IE_EID))) {
00145       /* Don't feed events back in that originated locally. */
00146       return;
00147    }
00148 
00149    if (!(event_dup = ast_malloc(len)))
00150       return;
00151    
00152    memcpy(event_dup, event, len);
00153 
00154    queue_event(event_dup);
00155 }
00156 
00157 static const char *type_to_filter_str(enum ast_event_type type)
00158 {
00159    const char *filter_str = NULL;
00160    int i;
00161 
00162    for (i = 0; i < ARRAY_LEN(supported_event_types); i++) {
00163       if (supported_event_types[i].type == type) {
00164          filter_str = supported_event_types[i].str;
00165          break;
00166       }
00167    }
00168 
00169    return filter_str;
00170 }
00171 
00172 static void ast_event_cb(const struct ast_event *ast_event, void *data)
00173 {
00174    SaEvtEventHandleT event_handle;
00175    SaAisErrorT ais_res;
00176    struct event_channel *event_channel = data;
00177    SaClmClusterNodeT local_node;
00178    SaEvtEventPatternArrayT pattern_array;
00179    SaEvtEventPatternT pattern;
00180    SaSizeT len;
00181    const char *filter_str;
00182    SaEvtEventIdT event_id;
00183 
00184    ast_log(LOG_DEBUG, "Got an event to forward\n");
00185 
00186    if (ast_eid_cmp(&ast_eid_default, ast_event_get_ie_raw(ast_event, AST_EVENT_IE_EID))) {
00187       /* If the event didn't originate from this server, don't send it back out. */
00188       ast_log(LOG_DEBUG, "Returning here\n");
00189       return;
00190    }
00191 
00192    ais_res = saEvtEventAllocate(event_channel->handle, &event_handle);
00193    if (ais_res != SA_AIS_OK) {
00194       ast_log(LOG_ERROR, "Error allocating event: %s\n", ais_err2str(ais_res));
00195       ast_log(LOG_DEBUG, "Returning here\n");
00196       return;
00197    }
00198    
00199    ais_res = saClmClusterNodeGet(clm_handle, SA_CLM_LOCAL_NODE_ID, 
00200       SA_TIME_ONE_SECOND, &local_node);
00201    if (ais_res != SA_AIS_OK) {
00202       ast_log(LOG_ERROR, "Error getting local node name: %s\n", ais_err2str(ais_res));
00203       goto return_event_free;
00204    }
00205 
00206    filter_str = type_to_filter_str(ast_event_get_type(ast_event));
00207    len = strlen(filter_str) + 1;
00208    pattern.pattern = (SaUint8T *) filter_str;
00209    pattern.patternSize = len;
00210    pattern.allocatedSize = len;
00211 
00212    pattern_array.allocatedNumber = 1;
00213    pattern_array.patternsNumber = 1;
00214    pattern_array.patterns = &pattern;
00215 
00216    /*! 
00217     * /todo Make retention time configurable 
00218     * /todo Make event priorities configurable
00219     */
00220    ais_res = saEvtEventAttributesSet(event_handle, &pattern_array,
00221       SA_EVT_LOWEST_PRIORITY, SA_TIME_ONE_MINUTE, &local_node.nodeName);
00222    if (ais_res != SA_AIS_OK) {
00223       ast_log(LOG_ERROR, "Error setting event attributes: %s\n", ais_err2str(ais_res));
00224       goto return_event_free;
00225    }
00226 
00227    ais_res = saEvtEventPublish(event_handle, 
00228       ast_event, ast_event_get_size(ast_event), &event_id);
00229    if (ais_res != SA_AIS_OK) {
00230       ast_log(LOG_ERROR, "Error publishing event: %s\n", ais_err2str(ais_res));
00231       goto return_event_free;
00232    }
00233 
00234 return_event_free:
00235    ais_res = saEvtEventFree(event_handle);
00236    if (ais_res != SA_AIS_OK) {
00237       ast_log(LOG_ERROR, "Error freeing allocated event: %s\n", ais_err2str(ais_res));
00238    }
00239    ast_log(LOG_DEBUG, "Returning here (event_free)\n");
00240 }
00241 
00242 static char *ais_evt_show_event_channels(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
00243 {
00244    struct event_channel *event_channel;
00245 
00246    switch (cmd) {
00247    case CLI_INIT:
00248       e->command = "ais show evt event channels";
00249       e->usage =
00250          "Usage: ais show evt event channels\n"
00251          "       List configured event channels for the (EVT) Eventing service.\n";
00252       return NULL;
00253 
00254    case CLI_GENERATE:
00255       return NULL;   /* no completion */
00256    }
00257 
00258    if (a->argc != e->args)
00259       return CLI_SHOWUSAGE;
00260 
00261    ast_cli(a->fd, "\n"
00262                "=============================================================\n"
00263                "=== Event Channels ==========================================\n"
00264                "=============================================================\n"
00265                "===\n");
00266 
00267    AST_RWLIST_RDLOCK(&event_channels);
00268    AST_RWLIST_TRAVERSE(&event_channels, event_channel, entry) {
00269       struct publish_event *publish_event;
00270       struct subscribe_event *subscribe_event;
00271 
00272       ast_cli(a->fd, "=== ---------------------------------------------------------\n"
00273                      "=== Event Channel Name: %s\n", event_channel->name);
00274 
00275       AST_LIST_TRAVERSE(&event_channel->publish_events, publish_event, entry) {
00276          ast_cli(a->fd, "=== ==> Publishing Event Type: %s\n", 
00277             type_to_filter_str(publish_event->type));
00278       }
00279       
00280       AST_LIST_TRAVERSE(&event_channel->subscribe_events, subscribe_event, entry) {
00281          ast_cli(a->fd, "=== ==> Subscribing to Event Type: %s\n", 
00282             type_to_filter_str(subscribe_event->type));
00283       }
00284 
00285       ast_cli(a->fd, "=== ---------------------------------------------------------\n"
00286                      "===\n");
00287    }
00288    AST_RWLIST_UNLOCK(&event_channels);
00289 
00290    ast_cli(a->fd, "=============================================================\n"
00291                   "\n");
00292 
00293    return CLI_SUCCESS;
00294 }
00295 
00296 static struct ast_cli_entry ais_cli[] = {
00297    AST_CLI_DEFINE(ais_evt_show_event_channels, "Show configured event channels"),
00298 };
00299 
00300 static void add_publish_event(struct event_channel *event_channel, const char *event_type)
00301 {
00302    int i;
00303    enum ast_event_type type = -1;
00304    struct publish_event *publish_event;
00305 
00306    for (i = 0; i < ARRAY_LEN(supported_event_types); i++) {
00307       if (!strcasecmp(event_type, supported_event_types[i].str)) {
00308          type = supported_event_types[i].type;
00309          break;
00310       }
00311    }
00312 
00313    if (type == -1) {
00314       ast_log(LOG_WARNING, "publish_event option given with invalid value '%s'\n", event_type);
00315       return;
00316    }
00317 
00318    if (type == AST_EVENT_DEVICE_STATE_CHANGE && ast_enable_distributed_devstate()) {
00319       return;
00320    }
00321 
00322    if (!(publish_event = ast_calloc(1, sizeof(*publish_event)))) {
00323       return;
00324    }
00325 
00326    publish_event->type = type;
00327    ast_log(LOG_DEBUG, "Subscribing to event type %d\n", type);
00328    publish_event->sub = ast_event_subscribe(type, ast_event_cb, event_channel,
00329       AST_EVENT_IE_END);
00330    ast_event_dump_cache(publish_event->sub);
00331 
00332    AST_LIST_INSERT_TAIL(&event_channel->publish_events, publish_event, entry);
00333 }
00334 
00335 static SaAisErrorT set_egress_subscription(struct event_channel *event_channel,
00336    struct subscribe_event *subscribe_event)
00337 {
00338    SaAisErrorT ais_res;
00339    SaEvtEventFilterArrayT filter_array;
00340    SaEvtEventFilterT filter;
00341    const char *filter_str = NULL;
00342    SaSizeT len;
00343 
00344    /* We know it's going to be valid.  It was checked earlier. */
00345    filter_str = type_to_filter_str(subscribe_event->type);
00346 
00347    filter.filterType = SA_EVT_EXACT_FILTER;
00348    len = strlen(filter_str) + 1;
00349    filter.filter.allocatedSize = len;
00350    filter.filter.patternSize = len;
00351    filter.filter.pattern = (SaUint8T *) filter_str;
00352 
00353    filter_array.filtersNumber = 1;
00354    filter_array.filters = &filter;
00355 
00356    ais_res = saEvtEventSubscribe(event_channel->handle, &filter_array, 
00357       subscribe_event->id);
00358 
00359    return ais_res;
00360 }
00361 
00362 static void add_subscribe_event(struct event_channel *event_channel, const char *event_type)
00363 {
00364    int i;
00365    enum ast_event_type type = -1;
00366    struct subscribe_event *subscribe_event;
00367    SaAisErrorT ais_res;
00368 
00369    for (i = 0; i < ARRAY_LEN(supported_event_types); i++) {
00370       if (!strcasecmp(event_type, supported_event_types[i].str)) {
00371          type = supported_event_types[i].type;
00372          break;
00373       }
00374    }
00375 
00376    if (type == -1) {
00377       ast_log(LOG_WARNING, "subscribe_event option given with invalid value '%s'\n", event_type);
00378       return;
00379    }
00380 
00381    if (type == AST_EVENT_DEVICE_STATE_CHANGE && ast_enable_distributed_devstate()) {
00382       return;
00383    }
00384 
00385    if (!(subscribe_event = ast_calloc(1, sizeof(*subscribe_event)))) {
00386       return;
00387    }
00388 
00389    subscribe_event->type = type;
00390    subscribe_event->id = ast_atomic_fetchadd_int(&unique_id, +1);
00391 
00392    ais_res = set_egress_subscription(event_channel, subscribe_event);
00393    if (ais_res != SA_AIS_OK) {
00394       ast_log(LOG_ERROR, "Error setting up egress subscription: %s\n",
00395          ais_err2str(ais_res));
00396       free(subscribe_event);
00397       return;
00398    }
00399 
00400    AST_LIST_INSERT_TAIL(&event_channel->subscribe_events, subscribe_event, entry);
00401 }
00402 
00403 static void build_event_channel(struct ast_config *cfg, const char *cat)
00404 {
00405    struct ast_variable *var;
00406    struct event_channel *event_channel;
00407    SaAisErrorT ais_res;
00408    SaNameT sa_name = { 0, };
00409 
00410    AST_RWLIST_WRLOCK(&event_channels);
00411    AST_RWLIST_TRAVERSE(&event_channels, event_channel, entry) {
00412       if (!strcasecmp(event_channel->name, cat))
00413          break;
00414    }
00415    AST_RWLIST_UNLOCK(&event_channels);
00416    if (event_channel) {
00417       ast_log(LOG_WARNING, "Event channel '%s' was specified twice in "
00418          "configuration.  Second instance ignored.\n", cat);
00419       return;
00420    }
00421 
00422    if (!(event_channel = ast_calloc(1, sizeof(*event_channel) + strlen(cat))))
00423       return;
00424 
00425    strcpy(event_channel->name, cat);
00426    ast_copy_string((char *) sa_name.value, cat, sizeof(sa_name.value));
00427    sa_name.length = strlen((char *) sa_name.value);
00428    ais_res = saEvtChannelOpen(evt_handle, &sa_name, 
00429       SA_EVT_CHANNEL_PUBLISHER | SA_EVT_CHANNEL_SUBSCRIBER | SA_EVT_CHANNEL_CREATE,
00430       SA_TIME_MAX, &event_channel->handle);
00431    if (ais_res != SA_AIS_OK) {
00432       ast_log(LOG_ERROR, "Error opening event channel: %s\n", ais_err2str(ais_res));
00433       free(event_channel);
00434       return;
00435    }
00436 
00437    for (var = ast_variable_browse(cfg, cat); var; var = var->next) {
00438       if (!strcasecmp(var->name, "type")) {
00439          continue;
00440       } else if (!strcasecmp(var->name, "publish_event")) {
00441          add_publish_event(event_channel, var->value);
00442       } else if (!strcasecmp(var->name, "subscribe_event")) {
00443          add_subscribe_event(event_channel, var->value);
00444       } else {
00445          ast_log(LOG_WARNING, "Event channel '%s' contains invalid option '%s'\n",
00446             event_channel->name, var->name);
00447       }
00448    }
00449 
00450    AST_RWLIST_WRLOCK(&event_channels);
00451    AST_RWLIST_INSERT_TAIL(&event_channels, event_channel, entry);
00452    AST_RWLIST_UNLOCK(&event_channels);
00453 }
00454 
00455 static void load_config(void)
00456 {
00457    static const char filename[] = "ais.conf";
00458    struct ast_config *cfg;
00459    const char *cat = NULL;
00460    struct ast_flags config_flags = { 0 };
00461 
00462    if (!(cfg = ast_config_load(filename, config_flags)) || cfg == CONFIG_STATUS_FILEINVALID)
00463       return;
00464 
00465    while ((cat = ast_category_browse(cfg, cat))) {
00466       const char *type;
00467 
00468       if (!strcasecmp(cat, "general"))
00469          continue;
00470 
00471       if (!(type = ast_variable_retrieve(cfg, cat, "type"))) {
00472          ast_log(LOG_WARNING, "Invalid entry in %s defined with no type!\n",
00473             filename);
00474          continue;
00475       }
00476 
00477       if (!strcasecmp(type, "event_channel")) {
00478          build_event_channel(cfg, cat);
00479       } else {
00480          ast_log(LOG_WARNING, "Entry in %s defined with invalid type '%s'\n", 
00481             filename, type);
00482       }
00483    }
00484 
00485    ast_config_destroy(cfg);
00486 }
00487 
00488 static void publish_event_destroy(struct publish_event *publish_event)
00489 {
00490    ast_event_unsubscribe(publish_event->sub);
00491 
00492    free(publish_event);
00493 }
00494 
00495 static void subscribe_event_destroy(const struct event_channel *event_channel,
00496    struct subscribe_event *subscribe_event)
00497 {
00498    SaAisErrorT ais_res;
00499 
00500    /* saEvtChannelClose() will actually do this automatically, but it just
00501     * feels cleaner to go ahead and do it manually ... */
00502    ais_res = saEvtEventUnsubscribe(event_channel->handle, subscribe_event->id);
00503    if (ais_res != SA_AIS_OK) {
00504       ast_log(LOG_ERROR, "Error unsubscribing: %s\n", ais_err2str(ais_res));
00505    }
00506 
00507    free(subscribe_event);
00508 }
00509 
00510 static void event_channel_destroy(struct event_channel *event_channel)
00511 {
00512    struct publish_event *publish_event;
00513    struct subscribe_event *subscribe_event;
00514    SaAisErrorT ais_res;
00515 
00516    while ((publish_event = AST_LIST_REMOVE_HEAD(&event_channel->publish_events, entry)))
00517       publish_event_destroy(publish_event);
00518    while ((subscribe_event = AST_LIST_REMOVE_HEAD(&event_channel->subscribe_events, entry)))
00519       subscribe_event_destroy(event_channel, subscribe_event);
00520 
00521    ais_res = saEvtChannelClose(event_channel->handle);
00522    if (ais_res != SA_AIS_OK) {
00523       ast_log(LOG_ERROR, "Error closing event channel '%s': %s\n",
00524          event_channel->name, ais_err2str(ais_res));
00525    }
00526 
00527    free(event_channel);
00528 }
00529 
00530 static void destroy_event_channels(void)
00531 {
00532    struct event_channel *event_channel;
00533 
00534    AST_RWLIST_WRLOCK(&event_channels);
00535    while ((event_channel = AST_RWLIST_REMOVE_HEAD(&event_channels, entry)))
00536       event_channel_destroy(event_channel);
00537    AST_RWLIST_UNLOCK(&event_channels);
00538 }
00539 
00540 int ast_ais_evt_load_module(void)
00541 {
00542    SaAisErrorT ais_res;
00543 
00544    ais_res = saEvtInitialize(&evt_handle, &evt_callbacks, &ais_version);
00545    if (ais_res != SA_AIS_OK) {
00546       ast_log(LOG_ERROR, "Could not initialize eventing service: %s\n",
00547          ais_err2str(ais_res));
00548       return -1;
00549    }
00550    
00551    load_config();
00552 
00553    ast_cli_register_multiple(ais_cli, ARRAY_LEN(ais_cli));
00554 
00555    return 0;
00556 }
00557 
00558 int ast_ais_evt_unload_module(void)
00559 {
00560    SaAisErrorT ais_res;
00561 
00562    destroy_event_channels();
00563 
00564    ais_res = saEvtFinalize(evt_handle);
00565    if (ais_res != SA_AIS_OK) {
00566       ast_log(LOG_ERROR, "Problem stopping eventing service: %s\n", 
00567          ais_err2str(ais_res));
00568       return -1;
00569    }
00570 
00571    return 0;   
00572 }