Blob Blame History Raw
#!/usr/bin/python

# Farstream simple network signalling library for the demo GUI
#
# Copyright (C) 2007 Collabora, Nokia
# @author: Olivier Crete <olivier.crete@collabora.co.uk>
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301 USA
#

#
# This is the signalling code used by fs-gui.py
#

import sys, os, pwd, os.path
import socket, struct
import gc

import gi
gi.require_version('Farstream', '0.2')
from gi.repository import Farstream, GLib

class FsUIConnect:
    ERROR = 0
    CODECS = 1
    CANDIDATE = 2
    CANDIDATES_DONE = 3
    INTRO = 4

    def __reset(self):
        self.type = None
        self.media = None
        self.size = struct.calcsize("!IIIIII")
        self.data = ""
        self.dest = -1
        self.src = -1
 
    
    def __init__(self, sock, callbacks, myid=0):
        self.sock = sock
        self.__reset()
        self.callbacks = callbacks
        self.myid = myid
        self.partid = 1
        self.is_server = True
        sock.setblocking(0)
        GLib.io_add_watch(self.sock.fileno(), GLib.IO_IN,
                             self.__data_in)
        GLib.io_add_watch(self.sock.fileno(),
                             GLib.IO_ERR | GLib.IO_HUP,
                             self.__error)

    def __error(self, source, condition):
        print "have error"
        if (self.src >= 0):
            self.callbacks[self.ERROR](self.src)
        else:
            self.callbacks[self.ERROR](self.partid)
        return False

    def __data_in(self, source, condition):
        data = self.sock.recv(self.size-len(self.data))

        if len(data) == 0:
            print "received nothing"
            if (self.src >= 0):
                self.callbacks[self.ERROR](self.src)
            else:
                self.callbacks[self.ERROR](self.partid)
            return False
        
        self.data += data
        if len(self.data) == self.size:
            if self.type is not None:
                if self.type == self.CODECS:
                    data = self.__codecs_from_string(data)
                elif self.type == self.CANDIDATE:
                    data = self.__candidate_from_string(data)
                else:
                    data = self.data
                self.callbacks[self.type](self.src, self.dest,
                                          self.media, data)
                self.__reset()
            else:
                (check,
                 self.src,
                 self.dest,
                 self.type,
                 self.media,
                 self.size) = struct.unpack("!IIIIII", self.data)
                if check != 0xDEADBEEF:
                    print "CORRUPTION"
                    sys.exit(1)
                if self.myid > 1 and self.dest != self.myid:
                    print "GOT MESSAGE FOR %d, but I am %d" % (self.dest,
                                                               self.myid)
                    sys.exit(1)
                self.data=""
                if self.size == 0:
                    self.callbacks[self.type](self.src, self.dest,
                                              self.media, None)
                    self.__reset()
        return True

    def __send_data(self, dest, type, media=0, data="", src=None):
        if src is None: src = self.myid
        if src == 0 and type != self.INTRO: raise Exception
        try:
            self.sock.sendall(struct.pack("!IIIIII",
                                          0xDEADBEEF,
                                          int(src),
                                          int(dest),
                                          int(type),
                                          int(media),
                                          len(data)))
            self.sock.sendall(data)
        except socket.error:
            print "have error"
            self.callbacks[self.ERROR](self.partid)


    def send_error(self, dest, src):
        self.__send_data(dest, self.ERROR, src=src)
    def send_intro(self, dest, src=None):
        self.__send_data(dest, self.INTRO, src=src)
    def send_codecs(self, dest, media, codecs, src=None):
        self.__send_data(dest, self.CODECS,
                         media=media,
                         data=self.__codecs_to_string(codecs), src=src)
    def send_candidate(self, dest, media, candidate, src=None):
        self.__send_data(dest, self.CANDIDATE, media=media,
                         data=self.__candidate_to_string(candidate), src=src)
    def send_candidates_done(self, dest, media, src=None):
        self.__send_data(dest, self.CANDIDATES_DONE, media=media, src=src)

    def __del__(self):
        try:
            self.sock.close()
        except AttributeError:
            pass


    def __candidate_to_string(self, candidate):
        print int(candidate.proto)
        return "|".join((
            candidate.foundation or "",
            str(candidate.component_id),
            candidate.ip or "",
            str(candidate.port),
            candidate.base_ip or "",
            str(candidate.base_port),
            str(int(candidate.proto)),
            str(candidate.priority),
            str(int(candidate.type)),
            candidate.username or "",
            candidate.password or ""))

    def __candidate_from_string(self, string):
        (foundation,
         component_id,
         ip,
         port,
         base_ip,
         base_port,
         proto,
         priority,
         type  ,
         username,
         password) = string.split("|")
        return Farstream.Candidate.new_full(
            foundation,
            int(component_id),
            ip,
            int(port),
            base_ip,
            int(base_port),
            int(proto),
            int(priority),
            int(type),
            username,
            password,
            0) # No multicast, ttl=0

    def __codecs_to_string(self, codecs):
        codec_strings = []
        for codec in codecs:
            start = " ".join((str(codec.id),
                              codec.encoding_name,
                              str(int(codec.media_type)),
                              str(codec.clock_rate),
                              str(codec.channels)))
            codec = "".join((start,
                             "|",
                             ";".join([i.name + "=" + i.value for i in codec.optional_params])))
            codec_strings.append(codec)
            
        return "\n".join(codec_strings)


    def __codecs_from_string(self, string):
        codecs = []
        for substring in string.split("\n"):
            (start,end) = substring.split("|")
            (id, encoding_name, media_type, clock_rate, channels) = start.split(" ")
            codec = Farstream.Codec.new(int(id), encoding_name, int(media_type),
                               int(clock_rate))
            codec.channels = int(channels)
            if len(end):
                for x in end.split(";"):
                    if len(x) > 0:
                        codec.add_optional_parameter(*x.split("=",1))
            codecs.append(codec)
        return codecs

