#!/usr/bin/python
#   Copyright 2008 Red Hat, Inc.
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.

import socket
import pickle
import sys
import os
import time
import qpid
import syslog
import re
import threading
import signal
import zipfile
import getopt
from jobhooks.functions import *
from qpid.util import connect
from qpid.connection import Connection
from qpid.datatypes import Message, RangedSet, uuid4
from qpid.queue import Empty
from cStringIO import StringIO

class exit_signal(Exception):
   def __init__(self, str):
      self.msg = str

class work_data(object):
   def __init__(self, msg, slot_num):
      self.__AMQP_msg__ = msg
      self.__slot__ = slot_num
      self.__access_time__ = time.time()
      self.__access_lock__ = threading.Lock()

   def lock(self, wait=True):
      """Acquires the lock controlling access to the data in the object"""
      if wait == True:
         self.__access_time__ = time.time()
      return(self.__access_lock__.acquire(wait))

   def unlock(self, wait=True):
      """Releases the lock controlling access to the data in the object"""
      if wait == True:
         self.__access_time__ = time.time()
      self.__access_lock__.release()

   def __set_AMQP_msg__(self, msg):
      self.__access_time__ = time.time()
      self.__AMQP_msg__ = msg

   def __get_AMQP_msg__(self):
      self.__access_time__ = time.time()
      return(self.__AMQP_msg__)

   AMQP_msg = property(__get_AMQP_msg__, __set_AMQP_msg__)

   def __set_slot__(self, slot_num):
      self.__access_time__ = time.time()
      self.__slot__ = slot_num

   def __get_slot__(self):
      self.__access_time__ = time.time()
      return(self.__slot__)

   slot = property(__get_slot__, __set_slot__)

   def __get_access_time__(self):
      return(self.__access_time__)

   access_time = property(__get_access_time__)

class global_data(object):
   def __init__(self):
      self.__work_list__ = {}
      self.__access_lock__ = threading.Lock()
      self.__amqp_socket__ = None
      self.__amqp_con__ = None
      self.__amqp_session__ = None
      self.__amqp_config__ = {}

   def lock(self, wait=True):
      """Acquires the lock controlling access to the stored data"""
      return(self.__access_lock__.acquire(wait))

   def unlock(self):
      """Releases the lock controlling access to the stored data"""
      self.__access_lock__.release()

   def add_work(self, key, AMQP_msg, slot):
      """Add work information to list of known work items.  Raises a
         general_exception if the key already exists"""
      work = work_data(AMQP_msg, slot)
      if self.__find_work__(key) == False:
         self.__work_list__.update({key:work})
      else:
         raise general_exception(syslog.LOG_WARNING, 'Key %s already exists.' % key)

   def remove_work(self, key):
      """Remove work information from the list of known work items and
         returns the work removed.  If the work with the specified key doesn't
         exist, None is returned"""
      if self.__find_work__(key) == True:
         work = self.__work_list__[key]
         del self.__work_list__[key]
         return(work)
      else:
         return(None)

   def get_work(self, key):
      """Get work information from the list of known work items.  If the
         work with the given key doesn't exist, None is returned"""
      if self.__find_work__(key) == True:
         work = self.__work_list__[key]
         return(work)
      else:
         return(None)

   def slot_in_use(self, slot_num):
      """Returns True if the given slot is currently processing work,
         False otherwise"""
      result = False
      for work in self.__work_list__.values():
         if work.slot == slot_num:
            result = True
            break
      return(result)

   def __find_work__(self, key):
      """Returns True if the desired key exists, False otherwise"""
      value = self.__work_list__.has_key(key)
      return(value)

   def values(self):
      """Returns a list of work_data objects which contains all known
         work information """
      return(self.__work_list__.values())

   def set_amqp_socket(self, sock):
      self.__amqp_socket__ = sock

   def get_amqp_socket(self):
      return(self.__amqp_socket__)

   amqp_socket = property(get_amqp_socket, set_amqp_socket)

   def set_amqp_con(self, con):
      self.__amqp_con__ = con

   def get_amqp_con(self):
      return(self.__amqp_con__)

   amqp_con = property(get_amqp_con, set_amqp_con)

   def set_amqp_session(self, session):
      self.__amqp_session__ = session

   def get_amqp_session(self):
      return(self.__amqp_session__)

   amqp_session = property(get_amqp_session, set_amqp_session)

   def set_amqp_config(self, config):
      self.__amqp_config__ = config

   def get_amqp_config(self):
      return(self.__amqp_config__)

   amqp_config = property(get_amqp_config, set_amqp_config)

