Blob Blame History Raw

# cfg.py - module for accessing configuration information
#
# Copyright (C) 2010-2017 Arthur de Jong
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301 USA

import logging
import re

import ldap


# the number of threads to start
threads = 5
# the user id nslcd should be run as
uid = None
# the group id nslcd should be run as
gid = None
# the configured loggers
logs = []

# the LDAP server to use
uri = None  # FIXME: support multiple servers and have a fail-over mechanism
# LDAP protocol version to use (perhaps fix at 3?)
ldap_version = ldap.VERSION3
# the DN to use when binding
binddn = None  # FIXME: add support
bindpw = None  # FIXME: add support
# the DN to use to perform password modifications as root
rootpwmoddn = None
rootpwmodpw = None

# SASL configuration
sasl_mech = None  # FIXME: add support
sasl_realm = None  # FIXME: add support
sasl_authcid = None  # FIXME: add support
sasl_authzid = None  # FIXME: add support
sasl_secprops = None  # FIXME: add support
sasl_canonicalize = None  # FIXME: add support

# LDAP bases to search
bases = []
# default search scope for searches
scope = ldap.SCOPE_SUBTREE

deref = ldap.DEREF_NEVER
referrals = True

# timing configuration
bind_timelimit = 10  # FIXME: add support
timelimit = ldap.NO_LIMIT
idle_timelimit = 0  # FIXME: add support
reconnect_sleeptime = 1  # FIXME: add support
reconnect_retrytime = 10

# SSL/TLS options
ssl = None
tls_reqcert = None
tls_cacertdir = None
tls_cacertfile = None
tls_randfile = None
tls_ciphers = None
tls_cert = None
tls_key = None

# other options
pagesize = 0  # FIXME: add support
nss_initgroups_ignoreusers = set()
nss_min_uid = 0
nss_uid_offset = 0
nss_gid_offset = 0
nss_nested_groups = False
nss_getgrent_skipmembers = False
nss_disable_enumeration = False
validnames = re.compile(r'^[a-z0-9._@$][a-z0-9._@$ \\~-]{0,98}[a-z0-9._@$~-]$', re.IGNORECASE)
pam_authz_searches = []
pam_password_prohibit_message = None
reconnect_invalidate = set()


# allowed boolean values
_boolean_options = {'on': True, 'yes': True, 'true': True, '1': True,
                    'off': False, 'no': False, 'false': False, '0': False}

# allowed log levels (we log notice which is unsupported in Python to warning)
_log_levels = {'crit': logging.CRITICAL, 'error': logging.ERROR,
               'err': logging.ERROR, 'warning': logging.WARNING,
               'notice': logging.WARNING, 'info': logging.INFO,
               'debug': logging.DEBUG, 'none': logging.INFO}

# allowed values for scope option
if not hasattr(ldap, 'SCOPE_CHILDREN') and ldap.VENDOR_VERSION >= 20400:
    ldap.SCOPE_CHILDREN = 3   # OpenLDAP extension
_scope_options = dict(sub=ldap.SCOPE_SUBTREE, subtree=ldap.SCOPE_SUBTREE,
                      one=ldap.SCOPE_ONELEVEL, onelevel=ldap.SCOPE_ONELEVEL,
                      base=ldap.SCOPE_BASE)
if hasattr(ldap, 'SCOPE_CHILDREN'):
    _scope_options['children'] = ldap.SCOPE_CHILDREN

# allowed values for the deref option
_deref_options = dict(never=ldap.DEREF_NEVER,
                      searching=ldap.DEREF_SEARCHING,
                      finding=ldap.DEREF_FINDING,
                      always=ldap.DEREF_ALWAYS)

# allowed values for the ssl option
_ssl_options = dict(start_tls='STARTTLS', starttls='STARTTLS',
                    on='LDAPS', off=None)

# allowed values for the tls_reqcert option
_tls_reqcert_options = {'never': ldap.OPT_X_TLS_NEVER,
                        'no': ldap.OPT_X_TLS_NEVER,
                        'allow': ldap.OPT_X_TLS_ALLOW,
                        'try': ldap.OPT_X_TLS_TRY,
                        'demand': ldap.OPT_X_TLS_DEMAND,
                        'yes': ldap.OPT_X_TLS_DEMAND,
                        'hard': ldap.OPT_X_TLS_HARD}