class FsUIConnectClient (FsUIConnect):
    def __init__(self, ip, port, callbacks):
        sock = socket.socket()
        sock.connect((ip, port))
        FsUIConnect.__init__(self, sock, callbacks)
        self.is_server = False

class FsUIListener:
    def __init__(self, port, callback, *args):
        self.sock = socket.socket()
        self.callback = callback
        self.args = args
        bound = False
        while not bound:
            try:
                self.sock.bind(("", port))
                bound = True
            except socket.error, e:
                port += 1
        self.port = port
        print "Bound to port ", port
        self.sock.setblocking(0)
        GLib.io_add_watch(self.sock.fileno(), GLib.IO_IN, self.data_in)
        GLib.io_add_watch(self.sock.fileno(),
                             GLib.IO_ERR | GLib.IO_HUP,
                             self.error)
        self.sock.listen(3)

    def error(self, source, condition):
        print "Error on listen"
        sys.exit(1)
        return False

    def data_in(self, source, condition):
        (sock,addr) = self.sock.accept()
        self.callback(sock, *self.args)
        return True
    
class FsUIClient:
    def __init__(self, ip, port, get_participant, *args):
        self.participants = {}
        self.get_participant = get_participant
        self.args = args
        self.connect = FsUIConnectClient(ip, port, (self.__error,
                                                    self.__codecs,
                                                    self.__candidate,
                                                    self.__candidate_done,
                                                    self.__intro))
        self.connect.send_intro(1)

    def __codecs(self, src, dest, media, data):
        print "Got codec Src:%d dest:%d data:%s" % (src, dest, data)
        self.participants[src].codecs(media, data)
    def __candidate(self, src, dest, media, data):
        self.participants[src].candidate(media, data)
    def __candidate_done(self, src, dest, media, data):
        self.participants[src].candidates_done(media)
    def __intro(self, src, dest, media, data):
        print "Got Intro from %s, I am %d" % (src, dest)
        if src == 1:
            self.connect.myid = dest
        if not self.participants.has_key(src):
            if src != 1:
                self.connect.send_intro(src)
            self.participants[src] = self.get_participant(self.connect, src,
                                                          *self.args)
    def __error(self, participantid, *arg):
        print "Client Error", participantid
        if participantid == 1:
            # Communication error with server, its over
            self.participants[participantid].error()
        else:
            self.participants[participantid].destroy()
            del self.participants[participantid]
            gc.collect()