def shutdown_sockets(sock1, sock2, amqp_con, amqp_session):
   if sock1 != None:
      try:
         # python 2.3 doesn't have SHUT_WR defined, so use it's value (1)
         sock1.shutdown(1)
         data = sock1.recv(4096)
         while len(data) != 0:
            data = sock1.recv(4096)
      except:
         pass
      sock1.close()
      sock1 = None
   if sock2 != None:
      try:
         # python 2.3 doesn't have SHUT_WR defined, so use it's value (1)
         sock2.send('shutdown')
         sock2.shutdown(1)
         data = sock2.recv(4096)
         while len(data) != 0:
            data = sock2.recv(4096)
      except:
         pass
      sock2.close()
      sock2 = None
   if amqp_con != None:
      try:
         amqp_con.close()
      except:
         pass
      amqp_con = None
   if amqp_session != None:
      try:
         amqp_session.close(timeout=10)
      except:
         pass
      amqp_session = None

def lease_monitor(data, max_lease_time, interval, log):
   """Monitor all work for lease expiration.  If a lease expired, the work
      is released"""
   func_name = "lease_monitor"
   broker_connection = data.amqp_session
   while True:
      current_time = float(time.time())
      if log == True:
         print '%s: max_lease_time = %s' % (func_name, str(max_lease_time))
         print '%s: Lease check started at %s' % (func_name, str(current_time))
         print '%s: acquiring list lock' % func_name
      data.lock()
      if log == True:
         print '%s: acquired list lock' % func_name
      for item in data.values():
         if log == True:
            print '%s: access time = %s' % (func_name, str(item.access_time))
            print '%s: current time = %s' % (func_name, str(current_time))
         if item.lock(False) == True:
            if (float(item.access_time) + float(max_lease_time)) < current_time:
               # No other thread is accessing this item and the lease has
               # expired, so delete it from the list of known messages and
               # release the lock on the AMQP message
               msg_id = item.AMQP_msg.get('message_properties').message_id
               if log == True:
                  print '%s: Expiring %s' % (func_name, str(msg_id))
               data.remove_work(msg_id)

               # Release the message so it can be consumed by another process
               try:
                  broker_connection.message_release(RangedSet(item.AMQP_msg.id))
               except qpid.session.SessionDetached:
                  pass

            item.unlock(False)

      if log == True:
         print '%s: releasing list lock' % func_name
      data.unlock()
      if log == True:
         print '%s: released list lock' % func_name
      time.sleep(int(interval))

def parse_data_into_headers(data, session):
   """Takes a set of data and parses it into the application_headers in an
      AMQP message property.  Returns the message property with the headers
      filled in."""
   headers = {}
   for line in data.split('\n'):
      found = grep('^([^=]*)\s*=\s*(.*)$', line)
      if found != None and found[0] != None and found[1] != None:
         key = found[0].rstrip().lstrip()
         value = found[1].rstrip().lstrip()
         if grep('^\d+$', value) != None:
            value = int(value) 
         elif grep('^[\d\.]+$', value) != None:
            value = float(value)
         headers[key] = value
   return(session.message_properties(application_headers=headers))

def send_AMQP_msg(sess, orig_req, msg_properties, config, data=None):
   """Sends send_msg to the reply_to queue in orig_req using the broker
      connection session 'sess'"""
   reply_to = orig_req.get('message_properties').reply_to
   delivery_props = sess.delivery_properties(routing_key=reply_to['routing_key'], delivery_mode=2)
   msg_properties.correlation_id = str(orig_req.get('message_properties').message_id)
   msg_properties.message_id = uuid4()
   sent = False
   attempts = 0
   while sent == False and attempts < 5:
      try:
         sess.message_transfer(destination=reply_to['exchange'], message=Message(msg_properties, delivery_props, data))
         sent = True
      except qpid.session.SessionDetached:
         # The connection to the broker has been lost for some reason.
         # Attempt to reconnect to the broker and send the message
         time.sleep(attempts * 10)
         try:
            connect_to_broker(config)
         except general_exception:
            pass
         sess = config.amqp_session
      except:
         pass
      attempts += 1

   if attempts >= 5:
      # Failed to send the results
      raise general_exception(syslog.LOG_ERR, 'Failed to send AMQP message after %d attempts for job id %s' % (attempts, str(msg_properties.correlation_id)))

def exit_signal_handler(signum, frame):
   raise exit_signal('Exit signal %s received' % signum)

