00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032 #include "asterisk.h"
00033
00034 ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
00035
00036 #include <stdio.h>
00037 #include <stdlib.h>
00038 #include <string.h>
00039 #include <sys/types.h>
00040 #include <sys/stat.h>
00041 #include <fcntl.h>
00042
00043 #include "asterisk/module.h"
00044 #include "asterisk/channel.h"
00045 #include "asterisk/bridging.h"
00046 #include "asterisk/bridging_technology.h"
00047 #include "asterisk/frame.h"
00048 #include "asterisk/astobj2.h"
00049
00050
00051 #define MULTIPLEXED_BUCKETS 53
00052
00053
00054 #define MULTIPLEXED_MAX_CHANNELS 8
00055
00056
00057 struct multiplexed_thread {
00058
00059 pthread_t thread;
00060
00061 int pipe[2];
00062
00063 struct ast_channel *chans[MULTIPLEXED_MAX_CHANNELS];
00064
00065 unsigned int count;
00066
00067 unsigned int waiting:1;
00068
00069 unsigned int service_count;
00070 };
00071
00072
00073 static struct ao2_container *multiplexed_threads;
00074
00075
00076 static int find_multiplexed_thread(void *obj, void *arg, int flags)
00077 {
00078 struct multiplexed_thread *multiplexed_thread = obj;
00079 return (multiplexed_thread->count <= (MULTIPLEXED_MAX_CHANNELS - 2)) ? CMP_MATCH | CMP_STOP : 0;
00080 }
00081
00082
00083 static void destroy_multiplexed_thread(void *obj)
00084 {
00085 struct multiplexed_thread *multiplexed_thread = obj;
00086
00087 if (multiplexed_thread->pipe[0] > -1) {
00088 close(multiplexed_thread->pipe[0]);
00089 }
00090 if (multiplexed_thread->pipe[1] > -1) {
00091 close(multiplexed_thread->pipe[1]);
00092 }
00093
00094 return;
00095 }
00096
00097
00098 static int multiplexed_bridge_create(struct ast_bridge *bridge)
00099 {
00100 struct multiplexed_thread *multiplexed_thread;
00101
00102 ao2_lock(multiplexed_threads);
00103
00104
00105 if (!(multiplexed_thread = ao2_callback(multiplexed_threads, 0, find_multiplexed_thread, NULL))) {
00106 int flags;
00107
00108
00109 if (!(multiplexed_thread = ao2_alloc(sizeof(*multiplexed_thread), destroy_multiplexed_thread))) {
00110 ast_debug(1, "Failed to find or create a new multiplexed thread for bridge '%p'\n", bridge);
00111 ao2_unlock(multiplexed_threads);
00112 return -1;
00113 }
00114
00115 multiplexed_thread->pipe[0] = multiplexed_thread->pipe[1] = -1;
00116
00117 if (pipe(multiplexed_thread->pipe)) {
00118 ast_debug(1, "Failed to create a pipe for poking a multiplexed thread for bridge '%p'\n", bridge);
00119 ao2_ref(multiplexed_thread, -1);
00120 ao2_unlock(multiplexed_threads);
00121 return -1;
00122 }
00123
00124
00125 flags = fcntl(multiplexed_thread->pipe[0], F_GETFL);
00126 if (fcntl(multiplexed_thread->pipe[0], F_SETFL, flags | O_NONBLOCK) < 0) {
00127 ast_log(LOG_WARNING, "Failed to setup first nudge pipe for non-blocking operation on %p (%d: %s)\n", bridge, errno, strerror(errno));
00128 ao2_ref(multiplexed_thread, -1);
00129 ao2_unlock(multiplexed_threads);
00130 return -1;
00131 }
00132 flags = fcntl(multiplexed_thread->pipe[1], F_GETFL);
00133 if (fcntl(multiplexed_thread->pipe[1], F_SETFL, flags | O_NONBLOCK) < 0) {
00134 ast_log(LOG_WARNING, "Failed to setup second nudge pipe for non-blocking operation on %p (%d: %s)\n", bridge, errno, strerror(errno));
00135 ao2_ref(multiplexed_thread, -1);
00136 ao2_unlock(multiplexed_threads);
00137 return -1;
00138 }
00139
00140
00141 multiplexed_thread->thread = AST_PTHREADT_NULL;
00142
00143
00144 ao2_link(multiplexed_threads, multiplexed_thread);
00145 ast_debug(1, "Created multiplexed thread '%p' for bridge '%p'\n", multiplexed_thread, bridge);
00146 } else {
00147 ast_debug(1, "Found multiplexed thread '%p' for bridge '%p'\n", multiplexed_thread, bridge);
00148 }
00149
00150
00151 multiplexed_thread->count += 2;
00152
00153 ao2_unlock(multiplexed_threads);
00154
00155 bridge->bridge_pvt = multiplexed_thread;
00156
00157 return 0;
00158 }
00159
00160
00161 static void multiplexed_nudge(struct multiplexed_thread *multiplexed_thread)
00162 {
00163 int nudge = 0;
00164
00165 if (multiplexed_thread->thread == AST_PTHREADT_NULL) {
00166 return;
00167 }
00168
00169 if (write(multiplexed_thread->pipe[1], &nudge, sizeof(nudge)) != sizeof(nudge)) {
00170 ast_log(LOG_ERROR, "We couldn't poke multiplexed thread '%p'... something is VERY wrong\n", multiplexed_thread);
00171 }
00172
00173 while (multiplexed_thread->waiting) {
00174 sched_yield();
00175 }
00176
00177 return;
00178 }
00179
00180
00181 static int multiplexed_bridge_destroy(struct ast_bridge *bridge)
00182 {
00183 struct multiplexed_thread *multiplexed_thread = bridge->bridge_pvt;
00184
00185 ao2_lock(multiplexed_threads);
00186
00187 multiplexed_thread->count -= 2;
00188
00189 if (!multiplexed_thread->count) {
00190 ast_debug(1, "Unlinking multiplexed thread '%p' since nobody is using it anymore\n", multiplexed_thread);
00191 ao2_unlink(multiplexed_threads, multiplexed_thread);
00192 }
00193
00194 multiplexed_nudge(multiplexed_thread);
00195
00196 ao2_unlock(multiplexed_threads);
00197
00198 ao2_ref(multiplexed_thread, -1);
00199
00200 return 0;
00201 }
00202
00203
00204 static void *multiplexed_thread_function(void *data)
00205 {
00206 struct multiplexed_thread *multiplexed_thread = data;
00207 int fds = multiplexed_thread->pipe[0];
00208
00209 ao2_lock(multiplexed_thread);
00210
00211 ast_debug(1, "Starting actual thread for multiplexed thread '%p'\n", multiplexed_thread);
00212
00213 while (multiplexed_thread->thread != AST_PTHREADT_STOP) {
00214 struct ast_channel *winner = NULL, *first = multiplexed_thread->chans[0];
00215 int to = -1, outfd = -1;
00216
00217
00218 memmove(multiplexed_thread->chans, multiplexed_thread->chans + 1, sizeof(struct ast_channel *) * (multiplexed_thread->service_count - 1));
00219 multiplexed_thread->chans[multiplexed_thread->service_count - 1] = first;
00220
00221 multiplexed_thread->waiting = 1;
00222 ao2_unlock(multiplexed_thread);
00223 winner = ast_waitfor_nandfds(multiplexed_thread->chans, multiplexed_thread->service_count, &fds, 1, NULL, &outfd, &to);
00224 multiplexed_thread->waiting = 0;
00225 ao2_lock(multiplexed_thread);
00226
00227 if (outfd > -1) {
00228 int nudge;
00229
00230 if (read(multiplexed_thread->pipe[0], &nudge, sizeof(nudge)) < 0) {
00231 if (errno != EINTR && errno != EAGAIN) {
00232 ast_log(LOG_WARNING, "read() failed for pipe on multiplexed thread '%p': %s\n", multiplexed_thread, strerror(errno));
00233 }
00234 }
00235 }
00236 if (winner && winner->bridge) {
00237 ast_bridge_handle_trip(winner->bridge, NULL, winner, -1);
00238 }
00239 }
00240
00241 multiplexed_thread->thread = AST_PTHREADT_NULL;
00242
00243 ast_debug(1, "Stopping actual thread for multiplexed thread '%p'\n", multiplexed_thread);
00244
00245 ao2_unlock(multiplexed_thread);
00246 ao2_ref(multiplexed_thread, -1);
00247
00248 return NULL;
00249 }
00250
00251
00252 static void multiplexed_add_or_remove(struct multiplexed_thread *multiplexed_thread, struct ast_channel *chan, int add)
00253 {
00254 int i, removed = 0;
00255 pthread_t thread = AST_PTHREADT_NULL;
00256
00257 ao2_lock(multiplexed_thread);
00258
00259 multiplexed_nudge(multiplexed_thread);
00260
00261 for (i = 0; i < MULTIPLEXED_MAX_CHANNELS; i++) {
00262 if (multiplexed_thread->chans[i] == chan) {
00263 if (!add) {
00264 multiplexed_thread->chans[i] = NULL;
00265 multiplexed_thread->service_count--;
00266 removed = 1;
00267 }
00268 break;
00269 } else if (!multiplexed_thread->chans[i] && add) {
00270 multiplexed_thread->chans[i] = chan;
00271 multiplexed_thread->service_count++;
00272 break;
00273 }
00274 }
00275
00276 if (multiplexed_thread->service_count && multiplexed_thread->thread == AST_PTHREADT_NULL) {
00277 ao2_ref(multiplexed_thread, +1);
00278 if (ast_pthread_create(&multiplexed_thread->thread, NULL, multiplexed_thread_function, multiplexed_thread)) {
00279 ao2_ref(multiplexed_thread, -1);
00280 ast_debug(1, "Failed to create an actual thread for multiplexed thread '%p', trying next time\n", multiplexed_thread);
00281 }
00282 } else if (!multiplexed_thread->service_count && multiplexed_thread->thread != AST_PTHREADT_NULL) {
00283 thread = multiplexed_thread->thread;
00284 multiplexed_thread->thread = AST_PTHREADT_STOP;
00285 } else if (!add && removed) {
00286 memmove(multiplexed_thread->chans + i, multiplexed_thread->chans + i + 1, sizeof(struct ast_channel *) * (MULTIPLEXED_MAX_CHANNELS - (i + 1)));
00287 }
00288
00289 ao2_unlock(multiplexed_thread);
00290
00291 if (thread != AST_PTHREADT_NULL) {
00292 pthread_join(thread, NULL);
00293 }
00294
00295 return;
00296 }
00297
00298
00299 static int multiplexed_bridge_join(struct ast_bridge *bridge, struct ast_bridge_channel *bridge_channel)
00300 {
00301 struct ast_channel *c0 = AST_LIST_FIRST(&bridge->channels)->chan, *c1 = AST_LIST_LAST(&bridge->channels)->chan;
00302 struct multiplexed_thread *multiplexed_thread = bridge->bridge_pvt;
00303
00304 ast_debug(1, "Adding channel '%s' to multiplexed thread '%p' for monitoring\n", bridge_channel->chan->name, multiplexed_thread);
00305
00306 multiplexed_add_or_remove(multiplexed_thread, bridge_channel->chan, 1);
00307
00308
00309 if (c0 == c1) {
00310 return 0;
00311 }
00312
00313 if (((c0->writeformat == c1->readformat) && (c0->readformat == c1->writeformat) && (c0->nativeformats == c1->nativeformats))) {
00314 return 0;
00315 }
00316
00317 return ast_channel_make_compatible(c0, c1);
00318 }
00319
00320
00321 static int multiplexed_bridge_leave(struct ast_bridge *bridge, struct ast_bridge_channel *bridge_channel)
00322 {
00323 struct multiplexed_thread *multiplexed_thread = bridge->bridge_pvt;
00324
00325 ast_debug(1, "Removing channel '%s' from multiplexed thread '%p'\n", bridge_channel->chan->name, multiplexed_thread);
00326
00327 multiplexed_add_or_remove(multiplexed_thread, bridge_channel->chan, 0);
00328
00329 return 0;
00330 }
00331
00332
00333 static void multiplexed_bridge_suspend(struct ast_bridge *bridge, struct ast_bridge_channel *bridge_channel)
00334 {
00335 struct multiplexed_thread *multiplexed_thread = bridge->bridge_pvt;
00336
00337 ast_debug(1, "Suspending channel '%s' from multiplexed thread '%p'\n", bridge_channel->chan->name, multiplexed_thread);
00338
00339 multiplexed_add_or_remove(multiplexed_thread, bridge_channel->chan, 0);
00340
00341 return;
00342 }
00343
00344
00345 static void multiplexed_bridge_unsuspend(struct ast_bridge *bridge, struct ast_bridge_channel *bridge_channel)
00346 {
00347 struct multiplexed_thread *multiplexed_thread = bridge->bridge_pvt;
00348
00349 ast_debug(1, "Unsuspending channel '%s' from multiplexed thread '%p'\n", bridge_channel->chan->name, multiplexed_thread);
00350
00351 multiplexed_add_or_remove(multiplexed_thread, bridge_channel->chan, 1);
00352
00353 return;
00354 }
00355
00356
00357 static enum ast_bridge_write_result multiplexed_bridge_write(struct ast_bridge *bridge, struct ast_bridge_channel *bridge_channel, struct ast_frame *frame)
00358 {
00359 struct ast_bridge_channel *other;
00360
00361 if (AST_LIST_FIRST(&bridge->channels) == AST_LIST_LAST(&bridge->channels)) {
00362 return AST_BRIDGE_WRITE_FAILED;
00363 }
00364
00365 if (!(other = (AST_LIST_FIRST(&bridge->channels) == bridge_channel ? AST_LIST_LAST(&bridge->channels) : AST_LIST_FIRST(&bridge->channels)))) {
00366 return AST_BRIDGE_WRITE_FAILED;
00367 }
00368
00369 if (other->state == AST_BRIDGE_CHANNEL_STATE_WAIT) {
00370 ast_write(other->chan, frame);
00371 }
00372
00373 return AST_BRIDGE_WRITE_SUCCESS;
00374 }
00375
00376 static struct ast_bridge_technology multiplexed_bridge = {
00377 .name = "multiplexed_bridge",
00378 .capabilities = AST_BRIDGE_CAPABILITY_1TO1MIX,
00379 .preference = AST_BRIDGE_PREFERENCE_HIGH,
00380 .formats = AST_FORMAT_AUDIO_MASK | AST_FORMAT_VIDEO_MASK | AST_FORMAT_TEXT_MASK,
00381 .create = multiplexed_bridge_create,
00382 .destroy = multiplexed_bridge_destroy,
00383 .join = multiplexed_bridge_join,
00384 .leave = multiplexed_bridge_leave,
00385 .suspend = multiplexed_bridge_suspend,
00386 .unsuspend = multiplexed_bridge_unsuspend,
00387 .write = multiplexed_bridge_write,
00388 };
00389
00390 static int unload_module(void)
00391 {
00392 int res = ast_bridge_technology_unregister(&multiplexed_bridge);
00393
00394 ao2_ref(multiplexed_threads, -1);
00395
00396 return res;
00397 }
00398
00399 static int load_module(void)
00400 {
00401 if (!(multiplexed_threads = ao2_container_alloc(MULTIPLEXED_BUCKETS, NULL, NULL))) {
00402 return AST_MODULE_LOAD_DECLINE;
00403 }
00404
00405 return ast_bridge_technology_register(&multiplexed_bridge);
00406 }
00407
00408 AST_MODULE_INFO_STANDARD(ASTERISK_GPL_KEY, "Multiplexed two channel bridging module");