00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031 #include "asterisk.h"
00032
00033 ASTERISK_FILE_VERSION(__FILE__, "$Revision: 184632 $");
00034
00035 #include <stdlib.h>
00036 #include <stdio.h>
00037 #include <string.h>
00038 #include <unistd.h>
00039 #include <errno.h>
00040
00041 #include "ais.h"
00042
00043 #include "asterisk/module.h"
00044 #include "asterisk/utils.h"
00045 #include "asterisk/cli.h"
00046 #include "asterisk/logger.h"
00047 #include "asterisk/event.h"
00048 #include "asterisk/config.h"
00049 #include "asterisk/linkedlists.h"
00050 #include "asterisk/devicestate.h"
00051
00052 #ifndef AST_MODULE
00053
00054 #define AST_MODULE "res_ais"
00055 #endif
00056
00057 SaEvtHandleT evt_handle;
00058
00059 void evt_channel_open_cb(SaInvocationT invocation, SaEvtChannelHandleT channel_handle,
00060 SaAisErrorT error);
00061 void evt_event_deliver_cb(SaEvtSubscriptionIdT subscription_id,
00062 const SaEvtEventHandleT event_handle, const SaSizeT event_datalen);
00063
00064 static const SaEvtCallbacksT evt_callbacks = {
00065 .saEvtChannelOpenCallback = evt_channel_open_cb,
00066 .saEvtEventDeliverCallback = evt_event_deliver_cb,
00067 };
00068
00069 static const struct {
00070 const char *str;
00071 enum ast_event_type type;
00072 } supported_event_types[] = {
00073 { "mwi", AST_EVENT_MWI },
00074 { "device_state", AST_EVENT_DEVICE_STATE_CHANGE },
00075 };
00076
00077
00078 static int unique_id;
00079
00080 struct subscribe_event {
00081 AST_LIST_ENTRY(subscribe_event) entry;
00082
00083
00084
00085 SaEvtSubscriptionIdT id;
00086 enum ast_event_type type;
00087 };
00088
00089 struct publish_event {
00090 AST_LIST_ENTRY(publish_event) entry;
00091
00092
00093 struct ast_event_sub *sub;
00094 enum ast_event_type type;
00095 };
00096
00097 struct event_channel {
00098 AST_RWLIST_ENTRY(event_channel) entry;
00099 AST_LIST_HEAD_NOLOCK(, subscribe_event) subscribe_events;
00100 AST_LIST_HEAD_NOLOCK(, publish_event) publish_events;
00101 SaEvtChannelHandleT handle;
00102 char name[1];
00103 };
00104
00105 static AST_RWLIST_HEAD_STATIC(event_channels, event_channel);
00106
00107 void evt_channel_open_cb(SaInvocationT invocation, SaEvtChannelHandleT channel_handle,
00108 SaAisErrorT error)
00109 {
00110
00111 }
00112
00113 static void queue_event(struct ast_event *ast_event)
00114 {
00115 ast_event_queue_and_cache(ast_event);
00116 }
00117
00118 void evt_event_deliver_cb(SaEvtSubscriptionIdT sub_id,
00119 const SaEvtEventHandleT event_handle, const SaSizeT event_datalen)
00120 {
00121
00122
00123
00124
00125 static unsigned char buf[4096];
00126 struct ast_event *event_dup, *event = (void *) buf;
00127 SaAisErrorT ais_res;
00128 SaSizeT len = sizeof(buf);
00129
00130 if (event_datalen > len) {
00131 ast_log(LOG_ERROR, "Event received with size %u, which is too big\n"
00132 "for the allocated size %u. Change the code to increase the size.\n",
00133 (unsigned int) event_datalen, (unsigned int) len);
00134 return;
00135 }
00136
00137 ais_res = saEvtEventDataGet(event_handle, event, &len);
00138 if (ais_res != SA_AIS_OK) {
00139 ast_log(LOG_ERROR, "Error retrieving event payload: %s\n",
00140 ais_err2str(ais_res));
00141 return;
00142 }
00143
00144 if (!ast_eid_cmp(&ast_eid_default, ast_event_get_ie_raw(event, AST_EVENT_IE_EID))) {
00145
00146 return;
00147 }
00148
00149 if (!(event_dup = ast_malloc(len)))
00150 return;
00151
00152 memcpy(event_dup, event, len);
00153
00154 queue_event(event_dup);
00155 }
00156
00157 static const char *type_to_filter_str(enum ast_event_type type)
00158 {
00159 const char *filter_str = NULL;
00160 int i;
00161
00162 for (i = 0; i < ARRAY_LEN(supported_event_types); i++) {
00163 if (supported_event_types[i].type == type) {
00164 filter_str = supported_event_types[i].str;
00165 break;
00166 }
00167 }
00168
00169 return filter_str;
00170 }
00171
00172 static void ast_event_cb(const struct ast_event *ast_event, void *data)
00173 {
00174 SaEvtEventHandleT event_handle;
00175 SaAisErrorT ais_res;
00176 struct event_channel *event_channel = data;
00177 SaClmClusterNodeT local_node;
00178 SaEvtEventPatternArrayT pattern_array;
00179 SaEvtEventPatternT pattern;
00180 SaSizeT len;
00181 const char *filter_str;
00182 SaEvtEventIdT event_id;
00183
00184 ast_log(LOG_DEBUG, "Got an event to forward\n");
00185
00186 if (ast_eid_cmp(&ast_eid_default, ast_event_get_ie_raw(ast_event, AST_EVENT_IE_EID))) {
00187
00188 ast_log(LOG_DEBUG, "Returning here\n");
00189 return;
00190 }
00191
00192 ais_res = saEvtEventAllocate(event_channel->handle, &event_handle);
00193 if (ais_res != SA_AIS_OK) {
00194 ast_log(LOG_ERROR, "Error allocating event: %s\n", ais_err2str(ais_res));
00195 ast_log(LOG_DEBUG, "Returning here\n");
00196 return;
00197 }
00198
00199 ais_res = saClmClusterNodeGet(clm_handle, SA_CLM_LOCAL_NODE_ID,
00200 SA_TIME_ONE_SECOND, &local_node);
00201 if (ais_res != SA_AIS_OK) {
00202 ast_log(LOG_ERROR, "Error getting local node name: %s\n", ais_err2str(ais_res));
00203 goto return_event_free;
00204 }
00205
00206 filter_str = type_to_filter_str(ast_event_get_type(ast_event));
00207 len = strlen(filter_str) + 1;
00208 pattern.pattern = (SaUint8T *) filter_str;
00209 pattern.patternSize = len;
00210 pattern.allocatedSize = len;
00211
00212 pattern_array.allocatedNumber = 1;
00213 pattern_array.patternsNumber = 1;
00214 pattern_array.patterns = &pattern;
00215
00216
00217
00218
00219
00220 ais_res = saEvtEventAttributesSet(event_handle, &pattern_array,
00221 SA_EVT_LOWEST_PRIORITY, SA_TIME_ONE_MINUTE, &local_node.nodeName);
00222 if (ais_res != SA_AIS_OK) {
00223 ast_log(LOG_ERROR, "Error setting event attributes: %s\n", ais_err2str(ais_res));
00224 goto return_event_free;
00225 }
00226
00227 ais_res = saEvtEventPublish(event_handle,
00228 ast_event, ast_event_get_size(ast_event), &event_id);
00229 if (ais_res != SA_AIS_OK) {
00230 ast_log(LOG_ERROR, "Error publishing event: %s\n", ais_err2str(ais_res));
00231 goto return_event_free;
00232 }
00233
00234 return_event_free:
00235 ais_res = saEvtEventFree(event_handle);
00236 if (ais_res != SA_AIS_OK) {
00237 ast_log(LOG_ERROR, "Error freeing allocated event: %s\n", ais_err2str(ais_res));
00238 }
00239 ast_log(LOG_DEBUG, "Returning here (event_free)\n");
00240 }
00241
00242 static char *ais_evt_show_event_channels(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
00243 {
00244 struct event_channel *event_channel;
00245
00246 switch (cmd) {
00247 case CLI_INIT:
00248 e->command = "ais show evt event channels";
00249 e->usage =
00250 "Usage: ais show evt event channels\n"
00251 " List configured event channels for the (EVT) Eventing service.\n";
00252 return NULL;
00253
00254 case CLI_GENERATE:
00255 return NULL;
00256 }
00257
00258 if (a->argc != e->args)
00259 return CLI_SHOWUSAGE;
00260
00261 ast_cli(a->fd, "\n"
00262 "=============================================================\n"
00263 "=== Event Channels ==========================================\n"
00264 "=============================================================\n"
00265 "===\n");
00266
00267 AST_RWLIST_RDLOCK(&event_channels);
00268 AST_RWLIST_TRAVERSE(&event_channels, event_channel, entry) {
00269 struct publish_event *publish_event;
00270 struct subscribe_event *subscribe_event;
00271
00272 ast_cli(a->fd, "=== ---------------------------------------------------------\n"
00273 "=== Event Channel Name: %s\n", event_channel->name);
00274
00275 AST_LIST_TRAVERSE(&event_channel->publish_events, publish_event, entry) {
00276 ast_cli(a->fd, "=== ==> Publishing Event Type: %s\n",
00277 type_to_filter_str(publish_event->type));
00278 }
00279
00280 AST_LIST_TRAVERSE(&event_channel->subscribe_events, subscribe_event, entry) {
00281 ast_cli(a->fd, "=== ==> Subscribing to Event Type: %s\n",
00282 type_to_filter_str(subscribe_event->type));
00283 }
00284
00285 ast_cli(a->fd, "=== ---------------------------------------------------------\n"
00286 "===\n");
00287 }
00288 AST_RWLIST_UNLOCK(&event_channels);
00289
00290 ast_cli(a->fd, "=============================================================\n"
00291 "\n");
00292
00293 return CLI_SUCCESS;
00294 }
00295
00296 static struct ast_cli_entry ais_cli[] = {
00297 AST_CLI_DEFINE(ais_evt_show_event_channels, "Show configured event channels"),
00298 };
00299
00300 static void add_publish_event(struct event_channel *event_channel, const char *event_type)
00301 {
00302 int i;
00303 enum ast_event_type type = -1;
00304 struct publish_event *publish_event;
00305
00306 for (i = 0; i < ARRAY_LEN(supported_event_types); i++) {
00307 if (!strcasecmp(event_type, supported_event_types[i].str)) {
00308 type = supported_event_types[i].type;
00309 break;
00310 }
00311 }
00312
00313 if (type == -1) {
00314 ast_log(LOG_WARNING, "publish_event option given with invalid value '%s'\n", event_type);
00315 return;
00316 }
00317
00318 if (type == AST_EVENT_DEVICE_STATE_CHANGE && ast_enable_distributed_devstate()) {
00319 return;
00320 }
00321
00322 if (!(publish_event = ast_calloc(1, sizeof(*publish_event)))) {
00323 return;
00324 }
00325
00326 publish_event->type = type;
00327 ast_log(LOG_DEBUG, "Subscribing to event type %d\n", type);
00328 publish_event->sub = ast_event_subscribe(type, ast_event_cb, event_channel,
00329 AST_EVENT_IE_END);
00330 ast_event_dump_cache(publish_event->sub);
00331
00332 AST_LIST_INSERT_TAIL(&event_channel->publish_events, publish_event, entry);
00333 }
00334
00335 static SaAisErrorT set_egress_subscription(struct event_channel *event_channel,
00336 struct subscribe_event *subscribe_event)
00337 {
00338 SaAisErrorT ais_res;
00339 SaEvtEventFilterArrayT filter_array;
00340 SaEvtEventFilterT filter;
00341 const char *filter_str = NULL;
00342 SaSizeT len;
00343
00344
00345 filter_str = type_to_filter_str(subscribe_event->type);
00346
00347 filter.filterType = SA_EVT_EXACT_FILTER;
00348 len = strlen(filter_str) + 1;
00349 filter.filter.allocatedSize = len;
00350 filter.filter.patternSize = len;
00351 filter.filter.pattern = (SaUint8T *) filter_str;
00352
00353 filter_array.filtersNumber = 1;
00354 filter_array.filters = &filter;
00355
00356 ais_res = saEvtEventSubscribe(event_channel->handle, &filter_array,
00357 subscribe_event->id);
00358
00359 return ais_res;
00360 }
00361
00362 static void add_subscribe_event(struct event_channel *event_channel, const char *event_type)
00363 {
00364 int i;
00365 enum ast_event_type type = -1;
00366 struct subscribe_event *subscribe_event;
00367 SaAisErrorT ais_res;
00368
00369 for (i = 0; i < ARRAY_LEN(supported_event_types); i++) {
00370 if (!strcasecmp(event_type, supported_event_types[i].str)) {
00371 type = supported_event_types[i].type;
00372 break;
00373 }
00374 }
00375
00376 if (type == -1) {
00377 ast_log(LOG_WARNING, "subscribe_event option given with invalid value '%s'\n", event_type);
00378 return;
00379 }
00380
00381 if (type == AST_EVENT_DEVICE_STATE_CHANGE && ast_enable_distributed_devstate()) {
00382 return;
00383 }
00384
00385 if (!(subscribe_event = ast_calloc(1, sizeof(*subscribe_event)))) {
00386 return;
00387 }
00388
00389 subscribe_event->type = type;
00390 subscribe_event->id = ast_atomic_fetchadd_int(&unique_id, +1);
00391
00392 ais_res = set_egress_subscription(event_channel, subscribe_event);
00393 if (ais_res != SA_AIS_OK) {
00394 ast_log(LOG_ERROR, "Error setting up egress subscription: %s\n",
00395 ais_err2str(ais_res));
00396 free(subscribe_event);
00397 return;
00398 }
00399
00400 AST_LIST_INSERT_TAIL(&event_channel->subscribe_events, subscribe_event, entry);
00401 }
00402
00403 static void build_event_channel(struct ast_config *cfg, const char *cat)
00404 {
00405 struct ast_variable *var;
00406 struct event_channel *event_channel;
00407 SaAisErrorT ais_res;
00408 SaNameT sa_name = { 0, };
00409
00410 AST_RWLIST_WRLOCK(&event_channels);
00411 AST_RWLIST_TRAVERSE(&event_channels, event_channel, entry) {
00412 if (!strcasecmp(event_channel->name, cat))
00413 break;
00414 }
00415 AST_RWLIST_UNLOCK(&event_channels);
00416 if (event_channel) {
00417 ast_log(LOG_WARNING, "Event channel '%s' was specified twice in "
00418 "configuration. Second instance ignored.\n", cat);
00419 return;
00420 }
00421
00422 if (!(event_channel = ast_calloc(1, sizeof(*event_channel) + strlen(cat))))
00423 return;
00424
00425 strcpy(event_channel->name, cat);
00426 ast_copy_string((char *) sa_name.value, cat, sizeof(sa_name.value));
00427 sa_name.length = strlen((char *) sa_name.value);
00428 ais_res = saEvtChannelOpen(evt_handle, &sa_name,
00429 SA_EVT_CHANNEL_PUBLISHER | SA_EVT_CHANNEL_SUBSCRIBER | SA_EVT_CHANNEL_CREATE,
00430 SA_TIME_MAX, &event_channel->handle);
00431 if (ais_res != SA_AIS_OK) {
00432 ast_log(LOG_ERROR, "Error opening event channel: %s\n", ais_err2str(ais_res));
00433 free(event_channel);
00434 return;
00435 }
00436
00437 for (var = ast_variable_browse(cfg, cat); var; var = var->next) {
00438 if (!strcasecmp(var->name, "type")) {
00439 continue;
00440 } else if (!strcasecmp(var->name, "publish_event")) {
00441 add_publish_event(event_channel, var->value);
00442 } else if (!strcasecmp(var->name, "subscribe_event")) {
00443 add_subscribe_event(event_channel, var->value);
00444 } else {
00445 ast_log(LOG_WARNING, "Event channel '%s' contains invalid option '%s'\n",
00446 event_channel->name, var->name);
00447 }
00448 }
00449
00450 AST_RWLIST_WRLOCK(&event_channels);
00451 AST_RWLIST_INSERT_TAIL(&event_channels, event_channel, entry);
00452 AST_RWLIST_UNLOCK(&event_channels);
00453 }
00454
00455 static void load_config(void)
00456 {
00457 static const char filename[] = "ais.conf";
00458 struct ast_config *cfg;
00459 const char *cat = NULL;
00460 struct ast_flags config_flags = { 0 };
00461
00462 if (!(cfg = ast_config_load(filename, config_flags)) || cfg == CONFIG_STATUS_FILEINVALID)
00463 return;
00464
00465 while ((cat = ast_category_browse(cfg, cat))) {
00466 const char *type;
00467
00468 if (!strcasecmp(cat, "general"))
00469 continue;
00470
00471 if (!(type = ast_variable_retrieve(cfg, cat, "type"))) {
00472 ast_log(LOG_WARNING, "Invalid entry in %s defined with no type!\n",
00473 filename);
00474 continue;
00475 }
00476
00477 if (!strcasecmp(type, "event_channel")) {
00478 build_event_channel(cfg, cat);
00479 } else {
00480 ast_log(LOG_WARNING, "Entry in %s defined with invalid type '%s'\n",
00481 filename, type);
00482 }
00483 }
00484
00485 ast_config_destroy(cfg);
00486 }
00487
00488 static void publish_event_destroy(struct publish_event *publish_event)
00489 {
00490 ast_event_unsubscribe(publish_event->sub);
00491
00492 free(publish_event);
00493 }
00494
00495 static void subscribe_event_destroy(const struct event_channel *event_channel,
00496 struct subscribe_event *subscribe_event)
00497 {
00498 SaAisErrorT ais_res;
00499
00500
00501
00502 ais_res = saEvtEventUnsubscribe(event_channel->handle, subscribe_event->id);
00503 if (ais_res != SA_AIS_OK) {
00504 ast_log(LOG_ERROR, "Error unsubscribing: %s\n", ais_err2str(ais_res));
00505 }
00506
00507 free(subscribe_event);
00508 }
00509
00510 static void event_channel_destroy(struct event_channel *event_channel)
00511 {
00512 struct publish_event *publish_event;
00513 struct subscribe_event *subscribe_event;
00514 SaAisErrorT ais_res;
00515
00516 while ((publish_event = AST_LIST_REMOVE_HEAD(&event_channel->publish_events, entry)))
00517 publish_event_destroy(publish_event);
00518 while ((subscribe_event = AST_LIST_REMOVE_HEAD(&event_channel->subscribe_events, entry)))
00519 subscribe_event_destroy(event_channel, subscribe_event);
00520
00521 ais_res = saEvtChannelClose(event_channel->handle);
00522 if (ais_res != SA_AIS_OK) {
00523 ast_log(LOG_ERROR, "Error closing event channel '%s': %s\n",
00524 event_channel->name, ais_err2str(ais_res));
00525 }
00526
00527 free(event_channel);
00528 }
00529
00530 static void destroy_event_channels(void)
00531 {
00532 struct event_channel *event_channel;
00533
00534 AST_RWLIST_WRLOCK(&event_channels);
00535 while ((event_channel = AST_RWLIST_REMOVE_HEAD(&event_channels, entry)))
00536 event_channel_destroy(event_channel);
00537 AST_RWLIST_UNLOCK(&event_channels);
00538 }
00539
00540 int ast_ais_evt_load_module(void)
00541 {
00542 SaAisErrorT ais_res;
00543
00544 ais_res = saEvtInitialize(&evt_handle, &evt_callbacks, &ais_version);
00545 if (ais_res != SA_AIS_OK) {
00546 ast_log(LOG_ERROR, "Could not initialize eventing service: %s\n",
00547 ais_err2str(ais_res));
00548 return -1;
00549 }
00550
00551 load_config();
00552
00553 ast_cli_register_multiple(ais_cli, ARRAY_LEN(ais_cli));
00554
00555 return 0;
00556 }
00557
00558 int ast_ais_evt_unload_module(void)
00559 {
00560 SaAisErrorT ais_res;
00561
00562 destroy_event_channels();
00563
00564 ais_res = saEvtFinalize(evt_handle);
00565 if (ais_res != SA_AIS_OK) {
00566 ast_log(LOG_ERROR, "Problem stopping eventing service: %s\n",
00567 ais_err2str(ais_res));
00568 return -1;
00569 }
00570
00571 return 0;
00572 }