def handle_get_work(req_socket, reply, amqp_queue, known_items, log):
   """Retrieve a message from an AMQP queue and send it back to the
      requesting client"""
   func_name = "handle_get_work"
   if log == True:
      print '%s called at %s' % (func_name, str(time.localtime()))

   # List of message headers that need special treatment
   special = ['Owner', 'HookKeyword']

   broker_connection = known_items.amqp_session
   try:
      # Figure out the SlotID that is requesting work, and don't get any
      # more work if it is still processing work from a previous call
      slots = grep('^SlotID\s*=\s*(.+)$', reply.data)
      if slots == None:
         syslog.syslog(syslog.LOG_ERR, '%s: Unable to determine SlotID for request.' % func_name)
         return(FAILURE)
      else:
         slot = slots[0].rstrip().lstrip()

      if log == True:
         print '%s: Checking if slot %s is doing work' % (func_name, str(slot))
         print '%s: Acquiring global message lock for slot check %s' % (func_name, str(slot))
      known_items.lock()
      if log == True:
         print '%s: Acquired global message lock for slot check %s' % (func_name, str(slot))
      if known_items.slot_in_use(slot) == True:
         if log == True:
            print '%s: slot %s is already doing work' % (func_name, str(slot))
         reply.data = ''
         req_socket.send(pickle.dumps(reply, 2))
         close_socket(req_socket)
         known_items.unlock()
         if log == True:
            print '%s: Released global message lock for slot check %s' % (func_name, str(slot))
         return(SUCCESS)
      known_items.unlock()
      if log == True:
         print '%s: slot %s is not doing work' % (func_name, str(slot))
         print '%s: Released global message lock for slot check %s' % (func_name, str(slot))

      # Get the work off the AMQP work queue if it exists
      try:
         msg = amqp_queue.get(timeout=1)
         job_data = msg.get('message_properties').application_headers
      except qpid.session.Closed:
         try:
            connect_to_broker(known_items)
         except general_exception:
            pass
         reply.data = ''
         req_socket.send(pickle.dumps(reply, 2))
         close_socket(req_socket)
         return(SUCCESS)
      except Empty:
         reply.data = ''
         req_socket.send(pickle.dumps(reply, 2))
         close_socket(req_socket)
         return(SUCCESS)
      except:
         if amqp_queue == None:
            try:
               connect_to_broker(known_items)
            except general_exception:
               pass
         return(SUCCESS)

      reply_to = msg.get('message_properties').reply_to
      if reply_to == None or reply_to == '':
         err_msg = 'ERROR: Work Request does not have a reply_to field and is unusable.  Discarding'
         reply.data = ''
         broker_connection.message_accept(RangedSet(msg.id))
         req_socket.send(pickle.dumps(reply, 2))
         close_socket(req_socket)
         raise general_exception(syslog.LOG_ERR, err_msg)
      elif msg.get('message_properties').message_id == None:
         err_msg = 'ERROR: Work Request does not have a unique message_id and is unusable.  Discarding'
         msg_props = msg.get('message_properties')
         reply.data = ''
         try:
            send_AMQP_msg(broker_connection, msg, msg_props, known_items, err_msg)
         except general_exception, error:
            log_messages(error)
         
         try:
            broker_connection.message_accept(RangedSet(msg.id))
         except:
            pass
         req_socket.send(pickle.dumps(reply, 2))
         close_socket(req_socket)
         raise general_exception(syslog.LOG_ERR, err_msg)
#      elif msg.properties.has_key('expiration') == True and msg['expiration'] > time.time():
#      elif msg.expiration > time.time():
#         print '%s: Message has expired and shouldn't be processed' % func_name
      else:
         # Create the ClassAd to send to the requesting client
         msg_num = str(msg.get('message_properties').message_id)
         reply.data = 'AMQPID = "%s"\n' % msg_num
         reply.data += 'WF_REQ_SLOT = "%s"\n' % slot
         if job_data.has_key('JobUniverse') == False:
            reply.data += 'JobUniverse = 5\n'
#         reply.data += 'JobLeaseDuration = 500000\n'
#         reply.data += 'JobLeaseDuration = 3\n'
         reply.data += 'IsFeatched = TRUE\n'
         for field in [header for header in job_data.keys() if header not in special]:
            reply.data += field + ' = ' + str(job_data[field]) + '\n'
         if job_data.has_key('Owner') == True and str(job_data['Owner']) != '':
            reply.data += 'Owner = ' + str(job_data['Owner']) + '\n'
         else:
            reply.data += 'Owner = "nobody"\n'
         reply.data += 'HookKeyword = "LOW_LATENCY_JOB"\n'

         # Preserve the work data was processed so it can be
         # acknowledged, expired, or released as needed
         if log == True:
            print '%s: Adding msg id %s to known items' % (func_name, msg_num)
         known_items.lock()
         try:
            known_items.add_work(msg_num, msg, slot)
         except general_exception, error:
            known_items.unlock()
            try:
               broker_connection.message_release(RangedSet(msg.id))
            except qpid.session.SessionDetached:
               pass
            raise general_exception(syslog.LOG_ERR, '%s: %s' % (func_name, error.msgs))
         known_items.unlock()
         if log == True:
            print '%s: Released global message lock for %s' % (func_name, msg_num)
   
      # Send the work to the requesting client
      req_socket.send(pickle.dumps(reply, 2))
      close_socket(req_socket)
      return(SUCCESS)

   except general_exception, error:
      log_messages(error)
      return(FAILURE)

