#!/usr/bin/python3
# -*- coding: utf-8 -*-
#
# Univention Management Console
# simple UMCP server implementation
#
# Copyright 2006-2022 Univention GmbH
#
# https://www.univention.de/
#
# All rights reserved.
#
# The source code of this program is made available
# under the terms of the GNU Affero General Public License version 3
# (GNU AGPL V3) as published by the Free Software Foundation.
#
# Binary versions of this program provided by Univention to you as
# well as other copyrighted, protected or trademarked materials like
# Logos, graphics, fonts, specific documentations and configurations,
# cryptographic keys etc. are subject to a license agreement between
# you and Univention and not subject to the GNU AGPL V3.
#
# In the case you use this program under the terms of the GNU AGPL V3,
# the program is provided in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public
# License with the Debian GNU/Linux or Univention distribution in file
# /usr/share/common-licenses/AGPL-3; if not, see
# <https://www.gnu.org/licenses/>.
"""
Defines the basic class for an UMC server.
"""
import os
import errno
import fcntl
import signal
import socket
import resource
import traceback
import multiprocessing
from types import TracebackType # noqa: F401
from typing import Dict, List, Optional, Tuple, Type # noqa: F401
from tornado import process
import notifier
import notifier.signals as signals
from OpenSSL import SSL
from OpenSSL.crypto import X509 # noqa: F401
from univention.lib.i18n import Translation
from .message import Message, IncompleteMessageError, ParseError
from .session import SessionHandler
from .definitions import RECV_BUFFER_SIZE
from ..resources import moduleManager, categoryManager
from ..log import CORE, CRYPT, RESOURCES
from ..config import ucr, SERVER_MAX_CONNECTIONS, SERVER_CONNECTION_TIMEOUT
_ = Translation('univention.management.console').translate
[docs]class MagicBucket(object):
'''Manages a connection (session) to the UMC server. Therefore it
ensures that without successful authentication no other command is
accepted. All commands are passed to the :class:`~SessionHandler`. After the user
has authenticated the commands are passed on to the Processor.'''
def __init__(self):
# type: () -> None
self.__states = {} # type: Dict[socket.socket, State]
[docs] def new(self, client, sock):
# type: (str, socket.socket) -> None
"""Is called by the Server object to announce a new incoming
connection.
:param str client: IP address + port
:param socket.socket sock: a socket object
"""
CORE.info('Established connection: %s' % client)
state = State(client, sock)
state.session.signal_connect('success', notifier.Callback(self._response, state))
self.__states[sock] = state
notifier.socket_add(sock, self._receive)
self.reset_connection_timeout(state)
[docs] def reset_connection_timeout(self, state):
# type: (State) -> None
state.reset_connection_timeout()
notifier.timer_remove(state._timer)
state._timer = notifier.timer_add(state.timeout * 1000, notifier.Callback(self._timed_out, state))
def _timed_out(self, state):
# type: (State) -> bool
"""Closes the connection after a specified timeout"""
if not state.active:
CORE.info('Session %r timed out' % (state,))
self._cleanup(state.socket)
else:
CORE.info('Session %r timed out: There are open requests. Postpone session shutdown' % (state,))
return True
return False
[docs] def exit(self):
# type: () -> None
'''Closes all open connections.'''
# remove all sockets
for sock in list(self.__states):
CORE.info('Shutting down connection %s' % sock)
self._cleanup(sock)
def _receive(self, sock):
# type: (socket.socket) -> bool
"""Signal callback: Handles incoming data. Processes SSL events
and parses the incoming data. If a valid UMCP was found it is
passed to _handle.
:param socket.socket sock: socket object that reported incoming data
"""
data = b''
try:
data = sock.recv(RECV_BUFFER_SIZE)
except SSL.WantReadError:
# this error can be ignored (SSL need to do something)
return True
except (SSL.SysCallError, SSL.Error) as exc:
if exc.args and exc.args[0] == -1:
CRYPT.warn('The socket was closed by the client.')
else:
CRYPT.error('SSL error in _receive: %s.' % (exc,))
self._cleanup(sock)
return False
except socket.error as exc:
CORE.warn('Socket error in _receive: %s. Probably close (114).' % (exc,))
self._cleanup(sock)
return False
if not data:
self._cleanup(sock)
return False
try:
state = self.__states[sock]
except KeyError:
return False
state.buffer += data
self.reset_connection_timeout(state)
try:
while state.buffer:
msg = Message()
state.buffer = msg.parse(state.buffer)
state.requests[msg.id] = msg
state.session.execute('handle', msg)
except (KeyboardInterrupt, SystemExit, SyntaxError):
raise # let the UMC-server crash/exit
except IncompleteMessageError as exc:
CORE.info('MagicBucket: incomplete message: %s' % (exc,))
except ParseError as exc:
CORE.process('Parse error: %r' % (exc,))
if msg.id is None:
# close the connection in case we use could not parse the header
self._cleanup(sock)
return False
state.requests[msg.id] = msg
state.session.execute('parse_error', msg, exc)
return True
def _do_send(self, sock):
# type: (socket.socket) -> bool
try:
state = self.__states[sock]
except KeyError:
CORE.warn('The socket was already removed.')
return False
try:
id, first = state.resend_queue.pop(0)
except IndexError:
CORE.error('The response queue for %r is empty.' % (state,))
return False
try:
ret = sock.send(first)
if ret < len(first):
state.resend_queue.insert(0, (id, first[ret:]))
else:
if id != -1:
del state.requests[id]
except (SSL.WantReadError, SSL.WantWriteError, SSL.WantX509LookupError):
CRYPT.info('UMCP: SSL error during re-send')
state.resend_queue.insert(0, (id, first))
return True
except (SSL.SysCallError, SSL.Error) as error:
CRYPT.warn('SSL error in _do_send: %s. Probably the socket was closed by the client.' % str(error))
self._cleanup(sock)
return False
except socket.error as exc:
CORE.warn('socket.error in _do_send: %s. Probably the socket was closed by the client.' % (exc,))
self._cleanup(sock)
return False
return bool(state.resend_queue)
def _response(self, msg, state):
# type: (Message, State) -> None
''' Send UMCP response to client. If the status code is 250 the
module process is asking for exit. This method forfills the
request.'''
if msg.id not in state.requests and msg.id != -1:
CORE.info('The given response is invalid or not known (%s)' % (msg.id,))
return
self.reset_connection_timeout(state)
try:
data = bytes(msg)
# there is no data from another request in the send queue
if not state.resend_queue:
ret = state.socket.send(data)
else:
ret = 0
# not all data could be send; retry later
if ret < len(data):
if not state.resend_queue:
notifier.socket_add(state.socket, self._do_send, notifier.IO_WRITE)
state.resend_queue.append((msg.id, data[ret:]))
else:
if msg.id != -1:
del state.requests[msg.id]
except (SSL.WantReadError, SSL.WantWriteError, SSL.WantX509LookupError):
CRYPT.info('UMCP: SSL error need to re-send chunk')
try:
notifier.socket_add(state.socket, self._do_send, notifier.IO_WRITE)
state.resend_queue.append((msg.id, data[ret:]))
except socket.error as error:
CRYPT.error('Socket error in _response: %s. Probably the socket was closed by the client.' % str(error))
self._cleanup(state.socket)
except (SSL.SysCallError, SSL.Error, socket.error) as error:
CRYPT.warn('SSL error in _response: %s. Probably the socket was closed by the client.' % str(error))
self._cleanup(state.socket)
except socket.error as exc:
CORE.warn('socket error in _response: %s. Probably the socket was closed by the client.' % (exc,))
self._cleanup(state.socket)
except Exception: # close the connection to the client. we can't do anything else
CORE.error('FATAL ERROR: %s' % (traceback.format_exc(),))
self._cleanup(state.socket)
def _cleanup(self, sock):
# type: (socket.socket) -> None
state = self.__states.pop(sock, None)
if state is None:
return
state.session.close_session()
notifier.socket_remove(sock)
try:
sock.close()
except Exception:
pass
state.session.signal_disconnect('success', self._response)
[docs]class Server(signals.Provider):
"""Creates an UMC server. It handles incoming connections on UNIX or
TCP sockets and passes the control to an external session handler
(e.g. :class:`.MagicBucket`)
:param int port: port to listen to
:param bool ssl: if SSL should be used
:param str unix: if given it must be the filename of the UNIX socket to use
:param bool magic: if an external session handler should be used
:param class magicClass: a reference to the class for the external session handler
:param bool load_ressources: if the modules and categories definitions should be loaded
:param int processes: Enable multi-process mode.
"""
def __init__(self, port=6670, ssl=True, unix=None, magic=True, magicClass=MagicBucket, load_ressources=True, processes=1):
# type: (int, bool, Optional[str], bool, Type[MagicBucket], bool, int) -> None
'''Initializes the socket to listen for requests'''
signals.Provider.__init__(self)
# loading resources
if load_ressources:
CORE.info('Loading resources ...')
self.reload()
self.__port = port
self.__unix = unix
self.__realtcpsocket = None # type: Optional[socket.socket]
self.__realunixsocket = None # type: Optional[socket.socket]
self.__ssl = ssl
self.__processes = processes
self._child_number = None # type: Optional[int]
self._children = {} # type: Dict[int, int]
self.__magic = magic
self.__magicClass = magicClass
self.__bucket = None # type: Optional[MagicBucket]
self.crypto_context = None # type: Optional[SSL.Context]
def __enter__(self):
# type: () -> Server
CORE.info('Initialising server process')
if self.__unix:
CORE.info('Using a UNIX socket')
self.__realunixsocket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
if self.__port:
CORE.info('Using a TCP socket')
try:
self.__realtcpsocket = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
except Exception:
CORE.warn('Cannot open socket with AF_INET6 (Python reports socket.has_ipv6 is %s), trying AF_INET' % socket.has_ipv6)
self.__realtcpsocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
for sock in (self.__realtcpsocket, self.__realunixsocket):
if sock is None:
continue
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.setblocking(False)
fd = sock.fileno()
flags = fcntl.fcntl(fd, fcntl.F_GETFD)
fcntl.fcntl(fd, fcntl.F_SETFD, flags | fcntl.FD_CLOEXEC)
if self.__ssl and self.__realtcpsocket is not None:
CORE.info('Setting up SSL configuration')
self.crypto_context = SSL.Context(SSL.TLSv1_METHOD)
self.crypto_context.set_cipher_list(ucr.get('umc/server/ssl/ciphers', 'DEFAULT'))
self.crypto_context.set_options(SSL.OP_NO_SSLv2)
self.crypto_context.set_options(SSL.OP_NO_SSLv3)
self.crypto_context.set_verify(SSL.VERIFY_PEER, self.__verify_cert_cb)
dir = '/etc/univention/ssl/%(hostname)s.%(dirname)s' % ucr
try:
self.crypto_context.use_privatekey_file(os.path.join(dir, 'private.key'))
self.crypto_context.use_certificate_file(os.path.join(dir, 'cert.pem'))
self.crypto_context.load_verify_locations('/etc/univention/ssl/ucsCA/CAcert.pem')
except SSL.Error as exc:
# SSL is not possible
CRYPT.error('Setting up SSL configuration failed: %s' % (exc,))
CRYPT.warn('Communication will not be encrypted!')
self.__ssl = False
self.crypto_context = None
self.__realtcpsocket.bind(('', self.__port))
CRYPT.info('Server listening to unencrypted connections')
self.__realtcpsocket.listen(SERVER_MAX_CONNECTIONS)
if self.crypto_context:
self.connection = SSL.Connection(self.crypto_context, self.__realtcpsocket)
self.connection.setblocking(0)
self.connection.bind(('', self.__port))
self.connection.set_accept_state()
CRYPT.info('Server listening to SSL connections')
self.connection.listen(SERVER_MAX_CONNECTIONS)
elif not self.__ssl and self.__realtcpsocket is not None:
self.crypto_context = None
self.__realtcpsocket.bind(('', self.__port))
CRYPT.info('Server listening to TCP connections')
self.__realtcpsocket.listen(SERVER_MAX_CONNECTIONS)
if self.__unix and self.__realunixsocket is not None:
# ensure that the UNIX socket is only accessible by root
old_umask = os.umask(0o077)
try:
self.__realunixsocket.bind(self.__unix)
except EnvironmentError:
if os.path.exists(self.__unix):
os.unlink(self.__unix)
finally:
# restore old umask
os.umask(old_umask)
CRYPT.info('Server listening to UNIX connections')
self.__realunixsocket.listen(SERVER_MAX_CONNECTIONS)
if self.__processes != 1:
self._children = multiprocessing.Manager().dict()
try:
self._child_number = process.fork_processes(self.__processes, 0)
except RuntimeError as exc:
CORE.warn('Child process died: %s' % (exc,))
os.kill(os.getpid(), signal.SIGTERM)
raise SystemExit(str(exc))
if self._child_number is not None:
self._children[self._child_number] = os.getpid()
if self.__magic:
self.__bucket = self.__magicClass()
else:
self.signal_new('session_new')
if self.__ssl:
notifier.socket_add(self.connection, self._connection)
elif self.__port:
notifier.socket_add(self.__realtcpsocket, self._connection)
if self.__unix:
notifier.socket_add(self.__realunixsocket, self._connection)
return self
def __verify_cert_cb(self, conn, cert, errnum, depth, ok):
# type: (SSL.Connection, X509, int, int, int) -> bool
CORE.info('__verify_cert_cb: Got certificate: %s' % cert.get_subject())
CORE.info('__verify_cert_cb: Got certificate issuer: %s' % cert.get_issuer())
CORE.info('__verify_cert_cb: errnum=%d depth=%d ok=%d' % (errnum, depth, ok))
return ok # FIXME: should return true if verification passes and false otherwise.
def _connection(self, sock):
# type: (socket.socket) -> bool
'''Signal callback: Invoked on incoming connections.'''
try:
sock, addr = sock.accept()
except EnvironmentError as exc:
if exc.errno == errno.EAGAIN:
# got an EAGAIN --> try again later
return True
CORE.error('Cannot accept new connection: %s' % (exc,))
if exc.errno == errno.EMFILE:
# got an EMFILE --> Too many open files
# If the process permanently lacks free file descriptors, incoming
# connections waiting in the listening socket backlog will starve.
# Therefore the limit is temporarily increased by 2 and the connection
# waiting in the backlog is temporarily accepted and immediately
# closed again to provoke an error message in the user's browser.
soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE)
resource.setrlimit(resource.RLIMIT_NOFILE, (soft + 2, hard + 2))
try:
sock, addr = sock.accept()
sock.close()
except EnvironmentError:
pass
finally:
resource.setrlimit(resource.RLIMIT_NOFILE, (soft, hard))
else:
# unknown errno - log traceback and continue
CORE.error(traceback.format_exc())
return True
fd = sock.fileno()
flags = fcntl.fcntl(fd, fcntl.F_GETFD)
fcntl.fcntl(fd, fcntl.F_SETFD, flags | fcntl.FD_CLOEXEC)
sock.setblocking(False)
if addr:
client = '%s:%d' % (addr[0], addr[1])
else:
client = ''
CORE.info('Incoming connection from %s' % client)
if self.__bucket is not None:
self.__bucket.new(client, sock)
else:
self.signal_emit('session_new', client, sock)
return True
[docs] def exit(self):
# type: () -> None
'''Shuts down all open connections.'''
CORE.warn('Shutting down all open connections')
if self.__bucket:
self.__bucket.exit()
if self._child_number is not None:
self._children.pop(self._child_number, None)
if self.__ssl and self.__port:
notifier.socket_remove(self.connection)
self.connection.close()
elif not self.__ssl and self.__port and self.__realtcpsocket:
notifier.socket_remove(self.__realtcpsocket)
self.__realtcpsocket.close()
self.__realtcpsocket = None
if self.__unix:
if self.__realunixsocket is not None:
notifier.socket_remove(self.__realunixsocket)
self.__realunixsocket.close()
self.__realunixsocket = None
if self._child_number is None and os.path.exists(self.__unix):
os.unlink(self.__unix)
self.__unix = None
self.__bucket = None
def __exit__(self, etype, exc, etraceback):
# type: (Optional[Type[BaseException]], Optional[BaseException], Optional[TracebackType]) -> None
self.exit()
[docs] @staticmethod
def reload():
# type: () -> None
"""Reloads resources like module and category definitions"""
CORE.info('Reloading resources: modules, categories')
moduleManager.load()
categoryManager.load()
RESOURCES.info('Reloading UCR variables')
ucr.load()
[docs] @staticmethod
def analyse_memory():
# type: () -> None
"""Print the number of living UMC objects. Helpful when analysing memory leaks."""
components = (
'protocol.server.State', 'protocol.session.ModuleProcess',
'protocol.session.Processor', 'protocol.session.SessionHandler',
'protocol.message.Message', 'protocol.message.Request', 'protocol.message.Response',
'auth.AuthHandler', 'pam.PamAuth', 'protocol.client.Client',
'locales.I18N', 'locales.I18N_Manager', 'module.Command', 'module.Flavor', 'module.Module',
'tools.JSON_List',
# 'module.Link',
# 'auth.AuthenticationResult',
# 'base.Base', 'category.XML_Definition', 'error.UMC_Error',
# 'module.XML_Definition', 'module.Manager', 'pam.AuthenticationError', 'pam.AuthenticationFailed', 'pam.AuthenticationInformationMissing',
# 'pam.AccountExpired', 'pam.PasswordExpired', 'pam.PasswordChangeFailed',
# 'protocol.message.ParseError', 'protocol.message.IncompleteMessageError',
# 'protocol.modserver.ModuleServer', 'protocol.server.MagicBucket', 'protocol.server.Server',
# 'protocol.session.ProcessorBase',
# 'tools.JSON_Object', 'tools.JSON_Dict',
)
try:
import objgraph
except ImportError:
return
CORE.warn('')
for component in components:
CORE.warn('%s: %d' % (component, len(objgraph.by_type('univention.management.console.%s' % (component,)))))
[docs]class State(object):
"""Holds information about the state of an active session
:param str client: IP address + port
:param socket.socket sock: socket object
"""
__slots__ = ('client', 'socket', 'buffer', 'requests', 'resend_queue', 'session', 'timeout', '_timer')
def __init__(self, client, sock):
# type: (str, socket.socket) -> None
self.client = client
self.socket = sock
self.buffer = b''
self.requests = {} # type: Dict
self.resend_queue = [] # type: List[Tuple[str, bytes]]
self.session = SessionHandler()
self._timer = None
self.reset_connection_timeout()
[docs] def reset_connection_timeout(self):
# type: () -> None
self.timeout = SERVER_CONNECTION_TIMEOUT
@property
def active(self):
# type: () -> bool
return bool(self.requests or self.session.has_active_module_processes())
def __repr__(self):
# type: () -> str
return '<State(%s %r buffer=%d requests=%d processes=%s)>' % (self.client, self.socket, len(self.buffer), len(self.requests), self.session.has_active_module_processes())