#!/usr/bin/env python

#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
#

import os
import getopt
import sys
import locale
import socket
import re
from qmf.console import Session

_host = "localhost"
_connTimeout = 10
_stopId = None
_stopAll = False
_force = False
_numeric = False
_showConn = False
_delConn = None

def Usage ():
    print "Usage:  qpid-cluster [OPTIONS] [broker-addr]"
    print
    print "             broker-addr is in the form:   [username/password@] hostname | ip-address [:<port>]"
    print "             ex:  localhost, 10.1.1.7:10000, broker-host:10000, guest/guest@localhost"
    print
    print "Options:"
    print "          --timeout seconds (10)  Maximum time to wait for broker connection"
    print "          -C [--all-connections]  View client connections to all cluster members"
    print "          -c [--connections] ID   View client connections to specified member"
    print "          -d [--del-connection] HOST:PORT"
    print "                                  Disconnect a client connection"
    print "          -s [--stop] ID          Stop one member of the cluster by its ID"
    print "          -k [--all-stop]         Shut down the whole cluster"
    print "          -f [--force]            Suppress the 'are-you-sure?' prompt"
    print "          -n [--numeric]          Don't resolve names"
    print
    sys.exit (1)


class IpAddr:
    def __init__(self, text):
        if text.find("@") != -1:
            tokens = text.split("@")
            text = tokens[1]
        if text.find(":") != -1:
            tokens = text.split(":")
            text = tokens[0]
            self.port = int(tokens[1])
        else:
            self.port = 5672
        self.dottedQuad = socket.gethostbyname(text)
        nums = self.dottedQuad.split(".")
        self.addr = (int(nums[0]) << 24) + (int(nums[1]) << 16) + (int(nums[2]) << 8) + int(nums[3])

    def bestAddr(self, addrPortList):
        bestDiff = 0xFFFFFFFFL
        bestAddr = None
        for addrPort in addrPortList:
            diff = IpAddr(addrPort[0]).addr ^ self.addr
            if diff < bestDiff:
                bestDiff = diff
                bestAddr = addrPort
        return bestAddr

class BrokerManager:
    def __init__(self):
        self.brokerName = None
        self.qmf        = None
        self.broker     = None

    def SetBroker(self, brokerUrl):
        self.url = brokerUrl
        self.qmf = Session()
        self.broker = self.qmf.addBroker(brokerUrl, _connTimeout)
        agents = self.qmf.getAgents()
        for a in agents:
            if a.getAgentBank() == 0:
                self.brokerAgent = a

    def Disconnect(self):
        if self.broker:
            self.qmf.delBroker(self.broker)

    def _getClusters(self):
        packages = self.qmf.getPackages()
        if "org.apache.qpid.cluster" not in packages:
            print "Clustering is not installed on the broker."
            sys.exit(0)

        clusters = self.qmf.getObjects(_class="cluster", _agent=self.brokerAgent)
        if len(clusters) == 0:
            print "Clustering is installed but not enabled on the broker."
            sys.exit(0)

        return clusters

    def _getHostList(self, urlList):
        hosts = []
        hostAddr = IpAddr(_host)
        for url in urlList:
            if url.find("amqp:") != 0:
                raise Exception("Invalid URL 1")
            url = url[5:]
            addrs = str(url).split(",")
            addrList = []
            for addr in addrs:
                tokens = addr.split(":")
                if len(tokens) != 3:
                    raise Exception("Invalid URL 2")
                addrList.append((tokens[1], tokens[2]))

            # Find the address in the list that is most likely to be in the same subnet as the address
            # with which we made the original QMF connection.  This increases the probability that we will
            # be able to reach the cluster member.

            best = hostAddr.bestAddr(addrList)
            bestUrl = best[0] + ":" + best[1]
            hosts.append(bestUrl)
        return hosts

    def overview(self):
        clusters = self._getClusters()
        cluster = clusters[0]
        memberList = cluster.members.split(";")
        idList = cluster.memberIDs.split(";")

        print "  Cluster Name: %s" % cluster.clusterName
        print "Cluster Status: %s" % cluster.status
        print "  Cluster Size: %d" % cluster.clusterSize
        print "       Members: ID=%s URL=%s" % (idList[0], memberList[0])
        for idx in range(1,len(idList)):
            print "              : ID=%s URL=%s" % (idList[idx], memberList[idx])

    def stopMember(self, id):
        clusters = self._getClusters()
        cluster = clusters[0]
        idList = cluster.memberIDs.split(";")
        if id not in idList:
            print "No member with matching ID found"
            sys.exit(1)

        if not _force:
            prompt = "Warning: "
            if len(idList) == 1:
                prompt += "This command will shut down the last running cluster member."
            else:
                prompt += "This command will shut down a cluster member."
            prompt += " Are you sure? [N]: "

            confirm = raw_input(prompt)
            if len(confirm) == 0 or confirm[0].upper() != 'Y':
                print "Operation canceled"
                sys.exit(1)

        cluster.stopClusterNode(id)

    def stopAll(self):
        clusters = self._getClusters()
        if not _force:
            prompt = "Warning: This command will shut down the entire cluster."
            prompt += " Are you sure? [N]: "

            confirm = raw_input(prompt)
            if len(confirm) == 0 or confirm[0].upper() != 'Y':
                print "Operation canceled"
                sys.exit(1)

        cluster = clusters[0]
        cluster.stopFullCluster()

    def showConnections(self):
        clusters = self._getClusters()
        cluster = clusters[0]
        memberList = cluster.members.split(";")
        idList = cluster.memberIDs.split(";")
        displayList = []
        hostList = self._getHostList(memberList)
        self.qmf.delBroker(self.broker)
        self.broker = None
        self.brokers = []
        pattern = re.compile("^\\d+\\.\\d+\\.\\d+\\.\\d+:\\d+$")

        idx = 0
        for host in hostList:
            if _showConn == "all" or _showConn == idList[idx] or _delConn:
                self.brokers.append(self.qmf.addBroker(host, _connTimeout))
                displayList.append(idList[idx])
            idx += 1

        idx = 0
        found = False
        for broker in self.brokers:
            if not _delConn:
                print "Clients on Member: ID=%s:" % displayList[idx]
            connList = self.qmf.getObjects(_class="connection", _package="org.apache.qpid.broker", _broker=broker)
            for conn in connList:
                if pattern.match(conn.address):
                    if _numeric or _delConn:
                        a = conn.address
                    else:
                        tokens = conn.address.split(":")
                        try:
                            hostList = socket.gethostbyaddr(tokens[0])
                            host = hostList[0]
                        except:
                            host = tokens[0]
                        a = host + ":" + tokens[1]
                    if _delConn:
                        tokens = _delConn.split(":")
                        ip = socket.gethostbyname(tokens[0])
                        toDelete = ip + ":" + tokens[1]
                        if a == toDelete:
                            print "Closing connection from client: %s" % a
                            conn.close()
                            found = True
                    else:
                        print "    %s" % a
            idx += 1
            if not _delConn:
                print
        if _delConn and not found:
            print "Client connection '%s' not found" % _delConn

        for broker in self.brokers:
            self.qmf.delBroker(broker)