def handle_reply_fetch(msg, known_items, log):
   """Send the data from a reply claim hook to a results AMQP queue.  Release
      the lock on the receiving AMQP queue in the case of a reject"""
   func_name = "handle_reply_fetch"
   if log == True:
      print '%s called at %s' % (func_name, str(time.localtime()))

   broker_connection = known_items.amqp_session
   try:
      # Find the AMQPID in the message received
      message_ids = grep('^AMQPID\s*=\s*"(.+)"$', msg.data)
      if message_ids == None:
         raise general_exception(syslog.LOG_ERR, msg.data, '%s: Unable to find AMQPID in exit message' % func_name)
      else:
         message_id = message_ids[0].rstrip().lstrip()

      if log == True:
         print '%s: Acquiring global message lock for %s' % (func_name, str(message_id))
      known_items.lock()
      if log == True:
         print '%s: Acquired global message lock for %s' % (func_name, str(message_id))
      if msg.type == condor_wf_types.reply_claim_reject:
         saved_work = known_items.remove_work(message_id)
      else:
         saved_work = known_items.get_work(message_id)

      if saved_work == None:
         # Couldn't find the AMQP message that corresponds to the AMQPID
         # in the exit message.  This is bad and shouldn't happen.
         known_items.unlock()
         if log == True:
            print '%s: Released global message lock for %s' % (func_name, str(message_id))
         raise general_exception(syslog.LOG_ERR, '%s: Unable to find stored AMQP message with AMQPID "%s"' % (func_name, str(message_id)))
      else:
         if log == True:
            print '%s: Acquiring lock for %s' % (func_name, str(message_id))
         saved_work.lock()
         if log == True:
            print '%s: Acquired lock for %s' % (func_name, str(message_id))
         known_items.unlock()
         if log == True:
            print '%s: Released global message lock for %s' % (func_name, str(message_id))

         # Set the JobStatus appropriately
         if msg.type == condor_wf_types.reply_claim_reject:
            msg.data += 'JobStatus = 1'
         else:
            msg.data += 'JobStatus = 2'

         try:
            # Place the data into the appropriate headers for the
            # results message.
            msg_props = parse_data_into_headers(msg.data, broker_connection)

            # Send the results to the appropriate exchange
            send_AMQP_msg(broker_connection, saved_work.AMQP_msg, msg_props, known_items)
         except:
            saved_work.unlock()
            raise general_exception(syslog.LOG_ERR, '%s: Unable to send reply messages for id "%s"' % (func_name, str(message_id)))

         if msg.type == condor_wf_types.reply_claim_reject:
            try:
               broker_connection.message_release(RangedSet(saved_work.AMQP_msg.id))
            except qpid.session.SessionDetached:
               pass
            if log == True:
               print '%s: Work rejected %s' % (func_name, str(message_id))

         if log == True:
            print '%s: Releasing lock on %s' % (func_name, str(message_id))
         saved_work.unlock()

      return(SUCCESS)

   except general_exception, error:
      log_messages(error)
      return(FAILURE)

def handle_prepare_job(req_socket, reply, known_items, log):
   """Prepare the environment for the job.  This includes extracting any
      archived data sent with the job request, as well as running a
      presetup script if one exists."""
   func_name = "handle_prepare_job"
   if log == True:
      print '%s called at %s' % (func_name, str(time.localtime()))

   broker_connection = known_items.amqp_session
   try:
      # Find the AMQPID in the message received
      message_ids = grep('^AMQPID\s*=\s*"(.+)"$', reply.data)
      if message_ids == None:
         raise general_exception(syslog.LOG_ERR, reply.data, '%s: Unable to find AMQPID in prepare job message' % func_name)
      else:
         message_id = message_ids[0].rstrip().lstrip()

      # Find the Current Working Directory  of the originating process
      # in the message received
      work_cwd = grep('^OriginatingCWD\s*=\s*"(.+)"$', reply.data)[0]

      if log == True:
         print '%s: Acquiring global message lock for %s' % (func_name, str(message_id))
      known_items.lock()
      if log == True:
         print '%s: Acquired global message lock for %s' % (func_name, str(message_id))
      saved_work = known_items.get_work(message_id)
      if saved_work == None:
         # Couldn't find the AMQP message that corresponds to the AMQPID
         # in the exit message.  This is bad and shouldn't happen.
         known_items.unlock()
         if log == True:
            print '%s: Released global message lock for %s' % (func_name, str(message_id))
         raise general_exception(syslog.LOG_ERR, '%s: Unable to find stored AMQP message with AMQPID "%s"' % (func_name, str(message_id)))
      else:
         if log == True:
            print '%s: Acquiring lock for %s' % (func_name, str(message_id))
         saved_work.lock()
         if log == True:
            print '%s: Acquired lock for %s' % (func_name, str(message_id))
         known_items.unlock()
         if log == True:
            print '%s: Released global message lock for %s' % (func_name, str(message_id))
         try:
            # Place the body of the message, which should contain an archived
            # file, into the directory for the job
            reply.data = ''
            if saved_work.AMQP_msg.body != '' and \
               saved_work.AMQP_msg.body != None:
               # Write the message contents (which should be an archived
               # file) to disk
               input_filename = work_cwd + '/data.zip'
               write_file(input_filename, saved_work.AMQP_msg.body)
               reply.data = input_filename

            # Send the information about the archive file to the requester
            req_socket.send(pickle.dumps(reply, 2))
            close_socket(req_socket)
         except:
            saved_work.unlock()
            known_items.remove_work(message_id)
            try:
               broker_connection.message_release(RangedSet(saved_work.AMQP_msg.id))
            except qpid.session.SessionDetached:
               pass
            raise general_exception(syslog.LOG_ERR, '%s: Error extracting message body contents for job id "%s"' % (func_name, str(message_id)))

         if log == True:
            print '%s: Releasing lock on %s' % (func_name, str(message_id))
         saved_work.unlock()
      return(SUCCESS)

   except general_exception, error:
      log_messages(error)
      return(FAILURE)