class FsUIServer:
    nextid = 2
    participants = {}

    def __init__(self, sock, get_participant, *args):
        self.get_participant = get_participant
        self.args = args
        self.connect = FsUIConnect(sock, (self.__error,
                                          self.__codecs,
                                          self.__candidate,
                                          self.__candidate_done,
                                          self.__intro), 1)
    def __codecs(self, src, dest, media, data):
        FsUIServer.participants[src].codecs(media, data)
    def __candidate(self, src, dest, media, data):
        if dest == 1:
            FsUIServer.participants[src].candidate(media, data)
        else:
            print data
            FsUIServer.participants[dest].connect.send_candidate(dest,
                                                                 media,
                                                                 data,
                                                                 src)
    def __candidate_done(self, src, dest, media, data):
        if dest == 1:
            FsUIServer.participants[src].candidates_done(media)
        else:
            FsUIServer.participants[dest].connect.send_candidates_done(dest,
                                                                       media,
                                                                       src)
    def __intro(self, src, dest, media, data):
        print "Got Intro from %s to %s" % (src, dest)
        if src == 0 and dest == 1:
            newid = FsUIServer.nextid
            # Forward the introduction to all other participants
            for pid in FsUIServer.participants:
                print "Sending from %d to %d" % (newid, pid)
                FsUIServer.participants[pid].connect.send_intro(pid, newid)
            self.connect.send_intro(newid)
            self.connect.partid = newid
            FsUIServer.participants[newid] = self.get_participant(self.connect,
                                                                  newid,
                                                                  *self.args)
            FsUIServer.participants[newid].send_local_codecs()
            FsUIServer.nextid += 1
        elif dest != 1:
            FsUIServer.participants[dest].connect.send_intro(dest, src)
            FsUIServer.participants[src].send_codecs_to(
                        FsUIServer.participants[dest])
        else:
            print "ERROR SRC != 0"
            
    def __error(self, participantid, *args):
        print "Server Error", participantid
        FsUIServer.participants[participantid].destroy()
        del FsUIServer.participants[participantid]
        gc.collect()
        for pid in FsUIServer.participants:
            FsUIServer.participants[pid].connect.send_error(pid, participantid)

if __name__ == "__main__":
    class TestMedia:
        def __init__(self, pid, id, connect):
            self.pid = pid
            self.id = id
            self.connect = connect
            candidate = Farstream.Candidate()
            candidate.component_id = 1
            connect.send_candidate(self.pid, self.id, candidate)
            connect.send_candidates_done(self.pid, self.id)
        def candidate(self, candidate):
            print "Got candidate", candidate
        def candidates_done(self):
            print "Got candidate done"
        def codecs(self, codecs):
            if self.connect.myid != 1:
                self.connect.send_codecs(1, self.id,
                                        [Farstream.Codec.new(self.connect.myid,
                                                       "codec-name",
                                                       Farstream.MediaType.AUDIO,
                                                       self.id)])
       
        def send_local_codecs(self):
            print "Send local codecs to %s for media %s" % (self.pid, self.id)
            self.connect.send_codecs(self.pid, self.id,
                                     [Farstream.Codec.new(self.connect.myid,
                                                          "local_codec",
                                                          Farstream.MediaType.AUDIO,
                                                          self.id)])
        def get_codecs(self):
            return [Farstream.Codec(self.connect.myid,
                                   "nego-codecs",
                                   self.pid,
                                   self.id)]
            
            
    class TestParticipant:
        def __init__(self, connect, id, *args):
            self.id = id
            self.streams = {1: TestMedia(id, 1, connect),
                            2: TestMedia(id, 2, connect)}
            self.connect = connect
            print "New Participant %s" % (id)
        def candidate(self, media, candidate):
            self.streams[media].candidate(candidate)
        def candidates_done(self, media):
            self.streams[media].candidates_done()
        def codecs(self, media, codecs):
            self.streams[media].codecs(codecs)
        def send_local_codecs(self):
            for id in self.streams:
                self.streams[id].send_local_codecs()
        def destroy(self):
            pass
        def send_codecs_to(self, participant):
            for sid in self.streams:
                print "to: %s from: %s" % (str(participant.id), (self.id))
                participant.connect.send_codecs(participant.id,
                                                self.streams[sid].id,
                                                self.streams[sid].get_codecs(),
                                                self.id)
        def error(self):
            print "ERROR"
            sys.exit(1)
        def destroy(self):
            pass
            

    mainloop = GLib.MainLoop()
    if len(sys.argv) > 1:
        client = FsUIClient("127.0.0.1", int(sys.argv[1]),
                            TestParticipant)
    else:
        listener = FsUIListener(9893, FsUIServer, TestParticipant)
    mainloop.run()