##
## Main Program
##

try:
    longOpts = ("stop=", "all-stop", "force", "connections=", "all-connections" "del-connection=", "numeric", "timeout=")
    (optlist, encArgs) = getopt.gnu_getopt(sys.argv[1:], "s:kfCc:d:n", longOpts)
except:
    Usage()

try:
    encoding = locale.getpreferredencoding()
    cargs = [a.decode(encoding) for a in encArgs]
except:
    cargs = encArgs

count = 0
for opt in optlist:
    if opt[0] == "--timeout":
        _connTimeout = int(opt[1])
        if _connTimeout == 0:
            _connTimeout = None
    if opt[0] == "-s" or opt[0] == "--stop":
        _stopId = opt[1]
        if len(_stopId.split(":")) != 2:
            print "Member ID must be of form: <host or ip>:<number>"
            sys.exit(1)
        count += 1
    if opt[0] == "-k" or opt[0] == "--all-stop":
        _stopAll = True
        count += 1
    if opt[0] == "-f" or opt[0] == "--force":
        _force = True
    if opt[0] == "-n" or opt[0] == "--numeric":
        _numeric = True
    if opt[0] == "-C" or opt[0] == "--all-connections":
        _showConn = "all"
        count += 1
    if opt[0] == "-c" or opt[0] == "--connections":
        _showConn = opt[1]
        if len(_showConn.split(":")) != 2:
            print "Member ID must be of form: <host or ip>:<number>"
            sys.exit(1)
        count += 1
    if opt[0] == "-d" or opt[0] == "--del-connection":
        _delConn = opt[1]
        if len(_delConn.split(":")) != 2:
            print "Connection must be of form: <host or ip>:<port>"
            sys.exit(1)
        count += 1

if count > 1:
    print "Only one command option may be supplied"
    print
    Usage()

nargs = len(cargs)
bm    = BrokerManager()

if nargs == 1:
    _host = cargs[0]

try:
    bm.SetBroker(_host)
    if _stopId:
        bm.stopMember(_stopId)
    elif _stopAll:
        bm.stopAll()
    elif _showConn or _delConn:
        bm.showConnections()
    else:
        bm.overview()
except KeyboardInterrupt:
    print
except Exception,e:
    if e.__repr__().find("connection aborted") > 0:
        # we expect this when asking the connected broker to shut down
        sys.exit(0)
    print "Failed: %s - %s" % (e.__class__.__name__, e)
    sys.exit(1)

bm.Disconnect()