def handle_update_job_status(msg, known_items, log):
   """Send the job status update information to a results AMQP queue."""
   func_name = "handle_update_job_status"
   if log == True:
      print '%s called at %s' % (func_name, str(time.localtime()))

   broker_connection = known_items.amqp_session
   try:
      # Find the AMQPID in the message received
      message_ids = grep('^AMQPID\s*=\s*"(.+)"$', msg.data)
      if message_ids == None:
         raise general_exception(syslog.LOG_ERR, msg.data, '%s: Unable to find AMQPID in exit message' % func_name)
      else:
         message_id = message_ids[0].rstrip().lstrip()

      if log == True:
         print '%s: Acquiring global message lock for %s' % (func_name, str(message_id))
      known_items.lock()
      if log == True:
         print '%s: Acquired global message lock for %s' % (func_name, str(message_id))
      saved_work = known_items.get_work(message_id)
      if saved_work == None:
         # Couldn't find the AMQP message that corresponds to the AMQPID
         # in the exit message.  This is bad and shouldn't happen.
         known_items.unlock()
         if log == True:
            print '%s: Released global message lock for %s' % (func_name, str(message_id))
         raise general_exception(syslog.LOG_ERR, '%s: Unable to find stored AMQP message with AMQPID "%s"' % (func_name, str(message_id)))
      else:
         if log == True:
            print '%s: Acquiring lock for %s' % (func_name, str(message_id))
         saved_work.lock()
         if log == True:
            print '%s: Acquired lock for %s' % (func_name, str(message_id))
         known_items.unlock()
         if log == True:
            print '%s: Released global message lock for %s' % (func_name, str(message_id))
         msg.data += 'JobStatus = 2'
         try:
            # Place the data into the appropriate headers for the
            # results message.
            msg_props = parse_data_into_headers(msg.data, broker_connection)

            # Send the results to the appropriate exchange
            send_AMQP_msg(broker_connection, saved_work.AMQP_msg, msg_props, known_items)
         except:
            saved_work.unlock()
            raise general_exception(syslog.LOG_ERR, '%s: Unable to send update message for id "%s"' % (func_name, str(message_id)))
          
         if log == True:
            print '%s: Releasing lock on %s' % (func_name, str(message_id))
         saved_work.unlock()
      return(SUCCESS)

   except general_exception, error:
      log_messages(error)
      return(FAILURE)