def _get_maps():
    # separate function as not to pollute the namespace and avoid import loops
    import alias, ether, group, host, netgroup, network, passwd
    import protocol, rpc, service, shadow
    import sys
    return dict(
            alias=alias, aliases=alias,
            ether=ether, ethers=ether,
            group=group,
            host=host, hosts=host,
            netgroup=netgroup,
            network=network, networks=network,
            passwd=passwd,
            protocol=protocol, protocols=protocol,
            rpc=rpc,
            service=service, services=service,
            shadow=shadow,
            none=sys.modules[__name__]
        )


class ParseError(Exception):

    def __init__(self, filename, lineno, message):
        self.message = '%s:%d: %s' % (filename, lineno, message)

    def __repr__(self):
        return self.message

    __str__ = __repr__


def read(filename):
    maps = _get_maps()
    lineno = 0
    for line in open(filename, 'r'):
        lineno += 1
        line = line.strip()
        # skip comments and blank lines
        if re.match('(#.*)?$', line, re.IGNORECASE):
            continue
        # parse options with a single integer argument
        m = re.match('(?P<keyword>threads|ldap_version|bind_timelimit|timelimit|idle_timelimit|reconnect_sleeptime|reconnect_retrytime|pagesize|nss_min_uid|nss_uid_offset|nss_gid_offset)\s+(?P<value>\d+)',
                     line, re.IGNORECASE)
        if m:
            globals()[m.group('keyword').lower()] = int(m.group('value'))
            continue
        # parse options with a single boolean argument
        m = re.match('(?P<keyword>referrals|nss_nested_groups|nss_getgrent_skipmembers|nss_disable_enumeration)\s+(?P<value>%s)' %
                         '|'.join(_boolean_options.keys()),
                     line, re.IGNORECASE)
        if m:
            globals()[m.group('keyword').lower()] = _boolean_options[m.group('value').lower()]
            continue
        # parse options with a single no-space value
        m = re.match('(?P<keyword>uid|gid|bindpw|rootpwmodpw|sasl_mech)\s+(?P<value>\S+)',
                     line, re.IGNORECASE)
        if m:
            globals()[m.group('keyword').lower()] = m.group('value')
            continue
        # parse options with a single value that can contain spaces
        m = re.match('(?P<keyword>binddn|rootpwmoddn|sasl_realm|sasl_authcid|sasl_authzid|sasl_secprops|krb5_ccname|tls_cacertdir|tls_cacertfile|tls_randfile|tls_ciphers|tls_cert|tls_key|pam_password_prohibit_message)\s+(?P<value>\S.*)',
                     line, re.IGNORECASE)
        if m:
            globals()[m.group('keyword').lower()] = m.group('value')
            continue
        # log <SCHEME> [<LEVEL>]
        m = re.match('log\s+(?P<scheme>syslog|/\S*)(\s+(?P<level>%s))?' %
                         '|'.join(_log_levels.keys()),
                     line, re.IGNORECASE)
        if m:
            logs.append((m.group('scheme'), _log_levels[str(m.group('level')).lower()]))
            continue
        # uri <URI>
        m = re.match('uri\s+(?P<uri>\S+)', line, re.IGNORECASE)
        if m:
            # FIXME: support multiple URI values
            # FIXME: support special DNS and DNS:domain values
            global uri
            uri = m.group('uri')
            continue
        # base <MAP>? <BASEDN>
        m = re.match('base\s+((?P<map>%s)\s+)?(?P<value>\S.*)' %
                         '|'.join(maps.keys()),
                     line, re.IGNORECASE)
        if m:
            mod = maps[str(m.group('map')).lower()]
            if not hasattr(mod, 'bases'):
                mod.bases = []
            mod.bases.append(m.group('value'))
            continue
        # filter <MAP> <SEARCHFILTER>
        m = re.match('filter\s+(?P<map>%s)\s+(?P<value>\S.*)' %
                         '|'.join(maps.keys()),
                     line, re.IGNORECASE)
        if m:
            mod = maps[m.group('map').lower()]
            mod.filter = m.group('value')
            continue
        # scope <MAP>? <SCOPE>
        m = re.match('scope\s+((?P<map>%s)\s+)?(?P<value>%s)' % (
                         '|'.join(maps.keys()),
                         '|'.join(_scope_options.keys())),
                     line, re.IGNORECASE)
        if m:
            mod = maps[str(m.group('map')).lower()]
            mod.scope = _scope_options[m.group('value').lower()]
            continue
        # map <MAP> <ATTRIBUTE> <ATTMAPPING>
        m = re.match('map\s+(?P<map>%s)\s+(?P<attribute>\S+)\s+(?P<value>\S.*)' %
                         '|'.join(maps.keys()),
                     line, re.IGNORECASE)
        if m:
            mod = maps[m.group('map').lower()]
            attribute = m.group('attribute')
            if attribute not in mod.attmap:
                raise ParseError(filename, lineno, 'attribute %s unknown' % attribute)
            mod.attmap[attribute] = m.group('value')
            # TODO: filter out attributes that cannot be an expression
            continue
        # deref <DEREF>
        m = re.match('deref\s+(?P<value>%s)' % '|'.join(_deref_options.keys()),
                     line, re.IGNORECASE)
        if m:
            global deref
            deref = _deref_options[m.group('value').lower()]
            continue
        # nss_initgroups_ignoreusers <USER,USER>|<ALLLOCAL>
        m = re.match('nss_initgroups_ignoreusers\s+(?P<value>\S.*)',
                     line, re.IGNORECASE)
        if m:
            users = m.group('value')
            if users.lower() == 'alllocal':
                # get all users known to the system currently (since nslcd
                # isn't yet running, this should work)
                import pwd
                users = (x.pw_name for x in pwd.getpwall())
            else:
                users = users.split(',')
                # TODO: warn about unknown users
            nss_initgroups_ignoreusers.update(users)
            continue
        # pam_authz_search <FILTER>
        m = re.match('pam_authz_search\s+(?P<value>\S.*)', line, re.IGNORECASE)
        if m:
            from expr import Expression
            pam_authz_searches.append(Expression(m.group('value')))
            # TODO: check pam_authz_search expression to only contain
            # username, service, ruser, rhost, tty, hostname, fqdn, dn or
            # uid variables
            continue
        # ssl <on|off|start_tls>
        m = re.match('ssl\s+(?P<value>%s)' % '|'.join(_ssl_options.keys()),
                     line, re.IGNORECASE)
        if m:
            global ssl
            ssl = _ssl_options[m.group('value').lower()]
            continue
        # sasl_canonicalize yes|no
        m = re.match('(ldap_?)?sasl_(?P<no>no)?canon(icali[sz]e)?\s+(?P<value>%s)' %
                         '|'.join(_boolean_options.keys()),
                     line, re.IGNORECASE)
        if m:
            global sasl_canonicalize
            sasl_canonicalize = _boolean_options[m.group('value').lower()]
            if m.group('no'):
                sasl_canonicalize = not sasl_canonicalize
            continue
        # tls_reqcert <demand|hard|yes...>
        m = re.match('tls_reqcert\s+(?P<value>%s)' %
                         '|'.join(_tls_reqcert_options.keys()),
                     line, re.IGNORECASE)
        if m:
            global tls_reqcert
            tls_reqcert = _tls_reqcert_options[m.group('value').lower()]
            continue
        # validnames /REGEX/i?
        m = re.match('validnames\s+/(?P<value>.*)/(?P<flags>[i]?)$',
                     line, re.IGNORECASE)
        if m:
            global validnames
            flags = 0 | re.IGNORECASE if m.group('flags') == 'i' else 0
            validnames = re.compile(m.group('value'), flags=flags)
            continue
        # reconnect_invalidate <MAP>,<MAP>,...
        m = re.match('reconnect_invalidate\s+(?P<value>\S.*)',
                     line, re.IGNORECASE)
        if m:
            dbs = re.split('[ ,]+', m.group('value').lower())
            for db in dbs:
                if db not in maps.keys() + ['nfsidmap']:
                    raise ParseError(filename, lineno, 'map %s unknown' % db)
            reconnect_invalidate.update(dbs)
            continue
        # unrecognised line
        raise ParseError(filename, lineno, 'error parsing line %r' % line)
    # if logging is not configured, default to syslog
    if not logs:
        logs.append(('syslog', logging.INFO))
    # dump config (debugging code)
    for k, v in globals().items():
        if not k.startswith('_'):
            logging.debug('%s=%r', k, v)


def get_usergid():
    """Return user info and group id."""
    import pwd
    import grp
    u = pwd.getpwnam(uid)
    if gid is None:
        g = u.pw_gid
    else:
        g = grp.getgrnam(gid).gr_gid
    return u, g