|
Packit |
a8ec6b |
# -*- coding: utf-8 -*-
|
|
Packit |
a8ec6b |
#
|
|
Packit |
a8ec6b |
# Copyright (C) 2011-2016 Red Hat, Inc.
|
|
Packit |
a8ec6b |
#
|
|
Packit |
a8ec6b |
# Authors:
|
|
Packit |
a8ec6b |
# Thomas Woerner <twoerner@redhat.com>
|
|
Packit |
a8ec6b |
#
|
|
Packit |
a8ec6b |
# This program is free software; you can redistribute it and/or modify
|
|
Packit |
a8ec6b |
# it under the terms of the GNU General Public License as published by
|
|
Packit |
a8ec6b |
# the Free Software Foundation; either version 2 of the License, or
|
|
Packit |
a8ec6b |
# (at your option) any later version.
|
|
Packit |
a8ec6b |
#
|
|
Packit |
a8ec6b |
# This program is distributed in the hope that it will be useful,
|
|
Packit |
a8ec6b |
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
Packit |
a8ec6b |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
Packit |
a8ec6b |
# GNU General Public License for more details.
|
|
Packit |
a8ec6b |
#
|
|
Packit |
a8ec6b |
# You should have received a copy of the GNU General Public License
|
|
Packit |
a8ec6b |
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
Packit |
a8ec6b |
#
|
|
Packit |
a8ec6b |
|
|
Packit |
a8ec6b |
"""FirewallCommand class for command line client simplification"""
|
|
Packit |
a8ec6b |
|
|
Packit |
a8ec6b |
__all__ = [ "FirewallCommand" ]
|
|
Packit |
a8ec6b |
|
|
Packit |
a8ec6b |
import sys
|
|
Packit |
a8ec6b |
|
|
Packit |
a8ec6b |
from firewall import errors
|
|
Packit |
a8ec6b |
from firewall.errors import FirewallError
|
|
Packit |
a8ec6b |
from dbus.exceptions import DBusException
|
|
Packit |
a8ec6b |
from firewall.functions import checkIPnMask, checkIP6nMask, check_mac, \
|
|
Packit |
a8ec6b |
check_port, check_single_address
|
|
Packit |
a8ec6b |
|
|
Packit |
a8ec6b |
class FirewallCommand(object):
|
|
Packit |
a8ec6b |
def __init__(self, quiet=False, verbose=False):
|
|
Packit |
a8ec6b |
self.quiet = quiet
|
|
Packit |
a8ec6b |
self.verbose = verbose
|
|
Packit |
a8ec6b |
self.__use_exception_handler = True
|
|
Packit |
a8ec6b |
self.fw = None
|
|
Packit |
a8ec6b |
|
|
Packit |
a8ec6b |
def set_fw(self, fw):
|
|
Packit |
a8ec6b |
self.fw = fw
|
|
Packit |
a8ec6b |
|
|
Packit |
a8ec6b |
def set_quiet(self, flag):
|
|
Packit |
a8ec6b |
self.quiet = flag
|
|
Packit |
a8ec6b |
|
|
Packit |
a8ec6b |
def get_quiet(self):
|
|
Packit |
a8ec6b |
return self.quiet
|
|
Packit |
a8ec6b |
|
|
Packit |
a8ec6b |
def set_verbose(self, flag):
|
|
Packit |
a8ec6b |
self.verbose = flag
|
|
Packit |
a8ec6b |
|
|
Packit |
a8ec6b |
def get_verbose(self):
|
|
Packit |
a8ec6b |
return self.verbose
|
|
Packit |
a8ec6b |
|
|
Packit |
a8ec6b |
def print_msg(self, msg=None):
|
|
Packit |
a8ec6b |
if msg is not None and not self.quiet:
|
|
Packit |
a8ec6b |
sys.stdout.write(msg + "\n")
|
|
Packit |
a8ec6b |
|
|
Packit |
a8ec6b |
def print_error_msg(self, msg=None):
|
|
Packit |
a8ec6b |
if msg is not None and not self.quiet:
|
|
Packit |
a8ec6b |
sys.stderr.write(msg + "\n")
|
|
Packit |
a8ec6b |
|
|
Packit |
a8ec6b |
def print_warning(self, msg=None):
|
|
Packit |
a8ec6b |
FAIL = '\033[91m'
|
|
Packit |
a8ec6b |
END = '\033[00m'
|
|
Packit |
a8ec6b |
if sys.stderr.isatty():
|
|
Packit |
a8ec6b |
msg = FAIL + msg + END
|
|
Packit |
a8ec6b |
self.print_error_msg(msg)
|
|
Packit |
a8ec6b |
|
|
Packit |
a8ec6b |
def print_and_exit(self, msg=None, exit_code=0):
|
|
Packit |
a8ec6b |
#OK = '\033[92m'
|
|
Packit |
a8ec6b |
#END = '\033[00m'
|
|
Packit |
a8ec6b |
if exit_code > 1:
|
|
Packit |
a8ec6b |
self.print_warning(msg)
|
|
Packit |
a8ec6b |
else:
|
|
Packit |
a8ec6b |
#if sys.stdout.isatty():
|
|
Packit |
a8ec6b |
# msg = OK + msg + END
|
|
Packit |
a8ec6b |
self.print_msg(msg)
|
|
Packit |
a8ec6b |
sys.exit(exit_code)
|
|
Packit |
a8ec6b |
|
|
Packit |
a8ec6b |
def fail(self, msg=None):
|
|
Packit |
a8ec6b |
self.print_and_exit(msg, 2)
|
|
Packit |
a8ec6b |
|
|
Packit |
a8ec6b |
def print_if_verbose(self, msg=None):
|
|
Packit |
a8ec6b |
if msg is not None and self.verbose:
|
|
Packit |
a8ec6b |
sys.stdout.write(msg + "\n")
|
|
Packit |
a8ec6b |
|
|
Packit |
a8ec6b |
def __cmd_sequence(self, cmd_type, option, action_method, query_method, # pylint: disable=W0613, R0913, R0914
|
|
Packit |
a8ec6b |
parse_method, message, start_args=None, end_args=None, # pylint: disable=W0613
|
|
Packit |
a8ec6b |
no_exit=False):
|
|
Packit |
a8ec6b |
if self.fw is not None:
|
|
Packit |
a8ec6b |
self.fw.authorizeAll()
|
|
Packit |
a8ec6b |
items = [ ]
|
|
Packit |
a8ec6b |
_errors = 0
|
|
Packit |
a8ec6b |
_error_codes = [ ]
|
|
Packit |
a8ec6b |
for item in option:
|
|
Packit |
a8ec6b |
if parse_method is not None:
|
|
Packit |
a8ec6b |
try:
|
|
Packit |
a8ec6b |
item = parse_method(item)
|
|
Packit |
a8ec6b |
except Exception as msg:
|
|
Packit |
a8ec6b |
code = FirewallError.get_code(str(msg))
|
|
Packit |
a8ec6b |
if len(option) > 1:
|
|
Packit |
a8ec6b |
self.print_warning("Warning: %s" % msg)
|
|
Packit |
a8ec6b |
else:
|
|
Packit |
a8ec6b |
self.print_and_exit("Error: %s" % msg, code)
|
|
Packit |
a8ec6b |
if code not in _error_codes:
|
|
Packit |
a8ec6b |
_error_codes.append(code)
|
|
Packit |
a8ec6b |
_errors += 1
|
|
Packit |
a8ec6b |
continue
|
|
Packit |
a8ec6b |
|
|
Packit |
a8ec6b |
items.append(item)
|
|
Packit |
a8ec6b |
|
|
Packit |
a8ec6b |
for item in items:
|
|
Packit |
a8ec6b |
call_item = [ ]
|
|
Packit |
a8ec6b |
if start_args is not None:
|
|
Packit |
a8ec6b |
call_item += start_args
|
|
Packit |
a8ec6b |
if not isinstance(item, list) and not isinstance(item, tuple):
|
|
Packit |
a8ec6b |
call_item.append(item)
|
|
Packit |
a8ec6b |
else:
|
|
Packit |
a8ec6b |
call_item += item
|
|
Packit |
a8ec6b |
if end_args is not None:
|
|
Packit |
a8ec6b |
call_item += end_args
|
|
Packit |
a8ec6b |
self.deactivate_exception_handler()
|
|
Packit |
a8ec6b |
try:
|
|
Packit |
a8ec6b |
action_method(*call_item)
|
|
Packit |
a8ec6b |
except (DBusException, Exception) as msg:
|
|
Packit |
a8ec6b |
if isinstance(msg, DBusException):
|
|
Packit |
a8ec6b |
self.fail_if_not_authorized(msg.get_dbus_name())
|
|
Packit |
a8ec6b |
msg = msg.get_dbus_message()
|
|
Packit |
a8ec6b |
else:
|
|
Packit |
a8ec6b |
msg = str(msg)
|
|
Packit |
a8ec6b |
code = FirewallError.get_code(msg)
|
|
Packit |
a8ec6b |
if code in [ errors.ALREADY_ENABLED, errors.NOT_ENABLED,
|
|
Packit |
a8ec6b |
errors.ZONE_ALREADY_SET, errors.ALREADY_SET ]:
|
|
Packit |
a8ec6b |
code = 0
|
|
Packit |
a8ec6b |
if len(option) > 1:
|
|
Packit |
a8ec6b |
self.print_warning("Warning: %s" % msg)
|
|
Packit |
a8ec6b |
elif code == 0:
|
|
Packit |
a8ec6b |
self.print_warning("Warning: %s" % msg)
|
|
Packit |
a8ec6b |
return
|
|
Packit |
a8ec6b |
else:
|
|
Packit |
a8ec6b |
self.print_and_exit("Error: %s" % msg, code)
|
|
Packit |
a8ec6b |
if code not in _error_codes:
|
|
Packit |
a8ec6b |
_error_codes.append(code)
|
|
Packit |
a8ec6b |
_errors += 1
|
|
Packit |
a8ec6b |
self.activate_exception_handler()
|
|
Packit |
a8ec6b |
|
|
Packit |
a8ec6b |
if not no_exit:
|
|
Packit |
a8ec6b |
if len(option) > _errors or 0 in _error_codes:
|
|
Packit |
a8ec6b |
# There have been more options than errors or there
|
|
Packit |
a8ec6b |
# was at least one error code 0, return.
|
|
Packit |
a8ec6b |
return
|
|
Packit |
a8ec6b |
elif len(_error_codes) == 1:
|
|
Packit |
a8ec6b |
# Exactly one error code, use it.
|
|
Packit |
a8ec6b |
sys.exit(_error_codes[0])
|
|
Packit |
a8ec6b |
elif len(_error_codes) > 1:
|
|
Packit |
a8ec6b |
# There is more than error, exit using
|
|
Packit |
a8ec6b |
# UNKNOWN_ERROR. This could happen within sequences
|
|
Packit |
a8ec6b |
# where parsing failed with different errors like
|
|
Packit |
a8ec6b |
# INVALID_PORT and INVALID_PROTOCOL.
|
|
Packit |
a8ec6b |
sys.exit(errors.UNKNOWN_ERROR)
|
|
Packit |
a8ec6b |
|
|
Packit |
a8ec6b |
def add_sequence(self, option, action_method, query_method, parse_method, # pylint: disable=R0913
|
|
Packit |
a8ec6b |
message, no_exit=False):
|
|
Packit |
a8ec6b |
self.__cmd_sequence("add", option, action_method, query_method,
|
|
Packit |
a8ec6b |
parse_method, message, no_exit=no_exit)
|
|
Packit |
a8ec6b |
|
|
Packit |
a8ec6b |
def x_add_sequence(self, x, option, action_method, query_method, # pylint: disable=R0913
|
|
Packit |
a8ec6b |
parse_method, message, no_exit=False):
|
|
Packit |
a8ec6b |
self.__cmd_sequence("add", option, action_method, query_method,
|
|
Packit |
a8ec6b |
parse_method, message, start_args=[x],
|
|
Packit |
a8ec6b |
no_exit=no_exit)
|
|
Packit |
a8ec6b |
|
|
Packit |
a8ec6b |
def zone_add_timeout_sequence(self, zone, option, action_method, # pylint: disable=R0913
|
|
Packit |
a8ec6b |
query_method, parse_method, message,
|
|
Packit |
a8ec6b |
timeout, no_exit=False):
|
|
Packit |
a8ec6b |
self.__cmd_sequence("add", option, action_method, query_method,
|
|
Packit |
a8ec6b |
parse_method, message, start_args=[zone],
|
|
Packit |
a8ec6b |
end_args=[timeout], no_exit=no_exit)
|
|
Packit |
a8ec6b |
|
|
Packit |
a8ec6b |
def remove_sequence(self, option, action_method, query_method, # pylint: disable=R0913
|
|
Packit |
a8ec6b |
parse_method, message, no_exit=False):
|
|
Packit |
a8ec6b |
self.__cmd_sequence("remove", option, action_method, query_method,
|
|
Packit |
a8ec6b |
parse_method, message, no_exit=no_exit)
|
|
Packit |
a8ec6b |
|
|
Packit |
a8ec6b |
def x_remove_sequence(self, x, option, action_method, query_method, # pylint: disable=R0913
|
|
Packit |
a8ec6b |
parse_method, message, no_exit=False):
|
|
Packit |
a8ec6b |
self.__cmd_sequence("remove", option, action_method, query_method,
|
|
Packit |
a8ec6b |
parse_method, message, start_args=[x],
|
|
Packit |
a8ec6b |
no_exit=no_exit)
|
|
Packit |
a8ec6b |
|
|
Packit |
a8ec6b |
|
|
Packit |
a8ec6b |
def __query_sequence(self, option, query_method, parse_method, message, # pylint: disable=R0913
|
|
Packit |
a8ec6b |
start_args=None, no_exit=False):
|
|
Packit |
a8ec6b |
items = [ ]
|
|
Packit |
a8ec6b |
for item in option:
|
|
Packit |
a8ec6b |
if parse_method is not None:
|
|
Packit |
a8ec6b |
try:
|
|
Packit |
a8ec6b |
item = parse_method(item)
|
|
Packit |
a8ec6b |
except Exception as msg:
|
|
Packit |
a8ec6b |
if len(option) > 1:
|
|
Packit |
a8ec6b |
self.print_warning("Warning: %s" % msg)
|
|
Packit |
a8ec6b |
continue
|
|
Packit |
a8ec6b |
else:
|
|
Packit |
a8ec6b |
code = FirewallError.get_code(str(msg))
|
|
Packit |
a8ec6b |
self.print_and_exit("Error: %s" % msg, code)
|
|
Packit |
a8ec6b |
items.append(item)
|
|
Packit |
a8ec6b |
|
|
Packit |
a8ec6b |
for item in items:
|
|
Packit |
a8ec6b |
call_item = [ ]
|
|
Packit |
a8ec6b |
if start_args is not None:
|
|
Packit |
a8ec6b |
call_item += start_args
|
|
Packit |
a8ec6b |
if not isinstance(item, list) and not isinstance(item, tuple):
|
|
Packit |
a8ec6b |
call_item.append(item)
|
|
Packit |
a8ec6b |
else:
|
|
Packit |
a8ec6b |
call_item += item
|
|
Packit |
a8ec6b |
self.deactivate_exception_handler()
|
|
Packit |
a8ec6b |
try:
|
|
Packit |
a8ec6b |
res = query_method(*call_item)
|
|
Packit |
a8ec6b |
except DBusException as msg:
|
|
Packit |
a8ec6b |
self.fail_if_not_authorized(msg.get_dbus_name())
|
|
Packit |
a8ec6b |
code = FirewallError.get_code(msg.get_dbus_message())
|
|
Packit |
a8ec6b |
if len(option) > 1:
|
|
Packit |
a8ec6b |
self.print_warning("Warning: %s" % msg.get_dbus_message())
|
|
Packit |
a8ec6b |
continue
|
|
Packit |
a8ec6b |
else:
|
|
Packit |
a8ec6b |
self.print_and_exit("Error: %s" % msg.get_dbus_message(),
|
|
Packit |
a8ec6b |
code)
|
|
Packit |
a8ec6b |
except Exception as msg:
|
|
Packit |
a8ec6b |
code = FirewallError.get_code(str(msg))
|
|
Packit |
a8ec6b |
if len(option) > 1:
|
|
Packit |
a8ec6b |
self.print_warning("Warning: %s" % msg)
|
|
Packit |
a8ec6b |
else:
|
|
Packit |
a8ec6b |
self.print_and_exit("Error: %s" % msg, code)
|
|
Packit |
a8ec6b |
self.activate_exception_handler()
|
|
Packit |
a8ec6b |
if len(option) > 1:
|
|
Packit |
a8ec6b |
self.print_msg("%s: %s" % (message % item, ("no", "yes")[res]))
|
|
Packit |
a8ec6b |
else:
|
|
Packit |
a8ec6b |
self.print_query_result(res)
|
|
Packit |
a8ec6b |
if not no_exit:
|
|
Packit |
a8ec6b |
sys.exit(0)
|
|
Packit |
a8ec6b |
|
|
Packit |
a8ec6b |
def query_sequence(self, option, query_method, parse_method, message, # pylint: disable=R0913
|
|
Packit |
a8ec6b |
no_exit=False):
|
|
Packit |
a8ec6b |
self.__query_sequence(option, query_method, parse_method,
|
|
Packit |
a8ec6b |
message, no_exit=no_exit)
|
|
Packit |
a8ec6b |
|
|
Packit |
a8ec6b |
def x_query_sequence(self, x, option, query_method, parse_method, # pylint: disable=R0913
|
|
Packit |
a8ec6b |
message, no_exit=False):
|
|
Packit |
a8ec6b |
self.__query_sequence(option, query_method, parse_method,
|
|
Packit |
a8ec6b |
message, start_args=[x], no_exit=no_exit)
|
|
Packit |
a8ec6b |
|
|
Packit |
a8ec6b |
|
|
Packit |
a8ec6b |
def parse_source(self, value):
|
|
Packit |
a8ec6b |
if not checkIPnMask(value) and not checkIP6nMask(value) \
|
|
Packit |
a8ec6b |
and not check_mac(value) and not \
|
|
Packit |
a8ec6b |
(value.startswith("ipset:") and len(value) > 6):
|
|
Packit |
a8ec6b |
raise FirewallError(errors.INVALID_ADDR,
|
|
Packit |
a8ec6b |
"'%s' is no valid IPv4, IPv6 or MAC address, nor an ipset" % value)
|
|
Packit |
a8ec6b |
return value
|
|
Packit |
a8ec6b |
|
|
Packit |
a8ec6b |
def parse_port(self, value, separator="/"):
|
|
Packit |
a8ec6b |
try:
|
|
Packit |
a8ec6b |
(port, proto) = value.split(separator)
|
|
Packit |
a8ec6b |
except ValueError:
|
|
Packit |
a8ec6b |
raise FirewallError(errors.INVALID_PORT, "bad port (most likely "
|
|
Packit |
a8ec6b |
"missing protocol), correct syntax is "
|
|
Packit |
a8ec6b |
"portid[-portid]%sprotocol" % separator)
|
|
Packit |
a8ec6b |
if not check_port(port):
|
|
Packit |
a8ec6b |
raise FirewallError(errors.INVALID_PORT, port)
|
|
Packit |
a8ec6b |
if proto not in [ "tcp", "udp", "sctp", "dccp" ]:
|
|
Packit |
a8ec6b |
raise FirewallError(errors.INVALID_PROTOCOL,
|
|
Packit |
a8ec6b |
"'%s' not in {'tcp'|'udp'|'sctp'|'dccp'}" % \
|
|
Packit |
a8ec6b |
proto)
|
|
Packit |
a8ec6b |
return (port, proto)
|
|
Packit |
a8ec6b |
|
|
Packit |
a8ec6b |
def parse_forward_port(self, value, compat=False):
|
|
Packit |
a8ec6b |
port = None
|
|
Packit |
a8ec6b |
protocol = None
|
|
Packit |
a8ec6b |
toport = None
|
|
Packit |
a8ec6b |
toaddr = None
|
|
Packit |
a8ec6b |
i = 0
|
|
Packit |
a8ec6b |
while ("=" in value[i:]):
|
|
Packit |
a8ec6b |
opt = value[i:].split("=", 1)[0]
|
|
Packit |
a8ec6b |
i += len(opt) + 1
|
|
Packit |
a8ec6b |
if "=" in value[i:]:
|
|
Packit |
a8ec6b |
val = value[i:].split(":", 1)[0]
|
|
Packit |
a8ec6b |
else:
|
|
Packit |
a8ec6b |
val = value[i:]
|
|
Packit |
a8ec6b |
i += len(val) + 1
|
|
Packit |
a8ec6b |
|
|
Packit |
a8ec6b |
if opt == "port":
|
|
Packit |
a8ec6b |
port = val
|
|
Packit |
a8ec6b |
elif opt == "proto":
|
|
Packit |
a8ec6b |
protocol = val
|
|
Packit |
a8ec6b |
elif opt == "toport":
|
|
Packit |
a8ec6b |
toport = val
|
|
Packit |
a8ec6b |
elif opt == "toaddr":
|
|
Packit |
a8ec6b |
toaddr = val
|
|
Packit |
a8ec6b |
elif opt == "if" and compat:
|
|
Packit |
a8ec6b |
# ignore if option in compat mode
|
|
Packit |
a8ec6b |
pass
|
|
Packit |
a8ec6b |
else:
|
|
Packit |
a8ec6b |
raise FirewallError(errors.INVALID_FORWARD,
|
|
Packit |
a8ec6b |
"invalid forward port arg '%s'" % (opt))
|
|
Packit |
a8ec6b |
if not port:
|
|
Packit |
a8ec6b |
raise FirewallError(errors.INVALID_FORWARD, "missing port")
|
|
Packit |
a8ec6b |
if not protocol:
|
|
Packit |
a8ec6b |
raise FirewallError(errors.INVALID_FORWARD, "missing protocol")
|
|
Packit |
a8ec6b |
if not (toport or toaddr):
|
|
Packit |
a8ec6b |
raise FirewallError(errors.INVALID_FORWARD, "missing destination")
|
|
Packit |
a8ec6b |
|
|
Packit |
a8ec6b |
if not check_port(port):
|
|
Packit |
a8ec6b |
raise FirewallError(errors.INVALID_PORT, port)
|
|
Packit |
a8ec6b |
if protocol not in [ "tcp", "udp", "sctp", "dccp" ]:
|
|
Packit |
a8ec6b |
raise FirewallError(errors.INVALID_PROTOCOL,
|
|
Packit |
a8ec6b |
"'%s' not in {'tcp'|'udp'|'sctp'|'dccp'}" % \
|
|
Packit |
a8ec6b |
protocol)
|
|
Packit |
a8ec6b |
if toport and not check_port(toport):
|
|
Packit |
a8ec6b |
raise FirewallError(errors.INVALID_PORT, toport)
|
|
Packit |
a8ec6b |
if toaddr and not check_single_address("ipv4", toaddr):
|
|
Packit |
a8ec6b |
if compat or not check_single_address("ipv6", toaddr):
|
|
Packit |
a8ec6b |
raise FirewallError(errors.INVALID_ADDR, toaddr)
|
|
Packit |
a8ec6b |
|
|
Packit |
a8ec6b |
return (port, protocol, toport, toaddr)
|
|
Packit |
a8ec6b |
|
|
Packit |
a8ec6b |
def parse_ipset_option(self, value):
|
|
Packit |
a8ec6b |
args = value.split("=")
|
|
Packit |
a8ec6b |
if len(args) == 1:
|
|
Packit |
a8ec6b |
return (args[0], "")
|
|
Packit |
a8ec6b |
elif len(args) == 2:
|
|
Packit |
a8ec6b |
return args
|
|
Packit |
a8ec6b |
else:
|
|
Packit |
a8ec6b |
raise FirewallError(errors.INVALID_OPTION,
|
|
Packit |
a8ec6b |
"invalid ipset option '%s'" % (value))
|
|
Packit |
a8ec6b |
|
|
Packit |
a8ec6b |
def check_destination_ipv(self, value):
|
|
Packit |
a8ec6b |
ipvs = [ "ipv4", "ipv6", ]
|
|
Packit |
a8ec6b |
if value not in ipvs:
|
|
Packit |
a8ec6b |
raise FirewallError(errors.INVALID_IPV,
|
|
Packit |
a8ec6b |
"invalid argument: %s (choose from '%s')" % \
|
|
Packit |
a8ec6b |
(value, "', '".join(ipvs)))
|
|
Packit |
a8ec6b |
return value
|
|
Packit |
a8ec6b |
|
|
Packit |
a8ec6b |
def parse_service_destination(self, value):
|
|
Packit |
a8ec6b |
try:
|
|
Packit |
a8ec6b |
(ipv, destination) = value.split(":", 1)
|
|
Packit |
a8ec6b |
except ValueError:
|
|
Packit |
a8ec6b |
raise FirewallError(errors.INVALID_DESTINATION,
|
|
Packit |
a8ec6b |
"destination syntax is ipv:address[/mask]")
|
|
Packit |
a8ec6b |
return (self.check_destination_ipv(ipv), destination)
|
|
Packit |
a8ec6b |
|
|
Packit |
a8ec6b |
def check_ipv(self, value):
|
|
Packit |
a8ec6b |
ipvs = [ "ipv4", "ipv6", "eb" ]
|
|
Packit |
a8ec6b |
if value not in ipvs:
|
|
Packit |
a8ec6b |
raise FirewallError(errors.INVALID_IPV,
|
|
Packit |
a8ec6b |
"invalid argument: %s (choose from '%s')" % \
|
|
Packit |
a8ec6b |
(value, "', '".join(ipvs)))
|
|
Packit |
a8ec6b |
return value
|
|
Packit |
a8ec6b |
|
|
Packit |
a8ec6b |
def check_helper_family(self, value):
|
|
Packit |
a8ec6b |
ipvs = [ "", "ipv4", "ipv6" ]
|
|
Packit |
a8ec6b |
if value not in ipvs:
|
|
Packit |
a8ec6b |
raise FirewallError(errors.INVALID_IPV,
|
|
Packit |
a8ec6b |
"invalid argument: %s (choose from '%s')" % \
|
|
Packit |
a8ec6b |
(value, "', '".join(ipvs)))
|
|
Packit |
a8ec6b |
return value
|
|
Packit |
a8ec6b |
|
|
Packit |
a8ec6b |
def check_module(self, value):
|
|
Packit |
a8ec6b |
if not value.startswith("nf_conntrack_"):
|
|
Packit |
a8ec6b |
raise FirewallError(
|
|
Packit |
a8ec6b |
errors.INVALID_MODULE,
|
|
Packit |
a8ec6b |
"'%s' does not start with 'nf_conntrack_'" % value)
|
|
Packit |
a8ec6b |
if len(value.replace("nf_conntrack_", "")) < 1:
|
|
Packit |
a8ec6b |
raise FirewallError(errors.INVALID_MODULE,
|
|
Packit |
a8ec6b |
"Module name '%s' too short" % value)
|
|
Packit |
a8ec6b |
return value
|
|
Packit |
a8ec6b |
|
|
Packit |
a8ec6b |
def print_zone_info(self, zone, settings, default_zone=None, extra_interfaces=[]): # pylint: disable=R0914
|
|
Packit |
a8ec6b |
target = settings.getTarget()
|
|
Packit |
a8ec6b |
icmp_block_inversion = settings.getIcmpBlockInversion()
|
|
Packit |
a8ec6b |
interfaces = sorted(set(settings.getInterfaces() + extra_interfaces))
|
|
Packit |
a8ec6b |
sources = settings.getSources()
|
|
Packit |
a8ec6b |
services = settings.getServices()
|
|
Packit |
a8ec6b |
ports = settings.getPorts()
|
|
Packit |
a8ec6b |
protocols = settings.getProtocols()
|
|
Packit |
a8ec6b |
masquerade = settings.getMasquerade()
|
|
Packit |
a8ec6b |
forward_ports = settings.getForwardPorts()
|
|
Packit |
a8ec6b |
source_ports = settings.getSourcePorts()
|
|
Packit |
a8ec6b |
icmp_blocks = settings.getIcmpBlocks()
|
|
Packit |
a8ec6b |
rules = settings.getRichRules()
|
|
Packit |
a8ec6b |
description = settings.getDescription()
|
|
Packit |
a8ec6b |
short_description = settings.getShort()
|
|
Packit |
a8ec6b |
|
|
Packit |
a8ec6b |
def rich_rule_sorted_key(rule):
|
|
Packit |
a8ec6b |
priority = 0
|
|
Packit |
a8ec6b |
search_str = "priority="
|
|
Packit |
a8ec6b |
try:
|
|
Packit |
a8ec6b |
i = rule.index(search_str)
|
|
Packit |
a8ec6b |
except ValueError:
|
|
Packit |
a8ec6b |
pass
|
|
Packit |
a8ec6b |
else:
|
|
Packit |
a8ec6b |
i += len(search_str)
|
|
Packit |
a8ec6b |
priority = int(rule[i:i+(rule[i:].index(" "))].replace("\"", ""))
|
|
Packit |
a8ec6b |
|
|
Packit |
a8ec6b |
return priority
|
|
Packit |
a8ec6b |
|
|
Packit |
a8ec6b |
attributes = []
|
|
Packit |
a8ec6b |
if default_zone is not None:
|
|
Packit |
a8ec6b |
if zone == default_zone:
|
|
Packit |
a8ec6b |
attributes.append("default")
|
|
Packit |
a8ec6b |
if interfaces or sources:
|
|
Packit |
a8ec6b |
attributes.append("active")
|
|
Packit |
a8ec6b |
if attributes:
|
|
Packit |
a8ec6b |
zone = zone + " (%s)" % ", ".join(attributes)
|
|
Packit |
a8ec6b |
self.print_msg(zone)
|
|
Packit |
a8ec6b |
if self.verbose:
|
|
Packit |
a8ec6b |
self.print_msg(" summary: " + short_description)
|
|
Packit |
a8ec6b |
self.print_msg(" description: " + description)
|
|
Packit |
a8ec6b |
self.print_msg(" target: " + target)
|
|
Packit |
a8ec6b |
self.print_msg(" icmp-block-inversion: %s" % \
|
|
Packit |
a8ec6b |
("yes" if icmp_block_inversion else "no"))
|
|
Packit |
a8ec6b |
self.print_msg(" interfaces: " + " ".join(interfaces))
|
|
Packit |
a8ec6b |
self.print_msg(" sources: " + " ".join(sources))
|
|
Packit |
a8ec6b |
self.print_msg(" services: " + " ".join(sorted(services)))
|
|
Packit |
a8ec6b |
self.print_msg(" ports: " + " ".join(["%s/%s" % (port[0], port[1])
|
|
Packit |
a8ec6b |
for port in ports]))
|
|
Packit |
a8ec6b |
self.print_msg(" protocols: " + " ".join(sorted(protocols)))
|
|
Packit |
a8ec6b |
self.print_msg(" masquerade: %s" % ("yes" if masquerade else "no"))
|
|
Packit Service |
422cd2 |
self.print_msg(" forward-ports: " +
|
|
Packit |
a8ec6b |
"\n\t".join(["port=%s:proto=%s:toport=%s:toaddr=%s" % \
|
|
Packit |
a8ec6b |
(port, proto, toport, toaddr)
|
|
Packit |
a8ec6b |
for (port, proto, toport, toaddr) in \
|
|
Packit |
a8ec6b |
forward_ports]))
|
|
Packit |
a8ec6b |
self.print_msg(" source-ports: " +
|
|
Packit |
a8ec6b |
" ".join(["%s/%s" % (port[0], port[1])
|
|
Packit |
a8ec6b |
for port in source_ports]))
|
|
Packit |
a8ec6b |
self.print_msg(" icmp-blocks: " + " ".join(icmp_blocks))
|
|
Packit Service |
422cd2 |
self.print_msg(" rich rules: \n\t" + "\n\t".join(
|
|
Packit Service |
422cd2 |
sorted(rules, key=rich_rule_sorted_key)))
|
|
Packit |
a8ec6b |
|
|
Packit |
a8ec6b |
def print_service_info(self, service, settings):
|
|
Packit |
a8ec6b |
ports = settings.getPorts()
|
|
Packit |
a8ec6b |
protocols = settings.getProtocols()
|
|
Packit |
a8ec6b |
source_ports = settings.getSourcePorts()
|
|
Packit |
a8ec6b |
modules = settings.getModules()
|
|
Packit |
a8ec6b |
description = settings.getDescription()
|
|
Packit |
a8ec6b |
destinations = settings.getDestinations()
|
|
Packit |
a8ec6b |
short_description = settings.getShort()
|
|
Packit |
a8ec6b |
includes = settings.getIncludes()
|
|
Packit |
a8ec6b |
helpers = settings.getHelpers()
|
|
Packit |
a8ec6b |
self.print_msg(service)
|
|
Packit |
a8ec6b |
if self.verbose:
|
|
Packit |
a8ec6b |
self.print_msg(" summary: " + short_description)
|
|
Packit |
a8ec6b |
self.print_msg(" description: " + description)
|
|
Packit |
a8ec6b |
self.print_msg(" ports: " + " ".join(["%s/%s" % (port[0], port[1])
|
|
Packit |
a8ec6b |
for port in ports]))
|
|
Packit |
a8ec6b |
self.print_msg(" protocols: " + " ".join(protocols))
|
|
Packit |
a8ec6b |
self.print_msg(" source-ports: " +
|
|
Packit |
a8ec6b |
" ".join(["%s/%s" % (port[0], port[1])
|
|
Packit |
a8ec6b |
for port in source_ports]))
|
|
Packit |
a8ec6b |
self.print_msg(" modules: " + " ".join(modules))
|
|
Packit |
a8ec6b |
self.print_msg(" destination: " +
|
|
Packit |
a8ec6b |
" ".join(["%s:%s" % (k, v)
|
|
Packit |
a8ec6b |
for k, v in destinations.items()]))
|
|
Packit |
a8ec6b |
self.print_msg(" includes: " + " ".join(sorted(includes)))
|
|
Packit |
a8ec6b |
self.print_msg(" helpers: " + " ".join(sorted(helpers)))
|
|
Packit |
a8ec6b |
|
|
Packit |
a8ec6b |
def print_icmptype_info(self, icmptype, settings):
|
|
Packit |
a8ec6b |
destinations = settings.getDestinations()
|
|
Packit |
a8ec6b |
description = settings.getDescription()
|
|
Packit |
a8ec6b |
short_description = settings.getShort()
|
|
Packit |
a8ec6b |
if len(destinations) == 0:
|
|
Packit |
a8ec6b |
destinations = [ "ipv4", "ipv6" ]
|
|
Packit |
a8ec6b |
self.print_msg(icmptype)
|
|
Packit |
a8ec6b |
if self.verbose:
|
|
Packit |
a8ec6b |
self.print_msg(" summary: " + short_description)
|
|
Packit |
a8ec6b |
self.print_msg(" description: " + description)
|
|
Packit |
a8ec6b |
self.print_msg(" destination: " + " ".join(destinations))
|
|
Packit |
a8ec6b |
|
|
Packit |
a8ec6b |
def print_ipset_info(self, ipset, settings):
|
|
Packit |
a8ec6b |
ipset_type = settings.getType()
|
|
Packit |
a8ec6b |
options = settings.getOptions()
|
|
Packit |
a8ec6b |
entries = settings.getEntries()
|
|
Packit |
a8ec6b |
description = settings.getDescription()
|
|
Packit |
a8ec6b |
short_description = settings.getShort()
|
|
Packit |
a8ec6b |
self.print_msg(ipset)
|
|
Packit |
a8ec6b |
if self.verbose:
|
|
Packit |
a8ec6b |
self.print_msg(" summary: " + short_description)
|
|
Packit |
a8ec6b |
self.print_msg(" description: " + description)
|
|
Packit |
a8ec6b |
self.print_msg(" type: " + ipset_type)
|
|
Packit |
a8ec6b |
self.print_msg(" options: " + " ".join(["%s=%s" % (k, v) if v else k
|
|
Packit |
a8ec6b |
for k, v in options.items()]))
|
|
Packit |
a8ec6b |
self.print_msg(" entries: " + " ".join(entries))
|
|
Packit |
a8ec6b |
|
|
Packit |
a8ec6b |
def print_helper_info(self, helper, settings):
|
|
Packit |
a8ec6b |
ports = settings.getPorts()
|
|
Packit |
a8ec6b |
module = settings.getModule()
|
|
Packit |
a8ec6b |
family = settings.getFamily()
|
|
Packit |
a8ec6b |
description = settings.getDescription()
|
|
Packit |
a8ec6b |
short_description = settings.getShort()
|
|
Packit |
a8ec6b |
self.print_msg(helper)
|
|
Packit |
a8ec6b |
if self.verbose:
|
|
Packit |
a8ec6b |
self.print_msg(" summary: " + short_description)
|
|
Packit |
a8ec6b |
self.print_msg(" description: " + description)
|
|
Packit |
a8ec6b |
self.print_msg(" family: " + family)
|
|
Packit |
a8ec6b |
self.print_msg(" module: " + module)
|
|
Packit |
a8ec6b |
self.print_msg(" ports: " + " ".join(["%s/%s" % (port[0], port[1])
|
|
Packit |
a8ec6b |
for port in ports]))
|
|
Packit |
a8ec6b |
|
|
Packit |
a8ec6b |
def print_query_result(self, value):
|
|
Packit |
a8ec6b |
if value:
|
|
Packit |
a8ec6b |
self.print_and_exit("yes")
|
|
Packit |
a8ec6b |
else:
|
|
Packit |
a8ec6b |
self.print_and_exit("no", 1)
|
|
Packit |
a8ec6b |
|
|
Packit |
a8ec6b |
def exception_handler(self, exception_message):
|
|
Packit |
a8ec6b |
if not self.__use_exception_handler:
|
|
Packit |
a8ec6b |
raise
|
|
Packit |
a8ec6b |
self.fail_if_not_authorized(exception_message)
|
|
Packit |
a8ec6b |
code = FirewallError.get_code(str(exception_message))
|
|
Packit |
a8ec6b |
if code in [ errors.ALREADY_ENABLED, errors.NOT_ENABLED,
|
|
Packit |
a8ec6b |
errors.ZONE_ALREADY_SET, errors.ALREADY_SET ]:
|
|
Packit |
a8ec6b |
self.print_warning("Warning: %s" % exception_message)
|
|
Packit |
a8ec6b |
else:
|
|
Packit |
a8ec6b |
self.print_and_exit("Error: %s" % exception_message, code)
|
|
Packit |
a8ec6b |
|
|
Packit |
a8ec6b |
def fail_if_not_authorized(self, exception_message):
|
|
Packit |
a8ec6b |
if "NotAuthorizedException" in exception_message:
|
|
Packit |
a8ec6b |
msg = """Authorization failed.
|
|
Packit |
a8ec6b |
Make sure polkit agent is running or run the application as superuser."""
|
|
Packit |
a8ec6b |
self.print_and_exit(msg, errors.NOT_AUTHORIZED)
|
|
Packit |
a8ec6b |
|
|
Packit |
a8ec6b |
def deactivate_exception_handler(self):
|
|
Packit |
a8ec6b |
self.__use_exception_handler = False
|
|
Packit |
a8ec6b |
|
|
Packit |
a8ec6b |
def activate_exception_handler(self):
|
|
Packit |
a8ec6b |
self.__use_exception_handler = True
|
|
Packit |
a8ec6b |
|
|
Packit |
a8ec6b |
def get_ipset_entries_from_file(self, filename):
|
|
Packit |
a8ec6b |
entries = [ ]
|
|
Packit |
a8ec6b |
entries_set = set()
|
|
Packit |
a8ec6b |
f = open(filename)
|
|
Packit |
a8ec6b |
for line in f:
|
|
Packit |
a8ec6b |
if not line:
|
|
Packit |
a8ec6b |
break
|
|
Packit |
a8ec6b |
line = line.strip()
|
|
Packit |
a8ec6b |
if len(line) < 1 or line[0] in ['#', ';']:
|
|
Packit |
a8ec6b |
continue
|
|
Packit |
a8ec6b |
if line not in entries_set:
|
|
Packit |
a8ec6b |
entries.append(line)
|
|
Packit |
a8ec6b |
entries_set.add(line)
|
|
Packit |
a8ec6b |
f.close()
|
|
Packit |
a8ec6b |
return entries
|