def handle_exit(req_socket, msg, known_items, log):
   """The job exited, so handle the reasoning appropriately.  If the
      job exited normally, then remove the work job from the AMQP queue,
      otherwise release the lock on the work.  Always place the results
      on the sender's reply_to AMQP queue"""
   func_name = "handle_exit"
   if log == True:
      print '%s called at %s' % (func_name, str(time.localtime()))

   file_list = []
   broker_connection = known_items.amqp_session

   try:
      # Determine the slot that is reporting results
      slots = grep ('^WF_REQ_SLOT\s*=\s*"(.+)"$', msg.data)
      if slots == None:
         syslog.syslog(syslog.LOG_WARNING, msg.data, 'Unable to determine SlotID for results.')
      else:
         # Verify the slot sending results is known to be in use.  If not,
         # somehow results have been send from an unknown slot.
         slot = slots[0]
         known_items.lock()
         if known_items.slot_in_use(slot) == False:
            syslog.syslog(syslog.LOG_WARNING, 'Received exit message from unknown slot %s' % slot)
         known_items.unlock()

      # Find the AMQPID in the message we received
      message_ids = grep('^AMQPID\s*=\s*"(.+)"$', msg.data)
      if message_ids == None:
         raise general_exception(syslog.LOG_ERR, msg.data, '%s: Unable to find AMQPID in exit message' % func_name)
      else:
         message_id = message_ids[0].rstrip().lstrip()

      # Find the Current Working Directory of the originating process
      # in the message received
      work_cwd = grep('^OriginatingCWD\s*=\s*"(.+)"$', msg.data)[0]

      # Set the JobStatus
      if msg.type == condor_wf_types.exit_exit:
         msg.data += 'JobStatus = 4'
      else:
         msg.data += 'JobStatus = 1'

      # Place the data into the appropriate headers for the
      # results message.
      msg_props = parse_data_into_headers(msg.data, broker_connection)

      # Retrieve the AMQP message from the list of known messages so it
      # can be acknowledged or released
      if log == True:
         print '%s: Acquiring global message lock for %s' % (func_name, str(message_id))
      known_items.lock()
      if log == True:
         print '%s: Acquired global message lock for %s' % (func_name, str(message_id))
      saved_work = known_items.remove_work(message_id)
      if saved_work == None:
         # Couldn't find the AMQP message that corresponds to the AMQPID
         # in the exit message.  This is bad and shouldn't happen.
         known_items.unlock()
         if log == True:
            print '%s: Released global message lock for %s' % (func_name, str(message_id))
         raise general_exception(syslog.LOG_ERR, '%s: Unable to find stored AMQP message with AMQPID "%s".  Message cannot be acknowledged nor results sent!' % (func_name, str(message_id)))
      else:
         if log == True:
            print '%s: Acquiring lock for %s' % (func_name, str(message_id))
         saved_work.lock()
         if log == True:
            print '%s: Acquired lock for %s' % (func_name, str(message_id))
         known_items.unlock()
         if log == True:
            print '%s: Released global message lock for %s' % (func_name, str(message_id))
         # If any files were specified to be retrieved side from
         # what is created in the working directory, then create an archive
         # of the files (if they exist) and place it in the body of the
         # results message
         ack_msg = saved_work.AMQP_msg
         app_hdrs = ack_msg.get('message_properties').application_headers
         data = None

         # Generate the list of files to return
         if os.path.exists(work_cwd) == False:
            saved_work.unlock()
            try:
               broker_connection.message_release(RangedSet(ack_msg.id))
            except qpid.session.SessionDetached:
               pass
            raise general_exception(syslog.LOG_ERR, '%s: Unable to change to exe dir "%s" for id "%s"' % (func_name, work_cwd, str(message_id)))
               
         try:
            os.chdir(work_cwd)
            transfer_files = grep('^TransferOutput\s*=\s*"(.*)"$', msg.data)
            if transfer_files != None and transfer_files[0] != None:
               if os.path.exists('_condor_stderr') == True:
                  file_list.append('_condor_stderr')
               else:
                  err_file = grep('^Err\s+=\s+"(.*)"$', msg.data)
                  if err_file != None and err_file[0] != None:
                     file_list.append(os.path.basename(err_file[0].rstrip()))
               if os.path.exists('_condor_stdout') == True:
                  file_list.append('_condor_stdout')
               else:
                  out_file = grep('^Out\s+=\s+"(.*)"$', msg.data)
                  if out_file != None and out_file[0] != None:
                     file_list.append(os.path.basename(out_file[0].rstrip()))
               for file in transfer_files[0].split(','):
                  if file not in file_list:
                     file_list.append(os.path.basename(file.rstrip()))
            else:
               file_list = os.listdir(".")
         except:
            saved_work.unlock()
            try:
               broker_connection.message_release(RangedSet(ack_msg.id))
            except qpid.session.SessionDetached:
               pass
            raise general_exception(syslog.LOG_ERR, '%s: Failed to create list of files for results archive for id "%s"' % (func_name, str(message_id)))

         # Create an archive of the files if they exist
         filename = '%s/results.zip' % work_cwd
         try:
            zip = zipfile.ZipFile(filename, 'w')
         except:
            saved_work.unlock()
            try:
               broker_connection.message_release(RangedSet(ack_msg.id))
            except qpid.session.SessionDetached:
               pass
            raise general_exception(syslog.LOG_ERR, '%s: Failed to open archive file for writing for id "%s"' % (func_name, str(message_id)))
         
         for result_file in file_list:
            if os.path.exists(result_file) == True:
               try:
                  zip.write(result_file)
               except:
                  saved_work.unlock()
                  try:
                     broker_connection.message_release(RangedSet(ack_msg.id))
                  except qpid.session.SessionDetached:
                     pass
                  raise general_exception(syslog.LOG_ERR, '%s: Failed to write "%s" to archive for id "%s"' % (func_name, result_file, str(message_id)))
                  
         try:
            zip.close()
            archived_file = open(filename, 'rb')
            data = archived_file.read()
            archived_file.close()
            os.remove(filename)
         except:
            saved_work.unlock()
            try:
               broker_connection.message_release(RangedSet(ack_msg.id))
            except qpid.session.SessionDetached:
               pass
            raise general_exception(syslog.LOG_ERR, '%s: Failed to write archive to disk for id "%s"' % (func_name, str(message_id)))

         try:
            # Send the results to the appropriate exchange
            send_AMQP_msg(broker_connection, ack_msg, msg_props, known_items, data)
         except general_exception, error:
            saved_work.unlock()
            try:
               broker_connection.message_release(RangedSet(ack_msg.id))
            except qpid.session.SessionDetached:
               pass
            raise general_exception(syslog.LOG_ERR, '%s: Unable to send exit message for id "%s": %s' % (func_name, str(message_id), error.msgs[0]))

         try:
            if msg.type == condor_wf_types.exit_exit:
               # Job exited normally, so grab the result files, transfer the
               # results to the specified AMQP reply_to queue, and ackowledge
               # the message on the AMQP work queue
               if log == True:
                  print '%s: Normal exit' % func_name

               # Acknowledge the message
               broker_connection.message_accept(RangedSet(ack_msg.id))
            else:
               # Job didn't exit normally, so release the lock for the message
               if log == True:
                  print '%s: Not normal exit: %s' % (func_name, str(msg.type))
               try:
                  broker_connection.message_release(RangedSet(ack_msg.id))
               except qpid.session.SessionDetached:
                  pass
         except:
            saved_work.unlock()
            try:
               broker_connection.message_release(RangedSet(ack_msg.id))
            except qpid.session.SessionDetached:
               pass
            raise general_exception(syslog.LOG_ERR, '%s: Unable to remove/release id "%s" on the broker' % (func_name, str(message_id)))

         if log == True:
            print '%s: Releasing lock on %s' % (func_name, str(message_id))
         saved_work.unlock()

      # Send acknowledgement to the originator that exit work is complete
      req_socket.send('Completed')
      close_socket(req_socket)
      return(SUCCESS)

   except general_exception, error:
      log_messages(error)
      return(FAILURE)

