Blob Blame History Raw
#!/usr/bin/python
# -*- coding: utf-8 -*-
#
# Copyright (C) 2010-2012 Red Hat, Inc.
#
# Authors:
# Thomas Woerner <twoerner@redhat.com>
# Jiri Popelka <jpopelka@redhat.com>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
#

# To use in git tree: PYTHONPATH=.. python firewalld-test.py

import dbus
import sys
import time
import unittest

from firewall.config.dbus import DBUS_PATH, DBUS_PATH_CONFIG, DBUS_INTERFACE, \
                                 DBUS_INTERFACE_ZONE, DBUS_INTERFACE_CONFIG
from firewall.dbus_utils import dbus_to_python
from pprint import pprint

class TestFirewallD(unittest.TestCase):
    """
    For testing of temporary changes, ie. those that disappear with restart:
    adding/removing interfaces to zones, setting/changing of default zone
    adding/removing of services, ports, forward ports, icmp blocks
    """
    def setUp(self):
        unittest.TestCase.setUp(self)
        bus = dbus.SystemBus()
        dbus_obj = bus.get_object(DBUS_INTERFACE, DBUS_PATH)
        dbus_obj_config = bus.get_object(DBUS_INTERFACE, DBUS_PATH_CONFIG)
        self.fw = dbus.Interface(dbus_obj, dbus_interface=DBUS_INTERFACE)
        self.fw_zone = dbus.Interface(dbus_obj,
                                     dbus_interface=DBUS_INTERFACE_ZONE)
        self.config_properties = dbus.Interface(dbus_obj_config,
                                    dbus_interface='org.freedesktop.DBus.Properties')
        self.config_properties.Set(DBUS_INTERFACE_CONFIG, "FlushAllOnReload", "no")
        self.fw.reload()

    def test_get_setDefaultZone(self):
        old_zone = dbus_to_python(self.fw.getDefaultZone())
        print ("\nCurrent default zone is '%s'" % old_zone)

        self.fw_zone.addInterface("", "foo")
        self.fw_zone.addInterface(old_zone, "bar")

        print ("Setting default zone to 'external'")
        self.fw.setDefaultZone("external")

        # make sure the default zone was properly set
        self.assertEqual(self.fw.getDefaultZone(), "external")
        # check that *only* foo interface was moved to new default zone
        self.assertTrue(self.fw_zone.queryInterface("external", "foo"))
        self.assertTrue(self.fw_zone.queryInterface(old_zone, "bar"))

        print ("Re-setting default zone back to '%s'" % old_zone)
        self.fw.setDefaultZone(old_zone)
        self.fw_zone.removeInterface("", "foo")
        self.fw_zone.removeInterface("", "bar")

    def test_zone_getActiveZones(self):
        interface = "baz"
        zone = "home"

        print ("\nAdding interface '%s' to '%s' zone" % (interface, zone))
        self.fw_zone.addInterface(zone, interface)

        print ("Getting active zones: ")
        ret = self.fw_zone.getActiveZones()
        self.assertTrue(len(ret)>0)
        pprint (dbus_to_python(ret))

        self.fw_zone.removeInterface(zone, interface) #cleanup

    def test_zone_getZones(self):
        z = self.fw_zone.getZones()
        print ("\nZones:")
        pprint(dbus_to_python(z))

    def test_zone_add_remove_queryInterface(self):
        interface = "foo"
        zone = "trusted"

        print ("\nAdding interface '%s' to '%s' zone" % (interface, zone))
        ret = self.fw_zone.addInterface(zone, interface)
        self.assertEqual(ret, zone)
        self.assertTrue(self.fw_zone.queryInterface(zone, interface))

        print ("Re-adding")
        self.assertRaisesRegexp(Exception, 'ZONE_ALREADY_SET', self.fw_zone.addInterface, zone, interface)

        zone = "block"
        print ("Re-adding interface '%s' to '%s' zone" % (interface, zone))
        self.assertRaisesRegexp(Exception, 'ZONE_CONFLICT', self.fw_zone.addInterface, zone, interface)

        print ("Removing interface '%s' from '%s' zone" % (interface, zone))
        self.assertRaisesRegexp(Exception, 'ZONE_CONFLICT', self.fw_zone.removeInterface, zone, interface)

        zone = "trusted"
        print ("Removing interface '%s' from '%s' zone" % (interface, zone))
        ret = self.fw_zone.removeInterface(zone, interface)
        self.assertEqual(ret, zone)
        self.assertFalse(self.fw_zone.queryInterface(zone, interface))
        print ("Re-removing")
        self.assertRaises(Exception, self.fw_zone.removeInterface, zone, interface)

        print ("Add again and remove interface '%s' from zone it belongs to" % interface)
        self.fw_zone.addInterface(zone, interface)
        self.assertTrue(self.fw_zone.queryInterface(zone, interface))
        ret = self.fw_zone.removeInterface("", interface)
        self.assertEqual(ret, zone)
        self.assertFalse(self.fw_zone.queryInterface(zone, interface))
        print ("Re-removing")
        self.assertRaises(Exception, self.fw_zone.removeInterface, "", interface)

    def test_zone_change_queryZone(self):
        interface = "foo"
        zone = "internal"

        print ("\nChanging zone of interface '%s' to '%s'" % (interface, zone))
        ret = self.fw_zone.changeZone(zone, interface)
        self.assertEqual(ret, zone)
        self.assertTrue(self.fw_zone.queryInterface(zone, interface))

        print ("Get zone of interface '%s': " % (interface))
        ret = self.fw_zone.getZoneOfInterface(interface)
        self.assertEqual(ret, zone)
        print (dbus_to_python(ret))

        self.fw_zone.removeInterface(zone, interface) #cleanup

    def test_zone_add_get_query_removeService(self):
        service = "samba"
        zone = "external"
        print ("\nAdding service '%s' to '%s' zone" % (service, zone))
        ret = self.fw_zone.addService(zone, service, 0)
        self.assertEqual(ret, zone)
        print ("Re-adding")
        self.assertRaisesRegexp(Exception, 'ALREADY_ENABLED', self.fw_zone.addService, zone, service, 0)

        print ("Get services of zone '%s'" % (zone))
        ret = self.fw_zone.getServices(zone)
        self.assertTrue(len(ret)>0)
        pprint (dbus_to_python(ret))

        print ("Removing service '%s' from '%s' zone" % (service, zone))
        ret = self.fw_zone.removeService(zone, service)
        self.assertEqual(ret, zone)
        print ("Re-removing")
        self.assertRaisesRegexp(Exception, 'NOT_ENABLED', self.fw_zone.removeService, zone, service)

        zone = "dmz"
        timeout = 2
        print ("Adding timed service '%s' to '%s' zone, active for %d seconds" % (service, zone, timeout))
        ret = self.fw_zone.addService(zone, service, timeout)
        self.assertEqual(ret, zone)
        self.assertTrue(self.fw_zone.queryService(zone, service))
        time.sleep(timeout+1)
        print ("Checking if timeout has been working")
        self.assertFalse(self.fw_zone.queryService(zone, service))

    def test_zone_add_get_query_removePort(self):
        port = "443"
        protocol="tcp"
        zone = "public"
        print ("\nAdding port '%s/%s' to '%s' zone" % (port, protocol, zone))
        ret = self.fw_zone.addPort(zone, port, protocol, 0)
        self.assertEqual(ret, zone)
        print ("Re-adding port")
        self.assertRaisesRegexp(Exception, 'ALREADY_ENABLED', self.fw_zone.addPort, zone, port, protocol, 0)

        print ("Get ports of zone '%s': " % (zone))
        ret = self.fw_zone.getPorts(zone)
        self.assertTrue(len(ret)>0)
        pprint (dbus_to_python(ret))

        print ("Removing port '%s/%s' from '%s' zone" % (port, protocol, zone))
        ret = self.fw_zone.removePort(zone, port, protocol)
        self.assertEqual(ret, zone)
        print ("Re-removing")
        self.assertRaisesRegexp(Exception, 'NOT_ENABLED', self.fw_zone.removePort, zone, port, protocol)

        port = "443-445"
        protocol="udp"
        zone = "dmz"
        timeout = 2
        print ("Adding timed port '%s/%s' to '%s' zone, active for %d seconds" % (port, protocol, zone, timeout))
        ret = self.fw_zone.addPort(zone, port, protocol, timeout)
        self.assertEqual(ret, zone)
        self.assertTrue(self.fw_zone.queryPort(zone, port, protocol))
        time.sleep(timeout+1)
        print ("Checking if timeout has been working")
        self.assertFalse(self.fw_zone.queryPort(zone, port, protocol))

    def test_zone_add_query_removeMasquerade(self):
        zone = "public"
        print ("\nAdd masquerade to '%s' zone" % (zone))
        ret = self.fw_zone.addMasquerade(zone, 0)
        self.assertEqual(ret, zone)
        print ("Re-adding")
        self.assertRaisesRegexp(Exception, 'ALREADY_ENABLED', self.fw_zone.addMasquerade, zone, 0)

        print ("Checking if masquerade is added to zone '%s'" % (zone))
        self.assertTrue(self.fw_zone.queryMasquerade(zone))

        print ("Remove masquerade from '%s' zone" % (zone))
        ret = self.fw_zone.removeMasquerade(zone)
        self.assertEqual(ret, zone)
        print ("Re-adding")
        self.assertRaisesRegexp(Exception, 'NOT_ENABLED', self.fw_zone.removeMasquerade, zone)

        zone = "dmz"
        timeout = 2
        print ("Add timed masquerade to '%s' zone, active for %d seconds" % (zone, timeout))
        ret = self.fw_zone.addMasquerade(zone, timeout)
        self.assertEqual(ret, zone)
        self.assertTrue(self.fw_zone.queryMasquerade(zone))
        time.sleep(timeout+1)
        print ("Checking if timeout has been working")
        self.assertFalse(self.fw_zone.queryMasquerade(zone))

    def test_zone_add_get_query_removeForwardPort(self):
        port = "443"
        protocol="tcp"
        toport = "441"
        toaddr = "192.168.0.2"
        zone = "public"
        print ("\nAdding forward port '%s/%s' to '%s:%s' to '%s' zone" % (port, protocol, toaddr, toport, zone))
        ret = self.fw_zone.addForwardPort(zone, port, protocol, toport, toaddr, 0)
        self.assertEqual(ret, zone)
        print ("Re-adding")
        self.assertRaisesRegexp(Exception, 'ALREADY_ENABLED', self.fw_zone.addForwardPort, zone, port, protocol, toport, toaddr, 0)

        print ("Get forward ports of zone '%s': " % (zone))
        ret = self.fw_zone.getForwardPorts(zone)
        self.assertTrue(len(ret)>0)
        pprint (dbus_to_python(ret))

        print ("Removing forward port '%s/%s' to '%s:%s' from '%s' zone" % (port, protocol, toaddr, toport, zone))
        ret = self.fw_zone.removeForwardPort(zone, port, protocol, toport, toaddr)
        self.assertEqual(ret, zone)
        print ("Re-removing")
        self.assertRaisesRegexp(Exception, 'NOT_ENABLED', self.fw_zone.removeForwardPort, zone, port, protocol, toport, toaddr)

        port = "443-445"
        protocol="udp"
        toport = ""
        toaddr = "192.168.0.3"
        zone = "dmz"
        timeout = 2
        print ("Adding timed forward port '%s/%s' to '%s:%s' to '%s' zone, active for %d seconds" % (port, protocol, toaddr, toport, zone, timeout))
        ret = self.fw_zone.addForwardPort(zone, port, protocol, toport, toaddr, timeout)
        self.assertEqual(ret, zone)
        self.assertTrue(self.fw_zone.queryForwardPort(zone, port, protocol, toport, toaddr))
        time.sleep(timeout+1)
        print ("Checking if timeout has been working")
        self.assertFalse(self.fw_zone.queryForwardPort(zone, port, protocol, toport, toaddr))

    def test_zone_add_get_query_removeIcmpBlock(self):
        icmp = "parameter-problem"
        zone = "external"
        print ("\nAdding icmp block '%s' to '%s' zone" % (icmp, zone))
        ret = self.fw_zone.addIcmpBlock(zone, icmp, 0)
        self.assertEqual(ret, zone)
        print ("Re-adding")
        self.assertRaisesRegexp(Exception, 'ALREADY_ENABLED', self.fw_zone.addIcmpBlock, zone, icmp, 0)

        print ("Get icmp blocks of zone '%s': " % (zone))
        ret = self.fw_zone.getIcmpBlocks(zone)
        self.assertTrue(len(ret)>0)
        pprint (dbus_to_python(ret))

        print ("Removing icmp block '%s' from '%s' zone" % (icmp, zone))
        ret = self.fw_zone.removeIcmpBlock(zone, icmp)
        self.assertEqual(ret, zone)
        print ("Re-removing")
        self.assertRaisesRegexp(Exception, 'NOT_ENABLED', self.fw_zone.removeIcmpBlock, zone, icmp)

        icmp = "redirect"
        zone = "dmz"
        timeout = 2
        print ("Adding timed icmp block '%s' to '%s' zone, active for %d seconds: " % (icmp, zone, timeout))
        ret = self.fw_zone.addIcmpBlock(zone, icmp, timeout)
        self.assertEqual(ret, zone)
        self.assertTrue(self.fw_zone.queryIcmpBlock(zone, icmp))
        time.sleep(timeout+1)
        print ("Checking if timeout has been working: ")
        self.assertFalse(self.fw_zone.queryIcmpBlock(zone, icmp))

    def test_reload(self):
        interface = "foo"
        zone = "work"

        self.fw_zone.addInterface(zone, interface)
        self.fw.reload()
        print ("\nChecking if interface remains in zone after service reload: ")
        self.assertTrue(self.fw_zone.queryInterface(zone, interface))

        self.fw_zone.removeInterface(zone, interface) #cleanup

if __name__ == '__main__':
    suite = unittest.TestLoader().loadTestsFromTestCase(TestFirewallD)
    results = unittest.TextTestRunner(verbosity=2).run(suite)
    sys.exit(0 if results.wasSuccessful() else 1)