# -*- coding: utf-8 -*-
#
# Copyright (C) 2015 - Philippe Proulx <pproulx@efficios.com>
# Copyright (C) 2014 - David Goulet <dgoulet@efficios.com>
#
# This library is free software; you can redistribute it and/or modify it under
# the terms of the GNU Lesser General Public License as published by the Free
# Software Foundation; version 2.1 of the License.
#
# This library is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this library; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
from __future__ import unicode_literals
from __future__ import print_function
from __future__ import division
import lttngust.debug as dbg
import lttngust.loghandler
import lttngust.cmd
from io import open
import threading
import logging
import socket
import time
import sys
import os
try:
# Python 2
import Queue as queue
except ImportError:
# Python 3
import queue
_PROTO_DOMAIN = 5
_PROTO_MAJOR = 2
_PROTO_MINOR = 0
def _get_env_value_ms(key, default_s):
try:
val = int(os.getenv(key, default_s * 1000)) / 1000
except:
val = -1
if val < 0:
fmt = 'invalid ${} value; {} seconds will be used'
dbg._pwarning(fmt.format(key, default_s))
val = default_s
return val
_REG_TIMEOUT = _get_env_value_ms('LTTNG_UST_PYTHON_REGISTER_TIMEOUT', 5)
_RETRY_REG_DELAY = _get_env_value_ms('LTTNG_UST_PYTHON_REGISTER_RETRY_DELAY', 3)
class _TcpClient(object):
def __init__(self, name, host, port, reg_queue):
super(self.__class__, self).__init__()
self._name = name
self._host = host
self._port = port
try:
self._log_handler = lttngust.loghandler._Handler()
except (OSError) as e:
dbg._pwarning('cannot load library: {}'.format(e))
raise e
self._root_logger = logging.getLogger()
self._root_logger.setLevel(logging.NOTSET)
self._ref_count = 0
self._sessiond_sock = None
self._reg_queue = reg_queue
self._server_cmd_handlers = {
lttngust.cmd._ServerCmdRegistrationDone: self._handle_server_cmd_reg_done,
lttngust.cmd._ServerCmdEnable: self._handle_server_cmd_enable,
lttngust.cmd._ServerCmdDisable: self._handle_server_cmd_disable,
lttngust.cmd._ServerCmdList: self._handle_server_cmd_list,
}
def _debug(self, msg):
return 'client "{}": {}'.format(self._name, msg)
def run(self):
while True:
try:
# connect to the session daemon
dbg._pdebug(self._debug('connecting to session daemon'))
self._connect_to_sessiond()
# register to the session daemon after a successful connection
dbg._pdebug(self._debug('registering to session daemon'))
self._register()
# wait for commands from the session daemon
self._wait_server_cmd()
except (Exception) as e:
# Whatever happens here, we have to close the socket and
# retry to connect to the session daemon since either
# the socket was closed, a network timeout occured, or
# invalid data was received.
dbg._pdebug(self._debug('got exception: {}'.format(e)))
self._cleanup_socket()
dbg._pdebug(self._debug('sleeping for {} s'.format(_RETRY_REG_DELAY)))
time.sleep(_RETRY_REG_DELAY)
def _recv_server_cmd_header(self):
data = self._sessiond_sock.recv(lttngust.cmd._SERVER_CMD_HEADER_SIZE)
if not data:
dbg._pdebug(self._debug('received empty server command header'))
return None
assert(len(data) == lttngust.cmd._SERVER_CMD_HEADER_SIZE)
dbg._pdebug(self._debug('received server command header ({} bytes)'.format(len(data))))
return lttngust.cmd._server_cmd_header_from_data(data)
def _recv_server_cmd(self):
server_cmd_header = self._recv_server_cmd_header()
if server_cmd_header is None:
return None
dbg._pdebug(self._debug('server command header: data size: {} bytes'.format(server_cmd_header.data_size)))
dbg._pdebug(self._debug('server command header: command ID: {}'.format(server_cmd_header.cmd_id)))
dbg._pdebug(self._debug('server command header: command version: {}'.format(server_cmd_header.cmd_version)))
data = bytes()
if server_cmd_header.data_size > 0:
data = self._sessiond_sock.recv(server_cmd_header.data_size)
assert(len(data) == server_cmd_header.data_size)
return lttngust.cmd._server_cmd_from_data(server_cmd_header, data)
def _send_cmd_reply(self, cmd_reply):
data = cmd_reply.get_data()
dbg._pdebug(self._debug('sending command reply ({} bytes)'.format(len(data))))
self._sessiond_sock.sendall(data)
def _handle_server_cmd_reg_done(self, server_cmd):
dbg._pdebug(self._debug('got "registration done" server command'))
if self._reg_queue is not None:
dbg._pdebug(self._debug('notifying _init_threads()'))
try:
self._reg_queue.put(True)
except (Exception) as e:
# read side could be closed by now; ignore it
pass
self._reg_queue = None
def _handle_server_cmd_enable(self, server_cmd):
dbg._pdebug(self._debug('got "enable" server command'))
self._ref_count += 1
if self._ref_count == 1:
dbg._pdebug(self._debug('adding our handler to the root logger'))
self._root_logger.addHandler(self._log_handler)
dbg._pdebug(self._debug('ref count is {}'.format(self._ref_count)))
return lttngust.cmd._ClientCmdReplyEnable()
def _handle_server_cmd_disable(self, server_cmd):
dbg._pdebug(self._debug('got "disable" server command'))
self._ref_count -= 1
if self._ref_count < 0:
# disable command could be sent again when a session is destroyed
self._ref_count = 0
if self._ref_count == 0:
dbg._pdebug(self._debug('removing our handler from the root logger'))
self._root_logger.removeHandler(self._log_handler)
dbg._pdebug(self._debug('ref count is {}'.format(self._ref_count)))
return lttngust.cmd._ClientCmdReplyDisable()
def _handle_server_cmd_list(self, server_cmd):
dbg._pdebug(self._debug('got "list" server command'))
names = logging.Logger.manager.loggerDict.keys()
dbg._pdebug(self._debug('found {} loggers'.format(len(names))))
cmd_reply = lttngust.cmd._ClientCmdReplyList(names=names)
return cmd_reply
def _handle_server_cmd(self, server_cmd):
cmd_reply = None
if server_cmd is None:
dbg._pdebug(self._debug('bad server command'))
status = lttngust.cmd._CLIENT_CMD_REPLY_STATUS_INVALID_CMD
cmd_reply = lttngust.cmd._ClientCmdReply(status)
elif type(server_cmd) in self._server_cmd_handlers:
cmd_reply = self._server_cmd_handlers[type(server_cmd)](server_cmd)
else:
dbg._pdebug(self._debug('unknown server command'))
status = lttngust.cmd._CLIENT_CMD_REPLY_STATUS_INVALID_CMD
cmd_reply = lttngust.cmd._ClientCmdReply(status)
if cmd_reply is not None:
self._send_cmd_reply(cmd_reply)
def _wait_server_cmd(self):
while True:
try:
server_cmd = self._recv_server_cmd()
except socket.timeout:
# simply retry here; the protocol has no KA and we could
# wait for hours
continue
self._handle_server_cmd(server_cmd)
def _cleanup_socket(self):
try:
self._sessiond_sock.shutdown(socket.SHUT_RDWR)
self._sessiond_sock.close()
except:
pass
self._sessiond_sock = None
def _connect_to_sessiond(self):
# create session daemon TCP socket
if self._sessiond_sock is None:
self._sessiond_sock = socket.socket(socket.AF_INET,
socket.SOCK_STREAM)
# Use str(self._host) here. Since this host could be a string
# literal, and since we're importing __future__.unicode_literals,
# we want to make sure the host is a native string in Python 2.
# This avoids an indirect module import (unicode module to
# decode the unicode string, eventually imported by the
# socket module if needed), which is not allowed in a thread
# directly created by a module in Python 2 (our case).
#
# tl;dr: Do NOT remove str() here, or this call in Python 2
# _will_ block on an interpreter's mutex until the waiting
# register queue timeouts.
self._sessiond_sock.connect((str(self._host), self._port))
def _register(self):
cmd = lttngust.cmd._ClientRegisterCmd(_PROTO_DOMAIN, os.getpid(),
_PROTO_MAJOR, _PROTO_MINOR)
data = cmd.get_data()
self._sessiond_sock.sendall(data)
def _get_port_from_file(path):
port = None
dbg._pdebug('reading port from file "{}"'.format(path))
try:
f = open(path)
r_port = int(f.readline())
f.close()
if r_port > 0 or r_port <= 65535:
port = r_port
except:
pass
return port
def _get_user_home_path():
# $LTTNG_HOME overrides $HOME if it exists
return os.getenv('LTTNG_HOME', os.path.expanduser('~'))
_initialized = False
_SESSIOND_HOST = '127.0.0.1'
def _client_thread_target(name, port, reg_queue):
dbg._pdebug('creating client "{}" using TCP port {}'.format(name, port))
client = _TcpClient(name, _SESSIOND_HOST, port, reg_queue)
dbg._pdebug('starting client "{}"'.format(name))
client.run()
def _init_threads():
global _initialized
dbg._pdebug('entering')
if _initialized:
dbg._pdebug('agent is already initialized')
return
# This makes sure that the appropriate modules for encoding and
# decoding strings/bytes are imported now, since no import should
# happen within a thread at import time (our case).
'lttng'.encode().decode()
_initialized = True
sys_port = _get_port_from_file('/var/run/lttng/agent.port')
user_port_file = os.path.join(_get_user_home_path(), '.lttng', 'agent.port')
user_port = _get_port_from_file(user_port_file)
reg_queue = queue.Queue()
reg_expecting = 0
dbg._pdebug('system session daemon port: {}'.format(sys_port))
dbg._pdebug('user session daemon port: {}'.format(user_port))
if sys_port == user_port and sys_port is not None:
# The two session daemon ports are the same. This is not normal.
# Connect to only one.
dbg._pdebug('both user and system session daemon have the same port')
sys_port = None
try:
if sys_port is not None:
dbg._pdebug('creating system client thread')
t = threading.Thread(target=_client_thread_target,
args=('system', sys_port, reg_queue))
t.name = 'system'
t.daemon = True
t.start()
dbg._pdebug('created and started system client thread')
reg_expecting += 1
if user_port is not None:
dbg._pdebug('creating user client thread')
t = threading.Thread(target=_client_thread_target,
args=('user', user_port, reg_queue))
t.name = 'user'
t.daemon = True
t.start()
dbg._pdebug('created and started user client thread')
reg_expecting += 1
except:
# cannot create threads for some reason; stop this initialization
dbg._pwarning('cannot create client threads')
return
if reg_expecting == 0:
# early exit: looks like there's not even one valid port
dbg._pwarning('no valid LTTng session daemon port found (is the session daemon started?)')
return
cur_timeout = _REG_TIMEOUT
# We block here to make sure the agent is properly registered to
# the session daemon. If we timeout, the client threads will still
# continue to try to connect and register to the session daemon,
# but there is no guarantee that all following logging statements
# will make it to LTTng-UST.
#
# When a client thread receives a "registration done" confirmation
# from the session daemon it's connected to, it puts True in
# reg_queue.
while True:
try:
dbg._pdebug('waiting for registration done (expecting {}, timeout is {} s)'.format(reg_expecting,
cur_timeout))
t1 = time.clock()
reg_queue.get(timeout=cur_timeout)
t2 = time.clock()
reg_expecting -= 1
dbg._pdebug('unblocked')
if reg_expecting == 0:
# done!
dbg._pdebug('successfully registered to session daemon(s)')
break
cur_timeout -= (t2 - t1)
if cur_timeout <= 0:
# timeout
dbg._pdebug('ran out of time')
break
except queue.Empty:
dbg._pdebug('ran out of time')
break
dbg._pdebug('leaving')
_init_threads()