def connect_to_broker(data, config=None):
   if config != None:
      data.amqp_config = config
      data.amqp_socket = None
      data.amqp_con = None
      data.amqp_session = None

   try:
      server_socket = connect(str(data.amqp_config['ip']), int(data.amqp_config['port']))
      connection = Connection(sock=server_socket)
      connection.start()
      session = connection.session(str(uuid4()))
      session.queue_declare(queue=data.amqp_config['queue'], exclusive=False, durable=True)
      session.message_subscribe(queue=data.amqp_config['queue'], destination=data.amqp_config['work_queue_name'], accept_mode=session.accept_mode.explicit)
   except qpid.session.SessionException, error:
      if connection != None:
         connection.close()
      raise general_exception(syslog.LOG_ERR, 'Broker Connection Error %d: %s' % (error[0].error_code, error[0].description))
   except socket.error, error:
      raise general_exception(syslog.LOG_ERR, 'socket error %d: %s' % (error[0], error[1]))
 
   session.message_flow(data.amqp_config['work_queue_name'], 0, 0xFFFFFFFF)
   session.message_flow(data.amqp_config['work_queue_name'], 1, 0xFFFFFFFF)
   data.amqp_config['work_queue'] = session.incoming(data.amqp_config['work_queue_name'])

   data.amqp_socket = server_socket
   data.amqp_con = connection
   data.amqp_session = session

def main(argv=None):
   conf_file = '/etc/condor/carod.conf'
   dest = 'work_requests'
   session = None
   listen_socket = None
   connection = None
   sock = None
   pidfile = ''
   daemon_mode = 0

   if argv is None:
      argv = sys.argv

   try:
      try:
         opts, args = getopt.getopt(argv[1:], 'dlhp:', ['daemon', 'logdebug', 'help', 'pidfile='])
      except getopt.GetoptError, error:
        print str(error)
        return(FAILURE)

      debug_logging = False
      for option, arg in opts:
         if option in ('-d', '--daemon'):
            daemon_mode = 1
         if option in ('-l', '--logdebug'):
            debug_logging = True
         if option in ('-h', '--help'):
            print 'usage: ' + os.path.basename(argv[0]) + ' [-d|--daemon] [-l|--logdebug] [-h|--help] [-p|--pidfile= <pidfile>]'
            return(SUCCESS)
         if option in ('-p', '--pidfile'):
            pidfile = arg

      if daemon_mode == 1:
         try:
            pid = os.fork()
         except OSError, error:
            raise general_exception(syslog.LOG_ERR, "%s [%d]" % (error.strerror, error.errno))
         if pid == 0:
            # Child thread.  Fork again to create the daemon
            os.setsid()
            try:
               pid = os.fork()
            except OSError, error:
               raise general_exception(syslog.LOG_ERR, "%s [%d]" % (error.strerror, error.errno))
            if pid == 0:
               os.chdir('/')
            else:
               return(SUCCESS)
         else:
            return(SUCCESS)

      # Create the pidfile
      if pidfile != '':
         file = open(pidfile, 'w')
         file.write('%s\n' % str(os.getpid()))
         file.close()

      # Open a connection to the system logger
      syslog.openlog(os.path.basename(argv[0]))

      # Set signal handlers
      signal.signal(signal.SIGINT, exit_signal_handler)
      signal.signal(signal.SIGTERM, exit_signal_handler)

      try:
         broker = read_condor_config('LL_BROKER', ['IP', 'PORT', 'QUEUE'])
      except config_err, error:
         syslog.syslog(syslog.LOG_INFO, *(error.msg))
         syslog.syslog(syslog.LOG_INFO, 'Attempting to retrieve config from %s' % conf_file)
         try:
            broker = read_config_file(conf_file, 'Broker')
         except config_err, error:
            raise general_exception(syslog.LOG_ERR, *(error.msg + ('Exiting.','')))
      broker['work_queue_name'] = dest
         
      try:
         server = read_condor_config('LL_DAEMON', ['IP', 'PORT', 'QUEUED_CONNECTIONS', 'LEASE_TIME', 'LEASE_CHECK_INTERVAL'])
      except config_err, error:
         syslog.syslog(syslog.LOG_INFO, *(error.msg))
         syslog.syslog(syslog.LOG_INFO, 'Attempting to retrieve config from %s' % conf_file)
         try:
            server = read_config_file(conf_file, 'Daemon')
         except config_err, error:
            raise general_exception(syslog.LOG_ERR, *(error.msg + ('Exiting.','')))

      # Create a container to share data between threads
      share_data = global_data()

      # Setup the AMQP connections
      try:
         connect_to_broker(share_data, broker)
      except general_exception:
         # Failure to connect at startup isn't a big deal.  Each attempt
         # to talk to the broker will establish a connection if one
         # isn't established already
         share_data.amqp_config['work_queue'] = None
