Sun Oct 16 2011 08:41:31

Asterisk developer's documentation


bridge_multiplexed.c
Go to the documentation of this file.
00001 /*
00002  * Asterisk -- An open source telephony toolkit.
00003  *
00004  * Copyright (C) 2008, Digium, Inc.
00005  *
00006  * Joshua Colp <jcolp@digium.com>
00007  *
00008  * See http://www.asterisk.org for more information about
00009  * the Asterisk project. Please do not directly contact
00010  * any of the maintainers of this project for assistance;
00011  * the project provides a web site, mailing lists and IRC
00012  * channels for your use.
00013  *
00014  * This program is free software, distributed under the terms of
00015  * the GNU General Public License Version 2. See the LICENSE file
00016  * at the top of the source tree.
00017  */
00018 
00019 /*! \file
00020  *
00021  * \brief Two channel bridging module which groups bridges into batches of threads
00022  *
00023  * \author Joshua Colp <jcolp@digium.com>
00024  *
00025  * \ingroup bridges
00026  */
00027 
00028 /*** MODULEINFO
00029    <support_level>core</support_level>
00030  ***/
00031 
00032 #include "asterisk.h"
00033 
00034 ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
00035 
00036 #include <stdio.h>
00037 #include <stdlib.h>
00038 #include <string.h>
00039 #include <sys/types.h>
00040 #include <sys/stat.h>
00041 #include <fcntl.h>
00042 
00043 #include "asterisk/module.h"
00044 #include "asterisk/channel.h"
00045 #include "asterisk/bridging.h"
00046 #include "asterisk/bridging_technology.h"
00047 #include "asterisk/frame.h"
00048 #include "asterisk/astobj2.h"
00049 
00050 /*! \brief Number of buckets our multiplexed thread container can have */
00051 #define MULTIPLEXED_BUCKETS 53
00052 
00053 /*! \brief Number of channels we handle in a single thread */
00054 #define MULTIPLEXED_MAX_CHANNELS 8
00055 
00056 /*! \brief Structure which represents a single thread handling multiple 2 channel bridges */
00057 struct multiplexed_thread {
00058    /*! Thread itself */
00059    pthread_t thread;
00060    /*! Pipe used to wake up the multiplexed thread */
00061    int pipe[2];
00062    /*! Channels in this thread */
00063    struct ast_channel *chans[MULTIPLEXED_MAX_CHANNELS];
00064    /*! Number of channels in this thread */
00065    unsigned int count;
00066    /*! Bit used to indicate that the thread is waiting on channels */
00067    unsigned int waiting:1;
00068    /*! Number of channels actually being serviced by this thread */
00069    unsigned int service_count;
00070 };
00071 
00072 /*! \brief Container of all operating multiplexed threads */
00073 static struct ao2_container *multiplexed_threads;
00074 
00075 /*! \brief Callback function for finding a free multiplexed thread */
00076 static int find_multiplexed_thread(void *obj, void *arg, int flags)
00077 {
00078    struct multiplexed_thread *multiplexed_thread = obj;
00079    return (multiplexed_thread->count <= (MULTIPLEXED_MAX_CHANNELS - 2)) ? CMP_MATCH | CMP_STOP : 0;
00080 }
00081 
00082 /*! \brief Destroy callback for a multiplexed thread structure */
00083 static void destroy_multiplexed_thread(void *obj)
00084 {
00085    struct multiplexed_thread *multiplexed_thread = obj;
00086 
00087    if (multiplexed_thread->pipe[0] > -1) {
00088       close(multiplexed_thread->pipe[0]);
00089    }
00090    if (multiplexed_thread->pipe[1] > -1) {
00091       close(multiplexed_thread->pipe[1]);
00092    }
00093 
00094    return;
00095 }
00096 
00097 /*! \brief Create function which finds/reserves/references a multiplexed thread structure */
00098 static int multiplexed_bridge_create(struct ast_bridge *bridge)
00099 {
00100    struct multiplexed_thread *multiplexed_thread;
00101 
00102    ao2_lock(multiplexed_threads);
00103 
00104    /* Try to find an existing thread to handle our additional channels */
00105    if (!(multiplexed_thread = ao2_callback(multiplexed_threads, 0, find_multiplexed_thread, NULL))) {
00106       int flags;
00107 
00108       /* If we failed we will have to create a new one from scratch */
00109       if (!(multiplexed_thread = ao2_alloc(sizeof(*multiplexed_thread), destroy_multiplexed_thread))) {
00110          ast_debug(1, "Failed to find or create a new multiplexed thread for bridge '%p'\n", bridge);
00111          ao2_unlock(multiplexed_threads);
00112          return -1;
00113       }
00114 
00115       multiplexed_thread->pipe[0] = multiplexed_thread->pipe[1] = -1;
00116       /* Setup a pipe so we can poke the thread itself when needed */
00117       if (pipe(multiplexed_thread->pipe)) {
00118          ast_debug(1, "Failed to create a pipe for poking a multiplexed thread for bridge '%p'\n", bridge);
00119          ao2_ref(multiplexed_thread, -1);
00120          ao2_unlock(multiplexed_threads);
00121          return -1;
00122       }
00123 
00124       /* Setup each pipe for non-blocking operation */
00125       flags = fcntl(multiplexed_thread->pipe[0], F_GETFL);
00126       if (fcntl(multiplexed_thread->pipe[0], F_SETFL, flags | O_NONBLOCK) < 0) {
00127          ast_log(LOG_WARNING, "Failed to setup first nudge pipe for non-blocking operation on %p (%d: %s)\n", bridge, errno, strerror(errno));
00128          ao2_ref(multiplexed_thread, -1);
00129          ao2_unlock(multiplexed_threads);
00130          return -1;
00131       }
00132       flags = fcntl(multiplexed_thread->pipe[1], F_GETFL);
00133       if (fcntl(multiplexed_thread->pipe[1], F_SETFL, flags | O_NONBLOCK) < 0) {
00134          ast_log(LOG_WARNING, "Failed to setup second nudge pipe for non-blocking operation on %p (%d: %s)\n", bridge, errno, strerror(errno));
00135          ao2_ref(multiplexed_thread, -1);
00136          ao2_unlock(multiplexed_threads);
00137          return -1;
00138       }
00139 
00140       /* Set up default parameters */
00141       multiplexed_thread->thread = AST_PTHREADT_NULL;
00142 
00143       /* Finally link us into the container so others may find us */
00144       ao2_link(multiplexed_threads, multiplexed_thread);
00145       ast_debug(1, "Created multiplexed thread '%p' for bridge '%p'\n", multiplexed_thread, bridge);
00146    } else {
00147       ast_debug(1, "Found multiplexed thread '%p' for bridge '%p'\n", multiplexed_thread, bridge);
00148    }
00149 
00150    /* Bump the count of the thread structure up by two since the channels for this bridge will be joining shortly */
00151    multiplexed_thread->count += 2;
00152 
00153    ao2_unlock(multiplexed_threads);
00154 
00155    bridge->bridge_pvt = multiplexed_thread;
00156 
00157    return 0;
00158 }
00159 
00160 /*! \brief Internal function which nudges the thread */
00161 static void multiplexed_nudge(struct multiplexed_thread *multiplexed_thread)
00162 {
00163    int nudge = 0;
00164 
00165    if (multiplexed_thread->thread == AST_PTHREADT_NULL) {
00166       return;
00167    }
00168 
00169    if (write(multiplexed_thread->pipe[1], &nudge, sizeof(nudge)) != sizeof(nudge)) {
00170       ast_log(LOG_ERROR, "We couldn't poke multiplexed thread '%p'... something is VERY wrong\n", multiplexed_thread);
00171    }
00172 
00173    while (multiplexed_thread->waiting) {
00174       sched_yield();
00175    }
00176 
00177    return;
00178 }
00179 
00180 /*! \brief Destroy function which unreserves/unreferences/removes a multiplexed thread structure */
00181 static int multiplexed_bridge_destroy(struct ast_bridge *bridge)
00182 {
00183    struct multiplexed_thread *multiplexed_thread = bridge->bridge_pvt;
00184 
00185    ao2_lock(multiplexed_threads);
00186 
00187    multiplexed_thread->count -= 2;
00188 
00189    if (!multiplexed_thread->count) {
00190       ast_debug(1, "Unlinking multiplexed thread '%p' since nobody is using it anymore\n", multiplexed_thread);
00191       ao2_unlink(multiplexed_threads, multiplexed_thread);
00192    }
00193 
00194    multiplexed_nudge(multiplexed_thread);
00195 
00196    ao2_unlock(multiplexed_threads);
00197 
00198    ao2_ref(multiplexed_thread, -1);
00199 
00200    return 0;
00201 }
00202 
00203 /*! \brief Thread function that executes for multiplexed threads */
00204 static void *multiplexed_thread_function(void *data)
00205 {
00206    struct multiplexed_thread *multiplexed_thread = data;
00207    int fds = multiplexed_thread->pipe[0];
00208 
00209    ao2_lock(multiplexed_thread);
00210 
00211    ast_debug(1, "Starting actual thread for multiplexed thread '%p'\n", multiplexed_thread);
00212 
00213    while (multiplexed_thread->thread != AST_PTHREADT_STOP) {
00214       struct ast_channel *winner = NULL, *first = multiplexed_thread->chans[0];
00215       int to = -1, outfd = -1;
00216 
00217       /* Move channels around so not just the first one gets priority */
00218       memmove(multiplexed_thread->chans, multiplexed_thread->chans + 1, sizeof(struct ast_channel *) * (multiplexed_thread->service_count - 1));
00219       multiplexed_thread->chans[multiplexed_thread->service_count - 1] = first;
00220 
00221       multiplexed_thread->waiting = 1;
00222       ao2_unlock(multiplexed_thread);
00223       winner = ast_waitfor_nandfds(multiplexed_thread->chans, multiplexed_thread->service_count, &fds, 1, NULL, &outfd, &to);
00224       multiplexed_thread->waiting = 0;
00225       ao2_lock(multiplexed_thread);
00226 
00227       if (outfd > -1) {
00228          int nudge;
00229 
00230          if (read(multiplexed_thread->pipe[0], &nudge, sizeof(nudge)) < 0) {
00231             if (errno != EINTR && errno != EAGAIN) {
00232                ast_log(LOG_WARNING, "read() failed for pipe on multiplexed thread '%p': %s\n", multiplexed_thread, strerror(errno));
00233             }
00234          }
00235       }
00236       if (winner && winner->bridge) {
00237          ast_bridge_handle_trip(winner->bridge, NULL, winner, -1);
00238       }
00239    }
00240 
00241    multiplexed_thread->thread = AST_PTHREADT_NULL;
00242 
00243    ast_debug(1, "Stopping actual thread for multiplexed thread '%p'\n", multiplexed_thread);
00244 
00245    ao2_unlock(multiplexed_thread);
00246    ao2_ref(multiplexed_thread, -1);
00247 
00248    return NULL;
00249 }
00250 
00251 /*! \brief Helper function which adds or removes a channel and nudges the thread */
00252 static void multiplexed_add_or_remove(struct multiplexed_thread *multiplexed_thread, struct ast_channel *chan, int add)
00253 {
00254    int i, removed = 0;
00255    pthread_t thread = AST_PTHREADT_NULL;
00256 
00257    ao2_lock(multiplexed_thread);
00258 
00259    multiplexed_nudge(multiplexed_thread);
00260 
00261    for (i = 0; i < MULTIPLEXED_MAX_CHANNELS; i++) {
00262       if (multiplexed_thread->chans[i] == chan) {
00263          if (!add) {
00264             multiplexed_thread->chans[i] = NULL;
00265             multiplexed_thread->service_count--;
00266             removed = 1;
00267          }
00268          break;
00269       } else if (!multiplexed_thread->chans[i] && add) {
00270          multiplexed_thread->chans[i] = chan;
00271          multiplexed_thread->service_count++;
00272          break;
00273       }
00274    }
00275 
00276    if (multiplexed_thread->service_count && multiplexed_thread->thread == AST_PTHREADT_NULL) {
00277       ao2_ref(multiplexed_thread, +1);
00278       if (ast_pthread_create(&multiplexed_thread->thread, NULL, multiplexed_thread_function, multiplexed_thread)) {
00279          ao2_ref(multiplexed_thread, -1);
00280          ast_debug(1, "Failed to create an actual thread for multiplexed thread '%p', trying next time\n", multiplexed_thread);
00281       }
00282    } else if (!multiplexed_thread->service_count && multiplexed_thread->thread != AST_PTHREADT_NULL) {
00283       thread = multiplexed_thread->thread;
00284       multiplexed_thread->thread = AST_PTHREADT_STOP;
00285    } else if (!add && removed) {
00286       memmove(multiplexed_thread->chans + i, multiplexed_thread->chans + i + 1, sizeof(struct ast_channel *) * (MULTIPLEXED_MAX_CHANNELS - (i + 1)));
00287    }
00288 
00289    ao2_unlock(multiplexed_thread);
00290 
00291    if (thread != AST_PTHREADT_NULL) {
00292       pthread_join(thread, NULL);
00293    }
00294 
00295    return;
00296 }
00297 
00298 /*! \brief Join function which actually adds the channel into the array to be monitored */
00299 static int multiplexed_bridge_join(struct ast_bridge *bridge, struct ast_bridge_channel *bridge_channel)
00300 {
00301    struct ast_channel *c0 = AST_LIST_FIRST(&bridge->channels)->chan, *c1 = AST_LIST_LAST(&bridge->channels)->chan;
00302    struct multiplexed_thread *multiplexed_thread = bridge->bridge_pvt;
00303 
00304    ast_debug(1, "Adding channel '%s' to multiplexed thread '%p' for monitoring\n", bridge_channel->chan->name, multiplexed_thread);
00305 
00306    multiplexed_add_or_remove(multiplexed_thread, bridge_channel->chan, 1);
00307 
00308    /* If the second channel has not yet joined do not make things compatible */
00309    if (c0 == c1) {
00310       return 0;
00311    }
00312 
00313    if (((c0->writeformat == c1->readformat) && (c0->readformat == c1->writeformat) && (c0->nativeformats == c1->nativeformats))) {
00314       return 0;
00315    }
00316 
00317    return ast_channel_make_compatible(c0, c1);
00318 }
00319 
00320 /*! \brief Leave function which actually removes the channel from the array */
00321 static int multiplexed_bridge_leave(struct ast_bridge *bridge, struct ast_bridge_channel *bridge_channel)
00322 {
00323    struct multiplexed_thread *multiplexed_thread = bridge->bridge_pvt;
00324 
00325    ast_debug(1, "Removing channel '%s' from multiplexed thread '%p'\n", bridge_channel->chan->name, multiplexed_thread);
00326 
00327    multiplexed_add_or_remove(multiplexed_thread, bridge_channel->chan, 0);
00328 
00329    return 0;
00330 }
00331 
00332 /*! \brief Suspend function which means control of the channel is going elsewhere */
00333 static void multiplexed_bridge_suspend(struct ast_bridge *bridge, struct ast_bridge_channel *bridge_channel)
00334 {
00335    struct multiplexed_thread *multiplexed_thread = bridge->bridge_pvt;
00336 
00337    ast_debug(1, "Suspending channel '%s' from multiplexed thread '%p'\n", bridge_channel->chan->name, multiplexed_thread);
00338 
00339    multiplexed_add_or_remove(multiplexed_thread, bridge_channel->chan, 0);
00340 
00341    return;
00342 }
00343 
00344 /*! \brief Unsuspend function which means control of the channel is coming back to us */
00345 static void multiplexed_bridge_unsuspend(struct ast_bridge *bridge, struct ast_bridge_channel *bridge_channel)
00346 {
00347    struct multiplexed_thread *multiplexed_thread = bridge->bridge_pvt;
00348 
00349    ast_debug(1, "Unsuspending channel '%s' from multiplexed thread '%p'\n", bridge_channel->chan->name, multiplexed_thread);
00350 
00351    multiplexed_add_or_remove(multiplexed_thread, bridge_channel->chan, 1);
00352 
00353    return;
00354 }
00355 
00356 /*! \brief Write function for writing frames into the bridge */
00357 static enum ast_bridge_write_result multiplexed_bridge_write(struct ast_bridge *bridge, struct ast_bridge_channel *bridge_channel, struct ast_frame *frame)
00358 {
00359    struct ast_bridge_channel *other;
00360 
00361    if (AST_LIST_FIRST(&bridge->channels) == AST_LIST_LAST(&bridge->channels)) {
00362       return AST_BRIDGE_WRITE_FAILED;
00363    }
00364 
00365    if (!(other = (AST_LIST_FIRST(&bridge->channels) == bridge_channel ? AST_LIST_LAST(&bridge->channels) : AST_LIST_FIRST(&bridge->channels)))) {
00366       return AST_BRIDGE_WRITE_FAILED;
00367    }
00368 
00369    if (other->state == AST_BRIDGE_CHANNEL_STATE_WAIT) {
00370       ast_write(other->chan, frame);
00371    }
00372 
00373    return AST_BRIDGE_WRITE_SUCCESS;
00374 }
00375 
00376 static struct ast_bridge_technology multiplexed_bridge = {
00377    .name = "multiplexed_bridge",
00378    .capabilities = AST_BRIDGE_CAPABILITY_1TO1MIX,
00379    .preference = AST_BRIDGE_PREFERENCE_HIGH,
00380    .formats = AST_FORMAT_AUDIO_MASK | AST_FORMAT_VIDEO_MASK | AST_FORMAT_TEXT_MASK,
00381    .create = multiplexed_bridge_create,
00382    .destroy = multiplexed_bridge_destroy,
00383    .join = multiplexed_bridge_join,
00384    .leave = multiplexed_bridge_leave,
00385    .suspend = multiplexed_bridge_suspend,
00386    .unsuspend = multiplexed_bridge_unsuspend,
00387    .write = multiplexed_bridge_write,
00388 };
00389 
00390 static int unload_module(void)
00391 {
00392    int res = ast_bridge_technology_unregister(&multiplexed_bridge);
00393 
00394    ao2_ref(multiplexed_threads, -1);
00395 
00396    return res;
00397 }
00398 
00399 static int load_module(void)
00400 {
00401    if (!(multiplexed_threads = ao2_container_alloc(MULTIPLEXED_BUCKETS, NULL, NULL))) {
00402       return AST_MODULE_LOAD_DECLINE;
00403    }
00404 
00405    return ast_bridge_technology_register(&multiplexed_bridge);
00406 }
00407 
00408 AST_MODULE_INFO_STANDARD(ASTERISK_GPL_KEY, "Multiplexed two channel bridging module");