#      try:
#         server_socket = connect(str(broker['ip']), int(broker['port']))
#         connection = Connection(sock=server_socket)
#         connection.start()
#         session = connection.session(str(uuid4()))
#         session.queue_declare(queue=broker['queue'], exclusive=False)
#         session.message_subscribe(queue=broker['queue'], destination=dest, accept_mode=session.accept_mode.explicit)
#      except qpid.session.SessionException, error:
#         syslog.syslog(syslog.LOG_ERR, 'Broker Connection Error %d: %s' % (error[0].error_code, error[0].description))
#         if connection != None:
#            connection.close()
#         return(FAILURE)
#      except socket.error, error:
#         raise general_exception(syslog.LOG_ERR, 'socket error %d: %s' % (error[0], error[1]))
# 
#      session.message_flow(dest, 0, 0xFFFFFFFF)
#      session.message_flow(dest, 1, 0xFFFFFFFF)
#      work_queue = session.incoming(dest)
#
#      share_data.amqp_socket = server_socket
#      share_data.amqp_con = connection
#      share_data.amqp_session = session
#      share_data.amqp_config = broker

      # Create a thread to monitor work expiration times
      monitor_thread = threading.Thread(target=lease_monitor, args=(share_data, server['lease_time'], server['lease_check_interval'], debug_logging))
      monitor_thread.setDaemon(True)
      monitor_thread.start()

      # Setup the socket for communication with condor
      listen_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
      listen_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
      try:
         listen_socket.bind((server['ip'], int(server['port'])))
         listen_socket.listen(int(server['queued_connections']))
      except socket.error, error:
         raise general_exception(syslog.LOG_ERR, 'socket error %d: %s' % (error[0], error[1]), 'Failed to listen on %s:%s' % (server['ip'], server['port']))

      # Accept all incoming connections and act accordingly
      while True:
         sock,address = listen_socket.accept()
         recv_data = socket_read_all(sock)
         condor_msg = pickle.loads(recv_data)

         # Set up a child thread to perform the desired action
         if condor_msg.type == condor_wf_types.get_work:
            child = threading.Thread(target=handle_get_work, args=(sock, condor_msg, share_data.amqp_config['work_queue'], share_data, debug_logging))
         elif condor_msg.type == condor_wf_types.reply_claim_accept or \
              condor_msg.type == condor_wf_types.reply_claim_reject:
            child = threading.Thread(target=handle_reply_fetch, args=(condor_msg, share_data, debug_logging))
         elif condor_msg.type == condor_wf_types.prepare_job:
            child = threading.Thread(target=handle_prepare_job, args=(sock, condor_msg, share_data, debug_logging))
         elif condor_msg.type == condor_wf_types.update_job_status:
            child = threading.Thread(target=handle_update_job_status, args=(condor_msg, share_data, debug_logging))
         elif condor_msg.type == condor_wf_types.exit_exit or \
              condor_msg.type == condor_wf_types.exit_remove or \
              condor_msg.type == condor_wf_types.exit_hold or \
              condor_msg.type == condor_wf_types.exit_evict:
            child = threading.Thread(target=handle_exit, args=(sock, condor_msg, share_data, debug_logging))
         else:
            syslog.syslog(syslog.LOG_WARN, 'Received unknown request: %d' % condor_msg.type)
            continue
         child.setDaemon(True)
         child.start()

   except exit_signal, exit_data:
      # Close the session before exiting
      shutdown_sockets(listen_socket, sock, connection, session)
      if pidfile != '' and os.path.exists(pidfile) == True:
         os.remove(pidfile)
      return(SUCCESS)

   except general_exception, error:
      log_messages(error)
      # Close the session before exiting
      shutdown_sockets(listen_socket, sock, connection, session)
      if pidfile != '' and os.path.exists(pidfile) == True:
         os.remove(pidfile)
      return(FAILURE)

if __name__ == '__main__':
    sys.exit(main())
